__init__.py
4.0 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from app.util import BlueprintApi
from app.util import BlueprintApi
from flask import Blueprint, render_template, redirect, url_for, request, session, jsonify
from .models import *
from werkzeug.security import gen_salt
import time
from .oauth2 import authorization, require_oauth, generate_user_info
from authlib.oauth2 import OAuth2Error
from authlib.integrations.flask_oauth2 import current_token
def current_user():
if 'id' in session:
uid = session['id']
return User.query.get(uid)
return None
def split_by_crlf(s):
return [v for v in s.splitlines() if v]
class DataManager(BlueprintApi):
bp = Blueprint("Auth", __name__, url_prefix="/auth")
@staticmethod
@bp.route('/test', methods=('GET', 'POST'))
def Test():
res = {}
try:
res['user'] = User.query.all()
except Exception as e:
raise e
return res
@staticmethod
@bp.route('/', methods=('GET', 'POST'))
def Login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = User.query.filter_by(username=username).first()
if not user:
user = User(username=username,
password=password, role='admin')
db.session.add(user)
db.session.commit()
session['id'] = user.id
return redirect('/auth')
user = current_user()
if user:
clients = OAuth2Client.query.filter_by(user_id=user.id).all()
else:
clients = []
return render_template('auth/login.html', user=user, clients=clients)
@staticmethod
@bp.route('/create_client', methods=('GET', 'POST'))
def create_client():
user = current_user()
if not user:
return redirect('/auth')
if request.method == 'GET':
return render_template('auth/create_client.html')
form = request.form
client_id = gen_salt(24)
client = OAuth2Client(client_id=client_id, user_id=user.id)
# Mixin doesn't set the issue_at date
client.client_id_issued_at = int(time.time())
if client.token_endpoint_auth_method == 'none':
client.client_secret = ''
else:
client.client_secret = gen_salt(48)
client_metadata = {
"client_name": form["client_name"],
"client_uri": form["client_uri"],
"grant_types": split_by_crlf(form["grant_type"]),
"redirect_uris": split_by_crlf(form["redirect_uri"]),
"response_types": split_by_crlf(form["response_type"]),
"scope": form["scope"],
"token_endpoint_auth_method": form["token_endpoint_auth_method"]
}
client.set_client_metadata(client_metadata)
db.session.add(client)
db.session.commit()
return redirect('/auth')
@staticmethod
@bp.route('/authorize', methods=('GET', 'POST'))
def authorize():
user = current_user()
if request.method == 'GET':
try:
grant = authorization.validate_consent_request(end_user=user)
except OAuth2Error as error:
return jsonify(dict(error.get_body()))
return render_template('auth/authorize.html', user=user, grant=grant)
if not user and 'username' in request.form:
username = request.form.get('username')
user = User.query.filter_by(username=username).first()
if request.form['confirm']:
grant_user = user
else:
grant_user = None
return authorization.create_authorization_response(grant_user=grant_user)
@staticmethod
@bp.route('/token', methods=['POST'])
def issue_token():
return authorization.create_token_response()
@staticmethod
@bp.route('/userinfo')
@require_oauth('profile')
def api_me():
return jsonify(generate_user_info(current_token.user, current_token.scope))