位置: 文档库 > Python > 基于Bearer Token的REST API认证教程

基于Bearer Token的REST API认证教程

康熙帝 上传于 2022-10-12 16:39

《基于Bearer Token的REST API认证教程》

在当今的Web开发中,RESTful API因其简洁性和可扩展性被广泛应用。然而,随着API的普及,安全性问题也日益凸显。如何确保只有授权用户才能访问API资源,成为开发者必须解决的课题。Bearer Token认证机制凭借其简单高效的特点,成为REST API认证的主流方案之一。本文将通过Python实现一个完整的Bearer Token认证流程,涵盖Token生成、验证及API保护的全过程。

一、Bearer Token认证原理

Bearer Token是一种基于令牌的认证机制,属于OAuth 2.0协议的一部分。其核心思想是:客户端通过某种方式(如用户名密码)获取Token后,在后续请求中通过HTTP头部的Authorization: Bearer 字段携带该Token。服务端验证Token的有效性后,决定是否允许访问资源。

与传统的Session认证相比,Bearer Token具有以下优势:

  • 无状态性:服务端无需存储会话信息,降低服务器压力
  • 跨域支持:天然适合前后端分离架构
  • 灵活性:Token可设置过期时间、支持刷新机制

二、Python实现环境准备

我们将使用以下Python库:

  • Flask:轻量级Web框架
  • PyJWT:JWT(JSON Web Token)编解码库
  • datetime:处理Token过期时间

首先安装依赖:

pip install flask pyjwt

三、Token生成与验证实现

1. 生成JWT Token

JWT由三部分组成:Header、Payload、Signature。我们使用PyJWT库生成Token:

import jwt
from datetime import datetime, timedelta

SECRET_KEY = "your-secret-key-here"  # 实际应用中应从环境变量获取
ALGORITHM = "HS256"

def generate_token(user_id, expiry_hours=1):
    """
    生成JWT Token
    :param user_id: 用户标识
    :param expiry_hours: Token有效期(小时)
    :return: JWT字符串
    """
    try:
        payload = {
            "user_id": user_id,
            "exp": datetime.utcnow() + timedelta(hours=expiry_hours)
        }
        token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
        return token
    except Exception as e:
        print(f"Token生成失败: {e}")
        return None

关键点说明:

  • exp字段设置Token过期时间(UTC时间)
  • SECRET_KEY必须保密,建议使用强密钥
  • ALGORITHM推荐使用HS256或RS256

2. 验证JWT Token

服务端需要验证Token的有效性:

def verify_token(token):
    """
    验证JWT Token
    :param token: 客户端传入的Token
    :return: (用户ID, 错误信息) 元组
    """
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload["user_id"], None
    except jwt.ExpiredSignatureError:
        return None, "Token已过期"
    except jwt.InvalidTokenError:
        return None, "无效的Token"
    except Exception as e:
        return None, f"验证失败: {e}"

常见异常处理:

  • ExpiredSignatureError:Token过期
  • InvalidTokenError:Token格式错误或签名不匹配

四、Flask API认证实现

1. 创建认证端点

实现登录接口,用户提供凭证后返回Token:

from flask import Flask, request, jsonify

app = Flask(__name__)

# 模拟用户数据库
users_db = {
    "admin": {"password": "admin123", "user_id": 1},
    "user1": {"password": "pass123", "user_id": 2}
}

@app.route("/api/login", methods=["POST"])
def login():
    auth_data = request.get_json()
    username = auth_data.get("username")
    password = auth_data.get("password")
    
    if not username or not password:
        return jsonify({"error": "用户名和密码不能为空"}), 400
    
    user = users_db.get(username)
    if not user or user["password"] != password:
        return jsonify({"error": "用户名或密码错误"}), 401
    
    token = generate_token(user["user_id"])
    return jsonify({"token": token})

2. 创建受保护资源

实现需要认证的API端点:

def token_required(f):
    """装饰器:验证Token的中间件"""
    def decorated(*args, **kwargs):
        token = None
        if "Authorization" in request.headers:
            auth_header = request.headers["Authorization"]
            if auth_header.startswith("Bearer "):
                token = auth_header.split(" ")[1]
        
        if not token:
            return jsonify({"error": "Token未提供"}), 401
        
        user_id, error = verify_token(token)
        if error:
            return jsonify({"error": error}), 401
        
        # 将用户ID添加到请求上下文
        return f(user_id, *args, **kwargs)
    return decorated

@app.route("/api/protected", methods=["GET"])
@token_required
def protected_route(user_id):
    """受保护的API端点"""
    return jsonify({"message": f"欢迎用户{user_id},您已成功访问受保护资源"})

