instrumentors.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import logging
  2. import traceback
  3. from typing import Collection
  4. from chromadb.telemetry.opentelemetry.fastapi import instrument_fastapi
  5. from opentelemetry.instrumentation.httpx import (
  6. HTTPXClientInstrumentor,
  7. RequestInfo,
  8. ResponseInfo,
  9. )
  10. from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
  11. from opentelemetry.instrumentation.logging import LoggingInstrumentor
  12. from opentelemetry.instrumentation.redis import RedisInstrumentor
  13. from opentelemetry.instrumentation.requests import RequestsInstrumentor
  14. from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
  15. from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
  16. from opentelemetry.trace import Span, StatusCode
  17. from redis import Redis
  18. from requests import PreparedRequest, Response
  19. from open_webui.utils.trace.constants import SPAN_REDIS_TYPE, SpanAttributes
  20. from open_webui.env import SRC_LOG_LEVELS
  21. logger = logging.getLogger(__name__)
  22. logger.setLevel(SRC_LOG_LEVELS["MAIN"])
  23. def requests_hook(span: Span, request: PreparedRequest):
  24. """
  25. Http Request Hook
  26. """
  27. span.update_name(f"{request.method} {request.url}")
  28. span.set_attributes(
  29. attributes={
  30. SpanAttributes.HTTP_URL: request.url,
  31. SpanAttributes.HTTP_METHOD: request.method,
  32. }
  33. )
  34. def response_hook(span: Span, request: PreparedRequest, response: Response):
  35. """
  36. HTTP Response Hook
  37. """
  38. span.set_attributes(
  39. attributes={
  40. SpanAttributes.HTTP_STATUS_CODE: response.status_code,
  41. }
  42. )
  43. span.set_status(StatusCode.ERROR if response.status_code >= 400 else StatusCode.OK)
  44. def redis_request_hook(span: Span, instance: Redis, args, kwargs):
  45. """
  46. Redis Request Hook
  47. """
  48. try:
  49. connection_kwargs: dict = instance.connection_pool.connection_kwargs
  50. host = connection_kwargs.get("host")
  51. port = connection_kwargs.get("port")
  52. db = connection_kwargs.get("db")
  53. span.set_attributes(
  54. {
  55. SpanAttributes.DB_INSTANCE: f"{host}/{db}",
  56. SpanAttributes.DB_NAME: f"{host}/{db}",
  57. SpanAttributes.DB_TYPE: SPAN_REDIS_TYPE,
  58. SpanAttributes.DB_PORT: port,
  59. SpanAttributes.DB_IP: host,
  60. SpanAttributes.DB_STATEMENT: " ".join([str(i) for i in args]),
  61. SpanAttributes.DB_OPERATION: str(args[0]),
  62. }
  63. )
  64. except Exception: # pylint: disable=W0718
  65. logger.error(traceback.format_exc())
  66. def httpx_request_hook(span: Span, request: RequestInfo):
  67. """
  68. HTTPX Request Hook
  69. """
  70. span.update_name(f"{request.method.decode()} {str(request.url)}")
  71. span.set_attributes(
  72. attributes={
  73. SpanAttributes.HTTP_URL: str(request.url),
  74. SpanAttributes.HTTP_METHOD: request.method.decode(),
  75. }
  76. )
  77. def httpx_response_hook(span: Span, request: RequestInfo, response: ResponseInfo):
  78. """
  79. HTTPX Response Hook
  80. """
  81. span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, response.status_code)
  82. span.set_status(
  83. StatusCode.ERROR
  84. if response.status_code >= status.HTTP_400_BAD_REQUEST
  85. else StatusCode.OK
  86. )
  87. async def httpx_async_request_hook(span, request):
  88. """
  89. Async Request Hook
  90. """
  91. httpx_request_hook(span, request)
  92. async def httpx_async_response_hook(span, request, response):
  93. """
  94. Async Response Hook
  95. """
  96. httpx_response_hook(span, request, response)
  97. class Instrumentor(BaseInstrumentor):
  98. """
  99. Instrument OT
  100. """
  101. def __init__(self, app):
  102. self.app = app
  103. def instrumentation_dependencies(self) -> Collection[str]:
  104. return []
  105. def _instrument(self, **kwargs):
  106. instrument_fastapi(app=self.app)
  107. SQLAlchemyInstrumentor().instrument()
  108. RedisInstrumentor().instrument(request_hook=redis_request_hook)
  109. RequestsInstrumentor().instrument(
  110. request_hook=requests_hook, response_hook=response_hook
  111. )
  112. LoggingInstrumentor().instrument()
  113. HTTPXClientInstrumentor().instrument(
  114. request_hook=httpx_request_hook,
  115. response_hook=httpx_response_hook,
  116. async_request_hook=httpx_async_request_hook,
  117. async_response_hook=httpx_async_response_hook,
  118. )
  119. AioHttpClientInstrumentor().instrument()
  120. def _uninstrument(self, **kwargs):
  121. if getattr(self, "instrumentors", None) is None:
  122. return
  123. for instrumentor in self.instrumentors:
  124. instrumentor.uninstrument()