|
- import time
- import requests
-
- class TokenManager:
- def __init__(self, token_url: str, client_id: str, client_secret: str,
- username: str, password: str, audience: str | None = None,
- verify_tls: bool = True, timeout_s: float = 10.0):
- self.token_url = token_url
- self.client_id = client_id
- self.client_secret = client_secret
- self.username = username
- self.password = password
- self.audience = audience
- self.verify_tls = verify_tls
- self.timeout_s = float(timeout_s)
-
- self._token: str | None = None
- self._exp: int = 0 # epoch seconds
-
- def get_token(self) -> str:
- now = int(time.time())
- if self._token and now < (self._exp - 30):
- return self._token
-
- data = {
- "grant_type": "password",
- "client_id": self.client_id,
- "client_secret": self.client_secret,
- "username": self.username,
- "password": self.password,
- }
- if self.audience:
- data["audience"] = self.audience
-
- r = requests.post(
- self.token_url,
- data=data,
- headers={"Content-Type": "application/x-www-form-urlencoded"},
- timeout=self.timeout_s,
- verify=self.verify_tls,
- )
- r.raise_for_status()
- payload = r.json()
-
- token = payload["access_token"]
- expires_in = int(payload.get("expires_in", 60))
- self._token = token
- self._exp = now + expires_in
- return token
|