likes
comments
collection
share

tornado 并发编程系列(6)——umongo + motor: 构建高效MongoDB应用的最佳组合

作者站长头像
站长
· 阅读数 9

umongo 和 motor 是 MongoDB 的异步 Python 驱动程序。我们可以通过异步和非阻塞的方式与 MongoDB 进行交互,实现高效的数据访问。

在这篇文章中,我们将介绍如何在 Tornado 中集成 umongo 和 motor,并使用它们进行数据访问。

安装 umongo 和 motor

在开始之前,我们需要确保已经安装了 umongo 和 motor。 你可以通过运行以下命令来安装它们:

pip install umongo motor

定义数据模型

在 MongoDB 中,我们通常使用文档来表示数据。在 umongo 中,我们可以通过继承 umongo.Document 类来定义一个数据模型。以下是一个示例:


@instance.register
class User(APIDocument):
    name = fields.StringField(max_length=255, verbose_name="名称")
    age = fields.IntField(verbose_name="年龄")

    class Meta:
        """
        ODM Metadata
        """
        collection = "User"

在上面的代码中,我们定义了一个名为 User 的文档,并在其内部使用了 umongo.fields 中提供的各个字段类型,例如 umongo.fields.EmailField,umongo.fields.StrField 和 umongo.fields.IntField。

连接到 MongoDB

与 MongoDB 进行交互的第一步是创建一个数据库连接。 在 Tornado 应用程序中,我们可以通过创建一个异步的 MongoDB 客户端来实现这一点。以下是一种实现方式:

import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.collection import Collection
from umongo.document import DocumentImplementation
from umongo.frameworks import MotorAsyncIOInstance
from src.utils.mongo.ops_mongo import OpsMongo
from src.config import settings

db = None
instance:MotorAsyncIOInstance = None


# 初始化ops mongo
async def init_ops_mongodb():
    # 原生mongo
    ops_mongo = OpsMongo(settings.mongo.ops.uri)

    return ops_mongo


def connect(loop):
    global db
    db = AsyncIOMotorClient(settings.mongo.ops.uri, io_loop=loop)[settings.mongo.ops.db]
    global instance
    instance = MotorAsyncIOInstance.from_db(db)
    return instance


def get_connection():
    try:
        if instance is not None:
            return instance
        raise Exception('DB connection not found.')
    except Exception as e:
        return connect(asyncio.get_event_loop())
  • 引入了多个第三方库和模块,包括 async、motor*asyncio、pymongo、umongo,以及自定义的一些操作类和配置文件

  • 定义了全局变量 db 和 instance,分别代表数据库连接和Mongodb实例对象*

  • 定义了一个异步函数 initopsmongodb,用于初始化OpsMongo数据库连接*

  • 定义了一个函数 connect,接收一个 event loop 对象作为参数,用于首次连接数据库,并返回 Mongodb 实例对象

  • 定义了一个函数 get*connection,用于获取数据库连接对象,如果之前已经连接过,则返回 instance 对象,否则调用 connect 函数连接数据库

实现了Mongodb数据库的异步连接和管理,方便其他程序模块进行数据读写操作。同时,在获取数据库连接时,兼顾了连接复用的性能考虑。

crud的集合类

OpsMongo为Mongo的crud的集合类

