提交 8694e40168465b59dd3afe14e408037707eabeeb

作者 qianyingz
2 个父辈 0d6f564d 3db18ae3
@@ -12,15 +12,12 @@ from app.models import db @@ -12,15 +12,12 @@ from app.models import db
12 from app.modules.auth.oauth2 import config_oauth, myCodeIDToken 12 from app.modules.auth.oauth2 import config_oauth, myCodeIDToken
13 from flasgger import Swagger 13 from flasgger import Swagger
14 import logging 14 import logging
15 -from app.util.component.EntryData import EntryData  
16 -from app.util.component.EntryDataVacuate import EntryDataVacuate  
17 -import threading  
18 from app.util.component.StructuredPrint import StructurePrint 15 from app.util.component.StructuredPrint import StructurePrint
19 from app.util.component.PGUtil import PGUtil 16 from app.util.component.PGUtil import PGUtil
20 import os 17 import os
21 -from app.modules.data.io.data_entry_center import data_entry_center  
22 from app.modules.monitor.schedule import start_schedule 18 from app.modules.monitor.schedule import start_schedule
23 19
  20 +
24 class JSONEncoder(_JSONEncoder): 21 class JSONEncoder(_JSONEncoder):
25 """ 22 """
26 因为decimal不能序列化,增加Flask对decimal类的解析 23 因为decimal不能序列化,增加Flask对decimal类的解析
@@ -94,13 +91,6 @@ def create_app(): @@ -94,13 +91,6 @@ def create_app():
94 for api in find_class(scan, BlueprintApi): 91 for api in find_class(scan, BlueprintApi):
95 app.register_blueprint(api.bp) 92 app.register_blueprint(api.bp)
96 93
97 - # 入库监测线程  
98 - # @app.before_first_request  
99 - # def data_entry_process():  
100 - # StructurePrint().print("start listen")  
101 - # process = threading.Thread(target=data_entry_center)  
102 - # process.start()  
103 -  
104 # 不检测https 94 # 不检测https
105 os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' 95 os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
106 96
@@ -20,9 +20,12 @@ from app.util.component.StructuredPrint import StructurePrint @@ -20,9 +20,12 @@ from app.util.component.StructuredPrint import StructurePrint
20 import multiprocessing 20 import multiprocessing
21 import datetime 21 import datetime
22 from app.util.component.TaskController import TaskController 22 from app.util.component.TaskController import TaskController
  23 +from app.util.component.TaskWriter import TaskWriter
23 24
24 class Api(ApiTemplate): 25 class Api(ApiTemplate):
25 26
  27 + api_name = "数据下载"
  28 +
26 def process(self): 29 def process(self):
27 30
28 res = {} 31 res = {}
@@ -47,7 +50,6 @@ class Api(ApiTemplate): @@ -47,7 +50,6 @@ class Api(ApiTemplate):
47 db.session.add(task) 50 db.session.add(task)
48 db.session.commit() 51 db.session.commit()
49 52
50 -  
51 res["data"] = "下载任务已提交!" 53 res["data"] = "下载任务已提交!"
52 res["result"] = True 54 res["result"] = True
53 55
@@ -58,8 +60,8 @@ class Api(ApiTemplate): @@ -58,8 +60,8 @@ class Api(ApiTemplate):
58 60
59 def download(self,task_guid,para): 61 def download(self,task_guid,para):
60 62
61 - sys_session = None  
62 ds: DataSource = None 63 ds: DataSource = None
  64 + task_writer = None
63 65
64 # 设置编码 66 # 设置编码
65 encoding = para.get("encoding") 67 encoding = para.get("encoding")
@@ -72,12 +74,13 @@ class Api(ApiTemplate): @@ -72,12 +74,13 @@ class Api(ApiTemplate):
72 74
73 #任务控制,等待执行 75 #任务控制,等待执行
74 TaskController.wait(task_guid) 76 TaskController.wait(task_guid)
75 -  
76 - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI) 77 + task_writer = TaskWriter(task_guid)
  78 + task_writer.update_task({"state":2,"update_time":datetime.datetime.now(),"process" : "下载中"})
  79 + task_writer.update_process("开始下载...")
77 80
78 table_names = para.get("table_name").split(",") 81 table_names = para.get("table_name").split(",")
79 database_guid = para.get("database_guid") 82 database_guid = para.get("database_guid")
80 - database = sys_session.query(Database).filter_by(guid=database_guid).one_or_none() 83 + database = task_writer.session.query(Database).filter_by(guid=database_guid).one_or_none()
81 if not database: 84 if not database:
82 raise Exception("数据库不存在!") 85 raise Exception("数据库不存在!")
83 86
@@ -89,36 +92,25 @@ class Api(ApiTemplate): @@ -89,36 +92,25 @@ class Api(ApiTemplate):
89 if download_type.__eq__("shp"): 92 if download_type.__eq__("shp"):
90 data = self.download_shp(table_names, ds) 93 data = self.download_shp(table_names, ds)
91 if download_type.__eq__("gdb"): 94 if download_type.__eq__("gdb"):
92 - data = self.download_gdb(sys_session,table_names, ds, database_guid)  
93 -  
94 - sys_session.query(Task).filter_by(guid=task_guid).update({"state":1,"update_time":datetime.datetime.now(),  
95 - "process" : "下载完成",  
96 - "parameter":data[0]["download_url"]})  
97 - sys_session.commit() 95 + data = self.download_gdb(task_writer.session,table_names, ds, database_guid)
98 96
  97 + task_writer.update_task({"state":1,"update_time":datetime.datetime.now(),"process" : "下载完成","parameter":data[0]["download_url"]})
  98 + task_writer.update_process("下载完成!")
99 99
100 except Exception as e: 100 except Exception as e:
101 try: 101 try:
102 - sys_session.query(Task).filter_by(guid=task_guid).update({"state": -1,"update_time":datetime.datetime.now(),  
103 - "process": "下载失败"})  
104 -  
105 - message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())  
106 - task_process_guid = uuid.uuid1().__str__()  
107 - task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),  
108 - task_guid=task_guid)  
109 - sys_session.add(task_process)  
110 - sys_session.commit() 102 + task_writer.update_task({"state": -1,"update_time":datetime.datetime.now(),"process": "下载失败"})
  103 + task_writer.update_process( e.__str__())
111 except Exception as ee: 104 except Exception as ee:
112 - print(traceback.format_exc()) 105 + StructurePrint().print(ee.__str__())
113 raise e 106 raise e
114 finally: 107 finally:
115 try: 108 try:
116 if ds: 109 if ds:
117 ds.Destroy() 110 ds.Destroy()
118 - if sys_session:  
119 - sys_session.close()  
120 - except:  
121 - print(traceback.format_exc()) 111 + task_writer.close()
  112 + except Exception as e:
  113 + StructurePrint().print(e.__str__())
122 114
123 115
124 def download_shp(self,table_names,ds): 116 def download_shp(self,table_names,ds):
@@ -154,10 +146,7 @@ class Api(ApiTemplate): @@ -154,10 +146,7 @@ class Api(ApiTemplate):
154 StructurePrint().print("{}图层已下载{}个对象".format(table_name, count)) 146 StructurePrint().print("{}图层已下载{}个对象".format(table_name, count))
155 147
156 data_source.Destroy() 148 data_source.Destroy()
157 -  
158 -  
159 ZipUtil.create_zip(os.path.join(parent, "file_tmp", table_name+"_"+uuid_) + ".zip", [dirpath]) 149 ZipUtil.create_zip(os.path.join(parent, "file_tmp", table_name+"_"+uuid_) + ".zip", [dirpath])
160 -  
161 return "http://" + configure.deploy_ip_host + "/API/IO/Download/{}".format(table_name+"_"+uuid_ + ".zip") 150 return "http://" + configure.deploy_ip_host + "/API/IO/Download/{}".format(table_name+"_"+uuid_ + ".zip")
162 151
163 152
@@ -175,8 +164,7 @@ class Api(ApiTemplate): @@ -175,8 +164,7 @@ class Api(ApiTemplate):
175 gdb_path = os.path.join(parent, "file_tmp", uuid_+".gdb") 164 gdb_path = os.path.join(parent, "file_tmp", uuid_+".gdb")
176 165
177 gdb_ds: DataSource = gdb_driver.CreateDataSource(gdb_path) 166 gdb_ds: DataSource = gdb_driver.CreateDataSource(gdb_path)
178 -  
179 - 167 +
180 for table_name in table_names: 168 for table_name in table_names:
181 169
182 layer: Layer = ds.GetLayerByName(table_name) 170 layer: Layer = ds.GetLayerByName(table_name)
@@ -196,7 +184,6 @@ class Api(ApiTemplate): @@ -196,7 +184,6 @@ class Api(ApiTemplate):
196 # schema = layer.schema 184 # schema = layer.schema
197 pg_layer.CreateFields(schema) 185 pg_layer.CreateFields(schema)
198 186
199 -  
200 # gdb 不支持fid=0的要素,所以识别到后要+1 187 # gdb 不支持fid=0的要素,所以识别到后要+1
201 offset = 0 188 offset = 0
202 f1:Feature = layer.GetNextFeature() 189 f1:Feature = layer.GetNextFeature()
@@ -218,8 +205,6 @@ class Api(ApiTemplate): @@ -218,8 +205,6 @@ class Api(ApiTemplate):
218 205
219 206
220 return data 207 return data
221 -  
222 -  
223 208
224 209
225 api_doc={ 210 api_doc={
@@ -259,4 +244,7 @@ class Api(ApiTemplate): @@ -259,4 +244,7 @@ class Api(ApiTemplate):
259 } 244 }
260 } 245 }
261 } 246 }
262 -}  
  247 +
  248 +}
  249 +
  250 +
