wrappers.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. from contextvars import ContextVar
  2. from peewee import *
  3. from peewee import PostgresqlDatabase, InterfaceError as PeeWeeInterfaceError
  4. import logging
  5. from playhouse.db_url import connect, parse
  6. from config import SRC_LOG_LEVELS
  7. log = logging.getLogger(__name__)
  8. log.setLevel(SRC_LOG_LEVELS["DB"])
  9. db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
  10. db_state = ContextVar("db_state", default=db_state_default.copy())
  11. class PeeweeConnectionState(object):
  12. def __init__(self, **kwargs):
  13. super().__setattr__("_state", db_state)
  14. super().__init__(**kwargs)
  15. def __setattr__(self, name, value):
  16. self._state.get()[name] = value
  17. def __getattr__(self, name):
  18. value = self._state.get()[name]
  19. return value
  20. class CustomReconnectMixin(ReconnectMixin):
  21. reconnect_errors = (
  22. # psycopg2
  23. (OperationalError, 'termin'),
  24. (InterfaceError, 'closed'),
  25. # peewee
  26. (PeeWeeInterfaceError, 'closed'),
  27. )
  28. class ReconnectingPostgresqlDatabase(CustomReconnectMixin, PostgresqlDatabase):
  29. pass
  30. def register_connection(db_url):
  31. db = connect(db_url)
  32. if isinstance(db, PostgresqlDatabase):
  33. # Enable autoconnect for SQLite databases, managed by Peewee
  34. db.autoconnect = True
  35. db.reuse_if_open = True
  36. log.info("Connected to PostgreSQL database")
  37. connection = parse(db_url)
  38. db = ReconnectingPostgresqlDatabase(connection['database'], user=connection['user'], password=connection['password'],host=connection['host'], port=connection['port'])
  39. db.connect(reuse_if_open=True)
  40. elif isinstance(db, SqliteDatabase):
  41. # Enable autoconnect for SQLite databases, managed by Peewee
  42. db.autoconnect = True
  43. db.reuse_if_open = True
  44. log.info("Connected to SQLite database")
  45. else:
  46. raise ValueError('Unsupported database connection')
  47. return db