wrappers.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. from contextvars import ContextVar
  2. from peewee import *
  3. from peewee import PostgresqlDatabase, InterfaceError as PeeWeeInterfaceError
  4. import logging
  5. from playhouse.db_url import connect, parse
  6. from playhouse.shortcuts import ReconnectMixin
  7. from config import SRC_LOG_LEVELS
  8. log = logging.getLogger(__name__)
  9. log.setLevel(SRC_LOG_LEVELS["DB"])
  10. db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
  11. db_state = ContextVar("db_state", default=db_state_default.copy())
  12. class PeeweeConnectionState(object):
  13. def __init__(self, **kwargs):
  14. super().__setattr__("_state", db_state)
  15. super().__init__(**kwargs)
  16. def __setattr__(self, name, value):
  17. self._state.get()[name] = value
  18. def __getattr__(self, name):
  19. value = self._state.get()[name]
  20. return value
  21. class CustomReconnectMixin(ReconnectMixin):
  22. reconnect_errors = (
  23. # psycopg2
  24. (OperationalError, "termin"),
  25. (InterfaceError, "closed"),
  26. # peewee
  27. (PeeWeeInterfaceError, "closed"),
  28. )
  29. class ReconnectingPostgresqlDatabase(CustomReconnectMixin, PostgresqlDatabase):
  30. pass
  31. def register_connection(db_url):
  32. db = connect(db_url)
  33. if isinstance(db, PostgresqlDatabase):
  34. # Enable autoconnect for SQLite databases, managed by Peewee
  35. db.autoconnect = True
  36. db.reuse_if_open = True
  37. log.info("Connected to PostgreSQL database")
  38. # Get the connection details
  39. connection = parse(db_url)
  40. # Use our custom database class that supports reconnection
  41. db = ReconnectingPostgresqlDatabase(
  42. connection["database"],
  43. user=connection["user"],
  44. password=connection["password"],
  45. host=connection["host"],
  46. port=connection["port"],
  47. )
  48. db.connect(reuse_if_open=True)
  49. elif isinstance(db, SqliteDatabase):
  50. # Enable autoconnect for SQLite databases, managed by Peewee
  51. db.autoconnect = True
  52. db.reuse_if_open = True
  53. log.info("Connected to SQLite database")
  54. else:
  55. raise ValueError("Unsupported database connection")
  56. return db