|
@@ -0,0 +1,202 @@
|
|
|
+import logging
|
|
|
+import traceback
|
|
|
+from typing import Collection, Union
|
|
|
+
|
|
|
+from aiohttp import (
|
|
|
+ TraceRequestStartParams,
|
|
|
+ TraceRequestEndParams,
|
|
|
+ TraceRequestExceptionParams,
|
|
|
+)
|
|
|
+from chromadb.telemetry.opentelemetry.fastapi import instrument_fastapi
|
|
|
+from fastapi import FastAPI
|
|
|
+from opentelemetry.instrumentation.httpx import (
|
|
|
+ HTTPXClientInstrumentor,
|
|
|
+ RequestInfo,
|
|
|
+ ResponseInfo,
|
|
|
+)
|
|
|
+from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
|
|
|
+from opentelemetry.instrumentation.logging import LoggingInstrumentor
|
|
|
+from opentelemetry.instrumentation.redis import RedisInstrumentor
|
|
|
+from opentelemetry.instrumentation.requests import RequestsInstrumentor
|
|
|
+from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
|
|
|
+from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
|
|
|
+from opentelemetry.trace import Span, StatusCode
|
|
|
+from redis import Redis
|
|
|
+from requests import PreparedRequest, Response
|
|
|
+from sqlalchemy import Engine
|
|
|
+from fastapi import status
|
|
|
+
|
|
|
+from open_webui.utils.telemetry.constants import SPAN_REDIS_TYPE, SpanAttributes
|
|
|
+
|
|
|
+from open_webui.env import SRC_LOG_LEVELS
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+logger.setLevel(SRC_LOG_LEVELS["MAIN"])
|
|
|
+
|
|
|
+
|
|
|
+def requests_hook(span: Span, request: PreparedRequest):
|
|
|
+ """
|
|
|
+ Http Request Hook
|
|
|
+ """
|
|
|
+
|
|
|
+ span.update_name(f"{request.method} {request.url}")
|
|
|
+ span.set_attributes(
|
|
|
+ attributes={
|
|
|
+ SpanAttributes.HTTP_URL: request.url,
|
|
|
+ SpanAttributes.HTTP_METHOD: request.method,
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+def response_hook(span: Span, request: PreparedRequest, response: Response):
|
|
|
+ """
|
|
|
+ HTTP Response Hook
|
|
|
+ """
|
|
|
+
|
|
|
+ span.set_attributes(
|
|
|
+ attributes={
|
|
|
+ SpanAttributes.HTTP_STATUS_CODE: response.status_code,
|
|
|
+ }
|
|
|
+ )
|
|
|
+ span.set_status(StatusCode.ERROR if response.status_code >= 400 else StatusCode.OK)
|
|
|
+
|
|
|
+
|
|
|
+def redis_request_hook(span: Span, instance: Redis, args, kwargs):
|
|
|
+ """
|
|
|
+ Redis Request Hook
|
|
|
+ """
|
|
|
+
|
|
|
+ try:
|
|
|
+ connection_kwargs: dict = instance.connection_pool.connection_kwargs
|
|
|
+ host = connection_kwargs.get("host")
|
|
|
+ port = connection_kwargs.get("port")
|
|
|
+ db = connection_kwargs.get("db")
|
|
|
+ span.set_attributes(
|
|
|
+ {
|
|
|
+ SpanAttributes.DB_INSTANCE: f"{host}/{db}",
|
|
|
+ SpanAttributes.DB_NAME: f"{host}/{db}",
|
|
|
+ SpanAttributes.DB_TYPE: SPAN_REDIS_TYPE,
|
|
|
+ SpanAttributes.DB_PORT: port,
|
|
|
+ SpanAttributes.DB_IP: host,
|
|
|
+ SpanAttributes.DB_STATEMENT: " ".join([str(i) for i in args]),
|
|
|
+ SpanAttributes.DB_OPERATION: str(args[0]),
|
|
|
+ }
|
|
|
+ )
|
|
|
+ except Exception: # pylint: disable=W0718
|
|
|
+ logger.error(traceback.format_exc())
|
|
|
+
|
|
|
+
|
|
|
+def httpx_request_hook(span: Span, request: RequestInfo):
|
|
|
+ """
|
|
|
+ HTTPX Request Hook
|
|
|
+ """
|
|
|
+
|
|
|
+ span.update_name(f"{request.method.decode()} {str(request.url)}")
|
|
|
+ span.set_attributes(
|
|
|
+ attributes={
|
|
|
+ SpanAttributes.HTTP_URL: str(request.url),
|
|
|
+ SpanAttributes.HTTP_METHOD: request.method.decode(),
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+def httpx_response_hook(span: Span, request: RequestInfo, response: ResponseInfo):
|
|
|
+ """
|
|
|
+ HTTPX Response Hook
|
|
|
+ """
|
|
|
+
|
|
|
+ span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, response.status_code)
|
|
|
+ span.set_status(
|
|
|
+ StatusCode.ERROR
|
|
|
+ if response.status_code >= status.HTTP_400_BAD_REQUEST
|
|
|
+ else StatusCode.OK
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+async def httpx_async_request_hook(span: Span, request: RequestInfo):
|
|
|
+ """
|
|
|
+ Async Request Hook
|
|
|
+ """
|
|
|
+
|
|
|
+ httpx_request_hook(span, request)
|
|
|
+
|
|
|
+
|
|
|
+async def httpx_async_response_hook(
|
|
|
+ span: Span, request: RequestInfo, response: ResponseInfo
|
|
|
+):
|
|
|
+ """
|
|
|
+ Async Response Hook
|
|
|
+ """
|
|
|
+
|
|
|
+ httpx_response_hook(span, request, response)
|
|
|
+
|
|
|
+
|
|
|
+def aiohttp_request_hook(span: Span, request: TraceRequestStartParams):
|
|
|
+ """
|
|
|
+ Aiohttp Request Hook
|
|
|
+ """
|
|
|
+
|
|
|
+ span.update_name(f"{request.method} {str(request.url)}")
|
|
|
+ span.set_attributes(
|
|
|
+ attributes={
|
|
|
+ SpanAttributes.HTTP_URL: str(request.url),
|
|
|
+ SpanAttributes.HTTP_METHOD: request.method,
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+def aiohttp_response_hook(
|
|
|
+ span: Span, response: Union[TraceRequestExceptionParams, TraceRequestEndParams]
|
|
|
+):
|
|
|
+ """
|
|
|
+ Aiohttp Response Hook
|
|
|
+ """
|
|
|
+
|
|
|
+ if isinstance(response, TraceRequestEndParams):
|
|
|
+ span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, response.response.status)
|
|
|
+ span.set_status(
|
|
|
+ StatusCode.ERROR
|
|
|
+ if response.response.status >= status.HTTP_400_BAD_REQUEST
|
|
|
+ else StatusCode.OK
|
|
|
+ )
|
|
|
+ elif isinstance(response, TraceRequestExceptionParams):
|
|
|
+ span.set_status(StatusCode.ERROR)
|
|
|
+ span.set_attribute(SpanAttributes.ERROR_MESSAGE, str(response.exception))
|
|
|
+
|
|
|
+
|
|
|
+class Instrumentor(BaseInstrumentor):
|
|
|
+ """
|
|
|
+ Instrument OT
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, app: FastAPI, db_engine: Engine):
|
|
|
+ self.app = app
|
|
|
+ self.db_engine = db_engine
|
|
|
+
|
|
|
+ def instrumentation_dependencies(self) -> Collection[str]:
|
|
|
+ return []
|
|
|
+
|
|
|
+ def _instrument(self, **kwargs):
|
|
|
+ instrument_fastapi(app=self.app)
|
|
|
+ SQLAlchemyInstrumentor().instrument(engine=self.db_engine)
|
|
|
+ RedisInstrumentor().instrument(request_hook=redis_request_hook)
|
|
|
+ RequestsInstrumentor().instrument(
|
|
|
+ request_hook=requests_hook, response_hook=response_hook
|
|
|
+ )
|
|
|
+ LoggingInstrumentor().instrument()
|
|
|
+ HTTPXClientInstrumentor().instrument(
|
|
|
+ request_hook=httpx_request_hook,
|
|
|
+ response_hook=httpx_response_hook,
|
|
|
+ async_request_hook=httpx_async_request_hook,
|
|
|
+ async_response_hook=httpx_async_response_hook,
|
|
|
+ )
|
|
|
+ AioHttpClientInstrumentor().instrument(
|
|
|
+ request_hook=aiohttp_request_hook,
|
|
|
+ response_hook=aiohttp_response_hook,
|
|
|
+ )
|
|
|
+
|
|
|
+ def _uninstrument(self, **kwargs):
|
|
|
+ if getattr(self, "instrumentors", None) is None:
|
|
|
+ return
|
|
|
+ for instrumentor in self.instrumentors:
|
|
|
+ instrumentor.uninstrument()
|