webhook.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import json
  2. import logging
  3. import requests
  4. from open_webui.config import WEBUI_FAVICON_URL
  5. from open_webui.env import SRC_LOG_LEVELS, VERSION
  6. log = logging.getLogger(__name__)
  7. log.setLevel(SRC_LOG_LEVELS["WEBHOOK"])
  8. def post_webhook(name: str, url: str, message: str, event_data: dict) -> bool:
  9. try:
  10. log.debug(f"post_webhook: {url}, {message}, {event_data}")
  11. payload = {}
  12. # Slack and Google Chat Webhooks
  13. if "https://hooks.slack.com" in url or "https://chat.googleapis.com" in url:
  14. payload["text"] = message
  15. # Discord Webhooks
  16. elif "https://discord.com/api/webhooks" in url:
  17. payload["content"] = (
  18. message
  19. if len(message) < 2000
  20. else f"{message[: 2000 - 20]}... (truncated)"
  21. )
  22. # Microsoft Teams Webhooks
  23. elif "webhook.office.com" in url:
  24. action = event_data.get("action", "undefined")
  25. facts = [
  26. {"name": name, "value": value}
  27. for name, value in json.loads(event_data.get("user", {})).items()
  28. ]
  29. payload = {
  30. "@type": "MessageCard",
  31. "@context": "http://schema.org/extensions",
  32. "themeColor": "0076D7",
  33. "summary": message,
  34. "sections": [
  35. {
  36. "activityTitle": message,
  37. "activitySubtitle": f"{name} ({VERSION}) - {action}",
  38. "activityImage": WEBUI_FAVICON_URL,
  39. "facts": facts,
  40. "markdown": True,
  41. }
  42. ],
  43. }
  44. # Default Payload
  45. else:
  46. payload = {**event_data}
  47. log.debug(f"payload: {payload}")
  48. r = requests.post(url, json=payload)
  49. r.raise_for_status()
  50. log.debug(f"r.text: {r.text}")
  51. return True
  52. except Exception as e:
  53. log.exception(e)
  54. return False