__init__.py 4.0 KB
from app.util import BlueprintApi
from app.util import BlueprintApi
from flask import Blueprint, render_template, redirect, url_for, request, session, jsonify
from .models import *
from werkzeug.security import gen_salt
import time
from .oauth2 import authorization, require_oauth, generate_user_info
from authlib.oauth2 import OAuth2Error
from authlib.integrations.flask_oauth2 import current_token

def current_user():
    if 'id' in session:
        uid = session['id']
        return User.query.get(uid)
    return None


def split_by_crlf(s):
    return [v for v in s.splitlines() if v]


class DataManager(BlueprintApi):
    bp = Blueprint("Auth", __name__, url_prefix="/auth")

    @staticmethod
    @bp.route('/test', methods=('GET', 'POST'))
    def Test():
        res = {}
        try:
            res['user'] = User.query.all()
        except Exception as e:
            raise e
        return res

    @staticmethod
    @bp.route('/', methods=('GET', 'POST'))
    def test():
        if request.method == 'POST':
            username = request.form['username']
            password = request.form['password']
            user = User.query.filter_by(username=username).first()
            if not user:
                user = User(username=username,
                            password=password, role='admin')
                db.session.add(user)
                db.session.commit()
            session['id'] = user.id
            return redirect('/auth')
        user = current_user()
        if user:
            clients = OAuth2Client.query.filter_by(user_id=user.id).all()
        else:
            clients = []
        return render_template('auth/login.html', user=user, clients=clients)
    
    

    @staticmethod
    @bp.route('/create_client', methods=('GET', 'POST'))
    def create_client():
        user = current_user()
        if not user:
            return redirect('/auth')
        if request.method == 'GET':
            return render_template('auth/create_client.html')
        form = request.form
        client_id = gen_salt(24)
        client = OAuth2Client(client_id=client_id, user_id=user.id)
        # Mixin doesn't set the issue_at date
        client.client_id_issued_at = int(time.time())
        if client.token_endpoint_auth_method == 'none':
            client.client_secret = ''
        else:
            client.client_secret = gen_salt(48)
        client_metadata = {
            "client_name": form["client_name"],
            "client_uri": form["client_uri"],
            "grant_types": split_by_crlf(form["grant_type"]),
            "redirect_uris": split_by_crlf(form["redirect_uri"]),
            "response_types": split_by_crlf(form["response_type"]),
            "scope": form["scope"],
            "token_endpoint_auth_method": form["token_endpoint_auth_method"]
        }
        client.set_client_metadata(client_metadata)
        db.session.add(client)
        db.session.commit()
        return redirect('/auth')
    
    @staticmethod
    @bp.route('/authorize', methods=('GET', 'POST'))
    def authorize():
        user = current_user()
        if request.method == 'GET':
            try:
                grant = authorization.validate_consent_request(end_user=user)
            except OAuth2Error as error:
                return jsonify(dict(error.get_body()))
            return render_template('auth/authorize.html', user=user, grant=grant)
        if not user and 'username' in request.form:
            username = request.form.get('username')
            user = User.query.filter_by(username=username).first()
        if request.form['confirm']:
            grant_user = user
        else:
            grant_user = None
        return authorization.create_authorization_response(grant_user=grant_user)
    
    
    @staticmethod
    @bp.route('/token', methods=['POST'])
    def issue_token():
        return authorization.create_token_response()
    
    @staticmethod
    @bp.route('/userinfo')
    @require_oauth('profile')
    def api_me():
        return jsonify(generate_user_info(current_token.user, current_token.scope))