123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- import json
- import logging
- from typing import Optional
- from fastapi import APIRouter, Depends, HTTPException, Request, status, BackgroundTasks
- from pydantic import BaseModel
- from open_webui.socket.main import sio, SESSION_POOL
- from open_webui.models.users import Users, UserNameResponse
- from open_webui.models.channels import Channels, ChannelModel, ChannelForm
- from open_webui.models.messages import Messages, MessageModel, MessageForm
- from open_webui.config import ENABLE_ADMIN_CHAT_ACCESS, ENABLE_ADMIN_EXPORT
- from open_webui.constants import ERROR_MESSAGES
- from open_webui.env import SRC_LOG_LEVELS, WEBUI_URL
- from open_webui.utils.auth import get_admin_user, get_verified_user
- from open_webui.utils.access_control import has_access, get_users_with_access
- from open_webui.utils.webhook import post_webhook
- log = logging.getLogger(__name__)
- log.setLevel(SRC_LOG_LEVELS["MODELS"])
- router = APIRouter()
- ############################
- # GetChatList
- ############################
- @router.get("/", response_model=list[ChannelModel])
- async def get_channels(user=Depends(get_verified_user)):
- if user.role == "admin":
- return Channels.get_channels()
- else:
- return Channels.get_channels_by_user_id(user.id)
- ############################
- # CreateNewChannel
- ############################
- @router.post("/create", response_model=Optional[ChannelModel])
- async def create_new_channel(form_data: ChannelForm, user=Depends(get_admin_user)):
- try:
- channel = Channels.insert_new_channel(form_data, user.id)
- return ChannelModel(**channel.model_dump())
- except Exception as e:
- log.exception(e)
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
- )
- ############################
- # GetChannelById
- ############################
- @router.get("/{id}", response_model=Optional[ChannelModel])
- async def get_channel_by_id(id: str, user=Depends(get_verified_user)):
- channel = Channels.get_channel_by_id(id)
- if not channel:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
- )
- if user.role != "admin" and not has_access(
- user.id, type="read", access_control=channel.access_control
- ):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
- )
- return ChannelModel(**channel.model_dump())
- ############################
- # UpdateChannelById
- ############################
- @router.post("/{id}/update", response_model=Optional[ChannelModel])
- async def update_channel_by_id(
- id: str, form_data: ChannelForm, user=Depends(get_admin_user)
- ):
- channel = Channels.get_channel_by_id(id)
- if not channel:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
- )
- try:
- channel = Channels.update_channel_by_id(id, form_data)
- return ChannelModel(**channel.model_dump())
- except Exception as e:
- log.exception(e)
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
- )
- ############################
- # DeleteChannelById
- ############################
- @router.delete("/{id}/delete", response_model=bool)
- async def delete_channel_by_id(id: str, user=Depends(get_admin_user)):
- channel = Channels.get_channel_by_id(id)
- if not channel:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
- )
- try:
- Channels.delete_channel_by_id(id)
- return True
- except Exception as e:
- log.exception(e)
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
- )
- ############################
- # GetChannelMessages
- ############################
- class MessageUserModel(MessageModel):
- user: UserNameResponse
- @router.get("/{id}/messages", response_model=list[MessageUserModel])
- async def get_channel_messages(
- id: str, skip: int = 0, limit: int = 50, user=Depends(get_verified_user)
- ):
- channel = Channels.get_channel_by_id(id)
- if not channel:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
- )
- if user.role != "admin" and not has_access(
- user.id, type="read", access_control=channel.access_control
- ):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
- )
- message_list = Messages.get_messages_by_channel_id(id, skip, limit)
- users = {}
- messages = []
- for message in message_list:
- if message.user_id not in users:
- user = Users.get_user_by_id(message.user_id)
- users[message.user_id] = user
- messages.append(
- MessageUserModel(
- **{
- **message.model_dump(),
- "user": UserNameResponse(**users[message.user_id].model_dump()),
- }
- )
- )
- return messages
- ############################
- # PostNewMessage
- ############################
- async def send_notification(channel, message, active_user_ids):
- print(f"Sending notification to {channel=}, {message=}, {active_user_ids=}")
- users = get_users_with_access("read", channel.access_control)
- for user in users:
- if user.id in active_user_ids:
- continue
- else:
- if user.settings:
- webhook_url = user.settings.ui.get("notifications", {}).get(
- "webhook_url", None
- )
- if webhook_url:
- post_webhook(
- webhook_url,
- f"#{channel.name} - {WEBUI_URL}/c/{channel.id}\n\n{message.content}",
- {
- "action": "channel",
- "message": message.content,
- "title": channel.name,
- "url": f"{WEBUI_URL}/c/{channel.id}",
- },
- )
- @router.post("/{id}/messages/post", response_model=Optional[MessageModel])
- async def post_new_message(
- id: str,
- form_data: MessageForm,
- background_tasks: BackgroundTasks,
- user=Depends(get_verified_user),
- ):
- channel = Channels.get_channel_by_id(id)
- if not channel:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
- )
- if user.role != "admin" and not has_access(
- user.id, type="read", access_control=channel.access_control
- ):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
- )
- try:
- message = Messages.insert_new_message(form_data, channel.id, user.id)
- if message:
- event_data = {
- "channel_id": channel.id,
- "message_id": message.id,
- "data": {
- "type": "message",
- "data": {
- **message.model_dump(),
- "user": UserNameResponse(**user.model_dump()).model_dump(),
- },
- },
- "user": UserNameResponse(**user.model_dump()).model_dump(),
- "channel": channel.model_dump(),
- }
- await sio.emit(
- "channel-events",
- event_data,
- to=f"channel:{channel.id}",
- )
- active_session_ids = sio.manager.get_participants(
- namespace="/",
- room=f"channel:{channel.id}",
- )
- active_user_ids = list(
- set(
- [
- SESSION_POOL.get(session_id[0])
- for session_id in active_session_ids
- ]
- )
- )
- background_tasks.add_task(
- send_notification, channel, message, active_user_ids
- )
- return MessageModel(**message.model_dump())
- except Exception as e:
- log.exception(e)
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
- )
- ############################
- # UpdateMessageById
- ############################
- @router.post(
- "/{id}/messages/{message_id}/update", response_model=Optional[MessageModel]
- )
- async def update_message_by_id(
- id: str, message_id: str, form_data: MessageForm, user=Depends(get_verified_user)
- ):
- channel = Channels.get_channel_by_id(id)
- if not channel:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
- )
- if user.role != "admin" and not has_access(
- user.id, type="read", access_control=channel.access_control
- ):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
- )
- message = Messages.get_message_by_id(message_id)
- if not message:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
- )
- if message.channel_id != id:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
- )
- try:
- message = Messages.update_message_by_id(message_id, form_data)
- if message:
- await sio.emit(
- "channel-events",
- {
- "channel_id": channel.id,
- "message_id": message.id,
- "data": {
- "type": "message:update",
- "data": {
- **message.model_dump(),
- "user": UserNameResponse(**user.model_dump()).model_dump(),
- },
- },
- "user": UserNameResponse(**user.model_dump()).model_dump(),
- "channel": channel.model_dump(),
- },
- to=f"channel:{channel.id}",
- )
- return MessageModel(**message.model_dump())
- except Exception as e:
- log.exception(e)
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
- )
- ############################
- # DeleteMessageById
- ############################
- @router.delete("/{id}/messages/{message_id}/delete", response_model=bool)
- async def delete_message_by_id(
- id: str, message_id: str, user=Depends(get_verified_user)
- ):
- channel = Channels.get_channel_by_id(id)
- if not channel:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
- )
- if user.role != "admin" and not has_access(
- user.id, type="read", access_control=channel.access_control
- ):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
- )
- message = Messages.get_message_by_id(message_id)
- if not message:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
- )
- if message.channel_id != id:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
- )
- try:
- Messages.delete_message_by_id(message_id)
- await sio.emit(
- "channel-events",
- {
- "channel_id": channel.id,
- "message_id": message.id,
- "data": {
- "type": "message:delete",
- "data": {
- **message.model_dump(),
- "user": UserNameResponse(**user.model_dump()).model_dump(),
- },
- },
- "user": UserNameResponse(**user.model_dump()).model_dump(),
- "channel": channel.model_dump(),
- },
- to=f"channel:{channel.id}",
- )
- return True
- except Exception as e:
- log.exception(e)
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
- )
|