1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- from pydantic import BaseModel
- from typing import List, Union, Optional
- from pymongo import ReturnDocument
- import time
- from utils.utils import decode_token
- from utils.misc import get_gravatar_url
- from config import DB
- ####################
- # User DB Schema
- ####################
- class UserModel(BaseModel):
- id: str
- name: str
- email: str
- role: str = "pending"
- profile_image_url: str = "/user.png"
- created_at: int # timestamp in epoch
- ####################
- # Forms
- ####################
- class UsersTable:
- def __init__(self, db):
- self.db = db
- self.table = db.users
- def insert_new_user(
- self, id: str, name: str, email: str, role: str = "pending"
- ) -> Optional[UserModel]:
- user = UserModel(
- **{
- "id": id,
- "name": name,
- "email": email,
- "role": role,
- "profile_image_url": get_gravatar_url(email),
- "created_at": int(time.time()),
- }
- )
- result = self.table.insert_one(user.model_dump())
- if result:
- return user
- else:
- return None
- def get_user_by_email(self, email: str) -> Optional[UserModel]:
- user = self.table.find_one({"email": email}, {"_id": False})
- if user:
- return UserModel(**user)
- else:
- return None
- def get_user_by_token(self, token: str) -> Optional[UserModel]:
- data = decode_token(token)
- if data != None and "email" in data:
- return self.get_user_by_email(data["email"])
- else:
- return None
- def get_users(self, skip: int = 0, limit: int = 50) -> Optional[UserModel]:
- return [
- UserModel(**user)
- for user in list(self.table.find({}, {"_id": False}))
- .skip(skip)
- .limit(limit)
- ]
- Users = UsersTable(DB)
|