wrappers.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. from contextvars import ContextVar
  2. from peewee import *
  3. from playhouse.db_url import connect
  4. from playhouse.pool import PooledPostgresqlExtDatabase
  5. from playhouse.pool import PooledSqliteDatabase
  6. db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
  7. db_state = ContextVar("db_state", default=db_state_default.copy())
  8. class PeeweeConnectionState(object):
  9. def __init__(self, **kwargs):
  10. super().__setattr__("_state", db_state)
  11. super().__init__(**kwargs)
  12. def __setattr__(self, name, value):
  13. self._state.get()[name] = value
  14. def __getattr__(self, name):
  15. value = self._state.get()[name]
  16. return value
  17. def register_connection(db_url):
  18. db = connect(db_url)
  19. if isinstance(db, PostgresqlDatabase):
  20. db = PooledPostgresqlExtDatabase(
  21. db.database,
  22. max_connections=8,
  23. stale_timeout=300,
  24. timeout=None,
  25. autoconnect=True,
  26. **db.connect_params
  27. )
  28. elif isinstance(db, SqliteDatabase):
  29. db = PooledSqliteDatabase(
  30. db.database,
  31. max_connections=8,
  32. stale_timeout=300,
  33. timeout=None,
  34. autoconnect=True,
  35. **db.connect_params
  36. )
  37. else:
  38. raise ValueError('Unsupported database connection')
  39. return db