likes
comments
collection
share

基于FastAPI实现短链重定向(3) -SQLAlchemy创建表以及CURD操作

作者站长头像
站长
· 阅读数 6

序言

尊贵的读者朋友们,在上一章之末,我们已经成功完成了应用程序配置信息与模型定义部分的工作。在此,我们将要深入探讨如何运用SQLAlchemy工具包来便捷地创建数据库表,同时,我们也将演示用户信息的常见CURD(Create,Update,Read,Delete)操作以及短链接信息表的CURD封装流程。

创建表的秘诀:SQLAlchemy

在同步引擎环境下,假如您选用的是create_engine函数并赋值为sqlite:///,那么只需简单地调用Base.metadata.create_all(engine, checkfirst=True)即可轻松完成数据表的创建。这里的checkfirst参数设置为True,意味着首次初始化时才会创建表;若表已存在,再次执行此操作便不会产生任何影响。

而在异步引擎环境下,enginwe = create_async_engine('sqlite:///'),您可以按照以下步骤进行表的创建:

async def init_create_table():
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

我们将表的创建过程集成到app启动事件回调中,修改代码如下:

@app.on_event("startup")
async def startup_event():
    from db.database import async_engine, Base
    from models.model import User,ShortUrl
    async def init_create_table():
        async with async_engine.begin() as conn:
            await conn.run_sync(Base.metadata.drop_all)
            await conn.run_sync(Base.metadata.create_all)
    await init_create_table()

在服务启动事件on_event("startup")回调中进行了数据库表的创建,通过async_engine.begin() as conn获取了一个连接数据库的会话对象,然后执行数据库表的删除和重新创建的操作,其中,Base.metadata.drop_all用于删除表,Base.metadata.create_all用于创建表。对表进行删除需谨慎 生成表时需要注意,要导入具体的模型类,如from models.model import User,ShortUrl。如果不导入,则执行创建表的操作时会因无法找到要生成的表模型而出错。 当启动服务之后,就可以看到在当前项目目录下生成了数据库文件。

用户信息CRUD

先来看实现代码,services/user.py

from sqlalchemy import select, update, delete,func
from sqlalchemy.ext.asyncio import AsyncSession

from models.model import User
from db.database import async_engine, Base


class UserServeries:


    @staticmethod
    async def get_user(async_session: AsyncSession, user_id: int):
        result = await async_session.execute(select(User).where(User.id == user_id))
        return result.scalars().first()

    @staticmethod
    async def get_user_by_name(async_session: AsyncSession, username: str) -> User:
        result = await async_session.execute(select(User).where(User.username == username))
        return result.scalars().first()

    @staticmethod
    async def get_users(async_session: AsyncSession):
        result = await async_session.execute(select(User).order_by(User.id))
        return result.scalars().fetchall()

    @staticmethod
    async def create_user(async_session: AsyncSession, **kwargs):
        new_user = User(**kwargs)
        async_session.add(new_user)
        await async_session.commit()
        return new_user

    @staticmethod
    async def update_user(async_session: AsyncSession, user_id: int, **kwargs):
        response = update(User).where(User.id == user_id)
        result = await async_session.execute(response.values(**kwargs))
        await async_session.commit()
        return result

    @staticmethod
    async def delete_user(async_session: AsyncSession, user_id: int):
        response = await async_session.execute(delete(User).where(User.id == user_id))
        await async_session.commit()
        return response

UserServeries类统一管理user表的操作,在该类中定义了与user表的增、删、改、查等操作对应的静态协程方法。在所有的静态协程方法中,第一个参数统一依赖于AsyncSession异步的会话对象。

  • create_user()方法用于创建User用户信息。
  • get_user()、get_user_by_name()、get_users()等方法用于根据不同的条件查询User用户信息。
  • update_user()方法用于根据用户ID修改User用户信息。
  • delete_user()方法用于根据用户ID删除User用户记录。

短链信息CURD

还是先看实现代码,services/short.py

from sqlalchemy import select, update, delete
from sqlalchemy.ext.asyncio import AsyncSession
from models.model import ShortUrl
from db.database import async_engine, Base
from typing import List
from schemas import SingleShortUrlCreate

class ShortServeries:

   
    @staticmethod
    async def get_short_url(async_session: AsyncSession, short_tag: str)->ShortUrl:
        result = await async_session.execute(select(ShortUrl).where(ShortUrl.short_tag == short_tag))
        return result.scalars().first()

    @staticmethod
    async def create_short_url(async_session: AsyncSession, **kwargs):
        new_short_url = ShortUrl(**kwargs)
        async_session.add(new_short_url)
        await async_session.commit()
        return new_short_url

    @staticmethod
    async def update_short_url(async_session: AsyncSession, short_url_id: int, **kwargs):
        response = update(ShortUrl).where(ShortUrl.id == short_url_id)
        result = await async_session.execute(response.values(**kwargs))
        await async_session.commit()
        return result

    @staticmethod
    async def delete_short_url(async_session: AsyncSession, short_url_id: int):
        response = await async_session.execute(delete(ShortUrl).where(ShortUrl.id == short_url_id))
        await async_session.commit()
        return response


    @staticmethod
    async def create_batch_short_url(async_session: AsyncSession, short_urls:List[SingleShortUrlCreate]):
        short_urls= [ShortUrl(**item.dict()) for item in short_urls]
        async_session.add_all(short_urls)
        await async_session.commit()
        return short_urls

ShortServeries类来统一管理短链信息表的操作,该类中定义了与短链信息表的增、删、改、查等操作对应的静态协程方法,这些静态协程方法包括:

  • get_short_url()方法用于根据short_tag查询短链记录。
  • create_short_url()方法用于创建单一的短链记录。
  • update_short_url()方法用于根据短链记录的ID更新短链信息。
  • delete_short_url()方法用于根据短链记录的ID删除短链记录。
  • create_batch_short_url()方法用于批量添加多条短链记录。

后续展望

在这篇文章中,主要是讲了怎样用SQLAlchemy这个强大工具来创建各种表,而且还有详细讲解了怎么操作那些用户信息表和短链信息表。

敬请期待:下一篇文章将介绍接口的定义以及路由配置。在此,我们诚挚邀请各位朋友继续关注此系列文章,若有所得,恳请您给予点赞和收藏以示鼓励,不胜感激!

转载自:https://juejin.cn/post/7357894315958845490
评论
请登录