@@ -18,9 +18,9 @@ from sqlalchemy.orm import Session @@ -18,9 +18,9 @@ from sqlalchemy.orm import Session
18 import configure 18 import configure
19 import datetime 19 import datetime
20 import multiprocessing 20 import multiprocessing
21 -from app.util.component.EntryDataVacuate import EntryDataVacuate 21 +from ..util.EntryDataVacuate import EntryDataVacuate
22 from app.util.component.TaskController import TaskController 22 from app.util.component.TaskController import TaskController
23 - 23 +from app.util.component.TaskWriter import TaskWriter
24 class Api(ApiTemplate): 24 class Api(ApiTemplate):
25 25
26 api_name = "通过meta入库" 26 api_name = "通过meta入库"
@@ -105,29 +105,23 @@ class Api(ApiTemplate): @@ -105,29 +105,23 @@ class Api(ApiTemplate):
105 105
106 def entry(self,task_guid): 106 def entry(self,task_guid):
107 107
108 - sys_session: Session = None 108 + task_writer = None
109 this_task_layer = [] 109 this_task_layer = []
110 try: 110 try:
111 #任务控制,等待执行 111 #任务控制,等待执行
112 TaskController.wait(task_guid) 112 TaskController.wait(task_guid)
113 113
114 - sys_session: Session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)  
115 - 114 + task_writer = TaskWriter(task_guid)
116 115
117 - task:Task = sys_session.query(Task).filter_by(guid=task_guid).one_or_none() 116 + task:Task = task_writer.session.query(Task).filter_by(guid=task_guid).one_or_none()
118 parameter = json.loads(task.parameter) 117 parameter = json.loads(task.parameter)
119 -  
120 - task.state = 2  
121 - task.process = "入库中"  
122 - sys_session.commit()  
123 - 118 + task_writer.update_task({"state": 2, "prcess": "入库中"})
124 119
125 #处理修改入库信息 120 #处理修改入库信息
126 -  
127 metas: list = json.loads(parameter.get("meta").__str__()) 121 metas: list = json.loads(parameter.get("meta").__str__())
128 parameter["meta"] = metas 122 parameter["meta"] = metas
129 123
130 - database = sys_session.query(Database).filter_by(guid=task.database_guid).one_or_none() 124 + database = task_writer.session.query(Database).filter_by(guid=task.database_guid).one_or_none()
131 pg_ds: DataSource = PGUtil.open_pg_data_source(1, DES.decode(database.sqlalchemy_uri)) 125 pg_ds: DataSource = PGUtil.open_pg_data_source(1, DES.decode(database.sqlalchemy_uri))
132 126
133 127
@@ -148,8 +142,8 @@ class Api(ApiTemplate): @@ -148,8 +142,8 @@ class Api(ApiTemplate):
148 task_guid=task.guid, 142 task_guid=task.guid,
149 name=layer_name) 143 name=layer_name)
150 144
151 - sys_session.add(iln)  
152 - sys_session.commit() 145 + task_writer.session.add(iln)
  146 + task_writer.session.commit()
153 this_task_layer.append(layer_name) 147 this_task_layer.append(layer_name)
154 # 修改表名 148 # 修改表名
155 meta["layer"][layer_name_origin] = layer_name 149 meta["layer"][layer_name_origin] = layer_name
@@ -160,23 +154,18 @@ class Api(ApiTemplate): @@ -160,23 +154,18 @@ class Api(ApiTemplate):
160 154
161 #完成后 155 #完成后
162 for ln in this_task_layer: 156 for ln in this_task_layer:
163 - iln = sys_session.query(InsertingLayerName).filter_by(name=ln).one_or_none()  
164 - sys_session.delete(iln) 157 + iln = task_writer.session.query(InsertingLayerName).filter_by(name=ln).one_or_none()
  158 + task_writer.session.delete(iln)
165 159
166 except Exception as e: 160 except Exception as e:
167 - sys_session.query(Task).filter_by(guid=task_guid).update(  
168 - {"state": -1, "process": "入库失败"})  
169 - 161 + task_writer.update_task({"state": -1, "process": "入库失败"})
170 for ln in this_task_layer: 162 for ln in this_task_layer:
171 - iln = sys_session.query(InsertingLayerName).filter_by(name=ln).one_or_none()  
172 - sys_session.delete(iln)  
173 -  
174 - sys_session.commit() 163 + iln = task_writer.session.query(InsertingLayerName).filter_by(name=ln).one_or_none()
  164 + task_writer.session.delete(iln)
175 StructurePrint().print(e.__str__(), "error") 165 StructurePrint().print(e.__str__(), "error")
176 finally: 166 finally:
177 - sys_session.commit()  
178 - if sys_session:  
179 - sys_session.close() 167 + task_writer.session.commit()
  168 + task_writer.close()
180 169
181 170
182 api_doc={ 171 api_doc={
1 -# coding=utf-8  
2 -#author: 4N  
3 -#createtime: 2021/9/14  
4 -#email: nheweijun@sina.com  
5 -  
6 -import configure  
7 -from ..models import InsertingLayerName, Database, Task,DES  
8 -  
9 -from sqlalchemy.orm import Session  
10 -import multiprocessing  
11 -from app.util.component.EntryDataVacuate import EntryDataVacuate  
12 -import json  
13 -from sqlalchemy import distinct  
14 -import uuid  
15 -from osgeo.ogr import DataSource  
16 -from app.util.component.StructuredPrint import StructurePrint  
17 -from app.util.component.PGUtil import PGUtil  
18 -import time  
19 -  
20 -def data_entry_center():  
21 - running_dict = {}  
22 - sys_session: Session = PGUtil.get_db_session(  
23 - configure.SQLALCHEMY_DATABASE_URI)  
24 -  
25 - while True:  
26 -  
27 - try:  
28 - time.sleep(3)  
29 -  
30 - # 已经结束的进程 从监测中删除  
31 - remove_process = []  
32 - for process, layer_names in running_dict.items():  
33 - if not process.is_alive():  
34 - for l in layer_names:  
35 - inserted = sys_session.query(  
36 - InsertingLayerName).filter_by(name=l).one_or_none()  
37 - if inserted:  
38 - sys_session.delete(inserted)  
39 - sys_session.commit()  
40 - remove_process.append(process)  
41 - for process in remove_process:  
42 - running_dict.pop(process)  
43 -  
44 -  
45 - # 入库进程少于阈值,开启入库进程  
46 - inter_size = sys_session.query(  
47 - distinct(InsertingLayerName.task_guid)).count()  
48 -  
49 - if inter_size < configure.entry_data_thread:  
50 - # 锁表  
51 - ready_task: Task = sys_session.query(Task).filter_by(state=0, task_type=1).order_by(  
52 - Task.create_time).with_lockmode("update").limit(1).one_or_none()  
53 - if ready_task:  
54 -  
55 - try:  
56 - parameter = json.loads(ready_task.parameter)  
57 - StructurePrint().print("检测到入库任务")  
58 - ready_task.state = 2  
59 - ready_task.process = "入库中"  
60 - sys_session.commit()  
61 -  
62 - metas: list = json.loads(  
63 - parameter.get("meta").__str__())  
64 - parameter["meta"] = metas  
65 -  
66 - database = sys_session.query(Database).filter_by(  
67 - guid=ready_task.database_guid).one_or_none()  
68 - pg_ds: DataSource = PGUtil.open_pg_data_source(  
69 - 1, DES.decode(database.sqlalchemy_uri))  
70 -  
71 - this_task_layer = []  
72 - for meta in metas:  
73 - overwrite = parameter.get("overwrite", "no")  
74 -  
75 - for layer_name_origin, layer_name in meta.get("layer").items():  
76 - origin_name = layer_name  
77 - no = 1  
78 -  
79 - while (overwrite.__eq__("no") and pg_ds.GetLayerByName(layer_name)) or sys_session.query(InsertingLayerName).filter_by(name=layer_name).one_or_none():  
80 - layer_name = origin_name + "_{}".format(no)  
81 - no += 1  
82 -  
83 - # 添加到正在入库的列表中  
84 - iln = InsertingLayerName(guid=uuid.uuid1().__str__(),  
85 - task_guid=ready_task.guid,  
86 - name=layer_name)  
87 -  
88 - sys_session.add(iln)  
89 - sys_session.commit()  
90 - this_task_layer.append(layer_name)  
91 - # 修改表名  
92 - meta["layer"][layer_name_origin] = layer_name  
93 -  
94 - pg_ds.Destroy()  
95 - entry_data_process = multiprocessing.Process(  
96 - target=EntryDataVacuate().entry, args=(parameter,))  
97 - entry_data_process.start()  
98 -  
99 - pid = entry_data_process.pid  
100 - sys_session.query(Task).filter_by(guid=ready_task.guid).update({"task_pid":pid})  
101 - sys_session.commit()  
102 -  
103 - running_dict[entry_data_process] = this_task_layer  
104 -  
105 - except Exception as e:  
106 - sys_session.query(Task).filter_by(guid=ready_task.guid).update(  
107 - {"state": -1, "process": "入库失败"})  
108 - sys_session.commit()  
109 - StructurePrint().print(e.__str__(), "error")  
110 - else:  
111 - # 解表啊  
112 - sys_session.commit()  
113 - except Exception as e:  
114 - sys_session.commit()  
115 - StructurePrint().print(e.__str__(), "error")  
@@ -13,7 +13,7 @@ import json @@ -13,7 +13,7 @@ import json
13 from app.util.component.ApiTemplate import ApiTemplate 13 from app.util.component.ApiTemplate import ApiTemplate
14 from .get_meta import Api as MetaApi 14 from .get_meta import Api as MetaApi
15 from threading import Thread 15 from threading import Thread
16 -from app.util.component.EntryDataVacuate import EntryDataVacuate 16 +from ..util.EntryDataVacuate import EntryDataVacuate
17 17
18 class Api(ApiTemplate): 18 class Api(ApiTemplate):
19 19
@@ -5,14 +5,11 @@ from osgeo import gdal @@ -5,14 +5,11 @@ from osgeo import gdal
5 import os 5 import os
6 import uuid 6 import uuid
7 import shutil 7 import shutil
8 -import time  
9 from app.modules.data.models import * 8 from app.modules.data.models import *
10 from app.util.component.PGUtil import PGUtil 9 from app.util.component.PGUtil import PGUtil
11 from app.util.component.StructuredPrint import StructurePrint 10 from app.util.component.StructuredPrint import StructurePrint
12 from sqlalchemy.orm import Session 11 from sqlalchemy.orm import Session
13 import configure 12 import configure
14 -import math  
15 -from functools import lru_cache  
16 import traceback 13 import traceback
17 import copy 14 import copy
18 from app.util.component.GeometryAdapter import GeometryAdapter 15 from app.util.component.GeometryAdapter import GeometryAdapter
@@ -19,6 +19,7 @@ from app.util.component.StructuredPrint import StructurePrint @@ -19,6 +19,7 @@ from app.util.component.StructuredPrint import StructurePrint
19 from app.util.component.ApiTemplate import ApiTemplate 19 from app.util.component.ApiTemplate import ApiTemplate
20 from app.util.component.GeometryAdapter import GeometryAdapter 20 from app.util.component.GeometryAdapter import GeometryAdapter
21 from app.util.component.TaskController import TaskController 21 from app.util.component.TaskController import TaskController
  22 +from app.util.component.TaskWriter import TaskWriter
22 import multiprocessing 23 import multiprocessing
23 import configure 24 import configure
24 25
@@ -69,12 +70,17 @@ class Api(ApiTemplate): @@ -69,12 +70,17 @@ class Api(ApiTemplate):
69 data_session=None 70 data_session=None
70 result = {} 71 result = {}
71 sys_session = None 72 sys_session = None
  73 + task_writer = None
  74 +
  75 +
72 db_tuple = PGUtil.get_info_from_sqlachemy_uri(DES.decode(database.sqlalchemy_uri)) 76 db_tuple = PGUtil.get_info_from_sqlachemy_uri(DES.decode(database.sqlalchemy_uri))
73 77
74 try: 78 try:
75 79
76 #任务控制,等待执行 80 #任务控制,等待执行
77 TaskController.wait(task_guid) 81 TaskController.wait(task_guid)
  82 + task_writer = TaskWriter(task_guid)
  83 +
78 84
79 sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI) 85 sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
80 sys_ds = PGUtil.open_pg_data_source(0,configure.SQLALCHEMY_DATABASE_URI) 86 sys_ds = PGUtil.open_pg_data_source(0,configure.SQLALCHEMY_DATABASE_URI)
@@ -145,24 +151,19 @@ class Api(ApiTemplate): @@ -145,24 +151,19 @@ class Api(ApiTemplate):
145 sys_session.commit() 151 sys_session.commit()
146 result["data"] = "刷新数据成功!" 152 result["data"] = "刷新数据成功!"
147 result["state"] = 1 153 result["state"] = 1
148 - sys_session.query(Task).filter_by(guid=task_guid).update(  
149 - {"state": 1, "update_time": datetime.datetime.now(),"process":"更新成功"})  
150 - sys_session.commit() 154 +
  155 + task_writer.update_task({"state": 1, "update_time": datetime.datetime.now(),"process":"更新成功"})
  156 +
151 157
152 except Exception as e: 158 except Exception as e:
153 try: 159 try:
154 - print(traceback.format_exc())  
155 - sys_session.query(Task).filter_by(guid=task_guid).update(  
156 - {"state": -1, "update_time": datetime.datetime.now(),"process":"更新失败"})  
157 - message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())  
158 - task_process_guid = uuid.uuid1().__str__()  
159 - task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),  
160 - task_guid=task_guid)  
161 - sys_session.add(task_process)  
162 - sys_session.commit() 160 + StructurePrint().print(traceback.format_exc())
  161 + task_writer.update_task({"state": -1, "update_time": datetime.datetime.now(),"process":"更新失败"})
  162 + task_writer.update_process(e.__str__())
