143 lines
3.6 KiB
Python
143 lines
3.6 KiB
Python
"""
|
|
认证路由
|
|
"""
|
|
from flask import Blueprint, request, jsonify
|
|
from flask_jwt_extended import (
|
|
create_access_token, create_refresh_token,
|
|
jwt_required, get_jwt_identity
|
|
)
|
|
from datetime import datetime
|
|
import bcrypt
|
|
from app.models import db, User
|
|
|
|
auth_bp = Blueprint('auth', __name__)
|
|
|
|
|
|
@auth_bp.route('/register', methods=['POST'])
|
|
def register():
|
|
"""注册新用户"""
|
|
data = request.get_json()
|
|
|
|
username = data.get('username')
|
|
email = data.get('email')
|
|
password = data.get('password')
|
|
nickname = data.get('nickname')
|
|
|
|
if not all([username, email, password]):
|
|
return jsonify({'error': 'Missing required fields'}), 400
|
|
|
|
# 检查用户是否已存在
|
|
if User.query.filter_by(username=username).first():
|
|
return jsonify({'error': 'Username already exists'}), 400
|
|
|
|
if User.query.filter_by(email=email).first():
|
|
return jsonify({'error': 'Email already exists'}), 400
|
|
|
|
# 密码哈希
|
|
password_hash = bcrypt.hashpw(
|
|
password.encode('utf-8'),
|
|
bcrypt.gensalt()
|
|
).decode('utf-8')
|
|
|
|
# 创建用户
|
|
user = User(
|
|
username=username,
|
|
email=email,
|
|
password_hash=password_hash,
|
|
nickname=nickname or username
|
|
)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'message': 'User registered successfully',
|
|
'user': user.to_dict()
|
|
}), 201
|
|
|
|
|
|
@auth_bp.route('/login', methods=['POST'])
|
|
def login():
|
|
"""用户登录"""
|
|
data = request.get_json()
|
|
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
|
|
if not all([username, password]):
|
|
return jsonify({'error': 'Missing username or password'}), 400
|
|
|
|
# 查找用户
|
|
user = User.query.filter_by(username=username).first()
|
|
if not user:
|
|
return jsonify({'error': 'Invalid username or password'}), 401
|
|
|
|
# 验证密码
|
|
if not bcrypt.checkpw(password.encode('utf-8'), user.password_hash.encode('utf-8')):
|
|
return jsonify({'error': 'Invalid username or password'}), 401
|
|
|
|
# 检查用户状态
|
|
if user.status != 'active':
|
|
return jsonify({'error': 'Account is disabled'}), 403
|
|
|
|
# 更新最后登录时间
|
|
user.last_login_at = datetime.utcnow()
|
|
db.session.commit()
|
|
|
|
# 生成 Token
|
|
access_token = create_access_token(identity=user.id)
|
|
refresh_token = create_refresh_token(identity=user.id)
|
|
|
|
return jsonify({
|
|
'access_token': access_token,
|
|
'refresh_token': refresh_token,
|
|
'user': user.to_dict()
|
|
}), 200
|
|
|
|
|
|
@auth_bp.route('/refresh', methods=['POST'])
|
|
@jwt_required(refresh=True)
|
|
def refresh():
|
|
"""刷新 Token"""
|
|
identity = get_jwt_identity()
|
|
access_token = create_access_token(identity=identity)
|
|
|
|
return jsonify({
|
|
'access_token': access_token
|
|
}), 200
|
|
|
|
|
|
@auth_bp.route('/me', methods=['GET'])
|
|
@jwt_required()
|
|
def get_current_user():
|
|
"""获取当前用户信息"""
|
|
user_id = get_jwt_identity()
|
|
user = User.query.get(user_id)
|
|
|
|
if not user:
|
|
return jsonify({'error': 'User not found'}), 404
|
|
|
|
return jsonify({'user': user.to_dict()}), 200
|
|
|
|
|
|
@auth_bp.route('/logout', methods=['POST'])
|
|
@jwt_required()
|
|
def logout():
|
|
"""用户登出"""
|
|
return jsonify({'message': 'Logged out successfully'}), 200
|
|
|
|
|
|
@auth_bp.route('/verify', methods=['POST'])
|
|
@jwt_required()
|
|
def verify_token():
|
|
"""验证 Token 有效性"""
|
|
user_id = get_jwt_identity()
|
|
user = User.query.get(user_id)
|
|
|
|
if not user:
|
|
return jsonify({'valid': False}), 401
|
|
|
|
return jsonify({
|
|
'valid': True,
|
|
'user': user.to_dict()
|
|
}), 200
|