Pārlūkot izejas kodu

Update auths.py

Timothy J. Baek 10 mēneši atpakaļ
vecāks
revīzija
8b13755d56
1 mainītis faili ar 51 papildinājumiem un 38 dzēšanām
  1. 51 38
      backend/apps/webui/models/auths.py

+ 51 - 38
backend/apps/webui/models/auths.py

@@ -102,40 +102,44 @@ class AuthsTable:
         role: str = "pending",
         role: str = "pending",
         oauth_sub: Optional[str] = None,
         oauth_sub: Optional[str] = None,
     ) -> Optional[UserModel]:
     ) -> Optional[UserModel]:
-        log.info("insert_new_auth")
+        with get_db() as db:
 
 
-        id = str(uuid.uuid4())
+            log.info("insert_new_auth")
 
 
-        auth = AuthModel(
-            **{"id": id, "email": email, "password": password, "active": True}
-        )
-        result = Auth(**auth.model_dump())
-        db.add(result)
+            id = str(uuid.uuid4())
 
 
-        user = Users.insert_new_user(
-            id, name, email, profile_image_url, role, oauth_sub
-        )
+            auth = AuthModel(
+                **{"id": id, "email": email, "password": password, "active": True}
+            )
+            result = Auth(**auth.model_dump())
+            db.add(result)
 
 
-        db.commit()
-        db.refresh(result)
+            user = Users.insert_new_user(
+                id, name, email, profile_image_url, role, oauth_sub
+            )
 
 
-        if result and user:
-            return user
-        else:
-            return None
+            db.commit()
+            db.refresh(result)
+
+            if result and user:
+                return user
+            else:
+                return None
 
 
     def authenticate_user(self, email: str, password: str) -> Optional[UserModel]:
     def authenticate_user(self, email: str, password: str) -> Optional[UserModel]:
         log.info(f"authenticate_user: {email}")
         log.info(f"authenticate_user: {email}")
         try:
         try:
-            auth = db.query(Auth).filter_by(email=email, active=True).first()
-            if auth:
-                if verify_password(password, auth.password):
-                    user = Users.get_user_by_id(auth.id)
-                    return user
+            with get_db() as db:
+
+                auth = db.query(Auth).filter_by(email=email, active=True).first()
+                if auth:
+                    if verify_password(password, auth.password):
+                        user = Users.get_user_by_id(auth.id)
+                        return user
+                    else:
+                        return None
                 else:
                 else:
                     return None
                     return None
-            else:
-                return None
         except:
         except:
             return None
             return None
 
 
@@ -154,38 +158,47 @@ class AuthsTable:
     def authenticate_user_by_trusted_header(self, email: str) -> Optional[UserModel]:
     def authenticate_user_by_trusted_header(self, email: str) -> Optional[UserModel]:
         log.info(f"authenticate_user_by_trusted_header: {email}")
         log.info(f"authenticate_user_by_trusted_header: {email}")
         try:
         try:
-            auth = db.query(Auth).filter(email=email, active=True).first()
-            if auth:
-                user = Users.get_user_by_id(auth.id)
-                return user
+            with get_db() as db:
+                auth = db.query(Auth).filter(email=email, active=True).first()
+                if auth:
+                    user = Users.get_user_by_id(auth.id)
+                    return user
         except:
         except:
             return None
             return None
 
 
     def update_user_password_by_id(self, id: str, new_password: str) -> bool:
     def update_user_password_by_id(self, id: str, new_password: str) -> bool:
         try:
         try:
-            result = db.query(Auth).filter_by(id=id).update({"password": new_password})
-            return True if result == 1 else False
+            with get_db() as db:
+
+                result = (
+                    db.query(Auth).filter_by(id=id).update({"password": new_password})
+                )
+                return True if result == 1 else False
         except:
         except:
             return False
             return False
 
 
     def update_email_by_id(self, id: str, email: str) -> bool:
     def update_email_by_id(self, id: str, email: str) -> bool:
         try:
         try:
-            result = db.query(Auth).filter_by(id=id).update({"email": email})
-            return True if result == 1 else False
+            with get_db() as db:
+
+                result = db.query(Auth).filter_by(id=id).update({"email": email})
+                return True if result == 1 else False
         except:
         except:
             return False
             return False
 
 
     def delete_auth_by_id(self, id: str) -> bool:
     def delete_auth_by_id(self, id: str) -> bool:
         try:
         try:
-            # Delete User
-            result = Users.delete_user_by_id(id)
+            with get_db() as db:
 
 
-            if result:
-                db.query(Auth).filter_by(id=id).delete()
+                # Delete User
+                result = Users.delete_user_by_id(id)
 
 
-                return True
-            else:
-                return False
+                if result:
+                    db.query(Auth).filter_by(id=id).delete()
+
+                    return True
+                else:
+                    return False
         except:
         except:
             return False
             return False