学习fastapi
一、FastAPI请求响应数据1.1新建项目1.2两种方式运行代码命令行和手动运行更改代码后均可以自动刷新uvicorn main:app --reloadctrlc停掉代码1.3访问交互式文档在端口号后面加上docs1.4路由路由就是URL,地址和处理函数之间的映射关系它决定了当用户访问某个特定网址时服务器应该执行哪段代码来返回结果#访问/hello/{name},响应结果Hello {name} app.get(/hello/{name}) async def say_hello(name: str): return {message: fHello {name}}1.5参数同一段接口逻辑可以根据参数不同返回不同的数据参数就是客户端发送请求时附带的额外信息和指令参数的作用是让同一个接口能根据不同的输入返回不同的输出实现动态交互常见参数有三类路径参数、查询参数、请求体1、路径参数位置URL路径的一部分/hello/{name}作用指向唯一的、特定的资源方法GET#访问/hello/{name},响应结果Hello {name} app.get(/hello/{name}) async def say_hello(name: str): return {message: fHello {name}}路径参数-类型注解Path1、导入Pathfrom fastapi import FastAPI,Path2、给参数加限制#限制新闻id范围为1-100 app.get(/news/{id}) async def news_id(id:intPath(...,gt0,lt101,descriptionid的范围是1-100)): return {id:id,msg:f新闻的id是{id}} #限制新闻的标题长度是5-10 app.get(/{title}) async def news_title(title:strPath(...,min_length2,max_length10)): return {f新闻的标题是{title}}2、查询参数声明的参数不是路径参数时路径操作函数会把参数自动解释为查询参数位置在UPL问好后面的部分例如URL?k1v1k2v2作用对资源集合进行过滤、排序、分页等操作方法GET为查询参数添加类型注解有python原生注解和Query注解1、导入Queryfrom fastapi import FastAPI,Query2、给参数加限制app.get(/news/news_list) async def get_news_list( skip:intQuery(default0,ge0,le10,description开始的新闻数), limit:intQuery(default10,ge10,le100,description每页新闻数)): return {skip:skip,limit:limit}3、请求体参数位置HTTP请求中的消息体body中作用创建、更新资源、携带大量数据如JSON方法POST、PUT等在HTTP协议中一个完整的请求由下列三部分组成请求行包含方法、URL、协议版本请求头元数据信息内容类型、鉴权信息等请求体实际要发送的数据内容1、导入基类from pydantic import BaseModel2、根据需求定义类型写post提交数据#注册用户名和密码-str class User(BaseModel): username: str password: str app.post(/register) async def register(user: User): return user请求体参数-类型注解Field1、导入pydantic的Field函数from pydantic import BaseModel,Field2、给类加上Field注解#title不能为空author长度1-10出版社默认黑马出版社售价不能为空价格大于0 class Book(BaseModel): title: strField(...) author: strField(min_length1,max_length10) isbn: strField(default黑马出版社) price: floatField(...,gt0) app.post(/get_book) async def get_book(book: Book): return book1.6请求与响应响应类型默认情况下Fast API会自动将路径操作函数返回的Python对象字典、列表、Pydantic模型等经由jsonable_encoder转化为JSON兼容格式并包装为JSONResponse返回。这省去了手动序列化的步骤让开发者更专注于业务逻辑如果需要返回非JSON数据如HTML、文件流FastAPI提供了丰富的相应类型来返回不同的数据响应类型用途示例JSONResponse默认响应返回JSON数据return {key:value}HTMLResponse返回HTML内容return HTMLResponsehtml_contentPlainTextResponse返回纯文本return PlainTextResponse(text)FieldResponse返回文件下载return FileResponse(path)StreamingResponse流式响应生成器返回数据RedirectResponse重定向return RedirectResponse(url)1、JSONResponse之前写的任意一个python接口,返回的都是JSON数据其他响应类型设置方式1、装饰器指定响应类适用场景接口固定了返回类型HTML、纯文本等2、返回响应对象场景文件下载、图片、流式响应装饰器指定响应类例如设置响应类为HTMLResponse当前接口即可返回HTML内容1、导入类from fastapi.responses import HTMLResponse2、装饰器直接指定响应类#响应html内容 app.get(/html,response_classHTMLResponse) async def get_html(): return h1一级标题/h1返回响应对象例如响应文件格式FileResponse是FastAPI提供的专门用于高效返回文件内容如图片、PDF、Excel、音视频等的相应类。他能够只能处理文件路径、媒体类型推断、范围请求和缓存头部是服务静态文件的推荐方式1、导入类from fastapi.responses import FileResponse2、返回时指定响应类型#响应file内容 app.get(/file) async def get_file(): path ./file/1.pdf return FileResponse(path)自定义响应数据类型格式response_model是路径操作装饰器如app.get或app.post的关键参数它通过一个Pydantic模型来严格定义和约束API端点的输出格式这一机制在自动数据验证和序列化的同时更是保障数据安全性的第一道防线1、导入类2、自定义数据类型格式class News(BaseModel): id:int title:str content:str #正确代码 app.get(/news/{id},response_modelNews) async def get_news(id:int): return { id:id, title:f这是第{id}书, content:这是本好书, } #错误代码 app.get(/news/{id},response_modelNews) async def get_news(id:int): return { id:id, title:f这是第{id}书, }正确错误1.7异常响应处理对于客户端引发的错误4xx如资源未找到认证失败使用fastapi.HTTPException来终端处理流程并返回标准错误响应1、导入HTTPExceptionfrom fastapi import HTTPException2、代码#需求-按照id查找新闻 app.get(/new/{id}) async def get_new(id:int): id_list[1,2,3,4,5] if id not in id_list: raise HTTPException(status_code404,detail没有找到这条新闻) return {f第{id}条新闻}结果1.8中间件中间件的作用为每个请求添加统一的处理逻辑记录日志、身份认证、跨域、设置响应头、性能监控等中间件的定义函数的顶部的hi用装饰器app.midddleware(http)#中间件1 app.middleware(http) #request指的是请求包含请求路径方法等 # call_next是自动向下执行 async def middleware1(request,call_next): print(中间件1开始) # 让请求继续往后执行知道路由处理完拿到返回的响应再回到中间件 responseawait call_next(request) print(中间件1结束) return response #中间件2 app.middleware(http) async def middleware1(request,call_next): print(中间件2开始) responseawait call_next(request) print(中间件2结束) return response app.get(/) async def root(): return {message: Hello World}多个中间价的执行顺序自下而上上面代码的结果如下1.9依赖注入作用抽取可复用的组件实现代码复用、解耦且可以轻松替换依赖项进行测试怎么使用依赖注入系统创建依赖项-导入Depends-声明依赖项from fastapi import FastAPI,Depends,Query #2、导入depends # 分页逻辑公用新闻列表和用户列表 #1、依赖项 async def common_list( skip:intQuery(0,ge0), limit:intQuery(10,le60), ): return {skip:skip,limit:limit} #3、注入依赖项 app.get(/news/news_list) async def get_news_list(commonsDepends(common_list)): return commons app.get(/user/user_list) async def get_user_list(): return {msg:你好}结果get_news_list成功注入依赖项二、ORMORMObject-RelationMapping,对象关系映射是一种编程技术用于在面向对象编程语言和关系型数据库之间建立映射它允许开发者通过操作对象的方式与数据库进行交互无需直接编写复杂的SQL语句2.1安装包pip install sqlalchemy[asyncio] aiomysql2.2建库、建表三步走创建数据库引擎-定义模型类-启动应用时建表1、创建数据库引擎先用mysql创建一个数据库MySQL安装包MySQL :: Download MySQL Installerhttps://dev.mysql.com/downloads/installer/安装完后进入MySQL 创建一个数据库create database fastapi_one从SQLAlchemy的异步扩展模块里导入创建异步数据库引擎的功能from sqlalchemy.ext.asyncio import create_async_engine然后创建异步引擎# 1、创建异步引擎 ASYNC_DATABASE_URL mysqlaiomysql://root:123456localhost:3306/fastapi_one?charsetutf8 async_enginecreate_async_engine( ASYNC_DATABASE_URL, echoTrue, #可选输出SQL日志 #最多处理30个数据库操作平时保持10个连接pool_size10, pool_size10,#设置连接池活跃的连接数 max_overflow20,#允许额外的连接数 )2、ORM-定义模型类先导入所需模块#所有表模型必须继承DeclarativeBase才能映射成数据库表 from sqlalchemy.orm import DeclarativeBase,Mapped,mapped_column from sqlalchemy import DateTime, func, String,Float from datetime import datetime定义模型类1、基类通用属性和字段的映射继承DeclarativeBase2、一个数据库表对应一个模型类#2、定义模型类:基类表对应的模型类 # 基类是指通用的字段这里包括创建时间、更新时间书籍表id,作者价格出版社 class Base(DeclarativeBase): create_time:Mapped[datetime]mapped_column(DateTime,defaultfunc.now(),comment创建时间) update_time:Mapped[datetime]mapped_column(DateTime,defaultfunc.now(),onupdatefunc.now,comment更新时间) #继承上面的基类 class Book(Base): __tablename__ book id:Mapped[int]mapped_column(primary_keyTrue,comment书籍id) bookname:Mapped[str]mapped_column(String(255),comment书名) author:Mapped[str]mapped_column(String(255),comment作者) price:Mapped[float]mapped_column(Float,comment价格) publisher:Mapped[str]mapped_column(String(255),comment出版社)3、建表先定义一个函数去建表#根据写的模型类自动去数据库建表 async def create_tables(): #获取异步引擎创建事务-表 #async with 异步引擎.开始连接() as 连接对象:打开一个异步的数据库连接 async with async_engine.begin() as conn: #run_sync让同步代码能在异步里运行 #Base.metadata所有继承了Base的模型类这里只有Book #create_allcreate all tables → 创建所有表 await conn.run_sync(Base.metadata.create_all) #Base模型类的元数据创建在FastAPI启动时去调用建表的函数立即去建表#app.在事件(“启动时”):FastAPI 一启动就立刻执行下面的函数 app.on_event(startup) async def startup(): #调用这个函数去建表 await create_tables()查看是否把表建好了先运行跑起来没报错然后点Database选择Mysql填写用户名、密码以及数据库进行连接。连接成功可以看到已经成功创建表了2.3在路由中使用ORM核心创建依赖项使用Depends 将依赖项注入到处理函数中1、导入会话类和会话工厂类from sqlalchemy.ext.asyncio import create_async_engine,async_sessionmaker,AsyncSession2、在路由中注入依赖项用来获取数据库会话# 需求查询用户功能的接口创建异步会话工厂-创建依赖项-注入依赖项到路由处理函数中 #创建异步会话工厂 #造一个专门生产异步数据库会话的工厂绑定好数据库连接生产出来的会话必须是异步版本 AsyncSessionLocalasync_sessionmaker( bindasync_engine,#绑定数据库引擎 expire_on_commitFalse,#提交后会话不会过期不会重新查询数据库 class_AsyncSession #生产异步会话 ) #依赖项用来获取数据库会话 async def get_database(): async with AsyncSessionLocal() as session: try: yield session #返回会话给路由处理函数 await session.commit() #提交事务 except Exception: await session.rollback() #有异常回滚 raise finally: await session.close() #关闭会话 app.get(/user/users) #注入依赖 async def get_users_list(db: AsyncSession Depends(get_database)): res await db.execute(select(User)) users res.scalars().all() return users2.4数据库操作1、查询核心语句await db.execute(select(模型类)),返回一个ORM对象app.get(/users) #依赖注入 async def get_users_list_all(db: AsyncSession Depends(get_database)): res await db.execute(select(User)) users res.scalars().all() #查询所有 return users app.get(/user_first) async def get_user_first(db:AsyncSessionDepends(get_database)): resawait db.execute(select(User)) user_firstres.scalars().first() return user_first app.get(/get_user) async def get_user(db:AsyncSessionDepends(get_database)): resawait db.execute(select(User)) get_uawait db.get(User,5)#根据主键获取单条数据 return get_u2、查询条件核心语句select(User).where(条件条件2...)条件比较判断、、、、模糊查询like(或与非查询、|、~包含查询in_()比较判断# orm条件查询 app.get(/users/{user_id}) async def get_user_id(user_id:int,db:AsyncSessionDepends(get_database)): resawait db.execute(select(User).where(User.iduser_id)) #条件查询查询参数等于User表中id的的数据 userres.scalar_one_or_none() #可能查不到也可能查到这里要用scalar_one_or_none()方法注意scalar没有s return user app.get(/search_user) async def get_search_user(db:AsyncSessionDepends(get_database)): resawait db.execute(select(User).where(User.id4)) #查询所有id小于4的用户 usersres.scalars().all() return users模糊查询%零个、一个或多个字符_单个字符#模糊查询 app.get(/search_like_user1) async def get_search_like_user1(db:AsyncSessionDepends(get_database)): resawait db.execute(select(User).where(User.user_name.like(见%))) #%匹配多个字符 userres.scalars().all() return user app.get(/search_like_user2) async def get_search_like_user2(db:AsyncSessionDepends(get_database)): resawait db.execute(select(User).where(User.user_name.like(见_))) #_匹配单个字符 userres.scalars().all() return user或与非查询#and app.get(/search_and) async def get_search_and(db:AsyncSessionDepends(get_database)): resawait db.execute(select(User).where(User.user_name.like(%天)(User.id5))) #并且 userres.scalars().all() return user #or app.get(/search_or) async def get_search_or(db:AsyncSessionDepends(get_database)): resawait db.execute(select(User).where(User.user_name.like(%天)|(User.id5))) #或 userres.scalars().all() return user #not app.get(/search_not) async def get_search_not(db:AsyncSessionDepends(get_database)): resawait db.execute(select(User).where(~User.user_name.like(%天))) #非 userres.scalars().all() return user包含查询app.get(/search_include) async def get_search_include(db:AsyncSessionDepends(get_database)): id_list[1,3,4,7,8] resawait db.execute(select(User).where(User.id.in_(id_list))) #包含查询 userres.scalars().all() return user3、聚合查询聚合计算select(func.方法(模型类属性))count:统计行数量avg:求平均值max:求最大值min:求最小值sum:求和app.get(/get_user) async def get_user(db:AsyncSessionDepends(get_database)): # resawait db.execute(select(func.count(User.id))) #count统计数量 # res await db.execute(select(func.sum(User.id))) #sum统计总和 # res await db.execute(select(func.max(User.id))) #max统计最大值 # res await db.execute(select(func.min(User.id))) #统计最小值 res await db.execute(select(func.avg(User.id))) #统计平均值 numres.scalar() return num4、分页查询分页查询select(模型类).offset().limit()offset括号里填写跳过的记录树limit括号里填写返回的记录数当前页码每页数量limit跳过数量offest1100210103102041030offset值(当前页码-1)*每页的数量limitapp.get(/get_user) async def get_user( page:int 1, page_size:int 2, db:AsyncSessionDepends(get_database)): skip(page-1)*page_size stselect(User).offset(skip).limit(page_size) resawait db.execute(st) userres.scalars().all() return user5、新增数据核心步骤定义ORM对象创建请求体参数-创建ORM对象-添加对象到事务add-commit提交到数据库#需求输入用户信息id,user_name,password,设置请求体参数BaseModel class UserBase(BaseModel): id:int #字段名需要完全一致 user_name:str password:str #新增数据-提交用post请求 app.post(/add_user) async def add_user(user:UserBase,db:AsyncSessionDepends(get_database)): #创建ORM对象-然后新增数据——提交这个数据 user_objUser(**user.__dict__) db.add(user_obj) await db.commit() return user6、更新数据核心步骤先查询数据get/scalars-属性重新赋值-commit提交到数据库7、删除数据核心步骤先查数据-delete删除数据-commit提交到数据库#删除用delete app.delete(/delete_user/{user_id}) async def delete_user(user_id: int,db:AsyncSessionDepends(get_database)): id_dbawait db.get(User,user_id) if id_db is None: raise HTTPException(status_code404,detail查无此人) #删除图书 await db.delete(id_db) #将修改的数据提交给数据库 await db.commit() return {msg:删除成功}写在最后学这个好难啊但我居然看完了又给自己灌了很多鸡汤世界上有两条路一条是错的一条是难走的路前路是路错的路也是路后路也是路只有原地不动时没有路