__init__.py 8.1 KB
from enum import auto
from logging import error
from flasgger import swag_from
from app.util import BlueprintApi
from app.util import BlueprintApi
from flask import Blueprint, render_template, redirect, request, session, jsonify
from sqlalchemy import and_
from .models import *
from .oauth2 import authorization, require_oauth, generate_user_info
from authlib.oauth2 import OAuth2Error
from authlib.integrations.flask_oauth2 import current_token
from . import user_create, client_create, client_query, user_query, user_update, user_delete


def current_user():
    if "id" in session:
        uid = session["id"]
        return User.query.get(uid)
    return None


def remove_user():
    user = current_user()
    if user:
        session.pop("id")


def split_by_crlf(s):
    return [v for v in s.splitlines() if v]


class DataManager(BlueprintApi):
    bp = Blueprint("Auth", __name__, url_prefix="/auth")

    # @staticmethod
    # @bp.route("/test", methods=("GET", "POST"))
    # def Test():
    #     res = {}
    #     try:
    #         res["user"] = User.query.all()
    #     except Exception as e:
    #         raise e
    #     return res

    # @staticmethod
    # @bp.route("/login", methods=("GET", "POST"))
    # def Login():
    #     if request.method == "POST":
    #         username = request.form["username"]
    #         password = request.form["password"]
    #         user = User.query.filter_by(username=username).first()
    #         if not user:
    #             user = User(username=username,
    #                         password=password, role="admin")
    #             db.session.add(user)
    #             db.session.commit()
    #         session["id"] = user.id
    #         return redirect("/auth/authorize")
    #     user = current_user()
    #     if user:
    #         clients = OAuth2Client.query.filter_by(user_id=user.id).all()
    #     else:
    #         clients = []
    #     return render_template("auth/authorize.html", user=user, clients=clients)

    # @staticmethod
    # @bp.route("/create_client", methods=("GET", "POST"))
    # def create_client():
    #     user = current_user()
    #     if not user:
    #         return redirect("/auth/login")
    #     if request.method == "GET":
    #         return render_template("auth/create_client.html")
    #     form = request.form
    #     client_id = gen_salt(24)
    #     client = OAuth2Client(client_id=client_id, user_id=user.id)
    #     # Mixin doesn"t set the issue_at date
    #     client.client_id_issued_at = int(time.time())
    #     if client.token_endpoint_auth_method == "none":
    #         client.client_secret = ""
    #     else:
    #         client.client_secret = gen_salt(48)
    #     client_metadata = {
    #         "client_name": form["client_name"],
    #         "client_uri": form["client_uri"],
    #         "grant_types": split_by_crlf(form["grant_type"]),
    #         "redirect_uris": split_by_crlf(form["redirect_uri"]),
    #         "response_types": split_by_crlf(form["response_type"]),
    #         "scope": form["scope"],
    #         "token_endpoint_auth_method": form["token_endpoint_auth_method"]
    #     }
    #     client.set_client_metadata(client_metadata)
    #     db.session.add(client)
    #     db.session.commit()
    #     return redirect("/auth/login")

    @staticmethod
    @bp.route("/authorize", methods=("GET", "POST"))
    def authorize():
        user = current_user()
        if request.method == "GET":
            # 没有登录,跳转到登录界面
            try:
                grant = authorization.validate_consent_request(end_user=user)
            except OAuth2Error as error:
                return jsonify(dict(error.get_body()))
            if not user:
                return render_template("auth/authorize.html", user=user, grant=grant)
            # return render_template("auth/login1.html", user=user, grant=grant)
        error = ""
        if not user:
            if not "username" in request.form or not request.form.get("username"):
                error = "用户名不可为空"
            elif not "password" in request.form or not request.form.get("password"):
                error = "密码不可为空"
            else:
                username = request.form.get("username")
                password = request.form.get("password")
                user = User.query.filter_by(
                    username=username, password=password).first()
                if not user:
                    error = "账号或密码不正确"

        if user:
            session["id"] = user.id
            grant_user = user
            return authorization.create_authorization_response(grant_user=grant_user)

        try:
            grant = authorization.validate_consent_request(end_user=user)
        except OAuth2Error as error:
            return jsonify(dict(error.get_body()))
        return render_template("auth/authorize.html", user=user, grant=grant, error=error)

        # if request.form["confirm"]:
        #     grant_user = user
        # else:
        #     grant_user = None

    @staticmethod
    @bp.route("/token", methods=["POST"])
    def issue_token():
        return authorization.create_token_response()

    @staticmethod
    @bp.route("/userinfo")
    @require_oauth("profile")
    def api_me():
        try:
            return jsonify(generate_user_info(current_token.user, current_token.scope))
        except error as e:
            return jsonify(dict(e.get_body()))

    @staticmethod
    @bp.route("/logout", methods=["GET"])
    # @require_oauth("profile")
    def logout():
        url = ''
        try:
            user = current_user()
            grant = authorization.validate_consent_request(end_user=user)
            access_token = request.args.get("accesstoken")
            accesstoken = OAuth2Token.query.filter_by(
                access_token=access_token).first()
            accesstoken.revoked = True
            db.session.commit()
            remove_user()
            if accesstoken:
                url = grant.client.client_uri
        except OAuth2Error as error:
            return jsonify(dict(error.get_body()))
        return redirect(url)
        # if current_token:
        #     remove_user()
        #     # accesstoken = OAuth2Token.query.filter_by(
        #     #     access_token=current_token.access_token).first()
        #     try:
        #         # accesstoken.revoked = True
        #         # db.session.commit()
        #     except error as e:
        #         return jsonify(dict(e.get_body()))
        # else:
        #     return jsonify({"result": False, "message": "access_token is null"})

        # return jsonify({"result": True, "message": "logout success"})

    """接口"""
    @staticmethod
    @bp.route("/users", methods=["GET"])
    @swag_from(user_query.Api.api_doc)
    def user_query():
        """
        获取用户列表
        """
        return user_query.Api().result

    @staticmethod
    @bp.route("/users", methods=["POST"])
    @swag_from(user_create.Api.api_doc)
    def user_create():
        """
        创建用户
        """
        return user_create.Api().result

    @staticmethod
    @bp.route("/userEdit", methods=["POST"])
    @swag_from(user_update.Api.api_doc)
    def user_update():
        """
        更新用户信息
        """
        return user_update.Api().result

    @staticmethod
    @bp.route("/userDelete", methods=["POST"])
    @swag_from(user_delete.Api.api_doc)
    def user_delete():
        """
        删除用户
        """
        return user_delete.Api().result

    @staticmethod
    @bp.route("/client", methods=["POST"])
    @swag_from(client_create.Api.api_doc)
    def client_create():
        """
        创建client
        """
        return client_create.Api().result

    @staticmethod
    @bp.route("/client", methods=["GET"])
    @swag_from(client_query.Api.api_doc)
    def client_query():
        """
        获取client列表
        """
        return client_query.Api().result