1.安装tortoise-orm
pip install tortoise-orm
2.新建models.py来创建数据库模型类
from tortoise import fields
from tortoise.models import Model
class User(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=100)
email = fields.CharField(max_length=100, unique=True)
age = fields.IntField()
from tortoise.models import Model
class User(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=100)
email = fields.CharField(max_length=100, unique=True)
age = fields.IntField()
3.在main.py中实现数据库增删改查
from fastapi import FastAPI
from tortoise.contrib.fastapi import register_tortoise
import uvicorn
from models import User
app = FastAPI()
# 注册 Tortoise ORM
register_tortoise(
app,
#数据库为sqlite
db_url="sqlite://db.sqlite3",
#数据库为mysql
# db_url="mysql://username:password@host:port/database_name",
#数据库为postgresql
# db_url="postgres://username:password@host:port/database_name",
modules={"models": ["models"]},
generate_schemas=True, #自动生成数据库表结构,开发环境可以设置为True
add_exception_handlers=True, #添加全局异常处理程序,开发环境可以设置为True
)
# 创建用户
@app.post("/users")
async def create_user():
return await User.create(name="张三",email="zhangsan@mail.com",age=18)
# 获取所有用户
@app.get("/users")
async def get_users():
#只查询name和age字段
#users = await User.all().values("name", "age")
#根据条件查询年龄等于18岁的,并且只查询name和age字段
#users = await User.filter(age=18).values("name", "age")
#根据条件查询年龄,大于是__gt,小于是__lt,大于等于是__gte,小于等于是__lte
#users = await User.filter(age__gt=18).values("name", "age")
#查询所有字段
users = await User.all()
return users
# 获取单个用户
@app.get("/users/{user_id}")
async def get_user(user_id: int):
return await User.get(id=user_id)
# 更新用户
@app.put("/users/{user_id}")
async def update_user(user_id: int):
await User.filter(id=user_id).update(name="李四")
return await User.get(id=user_id)
# 删除用户
@app.delete("/users/{user_id}")
async def delete_user(user_id: int):
deleted_count = await User.filter(id=user_id).delete()
if not deleted_count:
return {"message": f"用户{user_id}未找到"}
return {"message": f"用户{user_id}删除成功"}
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000)
from tortoise.contrib.fastapi import register_tortoise
import uvicorn
from models import User
app = FastAPI()
# 注册 Tortoise ORM
register_tortoise(
app,
#数据库为sqlite
db_url="sqlite://db.sqlite3",
#数据库为mysql
# db_url="mysql://username:password@host:port/database_name",
#数据库为postgresql
# db_url="postgres://username:password@host:port/database_name",
modules={"models": ["models"]},
generate_schemas=True, #自动生成数据库表结构,开发环境可以设置为True
add_exception_handlers=True, #添加全局异常处理程序,开发环境可以设置为True
)
# 创建用户
@app.post("/users")
async def create_user():
return await User.create(name="张三",email="zhangsan@mail.com",age=18)
# 获取所有用户
@app.get("/users")
async def get_users():
#只查询name和age字段
#users = await User.all().values("name", "age")
#根据条件查询年龄等于18岁的,并且只查询name和age字段
#users = await User.filter(age=18).values("name", "age")
#根据条件查询年龄,大于是__gt,小于是__lt,大于等于是__gte,小于等于是__lte
#users = await User.filter(age__gt=18).values("name", "age")
#查询所有字段
users = await User.all()
return users
# 获取单个用户
@app.get("/users/{user_id}")
async def get_user(user_id: int):
return await User.get(id=user_id)
# 更新用户
@app.put("/users/{user_id}")
async def update_user(user_id: int):
await User.filter(id=user_id).update(name="李四")
return await User.get(id=user_id)
# 删除用户
@app.delete("/users/{user_id}")
async def delete_user(user_id: int):
deleted_count = await User.filter(id=user_id).delete()
if not deleted_count:
return {"message": f"用户{user_id}未找到"}
return {"message": f"用户{user_id}删除成功"}
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000)