class OpsMongo(Mongo):
    def __init__(self, uri):
        super().__init__(uri=uri)

    def close(self):
        self.CLIENT.close()

    @classmethod
    def return_result(cls, res: list | dict):
        if isinstance(res, list):
            l = []
            for data in res:
                return l.append(cls.return_result(data))
        elif isinstance(res, dict):
            if "_id" in res:
                res["id"] = res.pop("_id").__str__()
                return res
        else:
            return []

    async def insert_one(self, table, data):
        """
        mongo原生插入数据
        :param table:
        :param data:
        :return:
        """
        db = self.get_default_database()
        res = await db[table].insert_one(data)
        return res

    async def find_one(self, table, query):
        """
        mongo原生返回一条数据
        :param table:
        :param query:
        :return:
        """
        db = self.get_default_database()
        res = await db[table].find_one(query)
        if res:
            return self.return_result(res)
        else:
            return res

    async def find(self, table, query):
        """
        mongo 原生查询所有
        :param table:
        :param query:
        :return:
        """
        db = self.get_default_database()
        res = db[table].find(query)
        data_list = []
        async for data in res:
            data_list.append(self.return_result(data))
        return data_list

    async def update(self, table, filter, update):
        """
            print('matched %d, modified %d' %
          (result.matched_count, result.modified_count))
        :param filter:
        :param update:
        :return:
        """
        db = self.get_default_database()
        res = await db[table].update_one(filter, {"$set": update})

        return res

    async def update_many(self, table, filter, update):
        """
        :param table:
        :param filter:
        :param update:
        :return:
        """
        db = self.get_default_database()
        res = await db[table].update_many(filter, {"$set": update})
        return res

    async def paginate(self, table, query, current_page: int, page_size: int):
        """
        分页查询
        :param table:
        :param query:
        :param current_page:
        :param page_size:
        :return:
        """

        l_list = []
        db = self.get_default_database()

        total = await db[table].count_documents(query)

        async for doc in db[table].find(query).skip((current_page - 1) * page_size).limit(page_size):
            l_list.append(self.return_result(doc))

        return l_list, total

    async def aggregate(self, table, pipeline, **kwargs):
        """
        :param table:
        :param pipeline:
        :param kwargs:
        :return:
        """
        db = self.get_default_database()
        docs = []
        async for doc in db[table].aggregate(pipeline, **kwargs):
            docs.append(doc)
        return docs
  • 初始化方法__init__(通过调用父类Mongo的初始化方法);

  • 关闭数据库连接的方法close;

  • 静态方法return_result,用于将MongoDB查询的结果转换成可读性更好的格式,比如将_id字段改名为id,并将其转为字符串;

  • 插入数据的异步方法insert_one;

  • 查找一条数据的异步方法find_one;

  • 查找所有数据的异步方法find;

  • 更新单条数据的异步方法update;

  • 更新多条数据的异步方法update_many;

  • 分页查询数据的异步方法paginate,接收参数包括要查询的表名、查询条件、当前页数、页大小,返回查询结果和总条数;

  • 聚合查询的异步方法aggregate,接收参数包括要查询的表名、聚合管道(一个包含查询条件的列表)、其他可选参数,返回查询结果。

集合tornado


class Application(tornadoApp):
    """ 定制 Tornado Application 集成日志、sqlalchemy 等功能 """

    def __init__(self):
        self.ops_mongo = None  # ops mongo
        self.redis_cluster = None  # redis基础session
        self.redis_manager = None  # redis_manager session
        self.redis = None

        tornado_settings = {
            'autoreload': settings.service.server.autoreload,
            'debug': settings.service.server.debug,
            "default_handler_class": DefaultHandler,
        }
        route = getRoutes(settings)

        self.prepare()

        super(Application, self).__init__(handlers=route, **tornado_settings)

    def prepare(self):
        self.redis_cluster = asyncio.get_event_loop().run_until_complete(
            init_cluster()
        )
        self.ops_mongo = asyncio.get_event_loop().run_until_complete(
            init_ops_mongodb()
        )

        loop = asyncio.get_event_loop()
        self.redis = RedisPool(loop=loop).get_conn()
        self.redis_manager = RedisManager(self.redis)
        asyncio.get_event_loop().run_until_complete(
            create_all_indexes()
        )

    async def on_close(self, server_conn: object) -> None:
        await self.redis_manager.close()
        logger.info("close redis")
        self.ops_mongo.close()
        logger.info("close mongo")

调用self.prepare()实现数据库连接初始化,self.on_close实现连接关闭

orm创建索引

class Meta:
    """
    ODM Metadata
    """
    collection_name = CmdbManagerMonitorRule
    indexes = [[("name", ASCENDING), {"unique": True}]]
async def create_all_indexes():
    index_names = []
    name: str
    doc: DocumentImplementation
    for name, doc in instance._doc_lookup.items():
        indexes = getattr(doc.Meta, "indexes", [])
        for index in indexes:
            collection: Collection = doc.collection
            index_opts = {}
            if isinstance(index, list) and isinstance(index[-1], dict):
                index_opts = index.pop(-1)

            result = await collection.create_index(index, background=True, **index_opts)
            index_names.append(f"{name}.{result}")

通过create_all_indexes在项目启动的时候就创建索引

结论

在本文中,我们了解了如何在 Tornado 中集成 umongo 和 motor,并实现了与 MongoDB 进行交互的异步和非阻塞的方式。使用这些技术可以实现高效的数据访问,并构建出更加优秀的 Web 应用程序。