3. 完整Flask应用

if __name__ == "__main__":
    app.run(debug=True)

五、测试认证流程

1. 获取Token

使用curl测试登录接口:

curl -X POST http://localhost:5000/api/login \
-H "Content-Type: application/json" \
-d '{"username":"admin","password":"admin123"}'

成功响应示例:

{
  "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoxLCJleHAiOjE2MjM0NTY3ODh9.XXX"
}

2. 访问受保护资源

携带Token访问:

curl -X GET http://localhost:5000/api/protected \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoxLCJleHAiOjE2MjM0NTY3ODh9.XXX"

成功响应示例:

{
  "message": "欢迎用户1,您已成功访问受保护资源"
}

3. 错误场景测试

  • 未提供Token:返回401错误
  • 过期Token:返回"Token已过期"
  • 无效Token:返回"无效的Token"

六、进阶优化

1. Token刷新机制

实现短期访问Token+长期刷新Token的模式:

def generate_refresh_token(user_id, expiry_days=7):
    payload = {
        "user_id": user_id,
        "type": "refresh",
        "exp": datetime.utcnow() + timedelta(days=expiry_days)
    }
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

@app.route("/api/refresh", methods=["POST"])
@token_required
def refresh_token(user_id):
    # 验证刷新Token的逻辑
    # 生成新的访问Token
    new_access_token = generate_token(user_id)
    return jsonify({"access_token": new_access_token})

2. 使用HTTPS

生产环境必须启用HTTPS,防止Token在传输过程中被截获。

3. 令牌撤销机制

对于需要立即失效的Token,可维护一个黑名单:

from flask import g

REVOKED_TOKENS = set()

def is_token_revoked(token):
    # 实际实现可能需要解析Token获取jti
    return token in REVOKED_TOKENS

def revoke_token(token):
    REVOKED_TOKENS.add(token)

七、常见问题解决方案

1. 时钟不同步问题

确保服务端和客户端时钟同步,或使用nbf(Not Before)字段设置生效时间。

2. Token存储安全

客户端应将Token存储在:

  • HTTP-only Cookie(防止XSS攻击)
  • 或安全的本地存储(如React的secure storage)

3. 跨域问题

在Flask中配置CORS:

from flask_cors import CORS
CORS(app, resources={r"/api/*": {"origins": "*"}})

八、完整代码示例

将所有代码整合后的完整实现:

import jwt
from datetime import datetime, timedelta
from flask import Flask, request, jsonify
from functools import wraps

app = Flask(__name__)
SECRET_KEY = "your-256-bit-secret"
ALGORITHM = "HS256"

users_db = {
    "admin": {"password": "admin123", "user_id": 1},
    "user1": {"password": "pass123", "user_id": 2}
}

def generate_token(user_id, expiry_hours=1):
    payload = {
        "user_id": user_id,
        "exp": datetime.utcnow() + timedelta(hours=expiry_hours)
    }
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def verify_token(token):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload["user_id"], None
    except Exception as e:
        return None, str(e)

def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = None
        if "Authorization" in request.headers:
            auth_header = request.headers["Authorization"]
            if auth_header.startswith("Bearer "):
                token = auth_header.split(" ")[1]
        
        if not token:
            return jsonify({"error": "Token未提供"}), 401
        
        user_id, error = verify_token(token)
        if error:
            return jsonify({"error": error}), 401
        
        return f(user_id, *args, **kwargs)
    return decorated

@app.route("/api/login", methods=["POST"])
def login():
    auth_data = request.get_json()
    username = auth_data.get("username")
    password = auth_data.get("password")
    
    if not username or not password:
        return jsonify({"error": "用户名和密码不能为空"}), 400
    
    user = users_db.get(username)
    if not user or user["password"] != password:
        return jsonify({"error": "用户名或密码错误"}), 401
    
    token = generate_token(user["user_id"])
    return jsonify({"token": token})

@app.route("/api/protected", methods=["GET"])
@token_required
def protected_route(user_id):
    return jsonify({"message": f"欢迎用户{user_id},您已成功访问受保护资源"})

if __name__ == "__main__":
    app.run(debug=True)

关键词

Bearer Token、REST API认证、JWT、Python、Flask、PyJWT、OAuth 2.0、Token生成、Token验证、HTTPS、CORS、令牌刷新、安全存储

简介

本文详细介绍了基于Bearer Token的REST API认证机制在Python中的实现方法。通过Flask框架和PyJWT库,演示了从Token生成、验证到API保护的全流程,涵盖了常见问题解决方案和进阶优化技巧,适合开发安全可靠的Web服务时参考。