users.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. import time
  6. from utils.misc import get_gravatar_url
  7. from apps.web.internal.db import DB
  8. ####################
  9. # User DB Schema
  10. ####################
  11. class User(Model):
  12. id = CharField(unique=True)
  13. name = CharField()
  14. email = CharField()
  15. role = CharField()
  16. profile_image_url = CharField()
  17. timestamp = DateField()
  18. class Meta:
  19. database = DB
  20. class UserModel(BaseModel):
  21. id: str
  22. name: str
  23. email: str
  24. role: str = "pending"
  25. profile_image_url: str = "/user.png"
  26. timestamp: int # timestamp in epoch
  27. ####################
  28. # Forms
  29. ####################
  30. class UserRoleUpdateForm(BaseModel):
  31. id: str
  32. role: str
  33. class UsersTable:
  34. def __init__(self, db):
  35. self.db = db
  36. self.db.create_tables([User])
  37. def insert_new_user(
  38. self, id: str, name: str, email: str, role: str = "pending"
  39. ) -> Optional[UserModel]:
  40. user = UserModel(
  41. **{
  42. "id": id,
  43. "name": name,
  44. "email": email,
  45. "role": role,
  46. "profile_image_url": get_gravatar_url(email),
  47. "timestamp": int(time.time()),
  48. }
  49. )
  50. result = User.create(**user.model_dump())
  51. if result:
  52. return user
  53. else:
  54. return None
  55. def get_user_by_id(self, id: str) -> Optional[UserModel]:
  56. try:
  57. user = User.get(User.id == id)
  58. return UserModel(**model_to_dict(user))
  59. except:
  60. return None
  61. def get_user_by_email(self, email: str) -> Optional[UserModel]:
  62. try:
  63. user = User.get(User.email == email)
  64. return UserModel(**model_to_dict(user))
  65. except:
  66. return None
  67. def get_users(self, skip: int = 0, limit: int = 50) -> List[UserModel]:
  68. return [
  69. UserModel(**model_to_dict(user))
  70. for user in User.select().limit(limit).offset(skip)
  71. ]
  72. def get_num_users(self) -> Optional[int]:
  73. return User.select().count()
  74. def update_user_role_by_id(self, id: str, role: str) -> Optional[UserModel]:
  75. try:
  76. query = User.update(role=role).where(User.id == id)
  77. query.execute()
  78. user = User.get(User.id == id)
  79. return UserModel(**model_to_dict(user))
  80. except:
  81. return None
  82. Users = UsersTable(DB)