Coverage for src/services/auth.py: 36%
83 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-01 22:29 +0200
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-01 22:29 +0200
1from datetime import datetime, timedelta, timezone
2from typing import Optional
4from redis.asyncio.client import Redis
5import pickle
7from fastapi import Depends, HTTPException, status
8from passlib.context import CryptContext
9from sqlalchemy.ext.asyncio import AsyncSession
10from jose import JWTError, jwt
12from src.entity.models import User
13from src.database.db import get_db, get_redis_client
14from src.repository import users as repositories_users
15from src.conf.config import SECRET_KEY, ALGORITHM, oauth2_scheme
17class Auth:
19 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
21 def verify_password(self, plain_password, hashed_password):
22 """
23 Verifies if the plain password matches the hashed password.
25 Args:
26 plain_password (str): The plain text password to be verified.
27 hashed_password (str): The hashed password to be compared with the plain text password.
29 Returns:
30 bool: Returns True if the plain password matches the hashed password, otherwise False.
32 Raises:
33 Exception: If the password context is not initialized.
34 """
35 return self.pwd_context.verify(plain_password, hashed_password)
37 def get_password_hash(self, password: str):
38 """
39 Hashes the given password using the bcrypt algorithm.
41 Args:
42 password (str): The plain text password to be hashed.
44 Returns:
45 str: The hashed password as a string.
47 Raises:
48 Exception: If the password context is not initialized.
49 """
50 return self.pwd_context.hash(password)
52 async def create_access_token(self, data: dict, expires_delta: Optional[float] = None):
53 """
54 Generates an access token for the user.
56 Args:
57 data (dict): A dictionary containing user-specific data.
58 expires_delta (Optional[float], optional): The time in seconds that the token should be valid for. Defaults to 15 minutes.
60 Returns:
61 str: The generated access token.
63 Raises:
64 Exception: If the token generation process fails.
66 Note:
67 The token is generated using the JWT library with the provided SECRET_KEY and ALGORITHM.
68 """
69 to_encode = data.copy()
70 now = datetime.now(timezone.utc)
71 if expires_delta:
72 expire = now + timedelta(seconds=expires_delta)
73 else:
74 expire = now + timedelta(minutes=15)
75 to_encode.update({"iat": now, "exp": expire, "scope": "access_token"})
76 encoded_access_token = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
77 return encoded_access_token
79 async def create_refresh_token(self, data: dict, expires_delta: Optional[float] = None):
80 """
81 Generates a refresh token for the user.
83 Args:
84 data (dict): A dictionary containing user-specific data.
85 expires_delta (Optional[float], optional): The time in seconds that the token should be valid for. Defaults to 7 days.
87 Returns:
88 str: The generated refresh token.
90 Raises:
91 Exception: If the token generation process fails.
93 Note:
94 The token is generated using the JWT library with the provided SECRET_KEY and ALGORITHM.
95 """
96 to_encode = data.copy()
97 now = datetime.now(timezone.utc)
98 if expires_delta:
99 expire = now + timedelta(seconds=expires_delta)
100 else:
101 expire = now + timedelta(days=7)
102 to_encode.update({"iat": now, "exp": expire, "scope": "refresh_token"})
103 encoded_refresh_token = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
104 return encoded_refresh_token
106 async def decode_refresh_token(self, refresh_token: str):
107 """
108 Decodes a refresh token to retrieve the user's email.
110 Args:
111 refresh_token (str): The refresh token to be decoded.
113 Returns:
114 str: The user's email if the token is valid and has a valid scope.
116 Raises:
117 HTTPException: If the token cannot be decoded or has an invalid scope.
119 Note:
120 The token is decoded using the JWT library with the provided SECRET_KEY and ALGORITHM.
121 """
122 try:
123 payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
124 if payload["scope"] == "refresh_token":
125 email = payload["sub"]
126 return email
127 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid scope for token")
128 except JWTError:
129 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials",)
131 async def get_current_user(
132 self,
133 token: str = Depends(oauth2_scheme),
134 db: AsyncSession = Depends(get_db),
135 redis_client: Redis = Depends(get_redis_client),
136 ):
137 """
138 Retrieves the current user from the token provided.
140 Args:
141 token (str): The token to be decoded and validated.
142 db (AsyncSession, optional): The database session to be used for retrieving user data. Defaults to Depends(get_db).
143 redis_client (Redis, optional): The Redis client to be used for caching user data. Defaults to Depends(get_redis_client).
145 Returns:
146 User: The current user object if the token is valid and has a valid scope.
148 Raises:
149 HTTPException: If the token cannot be decoded or has an invalid scope.
151 Note:
152 The token is decoded using the JWT library with the provided SECRET_KEY and ALGORITHM.
153 If the user data is found in Redis, it is returned directly. Otherwise, the user data is retrieved from the database and cached in Redis before being returned.
154 """
155 credentials_exception = HTTPException(
156 status_code=status.HTTP_401_UNAUTHORIZED,
157 detail="Could not validate credentials",
158 headers={"WWW-Authenticate": "Bearer"},
159 )
161 try:
162 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
163 if payload["scope"] == "access_token":
164 email = payload["sub"]
165 if email is None:
166 raise credentials_exception
167 else:
168 raise credentials_exception
169 except JWTError as e:
170 raise credentials_exception
171 redis_key = f'User data {email}'
172 user_data = await redis_client.get(redis_key)
173 if not user_data:
174 print('User is retrieved from Database')
175 user = await repositories_users.get_user_by_email(email, db)
176 if user is None:
177 raise credentials_exception
178 await redis_client.set(redis_key, pickle.dumps(user), ex=60*15)
179 else:
180 print("User is retrieved from Radis database")
181 user: User = pickle.loads(user_data)
182 return user
184 def create_email_token(self, data: dict):
185 """
186 Generates an email token for the user.
188 Args:
189 data (dict): A dictionary containing user-specific data.
191 Returns:
192 str: The generated email token.
194 Note:
195 The token is generated using the JWT library with the provided SECRET_KEY and ALGORITHM.
196 The token has a validity of 1 day.
197 """
198 to_encode = data.copy()
199 now = datetime.now(timezone.utc)
200 expire = now + timedelta(days=1)
201 to_encode.update({"iat": now, "exp": expire})
202 token = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
203 return token
205 async def get_email_from_token(self, token: str):
206 """
207 Decodes a token to retrieve the user's email.
209 Args:
210 token (str): The token to be decoded.
212 Returns:
213 str: The user's email if the token is valid.
215 Raises:
216 HTTPException: If the token cannot be decoded or has an invalid scope.
218 Note:
219 The token is decoded using the JWT library with the provided SECRET_KEY and ALGORITHM.
220 """
221 try:
222 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
223 email = payload["sub"]
224 return email
225 except JWTError as e:
226 print(e)
227 raise HTTPException(
228 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
229 detail="Invalid token for email verification",
230 )
233auth_serviсe = Auth()