pipelines.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. from fastapi import APIRouter, Depends, HTTPException, Response, status
  2. from pydantic import BaseModel
  3. from starlette.responses import FileResponse
  4. from open_webui.models.chats import ChatTitleMessagesForm
  5. from open_webui.config import DATA_DIR, ENABLE_ADMIN_EXPORT
  6. from open_webui.constants import ERROR_MESSAGES
  7. from open_webui.utils.misc import get_gravatar_url
  8. from open_webui.utils.pdf_generator import PDFGenerator
  9. from open_webui.utils.auth import get_admin_user
  10. router = APIRouter()
  11. ##################################
  12. #
  13. # Pipelines Endpoints
  14. #
  15. ##################################
  16. # TODO: Refactor pipelines API endpoints below into a separate file
  17. @app.get("/api/pipelines/list")
  18. async def get_pipelines_list(user=Depends(get_admin_user)):
  19. responses = await get_openai_models_responses()
  20. log.debug(f"get_pipelines_list: get_openai_models_responses returned {responses}")
  21. urlIdxs = [
  22. idx
  23. for idx, response in enumerate(responses)
  24. if response is not None and "pipelines" in response
  25. ]
  26. return {
  27. "data": [
  28. {
  29. "url": openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx],
  30. "idx": urlIdx,
  31. }
  32. for urlIdx in urlIdxs
  33. ]
  34. }
  35. @app.post("/api/pipelines/upload")
  36. async def upload_pipeline(
  37. urlIdx: int = Form(...), file: UploadFile = File(...), user=Depends(get_admin_user)
  38. ):
  39. print("upload_pipeline", urlIdx, file.filename)
  40. # Check if the uploaded file is a python file
  41. if not (file.filename and file.filename.endswith(".py")):
  42. raise HTTPException(
  43. status_code=status.HTTP_400_BAD_REQUEST,
  44. detail="Only Python (.py) files are allowed.",
  45. )
  46. upload_folder = f"{CACHE_DIR}/pipelines"
  47. os.makedirs(upload_folder, exist_ok=True)
  48. file_path = os.path.join(upload_folder, file.filename)
  49. r = None
  50. try:
  51. # Save the uploaded file
  52. with open(file_path, "wb") as buffer:
  53. shutil.copyfileobj(file.file, buffer)
  54. url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx]
  55. key = openai_app.state.config.OPENAI_API_KEYS[urlIdx]
  56. headers = {"Authorization": f"Bearer {key}"}
  57. with open(file_path, "rb") as f:
  58. files = {"file": f}
  59. r = requests.post(f"{url}/pipelines/upload", headers=headers, files=files)
  60. r.raise_for_status()
  61. data = r.json()
  62. return {**data}
  63. except Exception as e:
  64. # Handle connection error here
  65. print(f"Connection error: {e}")
  66. detail = "Pipeline not found"
  67. status_code = status.HTTP_404_NOT_FOUND
  68. if r is not None:
  69. status_code = r.status_code
  70. try:
  71. res = r.json()
  72. if "detail" in res:
  73. detail = res["detail"]
  74. except Exception:
  75. pass
  76. raise HTTPException(
  77. status_code=status_code,
  78. detail=detail,
  79. )
  80. finally:
  81. # Ensure the file is deleted after the upload is completed or on failure
  82. if os.path.exists(file_path):
  83. os.remove(file_path)
  84. class AddPipelineForm(BaseModel):
  85. url: str
  86. urlIdx: int
  87. @app.post("/api/pipelines/add")
  88. async def add_pipeline(form_data: AddPipelineForm, user=Depends(get_admin_user)):
  89. r = None
  90. try:
  91. urlIdx = form_data.urlIdx
  92. url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx]
  93. key = openai_app.state.config.OPENAI_API_KEYS[urlIdx]
  94. headers = {"Authorization": f"Bearer {key}"}
  95. r = requests.post(
  96. f"{url}/pipelines/add", headers=headers, json={"url": form_data.url}
  97. )
  98. r.raise_for_status()
  99. data = r.json()
  100. return {**data}
  101. except Exception as e:
  102. # Handle connection error here
  103. print(f"Connection error: {e}")
  104. detail = "Pipeline not found"
  105. if r is not None:
  106. try:
  107. res = r.json()
  108. if "detail" in res:
  109. detail = res["detail"]
  110. except Exception:
  111. pass
  112. raise HTTPException(
  113. status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
  114. detail=detail,
  115. )
  116. class DeletePipelineForm(BaseModel):
  117. id: str
  118. urlIdx: int
  119. @app.delete("/api/pipelines/delete")
  120. async def delete_pipeline(form_data: DeletePipelineForm, user=Depends(get_admin_user)):
  121. r = None
  122. try:
  123. urlIdx = form_data.urlIdx
  124. url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx]
  125. key = openai_app.state.config.OPENAI_API_KEYS[urlIdx]
  126. headers = {"Authorization": f"Bearer {key}"}
  127. r = requests.delete(
  128. f"{url}/pipelines/delete", headers=headers, json={"id": form_data.id}
  129. )
  130. r.raise_for_status()
  131. data = r.json()
  132. return {**data}
  133. except Exception as e:
  134. # Handle connection error here
  135. print(f"Connection error: {e}")
  136. detail = "Pipeline not found"
  137. if r is not None:
  138. try:
  139. res = r.json()
  140. if "detail" in res:
  141. detail = res["detail"]
  142. except Exception:
  143. pass
  144. raise HTTPException(
  145. status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
  146. detail=detail,
  147. )
  148. @app.get("/api/pipelines")
  149. async def get_pipelines(urlIdx: Optional[int] = None, user=Depends(get_admin_user)):
  150. r = None
  151. try:
  152. url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx]
  153. key = openai_app.state.config.OPENAI_API_KEYS[urlIdx]
  154. headers = {"Authorization": f"Bearer {key}"}
  155. r = requests.get(f"{url}/pipelines", headers=headers)
  156. r.raise_for_status()
  157. data = r.json()
  158. return {**data}
  159. except Exception as e:
  160. # Handle connection error here
  161. print(f"Connection error: {e}")
  162. detail = "Pipeline not found"
  163. if r is not None:
  164. try:
  165. res = r.json()
  166. if "detail" in res:
  167. detail = res["detail"]
  168. except Exception:
  169. pass
  170. raise HTTPException(
  171. status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
  172. detail=detail,
  173. )
  174. @app.get("/api/pipelines/{pipeline_id}/valves")
  175. async def get_pipeline_valves(
  176. urlIdx: Optional[int],
  177. pipeline_id: str,
  178. user=Depends(get_admin_user),
  179. ):
  180. r = None
  181. try:
  182. url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx]
  183. key = openai_app.state.config.OPENAI_API_KEYS[urlIdx]
  184. headers = {"Authorization": f"Bearer {key}"}
  185. r = requests.get(f"{url}/{pipeline_id}/valves", headers=headers)
  186. r.raise_for_status()
  187. data = r.json()
  188. return {**data}
  189. except Exception as e:
  190. # Handle connection error here
  191. print(f"Connection error: {e}")
  192. detail = "Pipeline not found"
  193. if r is not None:
  194. try:
  195. res = r.json()
  196. if "detail" in res:
  197. detail = res["detail"]
  198. except Exception:
  199. pass
  200. raise HTTPException(
  201. status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
  202. detail=detail,
  203. )
  204. @app.get("/api/pipelines/{pipeline_id}/valves/spec")
  205. async def get_pipeline_valves_spec(
  206. urlIdx: Optional[int],
  207. pipeline_id: str,
  208. user=Depends(get_admin_user),
  209. ):
  210. r = None
  211. try:
  212. url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx]
  213. key = openai_app.state.config.OPENAI_API_KEYS[urlIdx]
  214. headers = {"Authorization": f"Bearer {key}"}
  215. r = requests.get(f"{url}/{pipeline_id}/valves/spec", headers=headers)
  216. r.raise_for_status()
  217. data = r.json()
  218. return {**data}
  219. except Exception as e:
  220. # Handle connection error here
  221. print(f"Connection error: {e}")
  222. detail = "Pipeline not found"
  223. if r is not None:
  224. try:
  225. res = r.json()
  226. if "detail" in res:
  227. detail = res["detail"]
  228. except Exception:
  229. pass
  230. raise HTTPException(
  231. status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
  232. detail=detail,
  233. )
  234. @app.post("/api/pipelines/{pipeline_id}/valves/update")
  235. async def update_pipeline_valves(
  236. urlIdx: Optional[int],
  237. pipeline_id: str,
  238. form_data: dict,
  239. user=Depends(get_admin_user),
  240. ):
  241. r = None
  242. try:
  243. url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx]
  244. key = openai_app.state.config.OPENAI_API_KEYS[urlIdx]
  245. headers = {"Authorization": f"Bearer {key}"}
  246. r = requests.post(
  247. f"{url}/{pipeline_id}/valves/update",
  248. headers=headers,
  249. json={**form_data},
  250. )
  251. r.raise_for_status()
  252. data = r.json()
  253. return {**data}
  254. except Exception as e:
  255. # Handle connection error here
  256. print(f"Connection error: {e}")
  257. detail = "Pipeline not found"
  258. if r is not None:
  259. try:
  260. res = r.json()
  261. if "detail" in res:
  262. detail = res["detail"]
  263. except Exception:
  264. pass
  265. raise HTTPException(
  266. status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
  267. detail=detail,
  268. )