-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsecurity.py
More file actions
85 lines (70 loc) · 2.45 KB
/
security.py
File metadata and controls
85 lines (70 loc) · 2.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
"""
Security utilities and dependencies for authentication.
"""
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError
try:
from app.services.auth_service import auth_service
from app.models.user import User, TokenData
except Exception:
from auth_service import auth_service
from user import User, TokenData
# HTTP Bearer token scheme
security = HTTPBearer()
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> User:
"""
Dependency to get the current authenticated user from JWT token.
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# Verify the token
token_data = auth_service.verify_token(credentials.credentials)
if token_data is None or token_data.user_id is None:
raise credentials_exception
# In a real implementation, you would fetch the user from database
# For now, we'll create a mock user based on token data
user = User(
id=token_data.user_id,
email=token_data.email or "",
created_at=None,
is_active=True
)
return user
except JWTError:
raise credentials_exception
async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
"""
Dependency to get the current active user.
"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
return current_user
def create_user_session(user: User) -> dict:
"""
Create a user session with JWT token.
"""
from datetime import timedelta
try:
from app.core.config import settings
except Exception:
from config import settings
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = auth_service.create_access_token(
data={"sub": user.id, "email": user.email},
expires_delta=access_token_expires
)
return {
"access_token": access_token,
"token_type": "bearer",
"expires_in": settings.access_token_expire_minutes * 60,
"user": user
}