database_test.py 3.6 KB
# coding=utf-8
# author:        4N
# createtime:    2020/6/22
# email:         nheweijun@sina.com
from contextlib import closing

from sqlalchemy import create_engine

from ..models import Database,db,DES
from app.models import AESHelper

from sqlalchemy.orm import sessionmaker
from app.util.component.ApiTemplate import ApiTemplate
class Api(ApiTemplate):
    api_name = "数据库连接测试"
    def process(self):
        res = {}
        res["result"] = False
        try:

    
            host = self.para.get("host")
            port = self.para.get("port")
            user = self.para.get("user")
            passwd = self.para.get("passwd")
            database = self.para.get("database")

            encryption = int(self.para.get("encryption", "0"))
            if encryption:
                #passwd = DES.decode(passwd)
                passwd = AESHelper.decode(passwd)

            sqlalchemy_uri = "postgresql://{}:{}@{}:{}/{}".format(user,passwd,host,port,database)

            engine = create_engine(sqlalchemy_uri, connect_args={'connect_timeout': 2})
            with closing(engine.connect()):
                pass
            #判断数据库是否存在
            datab = db.session.query(Database).filter_by(alias=self.para.get("alias")).one_or_none()
            #真实的数据库
            connectsrt = "hostaddr={} port={} dbname='{}' user='{}' password='{}'".format(host, port, database, user,
                                                                                          passwd)
            real_database = db.session.query(Database).filter_by(connectstr=DES.encode(connectsrt)).all()
    
            if datab:
                res["msg"] = "数据库已存在,请修改别名!"
                return res

            elif real_database:
                res["msg"] = "数据库连接已存在,请修改数据库连接!"
                return res

            elif not self.check_space(sqlalchemy_uri):
                res["msg"] = "数据不是空间数据库!"
                return res
            else:
                res["result"] = True
                res["msg"] = "测试连接成功"
        except:
            raise Exception("测试连接失败!")
        return res
    
    
    def check_space(self,sqlachemy_uri):
        system_session = None
        check = True
        try:
            test_sql = "select st_geometryfromtext('POINT(1 1)')"
            engine = create_engine(sqlachemy_uri)
            system_session = sessionmaker(bind=engine)()
            system_session.execute(test_sql).fetchone()
        except:
            check = False
        finally:
            if system_session:
                system_session.close()
    
        return check
    
    api_doc={
    "tags":["数据库接口"],
    "parameters":[
        {"name": "host",
         "in": "formData",
         "type": "string", "required": "true"},
        {"name": "port",
         "in": "formData",
         "type": "string", "required": "true"},
        {"name": "user",
         "in": "formData",
         "type": "string", "required": "true"},
        {"name": "passwd",
         "in": "formData",
         "type": "string", "required": "true"},
        {"name": "database",
         "in": "formData",
         "type": "string", "required": "true"},
        {"name": "encryption",
         "in": "formData",
         "type": "int", "description": "密码是否加密", "enum": [0, 1]},

    ],
    "responses":{
        200:{
            "schema":{
                "properties":{
                }
            }
            }
        }
}