data_entry_center.py
4.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# coding=utf-8
#author: 4N
#createtime: 2021/9/14
#email: nheweijun@sina.com
import configure
from app.models import InsertingLayerName, Database, DES, Task
from sqlalchemy.orm import Session
import multiprocessing
from app.util.component.EntryDataVacuate import EntryDataVacuate
import json
from sqlalchemy import distinct
import uuid
from osgeo.ogr import DataSource
from app.util.component.StructuredPrint import StructurePrint
from app.util.component.PGUtil import PGUtil
import time
def data_entry_center():
running_dict = {}
sys_session: Session = PGUtil.get_db_session(
configure.SQLALCHEMY_DATABASE_URI)
while True:
try:
time.sleep(3)
# 已经结束的进程 从监测中删除
remove_process = []
# structured_print(running_dict.__len__().__str__())
for process, layer_names in running_dict.items():
if not process.is_alive():
for l in layer_names:
inserted = sys_session.query(
InsertingLayerName).filter_by(name=l).one_or_none()
if inserted:
sys_session.delete(inserted)
sys_session.commit()
remove_process.append(process)
for process in remove_process:
running_dict.pop(process)
# StructurePrint.print("listening...")
# 入库进程少于阈值,开启入库进程
inter_size = sys_session.query(
distinct(InsertingLayerName.task_guid)).count()
if inter_size < configure.entry_data_thread:
# 锁表啊
ready_task: Task = sys_session.query(Task).filter_by(state=0, task_type=1).order_by(
Task.create_time).with_lockmode("update").limit(1).one_or_none()
if ready_task:
try:
parameter = json.loads(ready_task.parameter)
StructurePrint.print("检测到入库任务")
ready_task.state = 2
ready_task.process = "入库中"
sys_session.commit()
metas: list = json.loads(
parameter.get("meta").__str__())
parameter["meta"] = metas
database = sys_session.query(Database).filter_by(
guid=ready_task.database_guid).one_or_none()
pg_ds: DataSource = PGUtil.open_pg_data_source(
1, DES.decode(database.sqlalchemy_uri))
this_task_layer = []
for meta in metas:
overwrite = parameter.get("overwrite", "no")
for layer_name_origin, layer_name in meta.get("layer").items():
origin_name = layer_name
no = 1
while (overwrite.__eq__("no") and pg_ds.GetLayerByName(layer_name)) or sys_session.query(InsertingLayerName).filter_by(name=layer_name).one_or_none():
layer_name = origin_name + "_{}".format(no)
no += 1
# 添加到正在入库的列表中
iln = InsertingLayerName(guid=uuid.uuid1().__str__(),
task_guid=ready_task.guid,
name=layer_name)
sys_session.add(iln)
sys_session.commit()
this_task_layer.append(layer_name)
# 修改表名
meta["layer"][layer_name_origin] = layer_name
pg_ds.Destroy()
entry_data_process = multiprocessing.Process(
target=EntryDataVacuate().entry, args=(parameter,))
entry_data_process.start()
running_dict[entry_data_process] = this_task_layer
except Exception as e:
sys_session.query(Task).filter_by(guid=ready_task.guid).update(
{"state": -1, "process": "入库失败"})
sys_session.commit()
StructurePrint.print(e.__str__(), "error")
else:
# 解表啊
sys_session.commit()
except Exception as e:
sys_session.commit()
StructurePrint.print(e.__str__(), "error")