database_test.py
3.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# coding=utf-8
# author: 4N
# createtime: 2020/6/22
# email: nheweijun@sina.com
from contextlib import closing
from sqlalchemy import create_engine
from ..models import Database,db,DES
from app.models import AESHelper
from sqlalchemy.orm import sessionmaker
from app.util.component.ApiTemplate import ApiTemplate
class Api(ApiTemplate):
api_name = "数据库连接测试"
def process(self):
res = {}
res["result"] = False
try:
host = self.para.get("host")
port = self.para.get("port")
user = self.para.get("user")
passwd = self.para.get("passwd")
database = self.para.get("database")
encryption = int(self.para.get("encryption", "0"))
if encryption:
#passwd = DES.decode(passwd)
passwd = AESHelper.decode(passwd)
sqlalchemy_uri = "postgresql://{}:{}@{}:{}/{}".format(user,passwd,host,port,database)
engine = create_engine(sqlalchemy_uri, connect_args={'connect_timeout': 2})
with closing(engine.connect()):
pass
#判断数据库是否存在
datab = db.session.query(Database).filter_by(alias=self.para.get("alias")).one_or_none()
#真实的数据库
connectsrt = "hostaddr={} port={} dbname='{}' user='{}' password='{}'".format(host, port, database, user,
passwd)
real_database = db.session.query(Database).filter_by(connectstr=DES.encode(connectsrt)).all()
if datab:
res["msg"] = "数据库已存在,请修改别名!"
return res
elif real_database:
res["msg"] = "数据库连接已存在,请修改数据库连接!"
return res
elif not self.check_space(sqlalchemy_uri):
res["msg"] = "数据不是空间数据库!"
return res
else:
res["result"] = True
res["msg"] = "测试连接成功"
except:
raise Exception("测试连接失败!")
return res
def check_space(self,sqlachemy_uri):
system_session = None
check = True
try:
test_sql = "select st_geometryfromtext('POINT(1 1)')"
engine = create_engine(sqlachemy_uri)
system_session = sessionmaker(bind=engine)()
system_session.execute(test_sql).fetchone()
except:
check = False
finally:
if system_session:
system_session.close()
return check
api_doc={
"tags":["数据库接口"],
"parameters":[
{"name": "host",
"in": "formData",
"type": "string", "required": "true"},
{"name": "port",
"in": "formData",
"type": "string", "required": "true"},
{"name": "user",
"in": "formData",
"type": "string", "required": "true"},
{"name": "passwd",
"in": "formData",
"type": "string", "required": "true"},
{"name": "database",
"in": "formData",
"type": "string", "required": "true"},
{"name": "encryption",
"in": "formData",
"type": "int", "description": "密码是否加密", "enum": [0, 1]},
],
"responses":{
200:{
"schema":{
"properties":{
}
}
}
}
}