auths.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. from fastapi import Response, Request
  2. from fastapi import Depends, FastAPI, HTTPException, status
  3. from datetime import datetime, timedelta
  4. from typing import List, Union
  5. from fastapi import APIRouter, status
  6. from pydantic import BaseModel
  7. import time
  8. import uuid
  9. from apps.web.models.auths import (
  10. SigninForm,
  11. SignupForm,
  12. UpdateProfileForm,
  13. UpdatePasswordForm,
  14. UserResponse,
  15. SigninResponse,
  16. Auths,
  17. )
  18. from apps.web.models.users import Users
  19. from utils.utils import (
  20. get_password_hash,
  21. get_current_user,
  22. get_admin_user,
  23. create_token,
  24. )
  25. from utils.misc import get_gravatar_url, validate_email_format
  26. from constants import ERROR_MESSAGES
  27. router = APIRouter()
  28. ############################
  29. # GetSessionUser
  30. ############################
  31. @router.get("/", response_model=UserResponse)
  32. async def get_session_user(user=Depends(get_current_user)):
  33. return {
  34. "id": user.id,
  35. "email": user.email,
  36. "name": user.name,
  37. "role": user.role,
  38. "profile_image_url": user.profile_image_url,
  39. }
  40. ############################
  41. # Update Profile
  42. ############################
  43. @router.post("/update/profile", response_model=UserResponse)
  44. async def update_profile(
  45. form_data: UpdateProfileForm, session_user=Depends(get_current_user)
  46. ):
  47. if session_user:
  48. user = Users.update_user_by_id(
  49. session_user.id,
  50. {"profile_image_url": form_data.profile_image_url, "name": form_data.name},
  51. )
  52. if user:
  53. return user
  54. else:
  55. raise HTTPException(400, detail=ERROR_MESSAGES.DEFAULT())
  56. else:
  57. raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
  58. ############################
  59. # Update Password
  60. ############################
  61. @router.post("/update/password", response_model=bool)
  62. async def update_password(
  63. form_data: UpdatePasswordForm, session_user=Depends(get_current_user)
  64. ):
  65. if session_user:
  66. user = Auths.authenticate_user(session_user.email, form_data.password)
  67. if user:
  68. hashed = get_password_hash(form_data.new_password)
  69. return Auths.update_user_password_by_id(user.id, hashed)
  70. else:
  71. raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_PASSWORD)
  72. else:
  73. raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
  74. ############################
  75. # SignIn
  76. ############################
  77. @router.post("/signin", response_model=SigninResponse)
  78. async def signin(form_data: SigninForm):
  79. user = Auths.authenticate_user(form_data.email.lower(), form_data.password)
  80. if user:
  81. token = create_token(data={"id": user.id})
  82. return {
  83. "token": token,
  84. "token_type": "Bearer",
  85. "id": user.id,
  86. "email": user.email,
  87. "name": user.name,
  88. "role": user.role,
  89. "profile_image_url": user.profile_image_url,
  90. }
  91. else:
  92. raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
  93. ############################
  94. # SignUp
  95. ############################
  96. @router.post("/signup", response_model=SigninResponse)
  97. async def signup(request: Request, form_data: SignupForm):
  98. if not request.app.state.ENABLE_SIGNUP:
  99. raise HTTPException(
  100. status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.ACCESS_PROHIBITED
  101. )
  102. if not validate_email_format(form_data.email.lower()):
  103. raise HTTPException(
  104. status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.INVALID_EMAIL_FORMAT
  105. )
  106. if Users.get_user_by_email(form_data.email.lower()):
  107. raise HTTPException(400, detail=ERROR_MESSAGES.EMAIL_TAKEN)
  108. try:
  109. role = (
  110. "admin"
  111. if Users.get_num_users() == 0
  112. else request.app.state.DEFAULT_USER_ROLE
  113. )
  114. hashed = get_password_hash(form_data.password)
  115. user = Auths.insert_new_auth(
  116. form_data.email.lower(), hashed, form_data.name, role
  117. )
  118. if user:
  119. token = create_token(data={"id": user.id})
  120. # response.set_cookie(key='token', value=token, httponly=True)
  121. return {
  122. "token": token,
  123. "token_type": "Bearer",
  124. "id": user.id,
  125. "email": user.email,
  126. "name": user.name,
  127. "role": user.role,
  128. "profile_image_url": user.profile_image_url,
  129. }
  130. else:
  131. raise HTTPException(500, detail=ERROR_MESSAGES.CREATE_USER_ERROR)
  132. except Exception as err:
  133. raise HTTPException(500, detail=ERROR_MESSAGES.DEFAULT(err))
  134. ############################
  135. # ToggleSignUp
  136. ############################
  137. @router.get("/signup/enabled", response_model=bool)
  138. async def get_sign_up_status(request: Request, user=Depends(get_admin_user)):
  139. return request.app.state.ENABLE_SIGNUP
  140. @router.get("/signup/enabled/toggle", response_model=bool)
  141. async def toggle_sign_up(request: Request, user=Depends(get_admin_user)):
  142. request.app.state.ENABLE_SIGNUP = not request.app.state.ENABLE_SIGNUP
  143. return request.app.state.ENABLE_SIGNUP
  144. ############################
  145. # Default User Role
  146. ############################
  147. @router.get("/signup/user/role")
  148. async def get_default_user_role(request: Request, user=Depends(get_admin_user)):
  149. return request.app.state.DEFAULT_USER_ROLE
  150. class UpdateRoleForm(BaseModel):
  151. role: str
  152. @router.post("/signup/user/role")
  153. async def update_default_user_role(
  154. request: Request, form_data: UpdateRoleForm, user=Depends(get_admin_user)
  155. ):
  156. if form_data.role in ["pending", "user", "admin"]:
  157. request.app.state.DEFAULT_USER_ROLE = form_data.role
  158. return request.app.state.DEFAULT_USER_ROLE