likes
comments
collection
share

DRF | APIView 源码阅读 + Authentication + Permission 认证组件分析

作者站长头像
站长
· 阅读数 20

APIView 的源码

APIView是REST framework提供的所有视图的基类,继承自Django的View类

APIView与View的不同点为:

传入到视图方法中的是REST framework的Request对象,而不是Django的HttpRequeset对象 视图方法可以返回REST framework的Response对象,视图会为响应数据设置(render)符合前端期望要求格式

任何APIException异常都会被捕获到,并且处理成合适格式(json)的响应信息返回给客户端 会重新声明了一个新的as_views方法并在dispatch()进行路由分发前,对请求的客户端进行身份认证、权限检查、流量控制

APIView还新增了如下的类属性:

authentication_classes 列表或元组,身份认证类 permissoin_classes 列表或元组,权限检查类 throttle_classes 列表或元祖,流量控制类

DRF | APIView 源码阅读 + Authentication + Permission 认证组件分析

DRF | APIView 源码阅读 + Authentication + Permission 认证组件分析

DRF | APIView 源码阅读 + Authentication + Permission 认证组件分析

好长好长.....我尽力讲我们经常用到的。

APIView 的类属性

The following policies may be set at either globally, or per-view.

以下内容可以被全局应用也可以局部应用

parser_classes = api_settings.DEFAULT_PARSER_CLASSES (分页器)常用

authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES (认证器)常用

throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES (拦截器)常用

permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES (权限器)常用

content_negotiation_class = api_settings.DEFAULT_CONTENT_NEGOTIATION_CLASS (不常用)

renderer_classes = api_settings.DEFAULT_RENDERER_CLASSES(渲染器)

metadata_class = api_settings.DEFAULT_METADATA_CLASS (不常用)

versioning_class = api_settings.DEFAULT_VERSIONING_CLASS(不常用)

as_view()

哈哈哈,我们又回到了as_view()! 来,我们先看一下代码!

@classmethod  
def as_view(cls, **initkwargs):  
    """  
    Store the original class on the view function.  

    This allows us to discover information about the view when we do URL  
    reverse lookups. Used for breadcrumb generation.  
    """  
    if isinstance(getattr(cls, 'queryset', None), models.query.QuerySet):  
        def force_evaluation():  
            raise RuntimeError(  
                'Do not evaluate the `.queryset` attribute directly, '  
                'as the result will be cached and reused between requests. '  
                'Use `.all()` or call `.get_queryset()` instead.'  
                )  
        cls.queryset._fetch_all = force_evaluation  

    view = super().as_view(**initkwargs)  
    view.cls = cls  
    view.initkwargs = initkwargs  

    # Note: session based authentication is explicitly CSRF validated,  
    # all other authentication is CSRF exempt.  
    return csrf_exempt(view)
This allows us to discover information about the view when we do URL reverse lookups.  Used for breadcrumb generation.


这个方法使我们就能在进行 URL 反向查找时发现视图的相关信息。 用于生成面包屑。

isinstance 方法:判断 a 是不是 b 的对象

if isinstance(getattr(cls, 'queryset', None), models.query.QuerySet):  
    def force_evaluation():  
        raise RuntimeError(  
            'Do not evaluate the `.queryset` attribute directly, '  
            'as the result will be cached and reused between requests. '  
            'Use `.all()` or call `.get_queryset()` instead.'  
            )  
    cls.queryset._fetch_all = force_evaluation
    

判断条件

if isinstance(getattr(cls, 'queryset', None), models.query.QuerySet):

判断cls.queryset是不是 在模型层里面的qs对象。如果是的话,定义一个方法叫 force_evaluation, 用来捕获异常,报错

view = super().as_view(**initkwargs) 这句话直接跑到父级,从父级返回回来view给我们使用。

最后返回return csrf_exempt(view),一个去除了CSRF认证的view函数

认证组件

from rest_framework.authentication import BaseAuthentication, BasicAuthentication

BaseAuthentication

class BaseAuthentication:  
    """  
    All authentication classes should extend BaseAuthentication.  
    """  

    def authenticate(self, request):  
    """  
    Authenticate the request and return a two-tuple of (user, token).  
    """  
        raise NotImplementedError(".authenticate() must be overridden.")  

    def authenticate_header(self, request):  
    """  
    Return a string to be used as the value of the `WWW-Authenticate`  
    header in a `401 Unauthenticated` response, or `None` if the  
    authentication scheme should return `403 Permission Denied` responses.  
    """  
        pass

老样子,我们先看源码

