Source code for orghandbookapi.database.database
import contextlib
from collections.abc import AsyncIterator
from sqlalchemy.ext.asyncio import (
AsyncConnection,
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from orghandbookapi.loader import config
url = config.database.url_format.format(
host=config.database.host,
port=config.database.port,
name=config.database.name,
user=config.database.user,
password=config.database.password,
)
# url = "sqlite+aiosqlite:///data/orghandbookapi.db" # noqa: ERA001
[docs]
class DatabaseSessionManager: # noqa: D101
[docs]
def __init__(self): # noqa: D107
self._engine: AsyncEngine | None = None
self._sessionmaker: async_sessionmaker[AsyncSession] | None = None
[docs]
def init(self, db_url: str): # noqa: D102
if "postgresql" in db_url:
connect_args = {
"statement_cache_size": 0,
"prepared_statement_cache_size": 0,
}
else:
connect_args = {"check_same_thread": False}
self._engine = create_async_engine(
url=db_url, pool_pre_ping=True, connect_args=connect_args
)
self._sessionmaker = async_sessionmaker(
bind=self._engine, expire_on_commit=config.database.expire_on_commit
)
[docs]
async def close(self): # noqa: D102
if self._engine is None:
return
await self._engine.dispose()
self._engine = None
self._sessionmaker = None
[docs]
@contextlib.asynccontextmanager
async def session(self) -> AsyncIterator[AsyncSession]: # noqa: D102
if self._sessionmaker is None:
raise OSError("DatabaseSessionManager is not initialized")
async with self._sessionmaker() as session:
try:
yield session
except Exception:
await session.rollback()
raise
[docs]
@contextlib.asynccontextmanager
async def connect(self) -> AsyncIterator[AsyncConnection]: # noqa: D102
if self._engine is None:
raise OSError("DatabaseSessionManager is not initialized")
async with self._engine.begin() as connection:
try:
yield connection
except Exception:
await connection.rollback()
raise
db_manager = DatabaseSessionManager()
[docs]
async def get_db_session() -> AsyncSession:
"""
Функция-зависимость для получения асинхронной сессии базы данных.
Returns:
AsyncSession: асинхронная сессия
Yields:
Iterator[AsyncSession]: итератор асинхронной сессии
"""
async with db_manager.session() as session:
yield session