123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- import logging
- import traceback
- from typing import Collection, Union
- from aiohttp import (
- TraceRequestStartParams,
- TraceRequestEndParams,
- TraceRequestExceptionParams,
- )
- from chromadb.telemetry.opentelemetry.fastapi import instrument_fastapi
- from fastapi import FastAPI
- from opentelemetry.instrumentation.httpx import (
- HTTPXClientInstrumentor,
- RequestInfo,
- ResponseInfo,
- )
- from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
- from opentelemetry.instrumentation.logging import LoggingInstrumentor
- from opentelemetry.instrumentation.redis import RedisInstrumentor
- from opentelemetry.instrumentation.requests import RequestsInstrumentor
- from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
- from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
- from opentelemetry.trace import Span, StatusCode
- from redis import Redis
- from requests import PreparedRequest, Response
- from sqlalchemy import Engine
- from fastapi import status
- from open_webui.utils.trace.constants import SPAN_REDIS_TYPE, SpanAttributes
- from open_webui.env import SRC_LOG_LEVELS
- logger = logging.getLogger(__name__)
- logger.setLevel(SRC_LOG_LEVELS["MAIN"])
- def requests_hook(span: Span, request: PreparedRequest):
- """
- Http Request Hook
- """
- span.update_name(f"{request.method} {request.url}")
- span.set_attributes(
- attributes={
- SpanAttributes.HTTP_URL: request.url,
- SpanAttributes.HTTP_METHOD: request.method,
- }
- )
- def response_hook(span: Span, request: PreparedRequest, response: Response):
- """
- HTTP Response Hook
- """
- span.set_attributes(
- attributes={
- SpanAttributes.HTTP_STATUS_CODE: response.status_code,
- }
- )
- span.set_status(StatusCode.ERROR if response.status_code >= 400 else StatusCode.OK)
- def redis_request_hook(span: Span, instance: Redis, args, kwargs):
- """
- Redis Request Hook
- """
- try:
- connection_kwargs: dict = instance.connection_pool.connection_kwargs
- host = connection_kwargs.get("host")
- port = connection_kwargs.get("port")
- db = connection_kwargs.get("db")
- span.set_attributes(
- {
- SpanAttributes.DB_INSTANCE: f"{host}/{db}",
- SpanAttributes.DB_NAME: f"{host}/{db}",
- SpanAttributes.DB_TYPE: SPAN_REDIS_TYPE,
- SpanAttributes.DB_PORT: port,
- SpanAttributes.DB_IP: host,
- SpanAttributes.DB_STATEMENT: " ".join([str(i) for i in args]),
- SpanAttributes.DB_OPERATION: str(args[0]),
- }
- )
- except Exception: # pylint: disable=W0718
- logger.error(traceback.format_exc())
- def httpx_request_hook(span: Span, request: RequestInfo):
- """
- HTTPX Request Hook
- """
- span.update_name(f"{request.method.decode()} {str(request.url)}")
- span.set_attributes(
- attributes={
- SpanAttributes.HTTP_URL: str(request.url),
- SpanAttributes.HTTP_METHOD: request.method.decode(),
- }
- )
- def httpx_response_hook(span: Span, request: RequestInfo, response: ResponseInfo):
- """
- HTTPX Response Hook
- """
- span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, response.status_code)
- span.set_status(
- StatusCode.ERROR
- if response.status_code >= status.HTTP_400_BAD_REQUEST
- else StatusCode.OK
- )
- async def httpx_async_request_hook(span: Span, request: RequestInfo):
- """
- Async Request Hook
- """
- httpx_request_hook(span, request)
- async def httpx_async_response_hook(
- span: Span, request: RequestInfo, response: ResponseInfo
- ):
- """
- Async Response Hook
- """
- httpx_response_hook(span, request, response)
- def aiohttp_request_hook(span: Span, request: TraceRequestStartParams):
- """
- Aiohttp Request Hook
- """
- span.update_name(f"{request.method} {str(request.url)}")
- span.set_attributes(
- attributes={
- SpanAttributes.HTTP_URL: str(request.url),
- SpanAttributes.HTTP_METHOD: request.method,
- }
- )
- def aiohttp_response_hook(
- span: Span, response: Union[TraceRequestExceptionParams, TraceRequestEndParams]
- ):
- """
- Aiohttp Response Hook
- """
- if isinstance(response, TraceRequestEndParams):
- span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, response.response.status)
- span.set_status(
- StatusCode.ERROR
- if response.response.status >= status.HTTP_400_BAD_REQUEST
- else StatusCode.OK
- )
- elif isinstance(response, TraceRequestExceptionParams):
- span.set_status(StatusCode.ERROR)
- span.set_attribute(SpanAttributes.ERROR_MESSAGE, str(response.exception))
- class Instrumentor(BaseInstrumentor):
- """
- Instrument OT
- """
- def __init__(self, app: FastAPI, db_engine: Engine):
- self.app = app
- self.db_engine = db_engine
- def instrumentation_dependencies(self) -> Collection[str]:
- return []
- def _instrument(self, **kwargs):
- instrument_fastapi(app=self.app)
- SQLAlchemyInstrumentor().instrument(engine=self.db_engine)
- RedisInstrumentor().instrument(request_hook=redis_request_hook)
- RequestsInstrumentor().instrument(
- request_hook=requests_hook, response_hook=response_hook
- )
- LoggingInstrumentor().instrument()
- HTTPXClientInstrumentor().instrument(
- request_hook=httpx_request_hook,
- response_hook=httpx_response_hook,
- async_request_hook=httpx_async_request_hook,
- async_response_hook=httpx_async_response_hook,
- )
- AioHttpClientInstrumentor().instrument(
- request_hook=aiohttp_request_hook,
- response_hook=aiohttp_response_hook,
- )
- def _uninstrument(self, **kwargs):
- if getattr(self, "instrumentors", None) is None:
- return
- for instrumentor in self.instrumentors:
- instrumentor.uninstrument()
|