163 except Exception as ee: 163 except Exception as ee:
164 - print(traceback.format_exc()) 164 + StructurePrint().print(traceback.format_exc())
165 finally: 165 finally:
  166 + task_writer.close()
166 if pg_ds: 167 if pg_ds:
167 pg_ds.Destroy() 168 pg_ds.Destroy()
168 if data_session: 169 if data_session:
@@ -18,6 +18,7 @@ from app.util.component.VacuateConf import VacuateConf @@ -18,6 +18,7 @@ from app.util.component.VacuateConf import VacuateConf
18 from app.util.component.GeometryAdapter import GeometryAdapter 18 from app.util.component.GeometryAdapter import GeometryAdapter
19 19
20 from app.util.component.TaskController import TaskController 20 from app.util.component.TaskController import TaskController
  21 +from app.util.component.TaskWriter import TaskWriter
21 from osgeo.ogr import DataSource,Layer,Geometry 22 from osgeo.ogr import DataSource,Layer,Geometry
22 from osgeo import ogr 23 from osgeo import ogr
23 24
@@ -100,7 +101,7 @@ class Api(ApiTemplate): @@ -100,7 +101,7 @@ class Api(ApiTemplate):
100 101
101 def task(self,table,task_guid): 102 def task(self,table,task_guid):
102 103
103 - sys_session = None 104 + task_writer = None
104 pg_session = None 105 pg_session = None
105 pg_ds = None 106 pg_ds = None
106 vacuate_process = None 107 vacuate_process = None
@@ -109,25 +110,22 @@ class Api(ApiTemplate): @@ -109,25 +110,22 @@ class Api(ApiTemplate):
109 #任务控制,等待执行 110 #任务控制,等待执行
110 TaskController.wait(task_guid) 111 TaskController.wait(task_guid)
111 112
  113 + task_writer = TaskWriter(task_guid)
112 114
113 - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)  
114 - sys_session.query(Table).filter_by(guid=table.guid).update({"is_vacuate": 2, "update_time": datetime.datetime.now()})  
115 - sys_session.query(Task).filter_by(guid=task_guid).update({"state":2,"process":"精华中"}) 115 + task_writer.update_table(table.guid,{"is_vacuate": 2, "update_time": datetime.datetime.now()})
  116 + task_writer.update_task({"state":2,"process":"精华中"})
116 117
117 - database = sys_session.query(Database).filter_by(guid=table.database_guid).one_or_none() 118 + database = task_writer.session.query(Database).filter_by(guid=table.database_guid).one_or_none()
118 database_sqlalchemy_uri = str(database.sqlalchemy_uri) 119 database_sqlalchemy_uri = str(database.sqlalchemy_uri)
  120 +
119 pg_session = PGUtil.get_db_session(DES.decode(database.sqlalchemy_uri)) 121 pg_session = PGUtil.get_db_session(DES.decode(database.sqlalchemy_uri))
120 pg_ds :DataSource= PGUtil.open_pg_data_source(0,DES.decode(database.sqlalchemy_uri)) 122 pg_ds :DataSource= PGUtil.open_pg_data_source(0,DES.decode(database.sqlalchemy_uri))
121 123
122 -  
123 #删除原有数据 124 #删除原有数据
124 - tvs = sys_session.query(TableVacuate).filter_by(table_guid=table.guid).all() 125 + tvs = task_writer.session.query(TableVacuate).filter_by(table_guid=table.guid).all()
125 for tv in tvs : 126 for tv in tvs :
126 - sys_session.delete(tv)  
127 -  
128 - sys_session.commit()  
129 - #长时间的连接,导致后续的session超时,现在先断开  
130 - sys_session.close() 127 + task_writer.session.delete(tv)
  128 + task_writer.session.commit()
131 129
132 130
133 # 创建抽稀过程 131 # 创建抽稀过程
@@ -150,9 +148,6 @@ class Api(ApiTemplate): @@ -150,9 +148,6 @@ class Api(ApiTemplate):
150 148
151 vacuate_process.set_vacuate_count() 149 vacuate_process.set_vacuate_count()
152 150
153 - # 重连  
154 - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)  
155 -  
156 #新增 151 #新增
157 if configure.VACUATE_DB_URI: 152 if configure.VACUATE_DB_URI:
158 user, passwd, host, port, datab = PGUtil.get_info_from_sqlachemy_uri(configure.VACUATE_DB_URI) 153 user, passwd, host, port, datab = PGUtil.get_info_from_sqlachemy_uri(configure.VACUATE_DB_URI)
@@ -169,41 +164,30 @@ class Api(ApiTemplate): @@ -169,41 +164,30 @@ class Api(ApiTemplate):
169 name=vacuate_process.vacuate_layers[l].GetName(), 164 name=vacuate_process.vacuate_layers[l].GetName(),
170 pixel_distance=vacuate_process.this_gridsize[l], 165 pixel_distance=vacuate_process.this_gridsize[l],
171 connectstr=DES.encode(connectstr)) 166 connectstr=DES.encode(connectstr))
172 - sys_session.add(table_vacuate) 167 + task_writer.session.add(table_vacuate)
173 168
174 - sys_session.query(Task).filter_by(guid=task_guid).update({"state":1,"update_time":datetime.datetime.now(),  
175 - "process": "精化完成"})  
176 - sys_session.query(Table).filter_by(guid=table.guid).update(  
177 - {"is_vacuate": 1, "update_time": datetime.datetime.now()})  
178 - sys_session.commit() 169 + task_writer.update_task({"state":1,"update_time":datetime.datetime.now(),"process": "精化完成"})
  170 + task_writer.update_table(table.guid, {"is_vacuate": 1, "update_time": datetime.datetime.now()})
179 171
180 except Exception as e: 172 except Exception as e:
181 try: 173 try:
182 - if not sys_session.is_active:  
183 - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)  
184 - sys_session.query(Task).filter_by(guid=task_guid).update({"state": -1,"update_time":datetime.datetime.now(),  
185 - "process": "精化失败"})  
186 - sys_session.query(Table).filter_by(guid=table.guid).update(  
187 - {"is_vacuate": 0, "update_time": datetime.datetime.now()})  
188 -  
189 - message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())  
190 - task_process_guid = uuid.uuid1().__str__()  
191 - task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),  
192 - task_guid=task_guid)  
193 - sys_session.add(task_process)  
194 - sys_session.commit() 174 + task_writer.update_task({"state": -1,"update_time":datetime.datetime.now(),"process": "精化失败"})
  175 + task_writer.update_table(table.guid, {"is_vacuate": 0, "update_time": datetime.datetime.now()})
  176 + task_writer.update_process( e.__str__())
  177 +
  178 + task_writer.session.commit()
  179 +
195 if vacuate_process: 180 if vacuate_process:
196 vacuate_process.rollback() 181 vacuate_process.rollback()
197 182
198 - print(traceback.format_exc()) 183 + StructurePrint().print(traceback.format_exc())
199 except Exception as ee: 184 except Exception as ee:
200 - print(traceback.format_exc()) 185 + StructurePrint().print(traceback.format_exc())
201 finally: 186 finally:
202 try: 187 try:
  188 + task_writer.close()
203 if vacuate_process: 189 if vacuate_process:
204 vacuate_process.end() 190 vacuate_process.end()
205 - if sys_session:  
206 - sys_session.close()  
207 if pg_session: 191 if pg_session:
208 pg_session.close() 192 pg_session.close()
209 if pg_ds: 193 if pg_ds:
@@ -233,15 +217,6 @@ class Api(ApiTemplate): @@ -233,15 +217,6 @@ class Api(ApiTemplate):
233 } 217 }
234 } 218 }
235 219
236 -class SysSession:  
237 - def __init__(self):  
238 - self.session : Session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)  
239 -  
240 - def close(self):  
241 - try:  
242 - self.session.close()  
243 - except:  
244 - pass  
245 220
246 class VacuateProcess: 221 class VacuateProcess:
247 222
@@ -5,10 +5,9 @@ @@ -5,10 +5,9 @@
5 5
6 import datetime 6 import datetime
7 import traceback 7 import traceback
8 -from ..models import Table, Database, DES,Task,db,TableVacuate 8 +from ..models import Table, Database, DES,Task,db,TableVacuate,Process
9 from app.util.component.ApiTemplate import ApiTemplate 9 from app.util.component.ApiTemplate import ApiTemplate
10 from app.util.component.PGUtil import PGUtil 10 from app.util.component.PGUtil import PGUtil
11 -from app.util.component.EntryDataVacuate import Process  
12 from app.util.component.StructuredPrint import StructurePrint 11 from app.util.component.StructuredPrint import StructurePrint
13 import multiprocessing 12 import multiprocessing
14 import uuid 13 import uuid
@@ -18,6 +17,7 @@ from osgeo import ogr @@ -18,6 +17,7 @@ from osgeo import ogr
18 17
19 from app.util.component.VacuateConf import VacuateConf 18 from app.util.component.VacuateConf import VacuateConf
20 from app.util.component.TaskController import TaskController 19 from app.util.component.TaskController import TaskController
  20 +from app.util.component.TaskWriter import TaskWriter