class BasicAuthentication(BaseAuthentication):  
    """  
    HTTP Basic authentication against username/password.  
    """  
    www_authenticate_realm = 'api'  

    def authenticate(self, request):  
        """  
        Returns a `User` if a correct username and password have been supplied  
        using HTTP Basic authentication. Otherwise returns `None`.  
        """  
        auth = get_authorization_header(request).split()  

        if not auth or auth[0].lower() != b'basic':  
            return None  

        if len(auth) == 1:  
            msg = _('Invalid basic header. No credentials provided.')  
            raise exceptions.AuthenticationFailed(msg)  
            elif len(auth) > 2:  
            msg = _('Invalid basic header. Credentials string should not contain spaces.')  
            raise exceptions.AuthenticationFailed(msg)  

        try:  
            try:  
                auth_decoded = base64.b64decode(auth[1]).decode('utf-8')  
            except UnicodeDecodeError:  
                auth_decoded = base64.b64decode(auth[1]).decode('latin-1')  
                auth_parts = auth_decoded.partition(':')  
        except (TypeError, UnicodeDecodeError, binascii.Error):  
            msg = _('Invalid basic header. Credentials not correctly base64 encoded.')  
            raise exceptions.AuthenticationFailed(msg)  

        userid, password = auth_parts[0], auth_parts[2]  
        return self.authenticate_credentials(userid, password, request)  

    def authenticate_credentials(self, userid, password, request=None):  
        """  
        Authenticate the userid and password against username and password  
        with optional request for context.  
        """  
        credentials = {  
            get_user_model().USERNAME_FIELD: userid,  
            'password': password  
        }  
        user = authenticate(request=request, **credentials)  

        if user is None:  
            raise exceptions.AuthenticationFailed(_('Invalid username/password.'))  

        if not user.is_active:  
            raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))  

        return (user, None)  

    def authenticate_header(self, request):  
        return 'Basic realm="%s"' % self.www_authenticate_realm

def authenticate(self, request)

auth = get_authorization_header(request).split() 先通过类属性定义的方法,获取到request请求里面的认证头。

if not auth or auth[0].lower() != b'basic': return None 如果 auth里面没有东西,或者 auth取0下标,转小写不等于 basic的编码,都返回None

如果len(auth) == 1 或者 len(auth) > 2, 统统捕获异常。

尝试通过两种解码方式解码

auth_decoded = base64.b64decode(auth[1]).decode('utf-8')

或者

auth_decoded = base64.b64decode(auth[1]).decode('latin-1')

一个是 utf-8,一个是Latin-1.

如果解码成功,分割auth,用auth_parts = auth_decoded.partition(':')。如果解码不成功,捕获异常。

分割成功后,我们可以获得 userid, 和password

最后我们在作死的边缘反复横跳, return self.authenticate_credentials(userid, password, request)

def authenticate_credentials(self, userid, password, request=None)

def authenticate_credentials(self, userid, password, request=None):  
    """  
    Authenticate the userid and password against username and password  
    with optional request for context.  
    """  
    credentials = {  
        get_user_model().USERNAME_FIELD: userid,  
        'password': password  
    }  
    user = authenticate(request=request, **credentials)  

    if user is None:  
        raise exceptions.AuthenticationFailed(_('Invalid username/password.'))  

    if not user.is_active:  
        raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))  

    return (user, None)

分析一下,credentials传进来一个字典

credentials = {  
    get_user_model().USERNAME_FIELD: userid,  
    'password': password  
}

把字典打散传回__init__.py下面的authenticate(request=None, **credentials)方法。这里就先不看校验credential的方法了,但是通过这里,如果我们传进去的是一个合法的credentials,我们会得到一个user object对象

最后, 我们通过两个筛选条件if user is None:if not user.is_active:。我们会得到return (user, None)

权限组件

from rest_framework.permissions import BasePermission

AllowAny

允许所有授权(通常不会这么用,可以直接用空代替,但是这么写可以让扩展性更高)

class AllowAny(BasePermission):  
    """  
    允许所有授权(通常不会这么用,可以直接用空代替,但是这么写可以让扩展性更高)
    """  

    def has_permission(self, request, view):  
        return True

IsAuthenticated

只给认证后的用户授权

**class IsAuthenticated(BasePermission):  
    """  
    Allows access only to authenticated users.  (只给认证后的用户授权)
    """  

    def has_permission(self, request, view):  
        return bool(request.user and request.user.is_authenticated)**

通过调用认证组件里面的判断是否认证来返回布尔值,来进行授权。

判断是否认证的方法我们自己重新写。

def has_permission(self, request, view):