Coverage for src/services/auth.py: 36%

83 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-01 22:29 +0200

1from datetime import datetime, timedelta, timezone 

2from typing import Optional 

3 

4from redis.asyncio.client import Redis 

5import pickle 

6 

7from fastapi import Depends, HTTPException, status 

8from passlib.context import CryptContext 

9from sqlalchemy.ext.asyncio import AsyncSession 

10from jose import JWTError, jwt 

11 

12from src.entity.models import User 

13from src.database.db import get_db, get_redis_client 

14from src.repository import users as repositories_users 

15from src.conf.config import SECRET_KEY, ALGORITHM, oauth2_scheme 

16 

17class Auth: 

18 

19 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") 

20 

21 def verify_password(self, plain_password, hashed_password): 

22 """ 

23 Verifies if the plain password matches the hashed password. 

24 

25 Args: 

26 plain_password (str): The plain text password to be verified. 

27 hashed_password (str): The hashed password to be compared with the plain text password. 

28 

29 Returns: 

30 bool: Returns True if the plain password matches the hashed password, otherwise False. 

31 

32 Raises: 

33 Exception: If the password context is not initialized. 

34 """ 

35 return self.pwd_context.verify(plain_password, hashed_password) 

36 

37 def get_password_hash(self, password: str): 

38 """ 

39 Hashes the given password using the bcrypt algorithm. 

40 

41 Args: 

42 password (str): The plain text password to be hashed. 

43 

44 Returns: 

45 str: The hashed password as a string. 

46 

47 Raises: 

48 Exception: If the password context is not initialized. 

49 """ 

50 return self.pwd_context.hash(password) 

51 

52 async def create_access_token(self, data: dict, expires_delta: Optional[float] = None): 

53 """ 

54 Generates an access token for the user. 

55 

56 Args: 

57 data (dict): A dictionary containing user-specific data. 

58 expires_delta (Optional[float], optional): The time in seconds that the token should be valid for. Defaults to 15 minutes. 

59 

60 Returns: 

61 str: The generated access token. 

62 

63 Raises: 

64 Exception: If the token generation process fails. 

65 

66 Note: 

67 The token is generated using the JWT library with the provided SECRET_KEY and ALGORITHM. 

68 """ 

69 to_encode = data.copy() 

70 now = datetime.now(timezone.utc) 

71 if expires_delta: 

72 expire = now + timedelta(seconds=expires_delta) 

73 else: 

74 expire = now + timedelta(minutes=15) 

75 to_encode.update({"iat": now, "exp": expire, "scope": "access_token"}) 

76 encoded_access_token = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 

77 return encoded_access_token 

78 

79 async def create_refresh_token(self, data: dict, expires_delta: Optional[float] = None): 

80 """ 

81 Generates a refresh token for the user. 

82 

83 Args: 

84 data (dict): A dictionary containing user-specific data. 

85 expires_delta (Optional[float], optional): The time in seconds that the token should be valid for. Defaults to 7 days. 

86 

87 Returns: 

88 str: The generated refresh token. 

89 

90 Raises: 

91 Exception: If the token generation process fails. 

92 

93 Note: 

94 The token is generated using the JWT library with the provided SECRET_KEY and ALGORITHM. 

95 """ 

96 to_encode = data.copy() 

97 now = datetime.now(timezone.utc) 

98 if expires_delta: 

99 expire = now + timedelta(seconds=expires_delta) 

100 else: 

101 expire = now + timedelta(days=7) 

102 to_encode.update({"iat": now, "exp": expire, "scope": "refresh_token"}) 

103 encoded_refresh_token = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 

104 return encoded_refresh_token 

105 

106 async def decode_refresh_token(self, refresh_token: str): 