21 21
22 class Api(ApiTemplate): 22 class Api(ApiTemplate):
23 api_name = "单独抽稀" 23 api_name = "单独抽稀"
@@ -100,7 +100,7 @@ class Api(ApiTemplate): @@ -100,7 +100,7 @@ class Api(ApiTemplate):
100 100
101 def task(self,table,task_guid,grids): 101 def task(self,table,task_guid,grids):
102 102
103 - sys_session = None 103 + task_write = None
104 pg_session = None 104 pg_session = None
105 pg_ds = None 105 pg_ds = None
106 vacuate_process = None 106 vacuate_process = None
@@ -110,27 +110,23 @@ class Api(ApiTemplate): @@ -110,27 +110,23 @@ class Api(ApiTemplate):
110 110
111 #任务控制,等待执行 111 #任务控制,等待执行
112 TaskController.wait(task_guid) 112 TaskController.wait(task_guid)
  113 + task_write = TaskWriter(task_guid)
113 114
114 - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)  
115 - sys_session.query(Table).filter_by(guid=table.guid).update(  
116 - {"is_vacuate": 2, "update_time": datetime.datetime.now()})  
117 - sys_session.query(Task).filter_by(guid=task_guid).update({"state":2,"process":"精华中"}) 115 + task_write.update_table(table.guid,{"is_vacuate": 2, "update_time": datetime.datetime.now()})
  116 + task_write.update_task({"state":2,"process":"精华中"})
118 117
119 -  
120 - database = sys_session.query(Database).filter_by(guid=table.database_guid).one_or_none() 118 + database = task_write.session.query(Database).filter_by(guid=table.database_guid).one_or_none()
121 database_sqlalchemy_uri = str(database.sqlalchemy_uri) 119 database_sqlalchemy_uri = str(database.sqlalchemy_uri)
122 pg_session = PGUtil.get_db_session(DES.decode(database_sqlalchemy_uri)) 120 pg_session = PGUtil.get_db_session(DES.decode(database_sqlalchemy_uri))
123 pg_ds :DataSource= PGUtil.open_pg_data_source(1,DES.decode(database_sqlalchemy_uri)) 121 pg_ds :DataSource= PGUtil.open_pg_data_source(1,DES.decode(database_sqlalchemy_uri))
124 122
125 #删除原有数据 123 #删除原有数据
126 for grid in grids: 124 for grid in grids:
127 - tvs = sys_session.query(TableVacuate).filter_by(pixel_distance=grid,table_guid=table.guid).all() 125 + tvs = task_write.session.query(TableVacuate).filter_by(pixel_distance=grid,table_guid=table.guid).all()
128 for tv in tvs : 126 for tv in tvs :
129 - sys_session.delete(tv) 127 + task_write.session.delete(tv)
  128 + task_write.session.commit()
130 129
131 - sys_session.commit()  
132 - #长时间的连接,导致后续的session超时,现在先断开  
133 - sys_session.close()  
134 130
135 # 创建抽稀过程 131 # 创建抽稀过程
136 options = ["OVERWRITE=yes", "GEOMETRY_NAME={}".format(PGUtil.get_geo_column(table.name,pg_session)), 132 options = ["OVERWRITE=yes", "GEOMETRY_NAME={}".format(PGUtil.get_geo_column(table.name,pg_session)),
@@ -152,8 +148,6 @@ class Api(ApiTemplate): @@ -152,8 +148,6 @@ class Api(ApiTemplate):
152 148
153 vacuate_process.set_vacuate_count() 149 vacuate_process.set_vacuate_count()
154 150
155 - #重新连接  
156 - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)  
157 151
158 #新增 152 #新增
159 if configure.VACUATE_DB_URI: 153 if configure.VACUATE_DB_URI:
@@ -172,41 +166,26 @@ class Api(ApiTemplate): @@ -172,41 +166,26 @@ class Api(ApiTemplate):
172 name=layer_name, 166 name=layer_name,
173 pixel_distance=vacuate_process.this_gridsize[l], 167 pixel_distance=vacuate_process.this_gridsize[l],
174 connectstr=DES.encode(connectstr)) 168 connectstr=DES.encode(connectstr))
175 - sys_session.add(table_vacuate) 169 + task_write.session.add(table_vacuate)
  170 + task_write.update_task({"state":1,"update_time":datetime.datetime.now(),"process" : "精化完成"})
  171 + task_write.update_table(table.guid,{"is_vacuate": 1, "update_time": datetime.datetime.now()})
176 172
177 - sys_session.query(Task).filter_by(guid=task_guid).update({"state":1,"update_time":datetime.datetime.now(),  
178 - "process" : "精化完成"})  
179 - sys_session.query(Table).filter_by(guid=table.guid).update(  
180 - {"is_vacuate": 1, "update_time": datetime.datetime.now()})  
181 - sys_session.commit()  
182 173
183 except Exception as e: 174 except Exception as e:
184 try: 175 try:
185 - if not sys_session.is_active:  
186 - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)  
187 - sys_session.query(Task).filter_by(guid=task_guid).update({"state": -1,"update_time":datetime.datetime.now(),  
188 - "process": "精化失败"})  
189 - sys_session.query(Table).filter_by(guid=table.guid).update(  
190 - {"is_vacuate": origin_vacuate, "update_time": datetime.datetime.now()})  
191 -  
192 - message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())  
193 - task_process_guid = uuid.uuid1().__str__()  
194 - task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),  
195 - task_guid=task_guid)  
196 - sys_session.add(task_process)  
197 - sys_session.commit() 176 + task_write.update_task({"state": -1,"update_time":datetime.datetime.now(),"process": "精化失败"})
  177 + task_write.update_table(table.guid, {"is_vacuate": origin_vacuate, "update_time": datetime.datetime.now()})
  178 + task_write.update_process( e.__str__())
198 if vacuate_process: 179 if vacuate_process:
199 vacuate_process.rollback() 180 vacuate_process.rollback()
200 -  
201 print(traceback.format_exc()) 181 print(traceback.format_exc())
202 except Exception as ee: 182 except Exception as ee:
203 print(traceback.format_exc()) 183 print(traceback.format_exc())
204 finally: 184 finally:
205 try: 185 try:
  186 + task_write.close()
206 if vacuate_process: 187 if vacuate_process:
207 vacuate_process.end() 188 vacuate_process.end()
208 - if sys_session:  
209 - sys_session.close()  
210 if pg_session: 189 if pg_session:
211 pg_session.close() 190 pg_session.close()
212 if pg_ds: 191 if pg_ds:
@@ -4,7 +4,8 @@ @@ -4,7 +4,8 @@
4 #email: nheweijun@sina.com 4 #email: nheweijun@sina.com
5 5
6 from app.models import db 6 from app.models import db
7 -from sqlalchemy import Column, Integer, String, ForeignKey, Text, DateTime, Time,Float,Binary 7 +from sqlalchemy import Column, Integer, String, ForeignKey, Text, DateTime, Time,Float,Binary,Sequence
  8 +from sqlalchemy import Sequence
8 from sqlalchemy.orm import relationship 9 from sqlalchemy.orm import relationship
9 import base64 10 import base64
10 from pyDes import * 11 from pyDes import *
@@ -29,6 +29,7 @@ class Api(ApiTemplate): @@ -29,6 +29,7 @@ class Api(ApiTemplate):
29 exec_result = os.popen('taskkill.exe /pid:' + str(pid)) 29 exec_result = os.popen('taskkill.exe /pid:' + str(pid))
30 30
31 else: 31 else:
  32 + #分布式下,很难吧,还需要记录机器,分布式是不可行的
32 os.kill(pid,signal.SIGILL) 33 os.kill(pid,signal.SIGILL)
33 except Exception as e: 34 except Exception as e:
34 StructurePrint.print("Kill task 失败") 35 StructurePrint.print("Kill task 失败")
@@ -144,7 +144,10 @@ class EntryDataVacuate: @@ -144,7 +144,10 @@ class EntryDataVacuate:
144 144
145 145
146 for i in range(ds.GetLayerCount()): 146 for i in range(ds.GetLayerCount()):
  147 +
147 layer: Layer = ds.GetLayer(i) 148 layer: Layer = ds.GetLayer(i)
  149 + if layer.GetName().lower() not in meta.get("layer").keys():
  150 + continue
148 is_success, new_layer_name = self.entry_one_layer(layer,this_task,meta) 151 is_success, new_layer_name = self.entry_one_layer(layer,this_task,meta)
149 new_layer_names.append(new_layer_name) 152 new_layer_names.append(new_layer_name)
150 is_successes.append(is_success) 153 is_successes.append(is_success)
@@ -330,7 +333,12 @@ class ThisTask: @@ -330,7 +333,12 @@ class ThisTask:
330 is_vacuate=is_vacuate 333 is_vacuate=is_vacuate
331 ) 334 )
332 # 删除遗留业务数据 335 # 删除遗留业务数据
333 - history_table = self.sys_session.query(Table).filter_by(name=new_layer_name,database_guid=self.database.guid).all() 336 + try:
  337 + history_table = self.sys_session.query(Table).filter_by(name=new_layer_name,database_guid=self.database.guid).all()
  338 + except:
  339 + self.sys_session: Session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  340 + history_table = self.sys_session.query(Table).filter_by(name=new_layer_name,database_guid=self.database.guid).all()
  341 +
334 if history_table: 342 if history_table:
335 for ht in history_table: 343 for ht in history_table:
336 self.sys_session.delete(ht) 344 self.sys_session.delete(ht)
@@ -21,6 +21,7 @@ from osgeo.gdal import * @@ -21,6 +21,7 @@ from osgeo.gdal import *
21 import traceback 21 import traceback
22 import os 22 import os
23 from app.util.component.TaskController import TaskController 23 from app.util.component.TaskController import TaskController
  24 +from app.util.component.TaskWriter import TaskWriter
24 25
25 class Api(ApiTemplate): 26 class Api(ApiTemplate):
26 27
@@ -76,21 +77,17 @@ class Api(ApiTemplate): @@ -76,21 +77,17 @@ class Api(ApiTemplate):
76 return res 77 return res
77 78
78 def build_pyramid_task(self,image_guid,task_guid,data_servers,path): 79 def build_pyramid_task(self,image_guid,task_guid,data_servers,path):
79 - sys_session = None  
80 - try:  
81 80
  81 + task_writer = None
82 82
  83 + try:
83 #任务控制,等待执行 84 #任务控制,等待执行
84 TaskController.wait(task_guid) 85 TaskController.wait(task_guid)
  86 + task_writer = TaskWriter(task_guid)
85 87
86 - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)  
87 #进入创建金字塔的状态 88 #进入创建金字塔的状态
88 - sys_session.query(Image).filter_by(guid=image_guid).update({"has_pyramid": -1})  
89 - sys_session.query(Task).filter_by(guid=task_guid).update({"state": 2})  
90 -  
91 - sys_session.commit()  
92 - sys_session.close()  
93 - 89 + task_guid.sys_session.query(Image).filter_by(guid=image_guid).update({"has_pyramid": -1})
  90 + task_writer.update_task({"state": 2})
