likes
comments
collection
share

(七)FastAPI的认证与授权

作者站长头像
站长
· 阅读数 32

认证与授权是 Web 应用程序中至关重要的两个部分。认证用于确认用户身份,授权则决定用户是否有权访问特定资源。FastAPI 提供了灵活而强大的认证与授权机制,下面我们将详细介绍如何使用这些功能。

1. 什么是认证和授权

  • 认证(Authentication) :验证用户身份的过程。例如,检查用户名和密码是否正确。
  • 授权(Authorization) :确定经过认证的用户是否有权访问特定资源或执行特定操作。

2. 认证方案

FastAPI 提供了多种认证方案,如基于 OAuth2、API Key、HTTP Basic Auth 等的认证方式。我们以 OAuth2 为例,演示如何实现认证。

2.1 使用 OAuth2 实现认证

OAuth2 是一种广泛使用的认证协议。FastAPI 提供了内置的支持来实现 OAuth2 认证。

2.1.1 安装依赖

首先,确保你已经安装了 fastapiuvicorn

pip install fastapi uvicorn
2.1.2 配置 OAuth2PasswordBearer

OAuth2PasswordBearer 是一个用于定义 Token URL 的类。创建一个 FastAPI 实例并配置 OAuth2PasswordBearer:

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

@app.get("/users/me")
async def read_users_me(token: str = Depends(oauth2_scheme)):
    # token 将包含 Bearer 令牌
    # 在这里可以进一步处理和验证 token
    return {"token": token}
  • oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token"):这行代码初始化了一个 OAuth2PasswordBearer 实例。
  • tokenUrl 参数指定了获取访问令牌的 URL(在我们的例子中是 /token)。
  • 这个实例(oauth2_scheme)可以用作依赖项,FastAPI 会自动处理请求中的认证信息。
  • async def read_users_me(token: str = Depends(oauth2_scheme)):视图函数read_users_me使用 Depends(oauth2_scheme) 作为参数时,FastAPI 会执行以下步骤:
  1. 检查请求头部:FastAPI 查看 HTTP 请求头部中的 Authorization 字段。
  2. 提取令牌:从 Authorization 头部中提取 Bearer 令牌。例如,Authorization: Bearer <token> 中的 <token> 部分。
  3. 传递令牌:将提取到的令牌作为参数传递给依赖函数。
2.1.3 实现 Token 端点

实现 /token 端点来生成和返回 token。在这个例子中,我们将使用假数据来演示:

from jose import JWTError, jwt
from datetime import datetime, timedelta
from pydantic import BaseModel

SECRET_KEY = "mysecretkey"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: str | None = None

class User(BaseModel):
    username: str
    password: str

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "password": "secret"
    }
}

def verify_password(plain_password, hashed_password):
    return plain_password == hashed_password

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return User(**user_dict)

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.password):
        return False
    return user

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"} 
2.1.4 获取当前用户

使用 Dependsoauth2_scheme 来获取当前用户:

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user
2.1.5 保护路由

使用 get_current_user 依赖项来保护需要认证的路由:

@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

3. 授权机制

在确保用户已认证后,我们可以实现授权逻辑。例如,不同角色的用户可以访问不同的资源。

3.1 基于角色的授权

我们可以扩展用户模型来包含角色信息,并在路由中检查角色。

3.1.1 更新用户模型
class UserInDB(User):
    hashed_password: str
    role: str

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "hashed_password": "secret",
        "role": "admin"
    },
    "janedoe": {
        "username": "janedoe",
        "hashed_password": "secret",
        "role": "user"
    }
}
3.1.2 角色检查

在路由中添加角色检查逻辑:

def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.role != "admin":
        raise HTTPException(status_code=403, detail="Not enough permissions")
    return current_user

@app.get("/admin/")
async def read_admin_data(current_user: User = Depends(get_current_active_user)):
    return {"msg": "This is admin data"}

4. 示例代码汇总

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from datetime import datetime, timedelta
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

SECRET_KEY = "mysecretkey"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Optional[str] = None

class User(BaseModel):
    username: str
    role: str

class UserInDB(User):
    hashed_password: str

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "hashed_password": "secret",
        "role": "admin"
    },
    "janedoe": {
        "username": "janedoe",
        "hashed_password": "secret",
        "role": "user"
    }
}

def verify_password(plain_password, hashed_password):
    return plain_password == hashed_password

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.role != "admin":
        raise HTTPException(status_code=403, detail="Not enough permissions")
    return current_user

@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

@app.get("/admin/")
async def read_admin_data(current_user: User = Depends(get_current_active_user)):
    return {"msg": "This is admin data"}

通过本文的详细介绍,你现在应该能够在 FastAPI 中实现健壮的认证与授权机制。利用 OAuth2 实现认证,并基于角色实现简单的授权逻辑,可以有效保护应用程序的资源。希望本文能帮助你更好地理解并实现 FastAPI 的认证与授权功能。

转载自:https://juejin.cn/post/7379897208533286966
评论
请登录