#!/usr/bin/python3 # -*- coding: utf-8 -*- """ OAuth2 authentication provider """ import logging from functools import wraps from flask import redirect, request, session, url_for from .base import AuthProvider class OAuthProvider(AuthProvider): """OAuth2 authentication provider. Supports multiple OAuth providers (Google, GitHub, GitLab, etc.) Configuration: provider: OAuth provider name ('google', 'github', 'gitlab', 'generic') client_id: OAuth client ID client_secret: OAuth client secret redirect_uri: OAuth redirect URI scopes: List of OAuth scopes (default: ['openid', 'email', 'profile']) authorized_domains: List of allowed email domains (optional) authorized_users: List of allowed email addresses (optional) login_view: Route name for login page (default: 'oauth_login') """ def __init__(self, config): super().__init__(config) self.logger = logging.getLogger("tisbackup.auth") self.provider_name = config.get("provider", "generic").lower() self.client_id = config.get("client_id") self.client_secret = config.get("client_secret") self.redirect_uri = config.get("redirect_uri") self.scopes = config.get("scopes", ["openid", "email", "profile"]) self.authorized_domains = config.get("authorized_domains", []) self.authorized_users = config.get("authorized_users", []) self.login_view = config.get("login_view", "oauth_login") if not self.client_id or not self.client_secret: raise ValueError("OAuth requires 'client_id' and 'client_secret' in config") # Provider-specific configurations self.provider_configs = { "google": { "authorization_endpoint": "https://accounts.google.com/o/oauth2/v2/auth", "token_endpoint": "https://oauth2.googleapis.com/token", "userinfo_endpoint": "https://www.googleapis.com/oauth2/v2/userinfo", "default_scopes": ["openid", "email", "profile"], }, "github": { "authorization_endpoint": "https://github.com/login/oauth/authorize", "token_endpoint": "https://github.com/login/oauth/access_token", "userinfo_endpoint": "https://api.github.com/user", "default_scopes": ["read:user", "user:email"], }, "gitlab": { "authorization_endpoint": "https://gitlab.com/oauth/authorize", "token_endpoint": "https://gitlab.com/oauth/token", "userinfo_endpoint": "https://gitlab.com/api/v4/user", "default_scopes": ["read_user", "email"], }, } # Get provider config or use generic self.provider_config = self.provider_configs.get(self.provider_name, {}) # Override with custom endpoints if provided self.authorization_endpoint = config.get( "authorization_endpoint", self.provider_config.get("authorization_endpoint") ) self.token_endpoint = config.get( "token_endpoint", self.provider_config.get("token_endpoint") ) self.userinfo_endpoint = config.get( "userinfo_endpoint", self.provider_config.get("userinfo_endpoint") ) # Use provider default scopes if not specified if not config.get("scopes"): self.scopes = self.provider_config.get("default_scopes", self.scopes) def init_app(self, app): """Initialize OAuth with the app.""" try: from authlib.integrations.flask_client import OAuth except ImportError: raise ImportError( "authlib library required for OAuth. " "Install with: pip install authlib requests" ) self.oauth = OAuth(app) # Register OAuth client self.oauth_client = self.oauth.register( name=self.provider_name, client_id=self.client_id, client_secret=self.client_secret, server_metadata_url=None, authorize_url=self.authorization_endpoint, access_token_url=self.token_endpoint, userinfo_endpoint=self.userinfo_endpoint, client_kwargs={"scope": " ".join(self.scopes)}, ) def is_user_authorized(self, user_info): """Check if user is authorized based on email/domain. Args: user_info: Dict with user information (must contain 'email') Returns: bool: True if authorized """ email = user_info.get("email", "").lower() # Check specific users if self.authorized_users: if email in [u.lower() for u in self.authorized_users]: return True # Check domains if self.authorized_domains: domain = email.split("@")[-1] if "@" in email else "" if domain in [d.lower() for d in self.authorized_domains]: return True # If no restrictions configured, allow all if not self.authorized_users and not self.authorized_domains: return True return False def is_authenticated(self): """Check if current user is authenticated via OAuth.""" return "oauth_user" in session def handle_unauthorized(self): """Redirect to OAuth login.""" return redirect(url_for(self.login_view)) def get_current_user(self): """Get current authenticated user.""" user_info = session.get("oauth_user") if user_info: return { **user_info, "auth_type": "oauth", "provider": self.provider_name } return None def logout(self): """Logout current user.""" session.pop("oauth_user", None) session.pop("oauth_token", None)