Files
pit-router/app/routes/auth.py

143 lines
3.6 KiB
Python
Raw Normal View History

2026-03-14 19:41:36 +08:00
"""
认证路由
"""
from flask import Blueprint, request, jsonify
from flask_jwt_extended import (
create_access_token, create_refresh_token,
jwt_required, get_jwt_identity
)
from datetime import datetime
import bcrypt
from app.models import db, User
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/register', methods=['POST'])
def register():
"""注册新用户"""
data = request.get_json()
username = data.get('username')
email = data.get('email')
password = data.get('password')
nickname = data.get('nickname')
if not all([username, email, password]):
return jsonify({'error': 'Missing required fields'}), 400
# 检查用户是否已存在
if User.query.filter_by(username=username).first():
return jsonify({'error': 'Username already exists'}), 400
if User.query.filter_by(email=email).first():
return jsonify({'error': 'Email already exists'}), 400
# 密码哈希
password_hash = bcrypt.hashpw(
password.encode('utf-8'),
bcrypt.gensalt()
).decode('utf-8')
# 创建用户
user = User(
username=username,
email=email,
password_hash=password_hash,
nickname=nickname or username
)
db.session.add(user)
db.session.commit()
return jsonify({
'message': 'User registered successfully',
'user': user.to_dict()
}), 201
@auth_bp.route('/login', methods=['POST'])
def login():
"""用户登录"""
data = request.get_json()
username = data.get('username')
password = data.get('password')
if not all([username, password]):
return jsonify({'error': 'Missing username or password'}), 400
# 查找用户
user = User.query.filter_by(username=username).first()
if not user:
return jsonify({'error': 'Invalid username or password'}), 401
# 验证密码
if not bcrypt.checkpw(password.encode('utf-8'), user.password_hash.encode('utf-8')):
return jsonify({'error': 'Invalid username or password'}), 401
# 检查用户状态
if user.status != 'active':
return jsonify({'error': 'Account is disabled'}), 403
# 更新最后登录时间
user.last_login_at = datetime.utcnow()
db.session.commit()
# 生成 Token
access_token = create_access_token(identity=user.id)
refresh_token = create_refresh_token(identity=user.id)
return jsonify({
'access_token': access_token,
'refresh_token': refresh_token,
'user': user.to_dict()
}), 200
@auth_bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
"""刷新 Token"""
identity = get_jwt_identity()
access_token = create_access_token(identity=identity)
return jsonify({
'access_token': access_token
}), 200
@auth_bp.route('/me', methods=['GET'])
@jwt_required()
def get_current_user():
"""获取当前用户信息"""
user_id = get_jwt_identity()
user = User.query.get(user_id)
if not user:
return jsonify({'error': 'User not found'}), 404
return jsonify({'user': user.to_dict()}), 200
@auth_bp.route('/logout', methods=['POST'])
@jwt_required()
def logout():
"""用户登出"""
return jsonify({'message': 'Logged out successfully'}), 200
@auth_bp.route('/verify', methods=['POST'])
@jwt_required()
def verify_token():
"""验证 Token 有效性"""
user_id = get_jwt_identity()
user = User.query.get(user_id)
if not user:
return jsonify({'valid': False}), 401
return jsonify({
'valid': True,
'user': user.to_dict()
}), 200