107 """ 

108 Decodes a refresh token to retrieve the user's email. 

109 

110 Args: 

111 refresh_token (str): The refresh token to be decoded. 

112 

113 Returns: 

114 str: The user's email if the token is valid and has a valid scope. 

115 

116 Raises: 

117 HTTPException: If the token cannot be decoded or has an invalid scope. 

118 

119 Note: 

120 The token is decoded using the JWT library with the provided SECRET_KEY and ALGORITHM. 

121 """ 

122 try: 

123 payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM]) 

124 if payload["scope"] == "refresh_token": 

125 email = payload["sub"] 

126 return email 

127 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid scope for token") 

128 except JWTError: 

129 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials",) 

130 

131 async def get_current_user( 

132 self, 

133 token: str = Depends(oauth2_scheme), 

134 db: AsyncSession = Depends(get_db), 

135 redis_client: Redis = Depends(get_redis_client), 

136 ): 

137 """ 

138 Retrieves the current user from the token provided. 

139 

140 Args: 

141 token (str): The token to be decoded and validated. 

142 db (AsyncSession, optional): The database session to be used for retrieving user data. Defaults to Depends(get_db). 

143 redis_client (Redis, optional): The Redis client to be used for caching user data. Defaults to Depends(get_redis_client). 

144 

145 Returns: 

146 User: The current user object if the token is valid and has a valid scope. 

147 

148 Raises: 

149 HTTPException: If the token cannot be decoded or has an invalid scope. 

150 

151 Note: 

152 The token is decoded using the JWT library with the provided SECRET_KEY and ALGORITHM. 

153 If the user data is found in Redis, it is returned directly. Otherwise, the user data is retrieved from the database and cached in Redis before being returned. 

154 """ 

155 credentials_exception = HTTPException( 

156 status_code=status.HTTP_401_UNAUTHORIZED, 

157 detail="Could not validate credentials", 

158 headers={"WWW-Authenticate": "Bearer"}, 

159 ) 

160 

161 try: 

162 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 

163 if payload["scope"] == "access_token": 

164 email = payload["sub"] 

165 if email is None: 

166 raise credentials_exception 

167 else: 

168 raise credentials_exception 

169 except JWTError as e: 

170 raise credentials_exception 

171 redis_key = f'User data {email}' 

172 user_data = await redis_client.get(redis_key) 

173 if not user_data: 

174 print('User is retrieved from Database') 

175 user = await repositories_users.get_user_by_email(email, db) 

176 if user is None: 

177 raise credentials_exception 

178 await redis_client.set(redis_key, pickle.dumps(user), ex=60*15) 

179 else: 

180 print("User is retrieved from Radis database") 

181 user: User = pickle.loads(user_data) 

182 return user 

183 

184 def create_email_token(self, data: dict): 

185 """ 

186 Generates an email token for the user. 

187 

188 Args: 

189 data (dict): A dictionary containing user-specific data. 

190 

191 Returns: 

192 str: The generated email token. 

193 

194 Note: 

195 The token is generated using the JWT library with the provided SECRET_KEY and ALGORITHM. 

196 The token has a validity of 1 day. 

197 """ 

198 to_encode = data.copy() 

199 now = datetime.now(timezone.utc) 

200 expire = now + timedelta(days=1) 

201 to_encode.update({"iat": now, "exp": expire}) 

202 token = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 

203 return token 

204 

205 async def get_email_from_token(self, token: str): 

206 """ 

207 Decodes a token to retrieve the user's email. 

208 

209 Args: 

210 token (str): The token to be decoded. 

211 

212 Returns: 

213 str: The user's email if the token is valid. 

214 

215 Raises: 

216 HTTPException: If the token cannot be decoded or has an invalid scope. 

217 

218 Note: 

219 The token is decoded using the JWT library with the provided SECRET_KEY and ALGORITHM. 

220 """ 

221 try: 

222 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 

223 email = payload["sub"] 

224 return email 

225 except JWTError as e: 

226 print(e) 

227 raise HTTPException( 

228 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 

229 detail="Invalid token for email verification", 

230 ) 

231 

232 

233auth_serviсe = Auth()