Source code for ABConnect.api.auth

import os
import json
import logging
import requests
import time
from abc import ABC, abstractmethod
from ABConnect.exceptions import LoginFailedError
from ABConnect.config import Config, get_config
from appdirs import user_cache_dir

logger = logging.getLogger(__name__)


[docs] class TokenStorage(ABC): @property def identity_url(self): return Config.get_identity_url() def _calc_expires_at(self, expires_in: int, buffer: int = 300) -> float: return time.time() + expires_in - buffer @property def expired(self) -> bool: if not self._token: return True expires_at = self._token.get("expires_at") if not expires_at: return True return time.time() >= expires_at def _identity_body(self): return { "rememberMe": True, "scope": "offline_access", "client_id": get_config("ABC_CLIENT_ID"), "client_secret": get_config("ABC_CLIENT_SECRET"), } def _call_login(self, data): r = requests.post(self.identity_url, data=data) if r.ok: resp = r.json() self.set_token(resp) logger.info( f"Login successful for user {data.get('username')} using grant_type {data.get('grant_type')}" ) else: msg = f"Login failed for {data.get('username')}: {r.status_code} {r.text}" logger.error(msg) raise LoginFailedError(msg).no_traceback() def _login(self): data = { "grant_type": "password", **self._get_creds(), **self._identity_body(), } self._call_login(data)
[docs] @abstractmethod def get_token(self): """Return the current bearer token.""" pass
[docs] @abstractmethod def set_token(self, token): """Store a new token.""" pass
[docs] @abstractmethod def refresh_token(self): """Refresh the token if needed and return the updated token.""" pass
@abstractmethod def _get_creds(self): """Return dict with 'username' and 'password' keys for credential login.""" pass
[docs] class SessionTokenStorage(TokenStorage):
[docs] def __init__(self, *args, **kwargs): self._token = None self.request = kwargs["request"] self._load_token()
def _load_token(self): if "abc_token" in self.request.session: self._token = self.request.session["abc_token"] if not self.expired: logger.info(f"From ABConnect.api.auth._load_token() -- Success") return if ( hasattr(self.request.user, "refresh_token") and self.request.user.refresh_token ): self._token = {"refresh_token": self.request.user.refresh_token} if self.refresh_token(): return self._login() def _get_creds(self): if "username" in self._creds and "password" in self._creds: return { "username": self._creds["username"], "password": self._creds["password"], } else: if ( hasattr(self.request, "user") and hasattr(self.request.user, "username") and hasattr(self.request.user, "pw") ): return { "username": self.request.user.username, "password": self.request.user.pw, }
[docs] def get_token(self): if self.expired: if not self.refresh_token(): self._login() return self._token
[docs] def set_token(self, token): token["expires_at"] = self._calc_expires_at(token["expires_in"]) self._token = token self.request.session["abc_token"] = token if hasattr(self.request.user, "refresh_token"): self.request.user.refresh_token = token.get("refresh_token") self.request.user.save()
[docs] def refresh_token(self): if self._token and "refresh_token" in self._token: refresh_token = self._token["refresh_token"] elif self.request.user.refresh_token: refresh_token = self.request.user.refresh_token else: return False data = { "grant_type": "refresh_token", "refresh_token": refresh_token, **self._identity_body(), } try: self._call_login(data) return True except LoginFailedError: logger.info( "Refresh token expired or invalid, will attempt credential login" ) return False
[docs] class FileTokenStorage(TokenStorage):
[docs] def __init__(self, *args, **kwargs): self._resolve_creds(**kwargs) cache_dir = user_cache_dir("ABConnect") os.makedirs(cache_dir, exist_ok=True) # Include environment in filename to separate staging/production tokens env_suffix = "_staging" if Config._env == "staging" else "" self.path = os.path.join(cache_dir, f"token_{self.creds['username']}{env_suffix}.json") self._token = None self._load_token() if not self._token: raise RuntimeError("Failed to load or obtain a valid access token.")
def _resolve_creds(self, **kwargs): username = kwargs.get("username") password = kwargs.get("password") if username and password: username = username.lower() elif username: username = username.lower() password = get_config(f"AB_USER_{username.upper()}") if not password: raise LoginFailedError(f"No password configured for user '{username}'") else: username = get_config("ABCONNECT_USERNAME") password = get_config("ABCONNECT_PASSWORD") if not (username and password): raise LoginFailedError("Default credentials (ABCONNECT_USERNAME/PASSWORD) not set") self.creds = {"username": username.lower(), "password": password} def _get_creds(self): return self.creds def _load_token(self): if os.path.exists(self.path): try: with open(self.path, "r") as f: self._token = json.load(f) except Exception as e: logger.error(f"Error reading token file: {e}") self.get_token()
[docs] def get_token(self): if self.expired: logger.info(f"Token expired") if not self.refresh_token(): self._login() return self._token
[docs] def set_token(self, token): token["expires_at"] = self._calc_expires_at(token["expires_in"]) self._token = token try: with open(self.path, "w") as f: json.dump(self._token, f) except Exception as e: print(f"Error writing token file: {e}")
[docs] def refresh_token(self): if not self._token or "refresh_token" not in self._token: return False data = { "grant_type": "refresh_token", "refresh_token": self._token["refresh_token"], **self._identity_body(), } try: self._call_login(data) return True except LoginFailedError: logger.info( f"Refresh token failed for {self.creds['username']}" ) return False