middleware.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531
  1. import time
  2. import logging
  3. import sys
  4. from aiocache import cached
  5. from typing import Any, Optional
  6. import random
  7. import json
  8. import inspect
  9. from fastapi import Request
  10. from starlette.responses import Response, StreamingResponse
  11. from open_webui.socket.main import (
  12. get_event_call,
  13. get_event_emitter,
  14. )
  15. from open_webui.routers.tasks import generate_queries
  16. from open_webui.models.users import UserModel
  17. from open_webui.models.functions import Functions
  18. from open_webui.models.models import Models
  19. from open_webui.retrieval.utils import get_sources_from_files
  20. from open_webui.utils.chat import generate_chat_completion
  21. from open_webui.utils.task import (
  22. get_task_model_id,
  23. rag_template,
  24. tools_function_calling_generation_template,
  25. )
  26. from open_webui.utils.misc import (
  27. add_or_update_system_message,
  28. get_last_user_message,
  29. prepend_to_first_user_message_content,
  30. )
  31. from open_webui.utils.tools import get_tools
  32. from open_webui.utils.plugin import load_function_module_by_id
  33. from open_webui.config import DEFAULT_TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE
  34. from open_webui.env import SRC_LOG_LEVELS, GLOBAL_LOG_LEVEL, BYPASS_MODEL_ACCESS_CONTROL
  35. from open_webui.constants import TASKS
  36. logging.basicConfig(stream=sys.stdout, level=GLOBAL_LOG_LEVEL)
  37. log = logging.getLogger(__name__)
  38. log.setLevel(SRC_LOG_LEVELS["MAIN"])
  39. async def chat_completion_filter_functions_handler(request, body, model, extra_params):
  40. skip_files = None
  41. def get_filter_function_ids(model):
  42. def get_priority(function_id):
  43. function = Functions.get_function_by_id(function_id)
  44. if function is not None and hasattr(function, "valves"):
  45. # TODO: Fix FunctionModel
  46. return (function.valves if function.valves else {}).get("priority", 0)
  47. return 0
  48. filter_ids = [
  49. function.id for function in Functions.get_global_filter_functions()
  50. ]
  51. if "info" in model and "meta" in model["info"]:
  52. filter_ids.extend(model["info"]["meta"].get("filterIds", []))
  53. filter_ids = list(set(filter_ids))
  54. enabled_filter_ids = [
  55. function.id
  56. for function in Functions.get_functions_by_type("filter", active_only=True)
  57. ]
  58. filter_ids = [
  59. filter_id for filter_id in filter_ids if filter_id in enabled_filter_ids
  60. ]
  61. filter_ids.sort(key=get_priority)
  62. return filter_ids
  63. filter_ids = get_filter_function_ids(model)
  64. for filter_id in filter_ids:
  65. filter = Functions.get_function_by_id(filter_id)
  66. if not filter:
  67. continue
  68. if filter_id in request.app.state.FUNCTIONS:
  69. function_module = request.app.state.FUNCTIONS[filter_id]
  70. else:
  71. function_module, _, _ = load_function_module_by_id(filter_id)
  72. request.app.state.FUNCTIONS[filter_id] = function_module
  73. # Check if the function has a file_handler variable
  74. if hasattr(function_module, "file_handler"):
  75. skip_files = function_module.file_handler
  76. # Apply valves to the function
  77. if hasattr(function_module, "valves") and hasattr(function_module, "Valves"):
  78. valves = Functions.get_function_valves_by_id(filter_id)
  79. function_module.valves = function_module.Valves(
  80. **(valves if valves else {})
  81. )
  82. if hasattr(function_module, "inlet"):
  83. try:
  84. inlet = function_module.inlet
  85. # Create a dictionary of parameters to be passed to the function
  86. params = {"body": body} | {
  87. k: v
  88. for k, v in {
  89. **extra_params,
  90. "__model__": model,
  91. "__id__": filter_id,
  92. }.items()
  93. if k in inspect.signature(inlet).parameters
  94. }
  95. if "__user__" in params and hasattr(function_module, "UserValves"):
  96. try:
  97. params["__user__"]["valves"] = function_module.UserValves(
  98. **Functions.get_user_valves_by_id_and_user_id(
  99. filter_id, params["__user__"]["id"]
  100. )
  101. )
  102. except Exception as e:
  103. print(e)
  104. if inspect.iscoroutinefunction(inlet):
  105. body = await inlet(**params)
  106. else:
  107. body = inlet(**params)
  108. except Exception as e:
  109. print(f"Error: {e}")
  110. raise e
  111. if skip_files and "files" in body.get("metadata", {}):
  112. del body["metadata"]["files"]
  113. return body, {}
  114. async def chat_completion_tools_handler(
  115. request: Request, body: dict, user: UserModel, models, extra_params: dict
  116. ) -> tuple[dict, dict]:
  117. async def get_content_from_response(response) -> Optional[str]:
  118. content = None
  119. if hasattr(response, "body_iterator"):
  120. async for chunk in response.body_iterator:
  121. data = json.loads(chunk.decode("utf-8"))
  122. content = data["choices"][0]["message"]["content"]
  123. # Cleanup any remaining background tasks if necessary
  124. if response.background is not None:
  125. await response.background()
  126. else:
  127. content = response["choices"][0]["message"]["content"]
  128. return content
  129. def get_tools_function_calling_payload(messages, task_model_id, content):
  130. user_message = get_last_user_message(messages)
  131. history = "\n".join(
  132. f"{message['role'].upper()}: \"\"\"{message['content']}\"\"\""
  133. for message in messages[::-1][:4]
  134. )
  135. prompt = f"History:\n{history}\nQuery: {user_message}"
  136. return {
  137. "model": task_model_id,
  138. "messages": [
  139. {"role": "system", "content": content},
  140. {"role": "user", "content": f"Query: {prompt}"},
  141. ],
  142. "stream": False,
  143. "metadata": {"task": str(TASKS.FUNCTION_CALLING)},
  144. }
  145. # If tool_ids field is present, call the functions
  146. metadata = body.get("metadata", {})
  147. tool_ids = metadata.get("tool_ids", None)
  148. log.debug(f"{tool_ids=}")
  149. if not tool_ids:
  150. return body, {}
  151. skip_files = False
  152. sources = []
  153. task_model_id = get_task_model_id(
  154. body["model"],
  155. request.app.state.config.TASK_MODEL,
  156. request.app.state.config.TASK_MODEL_EXTERNAL,
  157. models,
  158. )
  159. tools = get_tools(
  160. request,
  161. tool_ids,
  162. user,
  163. {
  164. **extra_params,
  165. "__model__": models[task_model_id],
  166. "__messages__": body["messages"],
  167. "__files__": metadata.get("files", []),
  168. },
  169. )
  170. log.info(f"{tools=}")
  171. specs = [tool["spec"] for tool in tools.values()]
  172. tools_specs = json.dumps(specs)
  173. if request.app.state.config.TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE != "":
  174. template = request.app.state.config.TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE
  175. else:
  176. template = DEFAULT_TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE
  177. tools_function_calling_prompt = tools_function_calling_generation_template(
  178. template, tools_specs
  179. )
  180. log.info(f"{tools_function_calling_prompt=}")
  181. payload = get_tools_function_calling_payload(
  182. body["messages"], task_model_id, tools_function_calling_prompt
  183. )
  184. try:
  185. response = await generate_chat_completion(request, form_data=payload, user=user)
  186. log.debug(f"{response=}")
  187. content = await get_content_from_response(response)
  188. log.debug(f"{content=}")
  189. if not content:
  190. return body, {}
  191. try:
  192. content = content[content.find("{") : content.rfind("}") + 1]
  193. if not content:
  194. raise Exception("No JSON object found in the response")
  195. result = json.loads(content)
  196. tool_function_name = result.get("name", None)
  197. if tool_function_name not in tools:
  198. return body, {}
  199. tool_function_params = result.get("parameters", {})
  200. try:
  201. required_params = (
  202. tools[tool_function_name]
  203. .get("spec", {})
  204. .get("parameters", {})
  205. .get("required", [])
  206. )
  207. tool_function = tools[tool_function_name]["callable"]
  208. tool_function_params = {
  209. k: v
  210. for k, v in tool_function_params.items()
  211. if k in required_params
  212. }
  213. tool_output = await tool_function(**tool_function_params)
  214. except Exception as e:
  215. tool_output = str(e)
  216. if isinstance(tool_output, str):
  217. if tools[tool_function_name]["citation"]:
  218. sources.append(
  219. {
  220. "source": {
  221. "name": f"TOOL:{tools[tool_function_name]['toolkit_id']}/{tool_function_name}"
  222. },
  223. "document": [tool_output],
  224. "metadata": [
  225. {
  226. "source": f"TOOL:{tools[tool_function_name]['toolkit_id']}/{tool_function_name}"
  227. }
  228. ],
  229. }
  230. )
  231. else:
  232. sources.append(
  233. {
  234. "source": {},
  235. "document": [tool_output],
  236. "metadata": [
  237. {
  238. "source": f"TOOL:{tools[tool_function_name]['toolkit_id']}/{tool_function_name}"
  239. }
  240. ],
  241. }
  242. )
  243. if tools[tool_function_name]["file_handler"]:
  244. skip_files = True
  245. except Exception as e:
  246. log.exception(f"Error: {e}")
  247. content = None
  248. except Exception as e:
  249. log.exception(f"Error: {e}")
  250. content = None
  251. log.debug(f"tool_contexts: {sources}")
  252. if skip_files and "files" in body.get("metadata", {}):
  253. del body["metadata"]["files"]
  254. return body, {"sources": sources}
  255. async def chat_completion_files_handler(
  256. request: Request, body: dict, user: UserModel
  257. ) -> tuple[dict, dict[str, list]]:
  258. sources = []
  259. if files := body.get("metadata", {}).get("files", None):
  260. try:
  261. queries_response = await generate_queries(
  262. {
  263. "model": body["model"],
  264. "messages": body["messages"],
  265. "type": "retrieval",
  266. },
  267. user,
  268. )
  269. queries_response = queries_response["choices"][0]["message"]["content"]
  270. try:
  271. bracket_start = queries_response.find("{")
  272. bracket_end = queries_response.rfind("}") + 1
  273. if bracket_start == -1 or bracket_end == -1:
  274. raise Exception("No JSON object found in the response")
  275. queries_response = queries_response[bracket_start:bracket_end]
  276. queries_response = json.loads(queries_response)
  277. except Exception as e:
  278. queries_response = {"queries": [queries_response]}
  279. queries = queries_response.get("queries", [])
  280. except Exception as e:
  281. queries = []
  282. if len(queries) == 0:
  283. queries = [get_last_user_message(body["messages"])]
  284. sources = get_sources_from_files(
  285. files=files,
  286. queries=queries,
  287. embedding_function=request.app.state.EMBEDDING_FUNCTION,
  288. k=request.app.state.config.TOP_K,
  289. reranking_function=request.app.state.rf,
  290. r=request.app.state.config.RELEVANCE_THRESHOLD,
  291. hybrid_search=request.app.state.config.ENABLE_RAG_HYBRID_SEARCH,
  292. )
  293. log.debug(f"rag_contexts:sources: {sources}")
  294. return body, {"sources": sources}
  295. def apply_params_to_form_data(form_data, model):
  296. params = form_data.pop("params", {})
  297. if model.get("ollama"):
  298. form_data["options"] = params
  299. if "format" in params:
  300. form_data["format"] = params["format"]
  301. if "keep_alive" in params:
  302. form_data["keep_alive"] = params["keep_alive"]
  303. else:
  304. if "seed" in params:
  305. form_data["seed"] = params["seed"]
  306. if "stop" in params:
  307. form_data["stop"] = params["stop"]
  308. if "temperature" in params:
  309. form_data["temperature"] = params["temperature"]
  310. if "top_p" in params:
  311. form_data["top_p"] = params["top_p"]
  312. if "frequency_penalty" in params:
  313. form_data["frequency_penalty"] = params["frequency_penalty"]
  314. return form_data
  315. async def process_chat_payload(request, form_data, metadata, user, model):
  316. form_data = apply_params_to_form_data(form_data, model)
  317. log.debug(f"form_data: {form_data}")
  318. extra_params = {
  319. "__event_emitter__": get_event_emitter(metadata),
  320. "__event_call__": get_event_call(metadata),
  321. "__user__": {
  322. "id": user.id,
  323. "email": user.email,
  324. "name": user.name,
  325. "role": user.role,
  326. },
  327. "__metadata__": metadata,
  328. "__request__": request,
  329. }
  330. # Initialize events to store additional event to be sent to the client
  331. # Initialize contexts and citation
  332. models = request.app.state.MODELS
  333. events = []
  334. sources = []
  335. try:
  336. form_data, flags = await chat_completion_filter_functions_handler(
  337. request, form_data, model, extra_params
  338. )
  339. except Exception as e:
  340. return Exception(f"Error: {e}")
  341. tool_ids = form_data.pop("tool_ids", None)
  342. files = form_data.pop("files", None)
  343. metadata = {
  344. **metadata,
  345. "tool_ids": tool_ids,
  346. "files": files,
  347. }
  348. form_data["metadata"] = metadata
  349. try:
  350. form_data, flags = await chat_completion_tools_handler(
  351. request, form_data, user, models, extra_params
  352. )
  353. sources.extend(flags.get("sources", []))
  354. except Exception as e:
  355. log.exception(e)
  356. try:
  357. form_data, flags = await chat_completion_files_handler(request, form_data, user)
  358. sources.extend(flags.get("sources", []))
  359. except Exception as e:
  360. log.exception(e)
  361. # If context is not empty, insert it into the messages
  362. if len(sources) > 0:
  363. context_string = ""
  364. for source_idx, source in enumerate(sources):
  365. source_id = source.get("source", {}).get("name", "")
  366. if "document" in source:
  367. for doc_idx, doc_context in enumerate(source["document"]):
  368. metadata = source.get("metadata")
  369. doc_source_id = None
  370. if metadata:
  371. doc_source_id = metadata[doc_idx].get("source", source_id)
  372. if source_id:
  373. context_string += f"<source><source_id>{doc_source_id if doc_source_id is not None else source_id}</source_id><source_context>{doc_context}</source_context></source>\n"
  374. else:
  375. # If there is no source_id, then do not include the source_id tag
  376. context_string += f"<source><source_context>{doc_context}</source_context></source>\n"
  377. context_string = context_string.strip()
  378. prompt = get_last_user_message(form_data["messages"])
  379. if prompt is None:
  380. raise Exception("No user message found")
  381. if (
  382. request.app.state.config.RELEVANCE_THRESHOLD == 0
  383. and context_string.strip() == ""
  384. ):
  385. log.debug(
  386. f"With a 0 relevancy threshold for RAG, the context cannot be empty"
  387. )
  388. # Workaround for Ollama 2.0+ system prompt issue
  389. # TODO: replace with add_or_update_system_message
  390. if model["owned_by"] == "ollama":
  391. form_data["messages"] = prepend_to_first_user_message_content(
  392. rag_template(
  393. request.app.state.config.RAG_TEMPLATE, context_string, prompt
  394. ),
  395. form_data["messages"],
  396. )
  397. else:
  398. form_data["messages"] = add_or_update_system_message(
  399. rag_template(
  400. request.app.state.config.RAG_TEMPLATE, context_string, prompt
  401. ),
  402. form_data["messages"],
  403. )
  404. # If there are citations, add them to the data_items
  405. sources = [source for source in sources if source.get("source", {}).get("name", "")]
  406. if len(sources) > 0:
  407. events.append({"sources": sources})
  408. return form_data, events
  409. async def process_chat_response(response, events, metadata):
  410. if not isinstance(response, StreamingResponse):
  411. return response
  412. content_type = response.headers["Content-Type"]
  413. is_openai = "text/event-stream" in content_type
  414. is_ollama = "application/x-ndjson" in content_type
  415. if not is_openai and not is_ollama:
  416. return response
  417. async def stream_wrapper(original_generator, events):
  418. def wrap_item(item):
  419. return f"data: {item}\n\n" if is_openai else f"{item}\n"
  420. for event in events:
  421. yield wrap_item(json.dumps(event))
  422. async for data in original_generator:
  423. yield data
  424. return StreamingResponse(
  425. stream_wrapper(response.body_iterator, events),
  426. headers=dict(response.headers),
  427. )