(七)FastAPI的认证与授权
认证与授权是 Web 应用程序中至关重要的两个部分。认证用于确认用户身份,授权则决定用户是否有权访问特定资源。FastAPI 提供了灵活而强大的认证与授权机制,下面我们将详细介绍如何使用这些功能。
1. 什么是认证和授权
- 认证(Authentication) :验证用户身份的过程。例如,检查用户名和密码是否正确。
- 授权(Authorization) :确定经过认证的用户是否有权访问特定资源或执行特定操作。
2. 认证方案
FastAPI 提供了多种认证方案,如基于 OAuth2、API Key、HTTP Basic Auth 等的认证方式。我们以 OAuth2 为例,演示如何实现认证。
2.1 使用 OAuth2 实现认证
OAuth2 是一种广泛使用的认证协议。FastAPI 提供了内置的支持来实现 OAuth2 认证。
2.1.1 安装依赖
首先,确保你已经安装了 fastapi
和 uvicorn
:
pip install fastapi uvicorn
2.1.2 配置 OAuth2PasswordBearer
OAuth2PasswordBearer
是一个用于定义 Token URL 的类。创建一个 FastAPI 实例并配置 OAuth2PasswordBearer:
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@app.get("/users/me")
async def read_users_me(token: str = Depends(oauth2_scheme)):
# token 将包含 Bearer 令牌
# 在这里可以进一步处理和验证 token
return {"token": token}
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
:这行代码初始化了一个OAuth2PasswordBearer
实例。tokenUrl
参数指定了获取访问令牌的 URL(在我们的例子中是/token
)。- 这个实例(
oauth2_scheme
)可以用作依赖项,FastAPI 会自动处理请求中的认证信息。 async def read_users_me(token: str = Depends(oauth2_scheme))
:视图函数read_users_me
使用Depends(oauth2_scheme)
作为参数时,FastAPI 会执行以下步骤:
- 检查请求头部:FastAPI 查看 HTTP 请求头部中的
Authorization
字段。 - 提取令牌:从
Authorization
头部中提取 Bearer 令牌。例如,Authorization: Bearer <token>
中的<token>
部分。 - 传递令牌:将提取到的令牌作为参数传递给依赖函数。
2.1.3 实现 Token 端点
实现 /token
端点来生成和返回 token。在这个例子中,我们将使用假数据来演示:
from jose import JWTError, jwt
from datetime import datetime, timedelta
from pydantic import BaseModel
SECRET_KEY = "mysecretkey"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
password: str
fake_users_db = {
"johndoe": {
"username": "johndoe",
"password": "secret"
}
}
def verify_password(plain_password, hashed_password):
return plain_password == hashed_password
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return User(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
2.1.4 获取当前用户
使用 Depends
和 oauth2_scheme
来获取当前用户:
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
2.1.5 保护路由
使用 get_current_user
依赖项来保护需要认证的路由:
@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
3. 授权机制
在确保用户已认证后,我们可以实现授权逻辑。例如,不同角色的用户可以访问不同的资源。
3.1 基于角色的授权
我们可以扩展用户模型来包含角色信息,并在路由中检查角色。
3.1.1 更新用户模型
class UserInDB(User):
hashed_password: str
role: str
fake_users_db = {
"johndoe": {
"username": "johndoe",
"hashed_password": "secret",
"role": "admin"
},
"janedoe": {
"username": "janedoe",
"hashed_password": "secret",
"role": "user"
}
}
3.1.2 角色检查
在路由中添加角色检查逻辑:
def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="Not enough permissions")
return current_user
@app.get("/admin/")
async def read_admin_data(current_user: User = Depends(get_current_active_user)):
return {"msg": "This is admin data"}
4. 示例代码汇总
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from datetime import datetime, timedelta
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
SECRET_KEY = "mysecretkey"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
role: str
class UserInDB(User):
hashed_password: str
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
fake_users_db = {
"johndoe": {
"username": "johndoe",
"hashed_password": "secret",
"role": "admin"
},
"janedoe": {
"username": "janedoe",
"hashed_password": "secret",
"role": "user"
}
}
def verify_password(plain_password, hashed_password):
return plain_password == hashed_password
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="Not enough permissions")
return current_user
@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
@app.get("/admin/")
async def read_admin_data(current_user: User = Depends(get_current_active_user)):
return {"msg": "This is admin data"}
通过本文的详细介绍,你现在应该能够在 FastAPI 中实现健壮的认证与授权机制。利用 OAuth2 实现认证,并基于角色实现简单的授权逻辑,可以有效保护应用程序的资源。希望本文能帮助你更好地理解并实现 FastAPI 的认证与授权功能。
转载自:https://juejin.cn/post/7379897208533286966