webhook.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import json
  2. import logging
  3. import requests
  4. from open_webui.config import WEBUI_FAVICON_URL, WEBUI_NAME
  5. from open_webui.env import SRC_LOG_LEVELS, VERSION
  6. log = logging.getLogger(__name__)
  7. log.setLevel(SRC_LOG_LEVELS["WEBHOOK"])
  8. def post_webhook(url: str, message: str, event_data: dict) -> bool:
  9. try:
  10. log.debug(f"post_webhook: {url}, {message}, {event_data}")
  11. payload = {}
  12. # Slack and Google Chat Webhooks
  13. if "https://hooks.slack.com" in url or "https://chat.googleapis.com" in url:
  14. payload["text"] = message
  15. # Discord Webhooks
  16. elif "https://discord.com/api/webhooks" in url:
  17. payload["content"] = message
  18. # Microsoft Teams Webhooks
  19. elif "webhook.office.com" in url:
  20. action = event_data.get("action", "undefined")
  21. facts = [
  22. {"name": name, "value": value}
  23. for name, value in json.loads(event_data.get("user", {})).items()
  24. ]
  25. payload = {
  26. "@type": "MessageCard",
  27. "@context": "http://schema.org/extensions",
  28. "themeColor": "0076D7",
  29. "summary": message,
  30. "sections": [
  31. {
  32. "activityTitle": message,
  33. "activitySubtitle": f"{WEBUI_NAME} ({VERSION}) - {action}",
  34. "activityImage": WEBUI_FAVICON_URL,
  35. "facts": facts,
  36. "markdown": True,
  37. }
  38. ],
  39. }
  40. # Default Payload
  41. else:
  42. payload = {**event_data}
  43. log.debug(f"payload: {payload}")
  44. r = requests.post(url, json=payload)
  45. r.raise_for_status()
  46. log.debug(f"r.text: {r.text}")
  47. return True
  48. except Exception as e:
  49. log.exception(e)
  50. return False