94 91
95 #所有数据节点的影像都要建金字塔 92 #所有数据节点的影像都要建金字塔
96 update_size = None 93 update_size = None
@@ -109,11 +106,9 @@ class Api(ApiTemplate): @@ -109,11 +106,9 @@ class Api(ApiTemplate):
109 update_size = json.loads(thrift_connect.client.getInfo(path)).get("size") 106 update_size = json.loads(thrift_connect.client.getInfo(path)).get("size")
110 thrift_connect.close() 107 thrift_connect.close()
111 108
112 - #重新连接,防止session超时  
113 - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI) 109 + task_writer.update_task({"state":1,"update_time":datetime.datetime.now()})
114 110
115 - sys_session.query(Task).filter_by(guid=task_guid).update({"state":1,"update_time":datetime.datetime.now()})  
116 - image:Image = sys_session.query(Image).filter_by(guid=image_guid).one_or_none() 111 + image:Image = task_writer.session.query(Image).filter_by(guid=image_guid).one_or_none()
117 112
118 if not overview_count: 113 if not overview_count:
119 overview_count = 0 114 overview_count = 0
@@ -128,20 +123,12 @@ class Api(ApiTemplate): @@ -128,20 +123,12 @@ class Api(ApiTemplate):
128 image.overview_count = overview_count 123 image.overview_count = overview_count
129 if update_size: 124 if update_size:
130 image.size = update_size 125 image.size = update_size
131 -  
132 except: 126 except:
133 - sys_session.query(Image).filter_by(guid=image_guid).update({"has_pyramid": 0})  
134 - sys_session.query(Task).filter_by(guid=task_guid).update({"state": -1,"update_time":datetime.datetime.now()}) 127 + task_writer.session.query(Image).filter_by(guid=image_guid).update({"has_pyramid": 0})
  128 + task_writer.update_task({"state": -1,"update_time":datetime.datetime.now()})
135 finally: 129 finally:
136 - sys_session.commit()  
137 - if sys_session:  
138 - try:  
139 - sys_session.close()  
140 - except:  
141 - pass  
142 -  
143 -  
144 - 130 + task_writer.session.commit()
  131 + task_writer.close()
145 132
146 def buildOverview(self,path): 133 def buildOverview(self,path):
147 image: Dataset = gdal.Open(path, 1) 134 image: Dataset = gdal.Open(path, 1)
@@ -10,8 +10,9 @@ import time @@ -10,8 +10,9 @@ import time
10 from app.modules.service.models import ImageService 10 from app.modules.service.models import ImageService
11 from app.modules.service.models import TileScheme,Service 11 from app.modules.service.models import TileScheme,Service
12 from app.util.component.ModelVisitor import ModelVisitor 12 from app.util.component.ModelVisitor import ModelVisitor
13 - 13 +from app.util.component.StructuredPrint import StructurePrint
14 import json 14 import json
  15 +import traceback
15 16
16 class Cache: 17 class Cache:
17 18
@@ -29,17 +30,23 @@ class Cache: @@ -29,17 +30,23 @@ class Cache:
29 GLOBAL_DIC["zookeeper_updatetime"] = time.time() 30 GLOBAL_DIC["zookeeper_updatetime"] = time.time()
30 else: 31 else:
31 if not zoo.connected: 32 if not zoo.connected:
32 - zoo.start() 33 + try:
  34 + zoo.start()
  35 + except:
  36 + pass
33 37
34 # 更新zoo 38 # 更新zoo
35 if not GLOBAL_DIC.get("zookeeper_updatetime"): 39 if not GLOBAL_DIC.get("zookeeper_updatetime"):
36 GLOBAL_DIC["zookeeper_updatetime"] = time.time() 40 GLOBAL_DIC["zookeeper_updatetime"] = time.time()
37 if time.time() - GLOBAL_DIC["zookeeper_updatetime"] > 15: 41 if time.time() - GLOBAL_DIC["zookeeper_updatetime"] > 15:
38 - #释放 42 + #释放,高并发下可行吗,线程安全问题
39 try: 43 try:
40 zoo.stop() 44 zoo.stop()
41 - except:  
42 - pass 45 + zoo.close()
  46 + del zoo
  47 + except Exception as e:
  48 + StructurePrint().print("关闭zoo失败")
  49 + StructurePrint().print(e.__str__())
43 zoo: KazooClient = KazooClient(hosts=configure.zookeeper, timeout=1) 50 zoo: KazooClient = KazooClient(hosts=configure.zookeeper, timeout=1)
44 zoo.start() 51 zoo.start()
45 GLOBAL_DIC["zookeeper"] = zoo 52 GLOBAL_DIC["zookeeper"] = zoo
@@ -8,6 +8,8 @@ from osgeo import gdal,ogr @@ -8,6 +8,8 @@ from osgeo import gdal,ogr
8 import uuid 8 import uuid
9 import time 9 import time
10 import os 10 import os
  11 +import json
  12 +
11 13
12 def get_info_from_sqlachemy_uri(uri): 14 def get_info_from_sqlachemy_uri(uri):
13 parts = uri.split(":") 15 parts = uri.split(":")
@@ -39,121 +41,158 @@ def open_pg_data_source(iswrite, uri): @@ -39,121 +41,158 @@ def open_pg_data_source(iswrite, uri):
39 raise Exception("打开数据源失败!") 41 raise Exception("打开数据源失败!")
40 return ds 42 return ds
41 43
42 -def move(geo:Geometry,offx,offy):  
43 44
44 - g = geo.GetGeometryRef(0)  
45 - num = g.GetPointCount() 45 +def move_digui(coords,offx,offy):
  46 + if isinstance(coords[0],list):
  47 + for coord in coords:
  48 + move_digui(coord, offx, offy)
  49 + else :
  50 + coords[0] += offx
  51 + coords[1] += offy
46 52
47 - coor = []  
48 - try:  
49 - for j in range(num):  
50 - point = g.GetPoint(j)  
51 - x = point[0]  
52 - y = point[1]  
53 - x += offx  
54 - y += offy  
55 - coor.append([x, y])  
56 - if num==1:  
57 - point = ogr.Geometry(ogr.wkbPoint)  
58 - point.AddPoint(coor[0][0], coor[0][1])  
59 - return point  
60 - elif coor[0].__eq__(coor[-1]):  
61 - ring = ogr.Geometry(ogr.wkbLinearRing)  
62 - for co in coor:  
63 - ring.AddPoint(co[0], co[1])  
64 - poly = ogr.Geometry(ogr.wkbPolygon)  
65 - poly.AddGeometry(ring)  
66 - return poly  
67 - else :  
68 - line = ogr.Geometry(ogr.wkbLineString)  
69 - for co in coor:  
70 - line.AddPoint(co[0], co[1])  
71 - return line  
72 - except:  
73 - return None 53 +def move(geo,offx,offy):
74 54
75 55
76 -def copydata(): 56 + geojson_str = geo.ExportToJson()
  57 + geojson = json.loads(geojson_str)
77 58
  59 + coords = geojson["coordinates"]
  60 + move_digui(coords, offx, offy)
  61 + og = ogr.CreateGeometryFromJson(json.dumps(geojson))
78 62
79 - bound = "" 63 + return og
80 64
81 - work_dir =r"E:\D2\桌面\FSSD_RES_PY_FWM"  
82 65
83 - data_path=r"E:\D2\桌面\FSSD_RES_PY_FWM\fs4326.shp" 66 +def copydata(bound,input,output):
  67 + work_dir = os.path.dirname(os.path.abspath(__file__))
84 68
85 - base_name="fs900w" 69 + shp_driver: Driver = ogr.GetDriverByName("ESRI Shapefile")
  70 + bound_ds : DataSource = shp_driver.Open(bound, 0)
  71 + bound_layer :Layer = bound_ds.GetLayer(0)
86 72
87 - driver: Driver = ogr.GetDriverByName("ESRI Shapefile")  
88 - ds: DataSource = driver.Open(data_path, 1) 73 + bf = bound_layer.GetNextFeature()
  74 + bound_geom = bf.GetGeometryRef()
89 75
90 - bound_ds : DataSource = driver.Open(bound, 1)  
91 - bound_layer :Layer = bound_ds.GetLayer(0)  
92 - bound_geom = bound_layer.GetNextFeature().GetGeometryRef()  
93 76
94 77
95 - if not ds: 78 + gdb_driver: Driver = ogr.GetDriverByName("FileGDB")
  79 + # gdb_ds: DataSource = gdb_driver.Open(gdb, 0)
  80 + # layer: Layer = gdb_ds.GetLayerByName(layer_name)
  81 +
  82 + input_ds: DataSource = shp_driver.Open(input, 0)
  83 + layer: Layer = input_ds.GetLayer(0)
  84 +
  85 + if not layer:
96 raise Exception("打开数据失败!") 86 raise Exception("打开数据失败!")
97 87
98 - layer: Layer = ds.GetLayer(0) 88 +
  89 + output_ds: DataSource = gdb_driver.CreateDataSource(output)
99 90
100 91
  92 + gdb_layer:Layer = output_ds.CreateLayer(layer.GetName(),layer.GetSpatialRef(),layer.GetGeomType())
101 93
102 - schema = layer.schema  
103 - schema = [s for s in schema if not s.GetName().lower().__eq__("objectid")]  
104 - for s in schema :  
105 - if s.GetName().lower().__eq__("objectid") and not s.GetTypeName().__eq__("Interger"):  
106 - s.SetType(ogr.OFTInteger) 94 + gdb_layer.CreateFields(layer.schema)
107 95
108 96
109 97
110 - gdb_driver:Driver = ogr.GetDriverByName("FileGDB")  
111 - gdb_ds: DataSource = gdb_driver.CreateDataSource(os.path.join(work_dir,"{}.gdb".format(base_name)))  
112 - gdb_layer:Layer = gdb_ds.CreateLayer(base_name, layer.GetSpatialRef(),layer.GetGeomType())  
113 98
114 - # gdb_layer.CreateFields(schema) 99 + #
  100 + bound_exent = bound_layer.GetExtent()
115 101
116 - print(gdb_layer.GetFIDColumn()) 102 + layer_extent = layer.GetExtent()
  103 +
  104 + xxrange=layer_extent[1]-layer_extent[0]
  105 + yrange = layer_extent[3]-layer_extent[2]
  106 +
  107 +
  108 + left = (int((layer_extent[0] - bound_exent[0]) / xxrange) + 1) * (-1)
  109 + right = int((bound_exent[1] - layer_extent[1]) / xxrange) + 1
  110 + up = int((bound_exent[3] - layer_extent[3]) / yrange) + 1
  111 + down = (int((layer_extent[2] - bound_exent[2]) / yrange) + 1) * (-1)
