users.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. import time
  6. from utils.misc import get_gravatar_url
  7. from apps.web.internal.db import DB
  8. from apps.web.models.chats import Chats
  9. ####################
  10. # User DB Schema
  11. ####################
  12. class User(Model):
  13. id = CharField(unique=True)
  14. name = CharField()
  15. email = CharField()
  16. role = CharField()
  17. profile_image_url = CharField()
  18. timestamp = DateField()
  19. api_key = CharField(null=True, unique=True)
  20. class Meta:
  21. database = DB
  22. class UserModel(BaseModel):
  23. id: str
  24. name: str
  25. email: str
  26. role: str = "pending"
  27. profile_image_url: str = "/user.png"
  28. timestamp: int # timestamp in epoch
  29. api_key: Optional[str] = None
  30. ####################
  31. # Forms
  32. ####################
  33. class UserRoleUpdateForm(BaseModel):
  34. id: str
  35. role: str
  36. class UserUpdateForm(BaseModel):
  37. name: str
  38. email: str
  39. profile_image_url: str
  40. password: Optional[str] = None
  41. class UsersTable:
  42. def __init__(self, db):
  43. self.db = db
  44. self.db.create_tables([User])
  45. def insert_new_user(
  46. self, id: str, name: str, email: str, role: str = "pending"
  47. ) -> Optional[UserModel]:
  48. user = UserModel(
  49. **{
  50. "id": id,
  51. "name": name,
  52. "email": email,
  53. "role": role,
  54. "profile_image_url": "/user.png",
  55. "timestamp": int(time.time()),
  56. }
  57. )
  58. result = User.create(**user.model_dump())
  59. if result:
  60. return user
  61. else:
  62. return None
  63. def get_user_by_id(self, id: str) -> Optional[UserModel]:
  64. try:
  65. user = User.get(User.id == id)
  66. return UserModel(**model_to_dict(user))
  67. except:
  68. return None
  69. def get_user_by_api_key(self, api_key: str) -> Optional[UserModel]:
  70. try:
  71. user = User.get(User.api_key == api_key)
  72. return UserModel(**model_to_dict(user))
  73. except:
  74. return None
  75. def get_user_by_email(self, email: str) -> Optional[UserModel]:
  76. try:
  77. user = User.get(User.email == email)
  78. return UserModel(**model_to_dict(user))
  79. except:
  80. return None
  81. def get_users(self, skip: int = 0, limit: int = 50) -> List[UserModel]:
  82. return [
  83. UserModel(**model_to_dict(user))
  84. for user in User.select()
  85. # .limit(limit).offset(skip)
  86. ]
  87. def get_num_users(self) -> Optional[int]:
  88. return User.select().count()
  89. def update_user_role_by_id(self, id: str, role: str) -> Optional[UserModel]:
  90. try:
  91. query = User.update(role=role).where(User.id == id)
  92. query.execute()
  93. user = User.get(User.id == id)
  94. return UserModel(**model_to_dict(user))
  95. except:
  96. return None
  97. def update_user_profile_image_url_by_id(
  98. self, id: str, profile_image_url: str
  99. ) -> Optional[UserModel]:
  100. try:
  101. query = User.update(profile_image_url=profile_image_url).where(
  102. User.id == id
  103. )
  104. query.execute()
  105. user = User.get(User.id == id)
  106. return UserModel(**model_to_dict(user))
  107. except:
  108. return None
  109. def update_user_by_id(self, id: str, updated: dict) -> Optional[UserModel]:
  110. try:
  111. query = User.update(**updated).where(User.id == id)
  112. query.execute()
  113. user = User.get(User.id == id)
  114. return UserModel(**model_to_dict(user))
  115. except:
  116. return None
  117. def delete_user_by_id(self, id: str) -> bool:
  118. try:
  119. # Delete User Chats
  120. result = Chats.delete_chats_by_user_id(id)
  121. if result:
  122. # Delete User
  123. query = User.delete().where(User.id == id)
  124. query.execute() # Remove the rows, return number of rows removed.
  125. return True
  126. else:
  127. return False
  128. except:
  129. return False
  130. def update_user_api_key_by_id(self, id: str, api_key: str) -> str:
  131. try:
  132. query = User.update(api_key=api_key).where(User.id == id)
  133. result = query.execute()
  134. return True if result == 1 else False
  135. except:
  136. return False
  137. def get_user_api_key_by_id(self, id: str) -> Optional[str]:
  138. try:
  139. user = User.get(User.id == id)
  140. return user.api_key
  141. except:
  142. return None
  143. Users = UsersTable(DB)