| @@ -1,13 +1,56 @@ | |||
| import os | |||
| import re | |||
| from pathlib import Path | |||
| from dotenv import load_dotenv | |||
| load_dotenv() | |||
| def _load_dotenv_chain() -> None: | |||
| # Load local .env first | |||
| load_dotenv() | |||
| # Support shell-style source directives in .env, e.g. ". /data/conf/presence/core.conf" | |||
| local_env = Path(".env") | |||
| if not local_env.exists(): | |||
| return | |||
| for raw_line in local_env.read_text(encoding="utf-8").splitlines(): | |||
| line = raw_line.strip() | |||
| if not line or line.startswith("#"): | |||
| continue | |||
| if line.startswith(". "): | |||
| target = line[2:].strip().strip('"').strip("'") | |||
| elif line.startswith("source "): | |||
| target = line[7:].strip().strip('"').strip("'") | |||
| else: | |||
| continue | |||
| if not target: | |||
| continue | |||
| source_path = Path(target) | |||
| if not source_path.is_absolute(): | |||
| source_path = (local_env.parent / source_path).resolve() | |||
| if source_path.exists(): | |||
| load_dotenv(dotenv_path=source_path, override=False) | |||
| _load_dotenv_chain() | |||
| _SHELL_VAR_PATTERN = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)\}|\$([A-Za-z_][A-Za-z0-9_]*)") | |||
| def _clean(value: str) -> str: | |||
| if value is None: | |||
| return "" | |||
| return str(value).strip().strip('"').strip("'") | |||
| text = str(value).strip().strip('"').strip("'") | |||
| if not text: | |||
| return "" | |||
| # Expand ${VAR} and $VAR references using already loaded environment variables. | |||
| def _replace(match: re.Match) -> str: | |||
| var_name = match.group(1) or match.group(2) | |||
| return os.getenv(var_name, "") | |||
| return _SHELL_VAR_PATTERN.sub(_replace, text) | |||
| def _has_http_scheme(value: str) -> bool: | |||
| @@ -31,21 +74,31 @@ def _absolute_or_join(value: str, base_url: str) -> str: | |||
| return "" | |||
| if _has_http_scheme(value): | |||
| return value | |||
| if value.startswith("/") and base_url: | |||
| return f"{base_url.rstrip('/')}{value}" | |||
| if value.startswith("/"): | |||
| if base_url: | |||
| return f"{base_url.rstrip('/')}{value}" | |||
| return "" | |||
| return _ensure_http_scheme(value) | |||
| def _env_first(*names: str) -> str: | |||
| for name in names: | |||
| value = _clean(os.getenv(name)) | |||
| if value: | |||
| return value | |||
| return "" | |||
| # Keycloak configuration (look in the .env) | |||
| SECRET = os.getenv("SECRET") | |||
| KEYCLOAK_AUDIENCE = os.getenv("KEYCLOAK_AUDIENCE") | |||
| SECRET = _env_first("SECRET", "client_secret") | |||
| KEYCLOAK_AUDIENCE = _env_first("KEYCLOAK_AUDIENCE", "keycloak_audience") | |||
| _raw_keycloak_server = _clean(os.getenv("KEYCLOAK_SERVER")) | |||
| _raw_keycloak_server = _env_first("KEYCLOAK_SERVER", "keycloak_server") | |||
| KEYCLOAK_SERVER = _ensure_http_scheme(_raw_keycloak_server) | |||
| _default_realm = _clean(os.getenv("KEYCLOAK_REALM")) or "API.Server.local" | |||
| _default_realm = _env_first("KEYCLOAK_REALM", "keycloak_realm") or "API.Server.local" | |||
| _raw_keycloak_issuer = _clean(os.getenv("KEYCLOAK_ISSUER")) | |||
| _raw_keycloak_issuer = _env_first("KEYCLOAK_ISSUER", "keycloak_issuer") | |||
| if _raw_keycloak_issuer and "${" not in _raw_keycloak_issuer: | |||
| KEYCLOAK_ISSUER = _absolute_or_join(_raw_keycloak_issuer, KEYCLOAK_SERVER) | |||
| elif KEYCLOAK_SERVER: | |||
| @@ -53,7 +106,7 @@ elif KEYCLOAK_SERVER: | |||
| else: | |||
| KEYCLOAK_ISSUER = "" | |||
| _raw_keycloak_protocol = _clean(os.getenv("KEYCLOAK_PROTOCOL_ENDPOINT")) | |||
| _raw_keycloak_protocol = _env_first("KEYCLOAK_PROTOCOL_ENDPOINT", "keycloak_protocol_endpoint") | |||
| if _raw_keycloak_protocol and "${" not in _raw_keycloak_protocol: | |||
| KEYCLOAK_PROTOCOL_ENDPOINT = _absolute_or_join(_raw_keycloak_protocol, KEYCLOAK_SERVER) | |||
| elif KEYCLOAK_ISSUER: | |||
| @@ -61,7 +114,7 @@ elif KEYCLOAK_ISSUER: | |||
| else: | |||
| KEYCLOAK_PROTOCOL_ENDPOINT = "" | |||
| _raw_jwks = _clean(os.getenv("KEYCLOAK_JWKS_URL")) | |||
| _raw_jwks = _env_first("KEYCLOAK_JWKS_URL", "keycloak_jwks_url") | |||
| if _raw_jwks and "${" not in _raw_jwks: | |||
| KEYCLOAK_JWKS_URL = _absolute_or_join(_raw_jwks, KEYCLOAK_SERVER) | |||
| elif KEYCLOAK_PROTOCOL_ENDPOINT: | |||
| @@ -69,7 +122,7 @@ elif KEYCLOAK_PROTOCOL_ENDPOINT: | |||
| else: | |||
| KEYCLOAK_JWKS_URL = "" | |||
| _raw_auth = _clean(os.getenv("KEYCLOAK_AUTH_URL")) | |||
| _raw_auth = _env_first("KEYCLOAK_AUTH_URL", "keycloak_auth_url") | |||
| if _raw_auth and "${" not in _raw_auth: | |||
| KEYCLOAK_AUTH_URL = _absolute_or_join(_raw_auth, KEYCLOAK_SERVER) | |||
| elif KEYCLOAK_PROTOCOL_ENDPOINT: | |||
| @@ -77,7 +130,7 @@ elif KEYCLOAK_PROTOCOL_ENDPOINT: | |||
| else: | |||
| KEYCLOAK_AUTH_URL = "" | |||
| _raw_token = _clean(os.getenv("KEYCLOAK_TOKEN_URL")) | |||
| _raw_token = _env_first("KEYCLOAK_TOKEN_URL", "keycloak_token_url") | |||
| if _raw_token and "${" not in _raw_token: | |||
| KEYCLOAK_TOKEN_URL = _absolute_or_join(_raw_token, KEYCLOAK_SERVER) | |||
| elif KEYCLOAK_PROTOCOL_ENDPOINT: | |||