117 112
118 - extent = layer.GetExtent()  
119 - xxrange=extent[1]-extent[0]  
120 - yrange = extent[3]-extent[2]  
121 - feature_defn: FeatureDefn = layer.GetLayerDefn()  
122 113
123 begin = time.time() 114 begin = time.time()
124 - count=0  
125 - work_dir =os.path.dirname(os.path.abspath(__file__)) 115 + count=1
  116 +
126 117
127 for f in layer: 118 for f in layer:
128 119
129 - if count%10000==0:  
130 - print(count)  
131 - with open(os.path.join(work_dir, "copy.txt"), "w") as fi:  
132 - fi.write("已完成{}".format(count))  
133 - g:Geometry=f.GetGeometryRef()  
134 - new_f:Feature = copy.copy(f)  
135 - for xt in range(3):  
136 - for yt in range(3):  
137 - out_g = move(g,xxrange*xt,yrange*yt)  
138 - new_f.SetGeometry(out_g)  
139 - new_f.SetFID(count)  
140 - dd:FieldDefn=new_f.GetField("OBJECTID")  
141 - # new_f.UnsetField("OBJECTID")  
142 - gdb_layer.CreateFeature(new_f)  
143 - count += 1 120 + new_f = copy.copy(f)
  121 + for xt in range(left,right+1,1):
  122 + for yt in range(down,up+1,1):
  123 + out_g = move(f.GetGeometryRef(),xxrange*xt,yrange*yt)
  124 +
  125 + if out_g.Intersect(bound_geom):
  126 + out_g = out_g.Intersection(bound_geom)
  127 + new_f.SetGeometry(out_g)
  128 + new_f.SetFID(count)
  129 + gdb_layer.CreateFeature(new_f)
  130 + count += 1
  131 + if count % 10000 == 0:
  132 + # print(count)
  133 + with open(os.path.join(work_dir, "copy.txt"), "w") as fi:
  134 + fi.write("已完成{}".format(count))
144 135
145 print(time.time()-begin) 136 print(time.time()-begin)
146 137
147 - gdb_ds.Destroy() 138 + input_ds.Destroy()
148 139
  140 +def envelop_2_polygon(env):
  141 + ring = ogr.Geometry(ogr.wkbLinearRing)
  142 + ring.AddPoint(env[0], env[2])
  143 + ring.AddPoint(env[0], env[3])
  144 + ring.AddPoint(env[1], env[3])
  145 + ring.AddPoint(env[1], env[2])
  146 + ring.AddPoint(env[0], env[2])
  147 + # Create polygon
  148 + poly = ogr.Geometry(ogr.wkbPolygon)
  149 + poly.AddGeometry(ring)
  150 + return poly
149 151
  152 +def clip():
150 153
151 -if __name__ == '__main__':  
152 - bound_shp = ""  
153 - gdb = ""  
154 - layer_name = "" 154 + input = r"E:\Data\copy\x.shp"
  155 + output = r"E:\Data\copy\xianorigin.shp"
  156 + gdal.SetConfigOption("SHAPE_ENCODING", "UTF-8")
  157 + shp_driver: Driver = ogr.GetDriverByName("ESRI Shapefile")
155 158
156 - copydata() 159 + input_ds: DataSource = shp_driver.Open(input, 0)
  160 + layer: Layer = input_ds.GetLayer(0)
  161 +
  162 + bound_geo = envelop_2_polygon([112.84,112.952,22.914,22.985])
  163 +
  164 + if not layer:
  165 + raise Exception("打开数据失败!")
  166 +
  167 + output_ds: DataSource = shp_driver.CreateDataSource(output)
  168 + #
  169 + # out_layer: Layer = output_ds.CreateLayer(layer.GetName(), layer.GetSpatialRef(), layer.GetGeomType())
  170 + # out_layer.CreateFields(layer.schema)
  171 + for f in layer:
  172 + new_f:Feature = copy.copy(f)
  173 + out_g = f.GetGeometryRef()
  174 + if out_g.Intersect(bound_geo):
  175 + out_g = out_g.Intersection(bound_geo)
  176 + new_f.SetGeometry(out_g)
  177 + new_f.SetField("ttt",1)
  178 + # new_f.UnsetField("OBJECTID")
  179 + # out_layer.CreateFeature(new_f)
  180 +
  181 + # output_ds.Destroy()
  182 +
  183 +
  184 +
  185 +if __name__ == '__main__':
  186 + # bound_shp = "E:\Data\广东边界\gdsample.shp"
  187 + # input = r"E:\Data\copy\origin.shp"
  188 + # output = r"E:\Data\copy\re.shp"
  189 + # copydata(bound_shp,input,output)
  190 + #
  191 + # bound_shp = "/root/CopyData/gdsample.shp"
  192 + # input = r"/root/CopyData/origin.shp"
  193 + # output = r"/root/CopyData/re.gdb"
  194 + # copydata(bound_shp,input,output)
  195 + clip()
