instrumentors.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. import logging
  2. import traceback
  3. from typing import Collection, Union
  4. from aiohttp import (
  5. TraceRequestStartParams,
  6. TraceRequestEndParams,
  7. TraceRequestExceptionParams,
  8. )
  9. from chromadb.telemetry.opentelemetry.fastapi import instrument_fastapi
  10. from fastapi import FastAPI
  11. from opentelemetry.instrumentation.httpx import (
  12. HTTPXClientInstrumentor,
  13. RequestInfo,
  14. ResponseInfo,
  15. )
  16. from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
  17. from opentelemetry.instrumentation.logging import LoggingInstrumentor
  18. from opentelemetry.instrumentation.redis import RedisInstrumentor
  19. from opentelemetry.instrumentation.requests import RequestsInstrumentor
  20. from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
  21. from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
  22. from opentelemetry.trace import Span, StatusCode
  23. from redis import Redis
  24. from requests import PreparedRequest, Response
  25. from sqlalchemy import Engine
  26. from fastapi import status
  27. from open_webui.utils.trace.constants import SPAN_REDIS_TYPE, SpanAttributes
  28. from open_webui.env import SRC_LOG_LEVELS
  29. logger = logging.getLogger(__name__)
  30. logger.setLevel(SRC_LOG_LEVELS["MAIN"])
  31. def requests_hook(span: Span, request: PreparedRequest):
  32. """
  33. Http Request Hook
  34. """
  35. span.update_name(f"{request.method} {request.url}")
  36. span.set_attributes(
  37. attributes={
  38. SpanAttributes.HTTP_URL: request.url,
  39. SpanAttributes.HTTP_METHOD: request.method,
  40. }
  41. )
  42. def response_hook(span: Span, request: PreparedRequest, response: Response):
  43. """
  44. HTTP Response Hook
  45. """
  46. span.set_attributes(
  47. attributes={
  48. SpanAttributes.HTTP_STATUS_CODE: response.status_code,
  49. }
  50. )
  51. span.set_status(StatusCode.ERROR if response.status_code >= 400 else StatusCode.OK)
  52. def redis_request_hook(span: Span, instance: Redis, args, kwargs):
  53. """
  54. Redis Request Hook
  55. """
  56. try:
  57. connection_kwargs: dict = instance.connection_pool.connection_kwargs
  58. host = connection_kwargs.get("host")
  59. port = connection_kwargs.get("port")
  60. db = connection_kwargs.get("db")
  61. span.set_attributes(
  62. {
  63. SpanAttributes.DB_INSTANCE: f"{host}/{db}",
  64. SpanAttributes.DB_NAME: f"{host}/{db}",
  65. SpanAttributes.DB_TYPE: SPAN_REDIS_TYPE,
  66. SpanAttributes.DB_PORT: port,
  67. SpanAttributes.DB_IP: host,
  68. SpanAttributes.DB_STATEMENT: " ".join([str(i) for i in args]),
  69. SpanAttributes.DB_OPERATION: str(args[0]),
  70. }
  71. )
  72. except Exception: # pylint: disable=W0718
  73. logger.error(traceback.format_exc())
  74. def httpx_request_hook(span: Span, request: RequestInfo):
  75. """
  76. HTTPX Request Hook
  77. """
  78. span.update_name(f"{request.method.decode()} {str(request.url)}")
  79. span.set_attributes(
  80. attributes={
  81. SpanAttributes.HTTP_URL: str(request.url),
  82. SpanAttributes.HTTP_METHOD: request.method.decode(),
  83. }
  84. )
  85. def httpx_response_hook(span: Span, request: RequestInfo, response: ResponseInfo):
  86. """
  87. HTTPX Response Hook
  88. """
  89. span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, response.status_code)
  90. span.set_status(
  91. StatusCode.ERROR
  92. if response.status_code >= status.HTTP_400_BAD_REQUEST
  93. else StatusCode.OK
  94. )
  95. async def httpx_async_request_hook(span: Span, request: RequestInfo):
  96. """
  97. Async Request Hook
  98. """
  99. httpx_request_hook(span, request)
  100. async def httpx_async_response_hook(
  101. span: Span, request: RequestInfo, response: ResponseInfo
  102. ):
  103. """
  104. Async Response Hook
  105. """
  106. httpx_response_hook(span, request, response)
  107. def aiohttp_request_hook(span: Span, request: TraceRequestStartParams):
  108. """
  109. Aiohttp Request Hook
  110. """
  111. span.update_name(f"{request.method} {str(request.url)}")
  112. span.set_attributes(
  113. attributes={
  114. SpanAttributes.HTTP_URL: str(request.url),
  115. SpanAttributes.HTTP_METHOD: request.method,
  116. }
  117. )
  118. def aiohttp_response_hook(
  119. span: Span, response: Union[TraceRequestExceptionParams, TraceRequestEndParams]
  120. ):
  121. """
  122. Aiohttp Response Hook
  123. """
  124. if isinstance(response, TraceRequestEndParams):
  125. span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, response.response.status)
  126. span.set_status(
  127. StatusCode.ERROR
  128. if response.response.status >= status.HTTP_400_BAD_REQUEST
  129. else StatusCode.OK
  130. )
  131. elif isinstance(response, TraceRequestExceptionParams):
  132. span.set_status(StatusCode.ERROR)
  133. span.set_attribute(SpanAttributes.ERROR_MESSAGE, str(response.exception))
  134. class Instrumentor(BaseInstrumentor):
  135. """
  136. Instrument OT
  137. """
  138. def __init__(self, app: FastAPI, db_engine: Engine):
  139. self.app = app
  140. self.db_engine = db_engine
  141. def instrumentation_dependencies(self) -> Collection[str]:
  142. return []
  143. def _instrument(self, **kwargs):
  144. instrument_fastapi(app=self.app)
  145. SQLAlchemyInstrumentor().instrument(engine=self.db_engine)
  146. RedisInstrumentor().instrument(request_hook=redis_request_hook)
  147. RequestsInstrumentor().instrument(
  148. request_hook=requests_hook, response_hook=response_hook
  149. )
  150. LoggingInstrumentor().instrument()
  151. HTTPXClientInstrumentor().instrument(
  152. request_hook=httpx_request_hook,
  153. response_hook=httpx_response_hook,
  154. async_request_hook=httpx_async_request_hook,
  155. async_response_hook=httpx_async_response_hook,
  156. )
  157. AioHttpClientInstrumentor().instrument(
  158. request_hook=aiohttp_request_hook,
  159. response_hook=aiohttp_response_hook,
  160. )
  161. def _uninstrument(self, **kwargs):
  162. if getattr(self, "instrumentors", None) is None:
  163. return
  164. for instrumentor in self.instrumentors:
  165. instrumentor.uninstrument()