123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- from typing import Optional
- from open_webui.models.prompts import (
- PromptForm,
- PromptUserResponse,
- PromptModel,
- Prompts,
- )
- from open_webui.constants import ERROR_MESSAGES
- from fastapi import APIRouter, Depends, HTTPException, status, Request
- from open_webui.utils.auth import get_admin_user, get_verified_user
- from open_webui.utils.access_control import has_access, has_permission
- router = APIRouter()
- ############################
- # GetPrompts
- ############################
- @router.get("/", response_model=list[PromptModel])
- async def get_prompts(user=Depends(get_verified_user)):
- if user.role == "admin":
- prompts = Prompts.get_prompts()
- else:
- prompts = Prompts.get_prompts_by_user_id(user.id, "read")
- return prompts
- @router.get("/list", response_model=list[PromptUserResponse])
- async def get_prompt_list(user=Depends(get_verified_user)):
- if user.role == "admin":
- prompts = Prompts.get_prompts()
- else:
- prompts = Prompts.get_prompts_by_user_id(user.id, "write")
- return prompts
- ############################
- # CreateNewPrompt
- ############################
- @router.post("/create", response_model=Optional[PromptModel])
- async def create_new_prompt(
- request: Request, form_data: PromptForm, user=Depends(get_verified_user)
- ):
- if user.role != "admin" and not has_permission(
- user.id, "workspace.prompts", request.app.state.config.USER_PERMISSIONS
- ):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=ERROR_MESSAGES.UNAUTHORIZED,
- )
- prompt = Prompts.get_prompt_by_command(form_data.command)
- if prompt is None:
- prompt = Prompts.insert_new_prompt(user.id, form_data)
- if prompt:
- return prompt
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.DEFAULT(),
- )
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.COMMAND_TAKEN,
- )
- ############################
- # GetPromptByCommand
- ############################
- @router.get("/command/{command}", response_model=Optional[PromptModel])
- async def get_prompt_by_command(command: str, user=Depends(get_verified_user)):
- prompt = Prompts.get_prompt_by_command(f"/{command}")
- if prompt:
- if (
- user.role == "admin"
- or prompt.user_id == user.id
- or has_access(user.id, "read", prompt.access_control)
- ):
- return prompt
- else:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- ############################
- # UpdatePromptByCommand
- ############################
- @router.post("/command/{command}/update", response_model=Optional[PromptModel])
- async def update_prompt_by_command(
- command: str,
- form_data: PromptForm,
- user=Depends(get_verified_user),
- ):
- prompt = Prompts.get_prompt_by_command(f"/{command}")
- if not prompt:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- # Is the user the original creator, in a group with write access, or an admin
- if (
- prompt.user_id != user.id
- and not has_access(user.id, "write", prompt.access_control)
- and user.role != "admin"
- ):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
- )
- prompt = Prompts.update_prompt_by_command(f"/{command}", form_data)
- if prompt:
- return prompt
- else:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
- )
- ############################
- # DeletePromptByCommand
- ############################
- @router.delete("/command/{command}/delete", response_model=bool)
- async def delete_prompt_by_command(command: str, user=Depends(get_verified_user)):
- prompt = Prompts.get_prompt_by_command(f"/{command}")
- if not prompt:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- if (
- prompt.user_id != user.id
- and not has_access(user.id, "write", prompt.access_control)
- and user.role != "admin"
- ):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
- )
- result = Prompts.delete_prompt_by_command(f"/{command}")
- return result
|