|
@@ -58,7 +58,9 @@ def extract_token_from_auth_header(auth_header: str):
|
|
return auth_header[len("Bearer ") :]
|
|
return auth_header[len("Bearer ") :]
|
|
|
|
|
|
|
|
|
|
-def get_current_user(auth_token: HTTPAuthorizationCredentials = Depends(bearer_security)):
|
|
|
|
|
|
+def get_current_user(
|
|
|
|
+ auth_token: HTTPAuthorizationCredentials = Depends(bearer_security),
|
|
|
|
+):
|
|
data = decode_token(auth_token.credentials)
|
|
data = decode_token(auth_token.credentials)
|
|
if data != None and "id" in data:
|
|
if data != None and "id" in data:
|
|
user = Users.get_user_by_id(data["id"])
|
|
user = Users.get_user_by_id(data["id"])
|
|
@@ -75,17 +77,19 @@ def get_current_user(auth_token: HTTPAuthorizationCredentials = Depends(bearer_s
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
-def get_verified_user(user: Users = Depends(get_current_user)):
|
|
|
|
|
|
+def get_verified_user(user=Depends(get_current_user)):
|
|
if user.role not in {"user", "admin"}:
|
|
if user.role not in {"user", "admin"}:
|
|
raise HTTPException(
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
|
|
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
|
|
)
|
|
)
|
|
|
|
+ return user
|
|
|
|
|
|
|
|
|
|
-def get_admin_user(user: Users = Depends(get_current_user)):
|
|
|
|
|
|
+def get_admin_user(user=Depends(get_current_user)):
|
|
if user.role != "admin":
|
|
if user.role != "admin":
|
|
raise HTTPException(
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
|
|
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
|
|
)
|
|
)
|
|
|
|
+ return user
|