data_entry_center.py 4.9 KB
# coding=utf-8
#author:        4N
#createtime:    2021/9/14
#email:         nheweijun@sina.com

import configure
from ..models import InsertingLayerName, Database, Task,DES

from sqlalchemy.orm import Session
import multiprocessing
from app.util.component.EntryDataVacuate import EntryDataVacuate
import json
from sqlalchemy import distinct
import uuid
from osgeo.ogr import DataSource
from app.util.component.StructuredPrint import StructurePrint
from app.util.component.PGUtil import PGUtil
import time

def data_entry_center():
    running_dict = {}
    sys_session: Session = PGUtil.get_db_session(
        configure.SQLALCHEMY_DATABASE_URI)

    while True:

        try:
            time.sleep(3)

            # 已经结束的进程 从监测中删除
            remove_process = []
            for process, layer_names in running_dict.items():
                if not process.is_alive():
                    for l in layer_names:
                        inserted = sys_session.query(
                            InsertingLayerName).filter_by(name=l).one_or_none()
                        if inserted:
                            sys_session.delete(inserted)
                    sys_session.commit()
                    remove_process.append(process)
            for process in remove_process:
                running_dict.pop(process)


            # 入库进程少于阈值,开启入库进程
            inter_size = sys_session.query(
                distinct(InsertingLayerName.task_guid)).count()

            if inter_size < configure.entry_data_thread:
                # 锁表
                ready_task: Task = sys_session.query(Task).filter_by(state=0, task_type=1).order_by(
                    Task.create_time).with_lockmode("update").limit(1).one_or_none()
                if ready_task:

                    try:
                        parameter = json.loads(ready_task.parameter)
                        StructurePrint().print("检测到入库任务")
                        ready_task.state = 2
                        ready_task.process = "入库中"
                        sys_session.commit()

                        metas: list = json.loads(
                            parameter.get("meta").__str__())
                        parameter["meta"] = metas

                        database = sys_session.query(Database).filter_by(
                            guid=ready_task.database_guid).one_or_none()
                        pg_ds: DataSource = PGUtil.open_pg_data_source(
                            1, DES.decode(database.sqlalchemy_uri))

                        this_task_layer = []
                        for meta in metas:
                            overwrite = parameter.get("overwrite", "no")

                            for layer_name_origin, layer_name in meta.get("layer").items():
                                origin_name = layer_name
                                no = 1

                                while (overwrite.__eq__("no") and pg_ds.GetLayerByName(layer_name)) or sys_session.query(InsertingLayerName).filter_by(name=layer_name).one_or_none():
                                    layer_name = origin_name + "_{}".format(no)
                                    no += 1

                                # 添加到正在入库的列表中
                                iln = InsertingLayerName(guid=uuid.uuid1().__str__(),
                                                         task_guid=ready_task.guid,
                                                         name=layer_name)

                                sys_session.add(iln)
                                sys_session.commit()
                                this_task_layer.append(layer_name)
                                # 修改表名
                                meta["layer"][layer_name_origin] = layer_name

                        pg_ds.Destroy()
                        entry_data_process = multiprocessing.Process(
                            target=EntryDataVacuate().entry, args=(parameter,))
                        entry_data_process.start()

                        pid = entry_data_process.pid
                        sys_session.query(Task).filter_by(guid=ready_task.guid).update({"task_pid":pid})
                        sys_session.commit()

                        running_dict[entry_data_process] = this_task_layer

                    except Exception as e:
                        sys_session.query(Task).filter_by(guid=ready_task.guid).update(
                            {"state": -1, "process": "入库失败"})
                        sys_session.commit()
                        StructurePrint().print(e.__str__(), "error")
                else:
                    # 解表啊
                    sys_session.commit()
        except Exception as e:
            sys_session.commit()
            StructurePrint().print(e.__str__(), "error")