wrappers.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. from contextvars import ContextVar
  2. from peewee import *
  3. from peewee import PostgresqlDatabase, InterfaceError as PeeWeeInterfaceError
  4. import logging
  5. from playhouse.db_url import connect, parse
  6. from playhouse.shortcuts import ReconnectMixin
  7. from env import SRC_LOG_LEVELS
  8. log = logging.getLogger(__name__)
  9. log.setLevel(SRC_LOG_LEVELS["DB"])
  10. db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
  11. db_state = ContextVar("db_state", default=db_state_default.copy())
  12. class PeeweeConnectionState(object):
  13. def __init__(self, **kwargs):
  14. super().__setattr__("_state", db_state)
  15. super().__init__(**kwargs)
  16. def __setattr__(self, name, value):
  17. self._state.get()[name] = value
  18. def __getattr__(self, name):
  19. value = self._state.get()[name]
  20. return value
  21. class CustomReconnectMixin(ReconnectMixin):
  22. reconnect_errors = (
  23. # psycopg2
  24. (OperationalError, "termin"),
  25. (InterfaceError, "closed"),
  26. # peewee
  27. (PeeWeeInterfaceError, "closed"),
  28. )
  29. class ReconnectingPostgresqlDatabase(CustomReconnectMixin, PostgresqlDatabase):
  30. pass
  31. def register_connection(db_url):
  32. db = connect(db_url, unquote_password=True)
  33. if isinstance(db, PostgresqlDatabase):
  34. # Enable autoconnect for SQLite databases, managed by Peewee
  35. db.autoconnect = True
  36. db.reuse_if_open = True
  37. log.info("Connected to PostgreSQL database")
  38. # Get the connection details
  39. connection = parse(db_url, unquote_password=True)
  40. # Use our custom database class that supports reconnection
  41. db = ReconnectingPostgresqlDatabase(**connection)
  42. db.connect(reuse_if_open=True)
  43. elif isinstance(db, SqliteDatabase):
  44. # Enable autoconnect for SQLite databases, managed by Peewee
  45. db.autoconnect = True
  46. db.reuse_if_open = True
  47. log.info("Connected to SQLite database")
  48. else:
  49. raise ValueError("Unsupported database connection")
  50. return db