157 196
158 197
159 198
1 -# coding=utf-8  
2 -#author: 4N  
3 -#createtime: 2021/6/11  
4 -#email: nheweijun@sina.com  
5 -import copy  
6 -from osgeo.ogr import *  
7 -from osgeo import gdal,ogr  
8 -import uuid  
9 -import time  
10 -import os  
11 -import json  
12 -  
13 -  
14 -def get_info_from_sqlachemy_uri(uri):  
15 - parts = uri.split(":")  
16 - user = parts[1][2:]  
17 -  
18 - password_list = parts[2].split("@")  
19 - if password_list.__len__() > 2:  
20 - password = "@".join(password_list[:-1])  
21 - else:  
22 - password = parts[2].split("@")[0]  
23 - host = parts[2].split("@")[-1]  
24 - port = parts[3].split("/")[0]  
25 - database = parts[3].split("/")[1]  
26 -  
27 - return user, password, host, port, database  
28 -  
29 -def open_pg_data_source(iswrite, uri):  
30 - """  
31 - # 获取PostGIS数据源  
32 - :return:  
33 - """  
34 - db_conn_tuple = get_info_from_sqlachemy_uri(uri)  
35 - fn = "PG: user=%s password=%s host=%s port=%s dbname=%s " % db_conn_tuple  
36 - driver = ogr.GetDriverByName("PostgreSQL")  
37 - if driver is None:  
38 - raise Exception("打开PostgreSQL驱动失败,可能是当前GDAL未支持PostgreSQL驱动!")  
39 - ds = driver.Open(fn, iswrite)  
40 - if ds is None:  
41 - raise Exception("打开数据源失败!")  
42 - return ds  
43 -  
44 -  
45 -def move_digui(coords,offx,offy):  
46 - if isinstance(coords[0],list):  
47 - for coord in coords:  
48 - move_digui(coord, offx, offy)  
49 - else :  
50 - coords[0] += offx  
51 - coords[1] += offy  
52 -  
53 -def move(geo,offx,offy):  
54 -  
55 -  
56 - geojson_str = geo.ExportToJson()  
57 - geojson = json.loads(geojson_str)  
58 -  
59 - coords = geojson["coordinates"]  
60 - move_digui(coords, offx, offy)  
61 - og = ogr.CreateGeometryFromJson(json.dumps(geojson))  
62 -  
63 - return og  
64 -  
65 -  
66 -def copydata(bound,input,output):  
67 - work_dir = os.path.dirname(os.path.abspath(__file__))  
68 -  
69 - shp_driver: Driver = ogr.GetDriverByName("ESRI Shapefile")  
70 - bound_ds : DataSource = shp_driver.Open(bound, 0)  
71 - bound_layer :Layer = bound_ds.GetLayer(0)  
72 -  
73 - bf = bound_layer.GetNextFeature()  
74 - bound_geom = bf.GetGeometryRef()  
75 -  
76 -  
77 -  
78 - gdb_driver: Driver = ogr.GetDriverByName("FileGDB")  
79 - # gdb_ds: DataSource = gdb_driver.Open(gdb, 0)  
80 - # layer: Layer = gdb_ds.GetLayerByName(layer_name)  
81 -  
82 - input_ds: DataSource = shp_driver.Open(input, 0)  
83 - layer: Layer = input_ds.GetLayer(0)  
84 -  
85 - if not layer:  
86 - raise Exception("打开数据失败!")  
87 -  
88 -  
89 - output_ds: DataSource = gdb_driver.CreateDataSource(output)  
90 -  
91 -  
92 - gdb_layer:Layer = output_ds.CreateLayer(layer.GetName(),layer.GetSpatialRef(),layer.GetGeomType())  
93 -  
94 - gdb_layer.CreateFields(layer.schema)  
95 -  
96 -  
97 -  
98 -  
99 - #  
100 - bound_exent = bound_layer.GetExtent()  
101 -  
102 - layer_extent = layer.GetExtent()  
103 -  
104 - xxrange=layer_extent[1]-layer_extent[0]  
105 - yrange = layer_extent[3]-layer_extent[2]  
106 -  
107 -  
108 - left = (int((layer_extent[0] - bound_exent[0]) / xxrange) + 1) * (-1)  
109 - right = int((bound_exent[1] - layer_extent[1]) / xxrange) + 1  
110 - up = int((bound_exent[3] - layer_extent[3]) / yrange) + 1  
111 - down = (int((layer_extent[2] - bound_exent[2]) / yrange) + 1) * (-1)  
112 -  
113 -  
114 - begin = time.time()  
115 - count=1  
116 -  
117 -  
118 - for f in layer:  
119 -  
120 - new_f = copy.copy(f)  
121 - for xt in range(left,right+1,1):  
122 - for yt in range(down,up+1,1):  
123 - out_g = move(f.GetGeometryRef(),xxrange*xt,yrange*yt)  
124 -  
125 - if out_g.Intersect(bound_geom):  
126 - out_g = out_g.Intersection(bound_geom)  
127 - new_f.SetGeometry(out_g)  
128 - new_f.SetFID(count)  
129 - gdb_layer.CreateFeature(new_f)  
130 - count += 1  
131 - if count % 10000 == 0:  
132 - # print(count)  
133 - with open(os.path.join(work_dir, "copy.txt"), "w") as fi:  
134 - fi.write("已完成{}".format(count))  
135 -  
136 - print(time.time()-begin)  
137 -  
138 - input_ds.Destroy()  
139 -  
140 -def envelop_2_polygon(env):  
141 - ring = ogr.Geometry(ogr.wkbLinearRing)  
142 - ring.AddPoint(env[0], env[2])  
143 - ring.AddPoint(env[0], env[3])  
144 - ring.AddPoint(env[1], env[3])  
145 - ring.AddPoint(env[1], env[2])  
146 - ring.AddPoint(env[0], env[2])  
147 - # Create polygon  
148 - poly = ogr.Geometry(ogr.wkbPolygon)  
149 - poly.AddGeometry(ring)  
150 - return poly  
151 -  
152 -def clip():  
153 -  
154 - input = r"E:\Data\copy\x.shp"  
155 - output = r"E:\Data\copy\xianorigin.shp"  
156 - gdal.SetConfigOption("SHAPE_ENCODING", "UTF-8")  
157 - shp_driver: Driver = ogr.GetDriverByName("ESRI Shapefile")  
158 -  
159 - input_ds: DataSource = shp_driver.Open(input, 0)  
160 - layer: Layer = input_ds.GetLayer(0)  
161 -  
162 - bound_geo = envelop_2_polygon([112.84,112.952,22.914,22.985])  
163 -  
164 - if not layer:  
165 - raise Exception("打开数据失败!")  
166 -  
167 - output_ds: DataSource = shp_driver.CreateDataSource(output)  
168 - #  
169 - # out_layer: Layer = output_ds.CreateLayer(layer.GetName(), layer.GetSpatialRef(), layer.GetGeomType())  
170 - # out_layer.CreateFields(layer.schema)  
171 - for f in layer:  
172 - new_f:Feature = copy.copy(f)  
173 - out_g = f.GetGeometryRef()  
174 - if out_g.Intersect(bound_geo):  
175 - out_g = out_g.Intersection(bound_geo)  
176 - new_f.SetGeometry(out_g)  
177 - new_f.SetField("ttt",1)  
178 - # new_f.UnsetField("OBJECTID")  
179 - # out_layer.CreateFeature(new_f)  
180 -  
181 - # output_ds.Destroy()  
182 -  
183 -  
184 -  
185 -if __name__ == '__main__':  
186 - # bound_shp = "E:\Data\广东边界\gdsample.shp"  
187 - # input = r"E:\Data\copy\origin.shp"  
188 - # output = r"E:\Data\copy\re.shp"  
189 - # copydata(bound_shp,input,output)  
190 - #  
191 - # bound_shp = "/root/CopyData/gdsample.shp"  
192 - # input = r"/root/CopyData/origin.shp"  
193 - # output = r"/root/CopyData/re.gdb"  
194 - # copydata(bound_shp,input,output)  
195 - clip()  
196 -  
197 -  
198 -  
199 -  
200 -  
201 -  
202 -  
203 -  
204 -  
205 -  
206 -  
1 -  
2 -from osgeo.ogr import *  
3 -from osgeo import ogr  
4 -from osgeo import gdal  
5 -import os  
6 -import uuid  
7 -import shutil  
8 -import time  
9 -from app.modules.data.models import *  
10 -from app.util.component.PGUtil import PGUtil  
11 -from app.util.component.StructuredPrint import StructurePrint  
12 -from sqlalchemy.orm import Session  
13 -import configure  
14 -import copy  
15 -import datetime  
16 -class EntryData:  
17 -  
18 - def entry(self,parameter):  
19 - meta:dict = parameter.get("meta")  
20 -  
21 - #设置编码  
22 - encoding = parameter.get("encoding")  
23 - if encoding:  
24 - gdal.SetConfigOption("SHAPE_ENCODING",encoding)  
25 - else:  
26 - gdal.SetConfigOption("SHAPE_ENCODING", "GBK")  
27 -  
28 - #如果包含cpg文件,优先使用cpg文件中声明的编码  
29 - encoding_cpg = meta.get("encoding")  
30 - if encoding_cpg:  
31 - gdal.SetConfigOption("SHAPE_ENCODING", encoding_cpg)  
32 -  
33 -  
34 - # 初始化任务  
35 - this_task = ThisTask(parameter)  
36 - data_path = meta.get("data_path")  
37 -  
38 - try:  
39 - this_task.write_process("入库任务初始化...")  
40 -  
41 - this_task.update({"process": "入库中"})  
42 - is_success = None  
43 - if not data_path:  
44 - raise Exception("数据错误!")  
45 - # 分为shp和gdb 2种录入形式  
46 -  
47 - # 开始事务  
48 - this_task.pg_ds.StartTransaction()  
49 - if data_path.endswith("shp"):  
50 - is_success,new_layer_name = self.entry_shp(data_path,this_task)  
51 -  
52 - if data_path.endswith("gdb"):  
53 - is_success,new_layer_names = self.entry_gdb(data_path,this_task)  
54 -  
55 - this_task.write_process("数据入库结束。")  
56 -  
57 - if is_success:  
58 - # 更新任务为成功任务  
59 - this_task.pg_ds.CommitTransaction()  
60 - this_task.update({"state": 1,"process":"入库完成","update_time": datetime.datetime.now()})  
61 - else:  
62 - # 更新任务为失败任务  
63 - this_task.update({"state": -1, "process": "入库失败", "update_time": datetime.datetime.now()})  
64 - # rollback  
65 - this_task.pg_ds.RollbackTransaction()  
66 - except Exception as e:  
67 - this_task.write_process("{} 任务结束!".format(e.__str__()))  
68 - this_task.update({"state": -1, "process": "入库失败", "update_time": datetime.datetime.now()})  
69 - StructurePrint().print(e.__str__(),"ERROR")  
70 - # rollback  
71 - this_task.pg_ds.RollbackTransaction()  
72 - finally:  
73 - this_task.end()  
74 - try:  
75 - file_tmp_path = os.path.join(data_path.split("file_tmp")[0],"file_tmp")  
76 - dir_path = os.path.dirname(data_path)  
77 - i=0  
78 - while not os.path.dirname(dir_path).__eq__(file_tmp_path) and i<30:  
79 - dir_path = os.path.dirname(dir_path)  
80 - i+=1  
81 - if i<30:  
82 - shutil.rmtree(dir_path,True)  
83 - StructurePrint().print("删除文件成功!")  
84 - else:  
85 - raise Exception("找不到文件!")  
86 -  
87 - except Exception as e:  
88 - StructurePrint().print(e.__str__(), "ERROR")  
89 - StructurePrint().print("删除文件失败!","ERROR")  
90 -  
91 - def entry_shp(self,data_path,this_task):  
92 - '''  
93 - 录入shp  
94 - :param data_path:  
95 - :return:  
96 - '''  
97 -  
98 - driver: Driver = ogr.GetDriverByName("ESRI Shapefile")  
99 - ds: DataSource = driver.Open(data_path, 1)  
100 - if not ds:  
101 - raise Exception("打开数据失败!")  
102 - layer: Layer = ds.GetLayer(0)  
103 -  
104 - return self.entry_one_layer(layer, this_task)  
105 -  
106 - def entry_gdb(self,data_path,this_task):  
107 - '''  
108 - 录入gdb  
109 - :param data_path:  
110 - :return:  
111 - '''  
112 -  
113 - is_successes = []  
114 - new_layer_names=[]  
115 - driver: Driver = ogr.GetDriverByName("OpenFileGDB")  
116 - ds: DataSource = driver.Open(data_path, 0)  
117 - if not ds:  
118 - raise Exception("打开数据失败!")  
119 -  
120 -  
121 - for i in range(ds.GetLayerCount()):  
122 - layer: Layer = ds.GetLayer(i)  
123 - is_success, new_layer_name = self.entry_one_layer(layer,this_task)  
124 - new_layer_names.append(new_layer_name)  
125 - is_successes.append(is_success)  
126 -  
127 - if is_successes.__contains__(False):  
128 - return False,new_layer_names  
129 - else:  
130 - return True,new_layer_names  
131 -  
132 - def entry_one_layer(self,layer: Layer,this_task):  
133 -  
134 - # this_task.pg_ds.StartTransaction()  
135 - new_layer_name = None  
136 -  
137 -  
138 - try:  
139 - # 图层设置  
140 - parameter = this_task.parameter  
141 -  
142 - meta: dict = parameter.get("meta")  
143 - overwrite = parameter.get("overwrite") if parameter.get("overwrite") is not None and parameter.get("overwrite")=="yes" else "no"  
144 - geom_name = parameter.get("geom_name") if parameter.get("geom_name") is not None else "geom"  
145 - fid = parameter.get("fid") if parameter.get("fid") is not None else "fid"  
146 - options = ["OVERWRITE={}".format(overwrite), "FID={}".format(fid), "GEOMETRY_NAME={}".format(geom_name),"PRECISION=NO"]  
147 -  
148 -  
149 -  
150 - # # 中文名处理  
151 - # chinese = is_chinese(new_layer_name)  
152 - # if chinese:  
153 - # new_layer_name = new_layer_name.__hash__()  
154 -  
155 - # 将线/面转多线多面,dmap只支持多线面  
156 - geom_type = self.change_geom_type(layer.GetGeomType())  
157 -  
158 - # 更改图层名  
159 - change_name = False  
160 - origin_name = layer.GetName().lower()  
161 -  
162 - # 新图层名  
163 - new_layer_name: str = meta.get("layer").get(origin_name)  
164 - origin_name = new_layer_name  
165 - no = 1  
166 - while overwrite.__eq__("no") and this_task.pg_ds.GetLayerByName(new_layer_name) :  
167 - change_name=True  
168 - new_layer_name = origin_name+"_{}".format(no)  
169 - no+=1  
170 -  
171 - if change_name:  
172 - this_task.write_process("{}图层已存在,更名为{}入库".format(origin_name, new_layer_name))  
173 -  
174 -  
175 - this_task.write_process("{}图层正在入库...".format(new_layer_name))  
176 -  
177 - pg_layer: Layer = this_task.pg_ds.CreateLayer(new_layer_name, layer.GetSpatialRef(), geom_type, options)  
178 -  
179 - # 复制原图层的属性  
180 - # 去掉fid的属性  
181 - schema = [sche for sche in layer.schema if not sche.name.__eq__(fid)]  
182 -  
183 - pg_layer.CreateFields(schema)  
184 -  
185 - count =0  
186 - this_time = time.time()  
187 - for feature in layer:  
188 - count+=1  
189 - if count%10000==0:  
190 - StructurePrint().print("{}图层已入库{}个对象".format(new_layer_name,count))  
191 - # print(time.time()-this_time)  
192 - this_time=time.time()  
193 - geo :Geometry = feature.GetGeometryRef()  
194 - # 如果是空对象不录入  
195 - if geo is not None:  
196 - if geo.IsEmpty():  
197 - this_task.write_process("FID:{}要素的空间字段为空,跳过该要素!".format(feature.GetFID()))  
198 - StructurePrint().print("FID:{}要素的空间字段为空,跳过该要素!".format(feature.GetFID()),"WARN")  
199 - continue  
200 - out_feature: Feature = copy.copy(feature)  
201 -  
202 - if geo is not None:  
203 - out_geom:Geometry = self.change_geom(geo, geom_type)  
204 - out_feature.SetGeometry(out_geom)  
205 -  
206 - pg_layer.CreateFeature(out_feature)  
207 -  
208 - # 注册图层信息  
209 - this_task.register_table(pg_layer,new_layer_name,overwrite,parameter.get("creator"))  
210 - this_task.write_process("{}图层入库成功。".format(new_layer_name))  
211 - except Exception as e:  
212 - # this_task.pg_ds.RollbackTransaction()  
213 - this_task.write_process("{}入库失败,数据回滚!原因:{}".format(new_layer_name,e.__str__()))  
214 - StructurePrint().print("{}入库失败,数据回滚!原因:{}".format(new_layer_name,e.__str__()), "error")  
215 - return False,new_layer_name  
216 - # finally:  
217 -  
218 - return True,new_layer_name  
219 -  
220 - def entry_feature(self):  
221 - pass  
222 -  
223 -  
224 - def change_geom_type(self,raw):  
225 - if raw.__eq__(-2147483646):  
226 - return 5  
227 - if raw.__eq__(-2147483645):  
228 - return 6  
229 - if raw==2 or raw ==3:  
230 - return raw+3  
231 - if raw==4:  
232 - return 1  
233 - return raw  
234 -  
235 - def get_table_type(self,raw):  
236 - if raw==4 or raw ==5 or raw ==6:  
237 - return raw-3  
238 - return raw  
239 -  
240 - def change_geom(self,geo:Geometry,geom_type):  
241 - '''  
242 - 转换空间对象的类型,以适应dmap只支持Multi类型  
243 - :param geo:  
244 - :param geom_type:  
245 - :return: 转换后的空间对象  
246 - '''  
247 -  
248 -  
249 - # Point = 1,  
250 - # LineString = 2,  
251 - # Polygon = 3,  
252 - # MultiPoint = 4,  
253 - # MultiLineString = 5,  
254 - # MultiPolygon = 6,  
255 - # GeometryCollection = 7,  
256 - # CircularString = 8,  
257 - # CompoundCurve = 9,  
258 - # CurvePolygon = 10,  
259 - # MultiCurve = 11,  
260 - # MultiSurface = 12,  
261 - # PolyhedralSurface = 15,  
262 - # LinearRing = 101,  
263 -  
264 - # MultiPointZ = -2147483644  
265 - # MultiPolygonZ = -2147483643  
266 - # MultiLineStringZ = -2147483642  
267 -  
268 -  
269 - # PointZ = -2147483647  
270 - # LINESTRINGZ=-2147483646  
271 - # POLYGONZ=-2147483645  
272 -  
273 - if geom_type==5 or geom_type.__eq__(-2147483646):  
274 - return ogr.ForceToMultiLineString(geo)  
275 - if geom_type==6 or geom_type.__eq__(-2147483645):  
276 - return ogr.ForceToMultiPolygon(geo)  
277 - if geom_type==1:  
278 - # 多点转单点,会有问题,只拿了第一个点  
279 - xy = geo.GetPoint()  
280 - point = ogr.Geometry(ogr.wkbPoint)  
281 - point.AddPoint(xy[0], xy[1])  
282 - return point  
283 - return geo  
284 -  
285 - def write_task_process(self,session, task_guid, message):  
286 - '''  
287 - 写详细过程  
288 - :param session:  
289 - :param task_guid:  
290 - :param message:  
291 - :return:  
292 - '''  
293 -  
294 - task_process_guid = uuid.uuid1().__str__()  
295 - task_process = Process(guid=task_process_guid,  
296 - message=message,  
297 - time=datetime.datetime.now(),  
298 - task_guid=task_guid)  
299 - session.add(task_process)  
300 -  
301 -  
302 - def data_check(self):  
303 - pass  
304 -  
305 -class ThisTask:  
306 -  
307 - def __init__(self, parameter):  
308 - try:  
309 - self.sys_session: Session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)  
310 - except Exception as e:  
311 - raise Exception("打开数据库失败!")  
312 - self.parameter = parameter  
313 -  
314 - # # 初始化task  
315 - # task = Task(guid=parameter.get("task_guid"), name=parameter.get("task_name"),create_time=datetime.datetime.now(),state=0)  
316 - # self.sys_session.add(task)  
317 - # self.sys_session.commit()  
318 -  
319 - self.task = self.sys_session.query(Task).filter_by(guid=parameter.get("task_guid"))  
320 -  
321 - self.database = self.sys_session.query(Database).filter_by(  
322 - guid=parameter.get("database_guid")).one_or_none()  
323 -  
324 - self.catalog_guid = parameter.get("catalog_guid")  
325 -  
326 - self.pg_ds: DataSource = PGUtil.open_pg_data_source(1, DES.decode(self.database.sqlalchemy_uri))  
327 -  
328 - if self.pg_ds is None:  
329 - raise Exception("打开系统数据库失败!")  
330 -  
331 - def update(self, update_dict):  
332 - self.task.update(update_dict)  
333 - self.sys_session.commit()  
334 -  
335 - def write_process(self, message):  
336 - message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), message)  
337 - task_process_guid = uuid.uuid1().__str__()  
338 - task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),  
339 - task_guid=self.parameter.get("task_guid"))  
340 - self.sys_session.add(task_process)  
341 - self.sys_session.commit()  
342 -  
343 - def register_table(self, layer: Layer, new_layer_name, overwrite, creator):  
344 - '''  
345 - 注册表  
346 - :param layer: 图层  
347 - :param new_layer_name: 图层名  
348 - :return: 表名  
349 - '''  
350 -  
351 - # 在覆盖模式下,删除原有记录  
352 - if overwrite.__eq__("yes"):  
353 - old_table = self.sys_session.query(Table).filter_by(name=new_layer_name).one_or_none()  
354 - if old_table:  
355 - self.sys_session.delete(old_table)  
356 - self.sys_session.commit()  
357 -  
358 - this_time = datetime.datetime.now()  
359 -  
360 - ext = layer.GetExtent()  
361 - if ext[0] < 360:  
362 - ext = [round(e, 6) for e in ext]  
363 - else:  
364 - ext = [round(e, 2) for e in ext]  
365 -  
366 - # extent = "西:{},东:{},南:{},北:{}".format(ext[0],ext[1],ext[2],ext[3])  
367 - extent = "{},{},{},{}".format(ext[0], ext[1], ext[2], ext[3])  
368 -  
369 - # 其他相同库也更新  
370 - connectstr = self.database.connectstr  
371 - databs = self.sys_session.query(Database).filter_by(connectstr=connectstr).all()  
372 - databs_guid = [d.guid for d in databs]  
373 - for d_guid in databs_guid:  
374 - table_guid = uuid.uuid1().__str__()  
375 - table = Table(guid=table_guid,  
376 - database_guid=d_guid,  
377 - # alias=new_layer_name,  
378 - creator=creator,  
379 - name=new_layer_name, create_time=this_time, update_time=this_time,  
380 - catalog_guid=self.catalog_guid, table_type=self.get_table_type(layer.GetGeomType()),  
381 - extent=extent,  
382 - feature_count=layer.GetFeatureCount()  
383 - )  
384 - self.sys_session.add(table)  
385 -  
386 - feature_defn: FeatureDefn = layer.GetLayerDefn()  
387 -  
388 - for i in range(feature_defn.GetFieldCount()):  
389 - field_defn: FieldDefn = feature_defn.GetFieldDefn(i)  
390 - field_name = field_defn.GetName().lower()  
391 - field_alias = field_name if field_defn.GetAlternativeName() is None or field_defn.GetAlternativeName().__eq__(  
392 - "") else field_defn.GetAlternativeName()  
393 - column = Columns(guid=uuid.uuid1().__str__(), table_guid=table_guid,  
394 - name=field_name, alias=field_alias, create_time=this_time, update_time=this_time)  
395 - self.sys_session.add(column)  
396 -  
397 - self.sys_session.commit()  
398 -  
399 - def end(self):  
400 - if self.sys_session:  
401 - self.sys_session.close()  
402 - if self.pg_ds:  
403 - self.pg_ds.Destroy()  
404 -  
405 - def get_table_type(self,raw):  
406 - if raw==4 or raw ==5 or raw ==6:  
407 - return raw-3  
408 - return raw  
@@ -52,6 +52,15 @@ class TaskController: @@ -52,6 +52,15 @@ class TaskController:
52 @classmethod 52 @classmethod
53 def wait(cls,task_guid): 53 def wait(cls,task_guid):
54 sys_session= PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI) 54 sys_session= PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  55 + #三秒检测一次是否可以执行任务
  56 + wait_time = 0
