__init__.py
6.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import decimal
from flask import Flask as _Flask
from flask.json import JSONEncoder as _JSONEncoder
from flask_cors import CORS
import time
import configure
from app.util import BlueprintApi
from app.util import find_class
from app.models import db,Table,InsertingLayerName,Database,DES,Task
from flasgger import Swagger
# from rtree import index
import logging
from sqlalchemy.orm import Session
import multiprocessing
from app.util.component.EntryData import EntryData
from app.util.component.EntryDataVacuate import EntryDataVacuate
import json
import threading
import traceback
from sqlalchemy import distinct
import uuid
from osgeo.ogr import DataSource
import datetime
from sqlalchemy import or_
from app.util.component.StructuredPrint import StructurePrint
from app.util.component.PGUtil import PGUtil
import os
"""
因为decimal不能序列化,增加Flask对decimal类的解析
"""
class JSONEncoder(_JSONEncoder):
def default(self, o):
if isinstance(o, decimal.Decimal):
return float(o)
super(JSONEncoder, self).default(o)
class Flask(_Flask):
json_encoder = JSONEncoder
# idx =None
# url_json_list=None
# sqlite3_connect= None
def create_app():
# app基本设置
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = configure.SQLALCHEMY_DATABASE_URI
app.config['echo'] = True
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['JSON_AS_ASCII'] = False
# app.config['SQLALCHEMY_ECHO'] = True
# 跨域设置
CORS(app)
#swagger设置
swagger_config = Swagger.DEFAULT_CONFIG
swagger_config.update(configure.swagger_configure)
Swagger(app, config=swagger_config)
# 创建数据库
db.init_app(app)
db.create_all(app=app)
# 日志
logging.basicConfig(level=logging.INFO)
log_file = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))),"logs","log.txt")
handler = logging.FileHandler(log_file, encoding='UTF-8') # 设置日志字符集和存储路径名字
logging_format = logging.Formatter('[%(levelname)s] %(asctime)s %(message)s')
handler.setFormatter(logging_format)
app.logger.addHandler(handler)
# 注册blueprint,查找BlueprintApi的子类
for scan in configure.scan_module:
for api in find_class(scan, BlueprintApi):
app.register_blueprint(api.bp)
# 入库监测线程
@app.before_first_request
def data_entry_process():
StructurePrint.print("start listen")
process = threading.Thread(target=data_entry_center)
process.start()
return app
def data_entry_center():
running_dict = {}
sys_session: Session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
while True:
try:
time.sleep(3)
# 已经结束的进程 从监测中删除
remove_process = []
# structured_print(running_dict.__len__().__str__())
for process,layer_names in running_dict.items():
if not process.is_alive():
for l in layer_names:
inserted = sys_session.query(InsertingLayerName).filter_by(name=l).one_or_none()
if inserted:
sys_session.delete(inserted)
sys_session.commit()
remove_process.append(process)
for process in remove_process:
running_dict.pop(process)
# StructurePrint.print("listening...")
# 入库进程少于阈值,开启入库进程
inter_size = sys_session.query(distinct(InsertingLayerName.task_guid)).count()
if inter_size < configure.entry_data_thread:
# 锁表啊
ready_task:Task = sys_session.query(Task).filter_by(state=0,task_type=1).order_by(Task.create_time).with_lockmode("update").limit(1).one_or_none()
if ready_task:
try:
parameter = json.loads(ready_task.parameter)
StructurePrint.print("检测到入库任务")
ready_task.state=2
ready_task.process="入库中"
sys_session.commit()
metas: list = json.loads(parameter.get("meta").__str__())
parameter["meta"] = metas
database = sys_session.query(Database).filter_by(guid=ready_task.database_guid).one_or_none()
pg_ds: DataSource = PGUtil.open_pg_data_source(1,DES.decode(database.sqlalchemy_uri))
this_task_layer = []
for meta in metas:
overwrite = parameter.get("overwrite", "no")
for layer_name_origin, layer_name in meta.get("layer").items():
origin_name = layer_name
no = 1
while (overwrite.__eq__("no") and pg_ds.GetLayerByName(layer_name)) or sys_session.query(InsertingLayerName).filter_by(name=layer_name).one_or_none() :
layer_name = origin_name + "_{}".format(no)
no += 1
# 添加到正在入库的列表中
iln = InsertingLayerName(guid=uuid.uuid1().__str__(),
task_guid=ready_task.guid,
name=layer_name)
sys_session.add(iln)
sys_session.commit()
this_task_layer.append(layer_name)
# 修改表名
meta["layer"][layer_name_origin] = layer_name
pg_ds.Destroy()
entry_data_process = multiprocessing.Process(target=EntryDataVacuate().entry,args=(parameter,))
entry_data_process.start()
running_dict[entry_data_process] = this_task_layer
except Exception as e:
sys_session.query(Task).filter_by(guid=ready_task.guid).update(
{"state": -1, "process": "入库失败"})
sys_session.commit()
StructurePrint.print(e.__str__(), "error")
else:
# 解表啊
sys_session.commit()
except Exception as e:
sys_session.commit()
StructurePrint.print(e.__str__(),"error")