__init__.py 6.6 KB
import decimal

from flask import Flask as _Flask
from flask.json import JSONEncoder as _JSONEncoder
from flask_cors import CORS
import time
import configure
from app.util import BlueprintApi
from app.util import find_class
from app.models import db,Table,InsertingLayerName,Database,DES,Task

from flasgger import Swagger
# from rtree import index
import logging
from sqlalchemy.orm import Session
import multiprocessing
from app.util.component.EntryData import EntryData
from app.util.component.EntryDataVacuate import EntryDataVacuate
import json
import threading
import traceback
from sqlalchemy import distinct
import uuid
from osgeo.ogr import DataSource
import datetime
from sqlalchemy import or_
from app.util.component.StructuredPrint import StructurePrint
from app.util.component.PGUtil import PGUtil
import os

"""
因为decimal不能序列化,增加Flask对decimal类的解析 
"""
class JSONEncoder(_JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            return float(o)
        super(JSONEncoder, self).default(o)

class Flask(_Flask):
    json_encoder = JSONEncoder


# idx =None
# url_json_list=None
# sqlite3_connect= None
def create_app():

    # app基本设置
    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = configure.SQLALCHEMY_DATABASE_URI
    app.config['echo'] = True
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
    app.config['JSON_AS_ASCII'] = False
    # app.config['SQLALCHEMY_ECHO'] = True

    # 跨域设置
    CORS(app)

    #swagger设置
    swagger_config = Swagger.DEFAULT_CONFIG
    swagger_config.update(configure.swagger_configure)
    Swagger(app, config=swagger_config)

    # 创建数据库
    db.init_app(app)
    db.create_all(app=app)


    # 日志
    logging.basicConfig(level=logging.INFO)
    log_file = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))),"logs","log.txt")
    handler = logging.FileHandler(log_file, encoding='UTF-8')   # 设置日志字符集和存储路径名字
    logging_format = logging.Formatter('[%(levelname)s] %(asctime)s %(message)s')
    handler.setFormatter(logging_format)
    app.logger.addHandler(handler)


    # 注册blueprint,查找BlueprintApi的子类
    for scan in configure.scan_module:
        for api in find_class(scan, BlueprintApi):
            app.register_blueprint(api.bp)


    # 入库监测线程
    @app.before_first_request
    def data_entry_process():
        StructurePrint.print("start listen")
        process = threading.Thread(target=data_entry_center)
        process.start()

    return app




def data_entry_center():
    running_dict = {}
    sys_session: Session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)

    while True:


        try:
            time.sleep(3)

            # 已经结束的进程 从监测中删除
            remove_process = []

            # structured_print(running_dict.__len__().__str__())

            for process,layer_names in running_dict.items():
                if not process.is_alive():
                    for l in layer_names:
                        inserted = sys_session.query(InsertingLayerName).filter_by(name=l).one_or_none()
                        if inserted:
                            sys_session.delete(inserted)
                    sys_session.commit()
                    remove_process.append(process)
            for process in remove_process:
                running_dict.pop(process)

            # StructurePrint.print("listening...")

            # 入库进程少于阈值,开启入库进程

            inter_size = sys_session.query(distinct(InsertingLayerName.task_guid)).count()

            if inter_size < configure.entry_data_thread:
                # 锁表啊
                ready_task:Task = sys_session.query(Task).filter_by(state=0,task_type=1).order_by(Task.create_time).with_lockmode("update").limit(1).one_or_none()
                if ready_task:

                    try:
                        parameter = json.loads(ready_task.parameter)
                        StructurePrint.print("检测到入库任务")
                        ready_task.state=2
                        ready_task.process="入库中"
                        sys_session.commit()

                        metas: list = json.loads(parameter.get("meta").__str__())
                        parameter["meta"] = metas


                        database = sys_session.query(Database).filter_by(guid=ready_task.database_guid).one_or_none()
                        pg_ds: DataSource = PGUtil.open_pg_data_source(1,DES.decode(database.sqlalchemy_uri))

                        this_task_layer = []
                        for meta in metas:
                            overwrite = parameter.get("overwrite", "no")



                            for layer_name_origin, layer_name in meta.get("layer").items():
                                origin_name = layer_name
                                no = 1

                                while (overwrite.__eq__("no") and pg_ds.GetLayerByName(layer_name)) or sys_session.query(InsertingLayerName).filter_by(name=layer_name).one_or_none() :
                                    layer_name = origin_name + "_{}".format(no)
                                    no += 1

                                # 添加到正在入库的列表中
                                iln = InsertingLayerName(guid=uuid.uuid1().__str__(),
                                                         task_guid=ready_task.guid,
                                                         name=layer_name)

                                sys_session.add(iln)
                                sys_session.commit()
                                this_task_layer.append(layer_name)
                                # 修改表名
                                meta["layer"][layer_name_origin] = layer_name

                        pg_ds.Destroy()
                        entry_data_process = multiprocessing.Process(target=EntryDataVacuate().entry,args=(parameter,))
                        entry_data_process.start()
                        running_dict[entry_data_process] = this_task_layer
                    except Exception as e:
                        sys_session.query(Task).filter_by(guid=ready_task.guid).update(
                            {"state": -1, "process": "入库失败"})
                        sys_session.commit()
                        StructurePrint.print(e.__str__(), "error")
                else:
                    # 解表啊
                    sys_session.commit()
        except Exception as e:
            sys_session.commit()
            StructurePrint.print(e.__str__(),"error")