__init__.py 4.4 KB
from app.util import BlueprintApi
from app.util import BlueprintApi
from flask import Blueprint, render_template, redirect, url_for, request, session, jsonify
from flask_sqlalchemy import sqlalchemy
from sqlalchemy import and_
from .models import *
from werkzeug.security import gen_salt
import time
from .oauth2 import authorization, require_oauth, generate_user_info
from authlib.oauth2 import OAuth2Error
from authlib.integrations.flask_oauth2 import current_token


def current_user():
    if 'id' in session:
        uid = session['id']
        return User.query.get(uid)
    return None


def split_by_crlf(s):
    return [v for v in s.splitlines() if v]


class DataManager(BlueprintApi):
    bp = Blueprint("Auth", __name__, url_prefix="/auth")

    # @staticmethod
    # @bp.route('/test', methods=('GET', 'POST'))
    # def Test():
    #     res = {}
    #     try:
    #         res['user'] = User.query.all()
    #     except Exception as e:
    #         raise e
    #     return res

    # @staticmethod
    # @bp.route('/login', methods=('GET', 'POST'))
    # def Login():
    #     if request.method == 'POST':
    #         username = request.form['username']
    #         password = request.form['password']
    #         user = User.query.filter_by(username=username).first()
    #         if not user:
    #             user = User(username=username,
    #                         password=password, role='admin')
    #             db.session.add(user)
    #             db.session.commit()
    #         session['id'] = user.id
    #         return redirect('/auth/authorize')
    #     user = current_user()
    #     if user:
    #         clients = OAuth2Client.query.filter_by(user_id=user.id).all()
    #     else:
    #         clients = []
    #     return render_template('auth/authorize.html', user=user, clients=clients)

    # @staticmethod
    # @bp.route('/create_client', methods=('GET', 'POST'))
    # def create_client():
    #     user = current_user()
    #     if not user:
    #         return redirect('/auth/login')
    #     if request.method == 'GET':
    #         return render_template('auth/create_client.html')
    #     form = request.form
    #     client_id = gen_salt(24)
    #     client = OAuth2Client(client_id=client_id, user_id=user.id)
    #     # Mixin doesn't set the issue_at date
    #     client.client_id_issued_at = int(time.time())
    #     if client.token_endpoint_auth_method == 'none':
    #         client.client_secret = ''
    #     else:
    #         client.client_secret = gen_salt(48)
    #     client_metadata = {
    #         "client_name": form["client_name"],
    #         "client_uri": form["client_uri"],
    #         "grant_types": split_by_crlf(form["grant_type"]),
    #         "redirect_uris": split_by_crlf(form["redirect_uri"]),
    #         "response_types": split_by_crlf(form["response_type"]),
    #         "scope": form["scope"],
    #         "token_endpoint_auth_method": form["token_endpoint_auth_method"]
    #     }
    #     client.set_client_metadata(client_metadata)
    #     db.session.add(client)
    #     db.session.commit()
    #     return redirect('/auth/login')

    @staticmethod
    @bp.route('/authorize', methods=('GET', 'POST'))
    def authorize():
        user = current_user()
        if request.method == 'GET':
            try:
                grant = authorization.validate_consent_request(end_user=user)
            except OAuth2Error as error:
                return jsonify(dict(error.get_body()))
            return render_template('auth/authorize.html', user=user, grant=grant)
            # return render_template('auth/login1.html', user=user, grant=grant)
        if not user and 'username' in request.form:
            username = request.form.get('username')
            password = request.form.get('password')
            user = User.query.filter_by(
                username=username, password=password).first()
        grant_user = user
        # if request.form['confirm']:
        #     grant_user = user
        # else:
        #     grant_user = None
        return authorization.create_authorization_response(grant_user=grant_user)

    @staticmethod
    @bp.route('/token', methods=['POST'])
    def issue_token():
        return authorization.create_token_response()

    @staticmethod
    @bp.route('/userinfo')
    @require_oauth('profile')
    def api_me():
        return jsonify(generate_user_info(current_token.user, current_token.scope))