55 while not cls.pass_check(sys_session,task_guid): 57 while not cls.pass_check(sys_session,task_guid):
56 time.sleep(3) 58 time.sleep(3)
  59 + wait_time += 3
  60 + if wait_time > 21600:
  61 + try:
  62 + sys_session.close()
  63 + except Exception as e:
  64 + pass
  65 + raise Exception("任务等待超时")
57 sys_session.close() 66 sys_session.close()
  1 +# coding=utf-8
  2 +#author: 4N
  3 +#createtime: 2021/12/3
  4 +#email: nheweijun@sina.com
  5 +
  6 +import datetime
  7 +import uuid
  8 +from app.modules.data.models import Process,Task,Table
  9 +from app.modules.service.models import Image
  10 +import configure
  11 +from app.util.component.PGUtil import PGUtil
  12 +
  13 +def check_session(fun):
  14 + '''
  15 + 检查session的装饰器
  16 + :param fun:
  17 + :return:
  18 + '''
  19 +
  20 + def wrap(self,*args, **kwargs):
  21 + if not self.sys_session.is_active:
  22 + self.sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  23 + fun(self,*args, **kwargs)
  24 +
  25 + return wrap
  26 +
  27 +class TaskWriter:
  28 +
  29 + def __init__(self,task_guid):
  30 + self.sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  31 + self.task_guid = task_guid
  32 +
  33 + @check_session
  34 + def update_task(self,update_info,commit=True):
  35 + self.sys_session.query(Task).filter_by(guid=self.task_guid).update(update_info)
  36 + if commit:
  37 + self.sys_session.commit()
  38 +
  39 + @check_session
  40 + def update_process(self,msg,commit=True):
  41 + message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), msg)
  42 + task_process_guid = uuid.uuid1().__str__()
  43 + task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),
  44 + task_guid=self.task_guid)
  45 +
  46 + self.sys_session.add(task_process)
  47 + if commit:
  48 + self.sys_session.commit()
  49 +
  50 + @check_session
  51 + def update_table(self,table_guid,update_info,commit=True):
  52 + self.sys_session.query(Table).filter_by(guid=table_guid).update(update_info)
  53 + if commit:
  54 + self.sys_session.commit()
  55 +
  56 + @check_session
  57 + def update_image(self,image_guid,update_info,commit=True):
  58 + self.sys_session.query(Image).filter_by(guid=image_guid).update(update_info)
  59 + if commit:
  60 + self.sys_session.commit()
  61 +
  62 + def close(self):
  63 + try:
  64 + self.sys_session.commit()
  65 + self.sys_session.close()
  66 + except:
  67 + pass
  68 +
  69 + @property
  70 + def session(self):
  71 + if not self.sys_session.is_active:
  72 + self.sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  73 + return self.sys_session
注册登录 后发表评论