123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- import logging
- import uuid
- from typing import Optional
- from open_webui.apps.webui.internal.db import Base, get_db
- from open_webui.apps.webui.models.users import UserModel, Users
- from open_webui.env import SRC_LOG_LEVELS
- from pydantic import BaseModel
- from sqlalchemy import Boolean, Column, String, Text
- from open_webui.utils.utils import verify_password
- log = logging.getLogger(__name__)
- log.setLevel(SRC_LOG_LEVELS["MODELS"])
- ####################
- # DB MODEL
- ####################
- class Auth(Base):
- __tablename__ = "auth"
- id = Column(String, primary_key=True)
- email = Column(String)
- password = Column(Text)
- active = Column(Boolean)
- class AuthModel(BaseModel):
- id: str
- email: str
- password: str
- active: bool = True
- ####################
- # Forms
- ####################
- class Token(BaseModel):
- token: str
- token_type: str
- class ApiKey(BaseModel):
- api_key: Optional[str] = None
- class UserResponse(BaseModel):
- id: str
- email: str
- name: str
- role: str
- profile_image_url: str
- class SigninResponse(Token, UserResponse):
- pass
- class SigninForm(BaseModel):
- email: str
- password: str
- class ProfileImageUrlForm(BaseModel):
- profile_image_url: str
- class UpdateProfileForm(BaseModel):
- profile_image_url: str
- name: str
- class UpdatePasswordForm(BaseModel):
- password: str
- new_password: str
- class SignupForm(BaseModel):
- name: str
- email: str
- password: str
- profile_image_url: Optional[str] = "/user.png"
- class AddUserForm(SignupForm):
- role: Optional[str] = "pending"
- class AuthsTable:
- def insert_new_auth(
- self,
- email: str,
- password: str,
- name: str,
- profile_image_url: str = "/user.png",
- role: str = "pending",
- oauth_sub: Optional[str] = None,
- ) -> Optional[UserModel]:
- with get_db() as db:
- log.info("insert_new_auth")
- id = str(uuid.uuid4())
- auth = AuthModel(
- **{"id": id, "email": email, "password": password, "active": True}
- )
- result = Auth(**auth.model_dump())
- db.add(result)
- user = Users.insert_new_user(
- id, name, email, profile_image_url, role, oauth_sub
- )
- db.commit()
- db.refresh(result)
- if result and user:
- return user
- else:
- return None
- def authenticate_user(self, email: str, password: str) -> Optional[UserModel]:
- log.info(f"authenticate_user: {email}")
- try:
- with get_db() as db:
- auth = db.query(Auth).filter_by(email=email, active=True).first()
- if auth:
- if verify_password(password, auth.password):
- user = Users.get_user_by_id(auth.id)
- return user
- else:
- return None
- else:
- return None
- except Exception:
- return None
- def authenticate_user_by_api_key(self, api_key: str) -> Optional[UserModel]:
- log.info(f"authenticate_user_by_api_key: {api_key}")
- # if no api_key, return None
- if not api_key:
- return None
- try:
- user = Users.get_user_by_api_key(api_key)
- return user if user else None
- except Exception:
- return False
- def authenticate_user_by_trusted_header(self, email: str) -> Optional[UserModel]:
- log.info(f"authenticate_user_by_trusted_header: {email}")
- try:
- with get_db() as db:
- auth = db.query(Auth).filter_by(email=email, active=True).first()
- if auth:
- user = Users.get_user_by_id(auth.id)
- return user
- except Exception:
- return None
- def update_user_password_by_id(self, id: str, new_password: str) -> bool:
- try:
- with get_db() as db:
- result = (
- db.query(Auth).filter_by(id=id).update({"password": new_password})
- )
- db.commit()
- return True if result == 1 else False
- except Exception:
- return False
- def update_email_by_id(self, id: str, email: str) -> bool:
- try:
- with get_db() as db:
- result = db.query(Auth).filter_by(id=id).update({"email": email})
- db.commit()
- return True if result == 1 else False
- except Exception:
- return False
- def delete_auth_by_id(self, id: str) -> bool:
- try:
- with get_db() as db:
- # Delete User
- result = Users.delete_user_by_id(id)
- if result:
- db.query(Auth).filter_by(id=id).delete()
- db.commit()
- return True
- else:
- return False
- except Exception:
- return False
- Auths = AuthsTable()
|