Source code for fastapi_contrib.db.client

from bson import CodecOptions
from pymongo.client_session import ClientSession
from pymongo.collection import Collection
from pymongo.cursor import Cursor
from pymongo.results import InsertOneResult, DeleteResult, UpdateResult

from fastapi_contrib.db.models import MongoDBModel, notset
from fastapi_contrib.common.utils import get_current_app, get_timezone


[docs]class MongoDBClient(object): """ Singleton client for interacting with MongoDB. Operates mostly using models, specified when making DB queries. Implements only part of internal `motor` methods, but can be populated more Please don't use it directly, use `fastapi_contrib.db.utils.get_db_client`. """ __instance = None def __new__(cls) -> "MongoDBClient": if cls.__instance is None: cls.__instance = object.__new__(cls) app = get_current_app() tzinfo = get_timezone() cls.__instance.codec_options = CodecOptions( tz_aware=True, tzinfo=tzinfo) cls.__instance.mongodb = app.mongodb return cls.__instance
[docs] def get_collection(self, collection_name: str) -> Collection: return self.mongodb.get_collection( collection_name, codec_options=self.codec_options)
[docs] async def insert( self, model: MongoDBModel, session: ClientSession = None, include=None, exclude=None, ) -> InsertOneResult: data = model.dict(include=include, exclude=exclude) data["_id"] = data.pop("id") collection_name = model.get_db_collection() collection = self.get_collection(collection_name) return await collection.insert_one(data, session=session)
[docs] async def count( self, model: MongoDBModel, session: ClientSession = None, **kwargs ) -> int: _id = kwargs.pop("id", notset) if _id != notset: kwargs["_id"] = _id collection_name = model.get_db_collection() collection = self.get_collection(collection_name) res = await collection.count_documents(kwargs, session=session) return res
[docs] async def delete( self, model: MongoDBModel, session: ClientSession = None, **kwargs ) -> DeleteResult: _id = kwargs.pop("id", notset) if _id != notset: kwargs["_id"] = _id collection_name = model.get_db_collection() collection = self.get_collection(collection_name) res = await collection.delete_many(kwargs, session=session) return res
[docs] async def update_one( self, model: MongoDBModel, filter_kwargs: dict, session: ClientSession = None, **kwargs ) -> UpdateResult: _id = filter_kwargs.pop("id", notset) if _id != notset: filter_kwargs["_id"] = _id collection_name = model.get_db_collection() collection = self.get_collection(collection_name) res = await collection.update_one( filter_kwargs, kwargs, session=session ) return res
[docs] async def update_many( self, model: MongoDBModel, filter_kwargs: dict, session: ClientSession = None, **kwargs ) -> UpdateResult: _id = filter_kwargs.pop("id", notset) if _id != notset: filter_kwargs["_id"] = _id collection_name = model.get_db_collection() collection = self.get_collection(collection_name) res = await collection.update_many( filter_kwargs, kwargs, session=session ) return res
[docs] async def get( self, model: MongoDBModel, session: ClientSession = None, **kwargs ) -> dict: _id = kwargs.pop("id", notset) if _id != notset: kwargs["_id"] = _id collection_name = model.get_db_collection() collection = self.get_collection(collection_name) res = await collection.find_one(kwargs, session=session) return res
[docs] def list( self, model: MongoDBModel, session: ClientSession = None, _offset: int = 0, _limit: int = 0, _sort: list = None, **kwargs ) -> Cursor: _id = kwargs.pop("id", notset) if _id != notset: kwargs["_id"] = _id collection_name = model.get_db_collection() collection = self.get_collection(collection_name) return collection.find( kwargs, session=session, skip=_offset, limit=_limit, sort=_sort )