提交 631a4b9bb0a7871b6c4177b861088df89642726c

作者 nheweijun
1 个父辈 e2c9d464

2022.03.03 从header中获得用户

@@ -87,3 +87,6 @@ for pkg in pkgs: @@ -87,3 +87,6 @@ for pkg in pkgs:
87 except Exception as e: 87 except Exception as e:
88 print(traceback.format_exc()) 88 print(traceback.format_exc())
89 pass 89 pass
  90 +
  91 +if __name__ == '__main__':
  92 + print(DES.decode("qnt136jlHF77l8+XaSHtbmMMbPdv7XeBPMyyUODlf0a8D0Lp+LurzZD+P8SdM76w0FrOkqGpLWtehsrYKjMQSQ=="))
@@ -31,6 +31,19 @@ class Api(ApiTemplate): @@ -31,6 +31,19 @@ class Api(ApiTemplate):
31 31
32 if file_path.endswith("shp"): 32 if file_path.endswith("shp"):
33 file_info["type"]="shp" 33 file_info["type"]="shp"
  34 + whole_size = file_info["size"][1]
  35 +
  36 + dbf_path = ".".join([file_path.split(".")[0],"dbf"])
  37 + if os.path.exists(dbf_path):
  38 + whole_size += FileProcess.get_file_size(dbf_path)[1]
  39 +
  40 + shx_path = ".".join([file_path.split(".")[0],"shx"])
  41 + if os.path.exists(shx_path):
  42 + whole_size += FileProcess.get_file_size(shx_path)[1]
  43 +
  44 + whole_size_text = FileProcess.get_text_size(whole_size)
  45 + file_info["size"] = [whole_size_text,whole_size]
  46 +
34 data_list.append(file_info) 47 data_list.append(file_info)
35 elif file_path.endswith("gdb"): 48 elif file_path.endswith("gdb"):
36 file_info["type"]="gdb" 49 file_info["type"]="gdb"
@@ -2,25 +2,31 @@ @@ -2,25 +2,31 @@
2 #createtime: 2021/1/27 2 #createtime: 2021/1/27
3 #email: nheweijun@sina.com 3 #email: nheweijun@sina.com
4 4
5 -from ..models import Table,Columns,DES  
6 - 5 +from ..models import Table,Columns,DES,Database,db
7 6
8 from app.util.component.ApiTemplate import ApiTemplate 7 from app.util.component.ApiTemplate import ApiTemplate
9 from app.util.component.ModelVisitor import ModelVisitor 8 from app.util.component.ModelVisitor import ModelVisitor
10 from app.util.component.PGUtil import PGUtil 9 from app.util.component.PGUtil import PGUtil
11 from osgeo.ogr import DataSource,Layer,Feature 10 from osgeo.ogr import DataSource,Layer,Feature
12 from osgeo.osr import SpatialReference 11 from osgeo.osr import SpatialReference
  12 +import configure
13 class Api(ApiTemplate): 13 class Api(ApiTemplate):
14 api_name = "表信息" 14 api_name = "表信息"
  15 +
15 def process(self): 16 def process(self):
16 res = {} 17 res = {}
17 pg_ds = None 18 pg_ds = None
18 try: 19 try:
  20 +
19 table_guid = self.para.get("guid") 21 table_guid = self.para.get("guid")
  22 +
20 table = Table.query.filter_by(guid=table_guid).one_or_none() 23 table = Table.query.filter_by(guid=table_guid).one_or_none()
21 if not table: 24 if not table:
22 raise Exception("数据不存在!") 25 raise Exception("数据不存在!")
23 - pg_ds :DataSource = PGUtil.open_pg_data_source(0,DES.decode(table.relate_database.sqlalchemy_uri)) 26 +
  27 + uri = DES.decode(table.relate_database.sqlalchemy_uri)
  28 +
  29 + pg_ds :DataSource = PGUtil.open_pg_data_source(0,uri)
24 layer:Layer = pg_ds.GetLayerByName(table.name) 30 layer:Layer = pg_ds.GetLayerByName(table.name)
25 if not layer: 31 if not layer:
26 raise Exception("数据异常!") 32 raise Exception("数据异常!")
@@ -32,7 +38,7 @@ class Api(ApiTemplate): @@ -32,7 +38,7 @@ class Api(ApiTemplate):
32 append_dict["sr_wkt"] = sr.ExportToWkt() 38 append_dict["sr_wkt"] = sr.ExportToWkt()
33 append_dict["sr_proj4"] = sr.ExportToProj4() 39 append_dict["sr_proj4"] = sr.ExportToProj4()
34 40
35 - srid_sql = '''select st_srid({}) from public."{}" limit 1'''.format(layer.GetGeometryColumn(),layer.GetName()) 41 + srid_sql = '''select st_srid("{}") from public."{}" limit 1'''.format(layer.GetGeometryColumn(),layer.GetName())
36 srid_layer:Layer = pg_ds.ExecuteSQL(srid_sql) 42 srid_layer:Layer = pg_ds.ExecuteSQL(srid_sql)
37 if srid_layer: 43 if srid_layer:
38 srid_feature : Feature = srid_layer.GetNextFeature() 44 srid_feature : Feature = srid_layer.GetNextFeature()
@@ -47,7 +53,7 @@ class Api(ApiTemplate): @@ -47,7 +53,7 @@ class Api(ApiTemplate):
47 res["data"]["columns"] = ModelVisitor.objects_to_jsonarray(columns) 53 res["data"]["columns"] = ModelVisitor.objects_to_jsonarray(columns)
48 res["data"].update(append_dict) 54 res["data"].update(append_dict)
49 res["result"] = True 55 res["result"] = True
50 - 56 +
51 except Exception as e: 57 except Exception as e:
52 raise e 58 raise e
53 finally: 59 finally:
@@ -38,6 +38,7 @@ class Api(ApiTemplate): @@ -38,6 +38,7 @@ class Api(ApiTemplate):
38 query_result : ResultProxy = db_session.execute('select * from "{}" limit {} offset {}'.format(table.name,limit,offset)) 38 query_result : ResultProxy = db_session.execute('select * from "{}" limit {} offset {}'.format(table.name,limit,offset))
39 39
40 res["data"]["count"]=PGUtil.get_table_count(table.name,db_session) 40 res["data"]["count"]=PGUtil.get_table_count(table.name,db_session)
  41 + res["data"]["count"] = table.feature_count
41 42
42 pkey = PGUtil.get_pkey(table.name,db_session) 43 pkey = PGUtil.get_pkey(table.name,db_session)
43 44
1 -# coding=utf-8  
2 -#author: 4N  
3 -#createtime: 2021/10/11  
4 -#email: nheweijun@sina.com  
5 -  
6 -  
7 -from ..models import InsertingLayerName,Columns  
8 -from sqlalchemy.orm import Session  
9 -  
10 -from app.util.component.EntryDataVacuate import EntryDataVacuate  
11 -import json  
12 -from sqlalchemy import distinct  
13 -  
14 -import time  
15 -  
16 -from app.util.component.SQLUtil import SQLUtil  
17 -  
18 -import datetime  
19 -  
20 -from ..models import Table, Database, DES,Task,db,TableVacuate,Process  
21 -  
22 -from app.util.component.StructurePrint import StructurePrint  
23 -  
24 -from osgeo.ogr import DataSource,Layer,Geometry  
25 -  
26 -from app.util.component.VacuateConf import VacuateConf  
27 -from app.util.component.GeometryAdapter import GeometryAdapter  
28 -  
29 -import traceback  
30 -from osgeo.ogr import DataSource,Layer,FeatureDefn,FieldDefn,Feature  
31 -from osgeo import gdal,ogr  
32 -import os  
33 -import uuid  
34 -import configure  
35 -  
36 -from app.util.component.PGUtil import PGUtil  
37 -from app.util.component.ZipUtil import ZipUtil  
38 -import multiprocessing  
39 -  
40 -  
41 -def task_consumer():  
42 -  
43 - running_dict = {}  
44 - sys_session: Session = PGUtil.get_db_session(  
45 - configure.SQLALCHEMY_DATABASE_URI)  
46 -  
47 - while True:  
48 -  
49 - try:  
50 - time.sleep(3)  
51 -  
52 - # 已经结束的进程 从监测中删除  
53 - remove_process = []  
54 -  
55 - for process, layer_names in running_dict.items():  
56 - if not process.is_alive():  
57 - for l in layer_names:  
58 - inserted = sys_session.query(  
59 - InsertingLayerName).filter_by(name=l).one_or_none()  
60 - if inserted:  
61 - sys_session.delete(inserted)  
62 - sys_session.commit()  
63 - remove_process.append(process)  
64 - for process in remove_process:  
65 - running_dict.pop(process)  
66 -  
67 -  
68 - # 入库进程少于阈值,开启入库进程  
69 -  
70 - inter_size = sys_session.query(  
71 - distinct(InsertingLayerName.task_guid)).count()  
72 -  
73 - if inter_size < configure.entry_data_thread:  
74 - # 锁表啊  
75 - ready_task: Task = sys_session.query(Task).filter_by(state=0).order_by(  
76 - Task.create_time).with_lockmode("update").limit(1).one_or_none()  
77 - if ready_task:  
78 -  
79 - if ready_task.task_type == 1:  
80 - task_entry_data(ready_task,sys_session,running_dict)  
81 - elif ready_task.task_type == 2:  
82 - task_table_refresh()  
83 - elif ready_task.task_type == 3:  
84 - task_vacuate()  
85 - elif ready_task.task_type == 4:  
86 - task_download()  
87 -  
88 - else:  
89 - # 解表啊  
90 - sys_session.commit()  
91 - except Exception as e:  
92 - sys_session.commit()  
93 - StructurePrint().print(e.__str__(), "error")  
94 -  
95 -  
96 -  
97 -def task_download(para,task_guid):  
98 - sys_session = None  
99 - ds: DataSource = None  
100 -  
101 - # 设置编码  
102 - encoding = para.get("encoding")  
103 - if encoding:  
104 - gdal.SetConfigOption("SHAPE_ENCODING", encoding)  
105 - else:  
106 - gdal.SetConfigOption("SHAPE_ENCODING", "UTF-8")  
107 -  
108 - try:  
109 -  
110 - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)  
111 -  
112 - table_names = para.get("table_name").split(",")  
113 - database_guid = para.get("database_guid")  
114 - database = sys_session.query(Database).filter_by(guid=database_guid).one_or_none()  
115 - if not database:  
116 - raise Exception("数据库不存在!")  
117 -  
118 - ds: DataSource = PGUtil.open_pg_data_source(0, DES.decode(database.sqlalchemy_uri))  
119 -  
120 - download_type = para.get("download_type")  
121 -  
122 - data = None  
123 - if download_type.__eq__("shp"):  
124 - data = download_shp(table_names, ds)  
125 - if download_type.__eq__("gdb"):  
126 - data = download_gdb(sys_session, table_names, ds, database_guid)  
127 -  
128 - sys_session.query(Task).filter_by(guid=task_guid).update({"state": 1, "update_time": datetime.datetime.now(),  
129 - "process": "下载完成",  
130 - "parameter": data[0]["download_url"]})  
131 - sys_session.commit()  
132 -  
133 -  
134 - except Exception as e:  
135 - try:  
136 - sys_session.query(Task).filter_by(guid=task_guid).update(  
137 - {"state": -1, "update_time": datetime.datetime.now(),  
138 - "process": "下载失败"})  
139 -  
140 - message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())  
141 - task_process_guid = uuid.uuid1().__str__()  
142 - task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),  
143 - task_guid=task_guid)  
144 - sys_session.add(task_process)  
145 - sys_session.commit()  
146 - except Exception as ee:  
147 - print(traceback.format_exc())  
148 - raise e  
149 - finally:  
150 - try:  
151 - if ds:  
152 - ds.Destroy()  
153 - if sys_session:  
154 - sys_session.close()  
155 - except:  
156 - print(traceback.format_exc())  
157 -  
158 -  
159 -def download_shp(table_names, ds):  
160 - data = []  
161 - for table_name in table_names:  
162 - url = download_one(ds, table_name)  
163 - data.append({"name": table_name, "download_url": url})  
164 - return data  
165 -  
166 -  
167 -def download_one( ds, table_name):  
168 - layer: Layer = ds.GetLayerByName(table_name)  
169 - driver = ogr.GetDriverByName("ESRI Shapefile")  
170 - uuid_ = uuid.uuid1().__str__()  
171 - parent = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))  
172 - dirpath = os.path.join(parent, "file_tmp", uuid_)  
173 - os.makedirs(dirpath)  
174 - data_source: DataSource = driver.CreateDataSource(dirpath + "/{}.shp".format(table_name))  
175 - # data_source.CopyLayer(layer, table_name)  
176 -  
177 - fid = layer.GetFIDColumn()  
178 - pg_layer: Layer = data_source.CreateLayer(table_name, layer.GetSpatialRef(), layer.GetGeomType())  
179 - schema = [sche for sche in layer.schema if not sche.name.__eq__(fid)]  
180 -  
181 - pg_layer.CreateFields(schema)  
182 - layer.ResetReading()  
183 - for feature in layer:  
184 - pg_layer.CreateFeature(feature)  
185 -  
186 - data_source.Destroy()  
187 -  
188 - ZipUtil.create_zip(os.path.join(parent, "file_tmp", table_name + "_" + uuid_) + ".zip", [dirpath])  
189 -  
190 - return "http://" + configure.deploy_ip_host + "/API/IO/Download/{}".format(table_name + "_" + uuid_ + ".zip")  
191 -  
192 -  
193 -def download_gdb( sys_session, table_names, ds, database_guid):  
194 - ogr.RegisterAll()  
195 - data = []  
196 - gdal.UseExceptions()  
197 - gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")  
198 -  
199 - # 创建一个gdb datasource  
200 - gdb_driver = ogr.GetDriverByName('FileGDB')  
201 - uuid_ = uuid.uuid1().__str__()  
202 - parent = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))  
203 - gdb_path = os.path.join(parent, "file_tmp", uuid_ + ".gdb")  
204 -  
205 - gdb_ds: DataSource = gdb_driver.CreateDataSource(gdb_path)  
206 -  
207 - for table_name in table_names:  
208 -  
209 - layer: Layer = ds.GetLayerByName(table_name)  
210 - table = sys_session.query(Table).filter_by(name=table_name, database_guid=database_guid).one_or_none()  
211 - feature_defn: FeatureDefn = layer.GetLayerDefn()  
212 -  
213 - for i in range(feature_defn.GetFieldCount()):  
214 - field_defn: FieldDefn = feature_defn.GetFieldDefn(i)  
215 - field_alias = sys_session.query(Columns).filter_by(table_guid=table.guid,  
216 - name=field_defn.GetName()).one_or_none().alias  
217 - field_defn.SetAlternativeName(field_alias)  
218 -  
219 - table_alias = table.alias  
220 -  
221 -  
222 - fid = layer.GetFIDColumn()  
223 - pg_layer: Layer = gdb_ds.CreateLayer(table_name, layer.GetSpatialRef(), layer.GetGeomType(),  
224 - ["LAYER_ALIAS={}".format(table_alias)])  
225 - schema = [sche for sche in layer.schema if not sche.name.__eq__(fid)]  
226 - # schema = layer.schema  
227 - pg_layer.CreateFields(schema)  
228 -  
229 - # gdb 不支持fid=0的要素,所以识别到后要+1  
230 - offset = 0  
231 - f1: Feature = layer.GetNextFeature()  
232 - if f1:  
233 - if f1.GetFID().__eq__(0):  
234 - offset = 1  
235 - layer.ResetReading()  
236 - for feature in layer:  
237 - feature.SetFID(feature.GetFID() + offset)  
238 - pg_layer.CreateFeature(feature)  
239 -  
240 -  
241 -  
242 - gdb_ds.Destroy()  
243 - ZipUtil.create_zip(gdb_path + ".zip", [gdb_path])  
244 - data.append({"name": ",".join(table_names),  
245 - "download_url": "http://" + configure.deploy_ip_host + "/API/IO/Download/{}".format(  
246 - uuid_ + ".gdb" + ".zip")})  
247 -  
248 - return data  
249 -  
250 -  
251 -def task_entry_data(ready_task,sys_session,running_dict):  
252 - try:  
253 - parameter = json.loads(ready_task.parameter)  
254 - StructurePrint().print("检测到入库任务")  
255 - ready_task.state = 2  
256 - ready_task.process = "入库中"  
257 - sys_session.commit()  
258 -  
259 - metas: list = json.loads(  
260 - parameter.get("meta").__str__())  
261 - parameter["meta"] = metas  
262 -  
263 - database = sys_session.query(Database).filter_by(  
264 - guid=ready_task.database_guid).one_or_none()  
265 - pg_ds: DataSource = PGUtil.open_pg_data_source(  
266 - 1, DES.decode(database.sqlalchemy_uri))  
267 -  
268 - this_task_layer = []  
269 - for meta in metas:  
270 - overwrite = parameter.get("overwrite", "no")  
271 -  
272 - for layer_name_origin, layer_name in meta.get("layer").items():  
273 - origin_name = layer_name  
274 - no = 1  
275 -  
276 - while (overwrite.__eq__("no") and pg_ds.GetLayerByName(layer_name)) or sys_session.query(  
277 - InsertingLayerName).filter_by(name=layer_name).one_or_none():  
278 - layer_name = origin_name + "_{}".format(no)  
279 - no += 1  
280 -  
281 - # 添加到正在入库的列表中  
282 - iln = InsertingLayerName(guid=uuid.uuid1().__str__(),  
283 - task_guid=ready_task.guid,  
284 - name=layer_name)  
285 -  
286 - sys_session.add(iln)  
287 - sys_session.commit()  
288 - this_task_layer.append(layer_name)  
289 - # 修改表名  
290 - meta["layer"][layer_name_origin] = layer_name  
291 -  
292 - pg_ds.Destroy()  
293 - entry_data_process = multiprocessing.Process(  
294 - target=EntryDataVacuate().entry, args=(parameter,))  
295 - entry_data_process.start()  
296 - running_dict[entry_data_process] = this_task_layer  
297 - except Exception as e:  
298 - sys_session.query(Task).filter_by(guid=ready_task.guid).update(  
299 - {"state": -1, "process": "入库失败"})  
300 - sys_session.commit()  
301 - StructurePrint().print(e.__str__(), "error")  
302 -  
303 -  
304 -def task_table_refresh(database,task_guid):  
305 - pg_ds =None  
306 - sys_ds =None  
307 - data_session=None  
308 - result = {}  
309 - sys_session = None  
310 - db_tuple = PGUtil.get_info_from_sqlachemy_uri(DES.decode(database.sqlalchemy_uri))  
311 -  
312 - try:  
313 - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)  
314 - sys_ds = PGUtil.open_pg_data_source(0,configure.SQLALCHEMY_DATABASE_URI)  
315 -  
316 - this_time = datetime.datetime.now()  
317 - database_guid = database.guid  
318 -  
319 - # 已注册空间表  
320 - spatial_tables = sys_session.query(Table).order_by(Table.create_time.desc()).filter_by(database_guid=database_guid).filter(  
321 - Table.table_type != 0).all()  
322 -  
323 - # 已注册空间表名  
324 - spatial_tables_names = [table.name for table in spatial_tables]  
325 -  
326 - # 实体库datasource  
327 - pg_ds: DataSource = PGUtil.open_pg_data_source(1, DES.decode(database.sqlalchemy_uri))  
328 -  
329 - # 更新空间表  
330 - # 增加表  
331 - db_tables_names = add_spatail_table(database, pg_ds, sys_session,spatial_tables_names, this_time,db_tuple)# 实体库中空间表名  
332 -  
333 - # 删除/修改表  
334 - edit_spatial_table(pg_ds, sys_session,spatial_tables, db_tables_names, this_time,db_tuple)  
335 -  
336 - # 空间表处理完毕  
337 - sys_session.commit()  
338 -  
339 -  
340 - # 空间表处理完毕  
341 - sys_session.commit()  
342 -  
343 - # 注册普通表  
344 - # 实体库连接  
345 - data_session: Session = PGUtil.get_db_session(DES.decode(database.sqlalchemy_uri))  
346 -  
347 - # 处理后空间表  
348 - spatial_tables = sys_session.query(Table).order_by(Table.create_time.desc()).filter_by(database_guid=database_guid).filter(  
349 - Table.table_type != 0).all()  
350 - # 处理后空间表名  
351 - spatial_tables_names = [table.name for table in spatial_tables]  
352 -  
353 - # 原有普通表  
354 - common_tables = sys_session.query(Table).order_by(Table.create_time.desc()).filter_by(database_guid=database_guid).filter(  
355 - Table.table_type == 0).all()  
356 - # 原有普通表 名  
357 - origin_common_tables_name = [table.name for table in common_tables]  
358 -  
359 - # 现有普通表  
360 - real_common_tables_name = []  
361 -  
362 - # 只注册public中的表  
363 - common_result = data_session.execute(  
364 - "select relname as tabname from pg_class c where relkind = 'r' and relnamespace=2200 and relname not like 'pg_%' and relname not like 'sql_%' order by relname").fetchall()  
365 - for re in common_result:  
366 - table_name = re[0]  
367 - if table_name not in spatial_tables_names and (not table_name.__contains__("_vacuate_")):  
368 - real_common_tables_name.append(table_name)  
369 -  
370 - # 增加新普通表  
371 -  
372 - add_common_table(data_session, sys_session, database_guid, real_common_tables_name, origin_common_tables_name,  
373 - this_time,db_tuple)  
374 -  
375 - # 删除、修改普通表  
376 - edit_common_table(data_session,sys_session, database_guid, real_common_tables_name, origin_common_tables_name,  
377 - this_time,db_tuple)  
378 -  
379 - sys_session.commit()  
380 - result["data"] = "刷新数据成功!"  
381 - result["state"] = 1  
382 - sys_session.query(Task).filter_by(guid=task_guid).update(  
383 - {"state": 1, "update_time": datetime.datetime.now(),"process":"更新成功"})  
384 - sys_session.commit()  
385 -  
386 - except Exception as e:  
387 - try:  
388 - print(traceback.format_exc())  
389 - sys_session.query(Task).filter_by(guid=task_guid).update(  
390 - {"state": -1, "update_time": datetime.datetime.now(),"process":"更新失败"})  
391 - message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())  
392 - task_process_guid = uuid.uuid1().__str__()  
393 - task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),  
394 - task_guid=task_guid)  
395 - sys_session.add(task_process)  
396 - sys_session.commit()  
397 - except Exception as ee:  
398 - print(traceback.format_exc())  
399 - finally:  
400 - if pg_ds:  
401 - pg_ds.Destroy()  
402 - if data_session:  
403 - data_session.close()  
404 - if sys_session:  
405 - sys_session.close()  
406 - if sys_ds:  
407 - sys_ds.Destroy()  
408 - return result  
409 -  
410 -def add_spatail_table(database,pg_ds,sys_session,spatial_tables_names,this_time,db_tuple):  
411 - '''  
412 - 注册新增空间表  
413 - :param database:  
414 - :param pg_ds:  
415 - :param spatial_tables_names: 已注册空间表名  
416 - :param this_time:  
417 - :return: 实体库中空间表名  
418 - '''  
419 -  
420 - db_tables_names=[]  
421 -  
422 - for i in range(pg_ds.GetLayerCount()):  
423 - layer: Layer = pg_ds.GetLayer(i)  
424 - geom_column = layer.GetGeometryColumn()  
425 - db_tables_names.append(layer.GetName())  
426 - if not geom_column:  
427 - continue  
428 - if layer.GetName() not in spatial_tables_names:  
429 - l_name = layer.GetName()  
430 -  
431 - try:  
432 - # 只注册public的空间表,其他表空间的表名会有.  
433 - if layer.GetName().__contains__("."):  
434 - continue  
435 - # 略过抽稀表  
436 - if layer.GetName().__contains__("_vacuate_"):  
437 - continue  
438 -  
439 - # 没有权限的表跳过  
440 - if not PGUtil.check_table_privilege(l_name, "SELECT", db_tuple[0], pg_ds):  
441 - StructurePrint().print("用户{}对表{}没有select权限!".format(db_tuple[0], l_name), "warn")  
442 - continue  
443 -  
444 -  
445 - # 范围统计和数量统计以100w为界限  
446 - query_count_layer: Layer = pg_ds.ExecuteSQL(  
447 - '''SELECT reltuples::bigint AS ec FROM pg_class WHERE oid = 'public."{}"'::regclass'''.format(  
448 - l_name))  
449 -  
450 - feature_count = query_count_layer.GetFeature(0).GetField("ec")  
451 - # 要素少于100w可以精确统计  
452 - if feature_count < 1000000:  
453 - feature_count = layer.GetFeatureCount()  
454 - ext = layer.GetExtent()  
455 - else:  
456 - query_ext_layer: Layer = pg_ds.ExecuteSQL(  
457 - "select geometry(ST_EstimatedExtent('public', '{}','{}'))".format(l_name,  
458 - layer.GetGeometryColumn()))  
459 - ext = query_ext_layer.GetExtent()  
460 - if ext[0] < 360:  
461 - ext = [round(e, 6) for e in ext]  
462 - else:  
463 - ext = [round(e, 2) for e in ext]  
464 - extent = "{},{},{},{}".format(ext[0], ext[1], ext[2], ext[3])  
465 -  
466 - StructurePrint().print("空间表增加!")  
467 -  
468 - geom_type = GeometryAdapter.get_geometry_type(layer)  
469 - except:  
470 - StructurePrint().print("表{}注册失败!".format(l_name), "warn")  
471 - continue  
472 -  
473 - table_guid = uuid.uuid1().__str__()  
474 - table = Table(guid=table_guid,  
475 - database_guid=database.guid,  
476 - # alias=layer.GetName(),  
477 - name=layer.GetName(), create_time=this_time, update_time=this_time,  
478 - table_type=GeometryAdapter.get_table_type(geom_type),  
479 - extent=extent,  
480 - feature_count=feature_count  
481 - )  
482 - sys_session.add(table)  
483 - feature_defn: FeatureDefn = layer.GetLayerDefn()  
484 -  
485 - for i in range(feature_defn.GetFieldCount()):  
486 - field_defn: FieldDefn = feature_defn.GetFieldDefn(i)  
487 - field_name = field_defn.GetName()  
488 - field_alias = field_name if field_defn.GetAlternativeName() is None or field_defn.GetAlternativeName().__eq__(  
489 - "") else field_defn.GetAlternativeName()  
490 - column = Columns(guid=uuid.uuid1().__str__(), table_guid=table_guid,  
491 - name=field_name, alias=field_alias, create_time=this_time, update_time=this_time)  
492 - sys_session.add(column)  
493 - return db_tables_names  
494 -  
495 -def deal_vacuate_table(sys_ds,sys_session,database_guid):  
496 -  
497 -  
498 - for i in range(sys_ds.GetLayerCount()):  
499 - layer: Layer = sys_ds.GetLayer(i)  
500 - geom_column = layer.GetGeometryColumn()  
501 -  
502 - if not geom_column:  
503 - continue  
504 -  
505 -  
506 -  
507 - if layer.GetName().__contains__("_vacuate_"):  
508 - l_name = layer.GetName()  
509 -  
510 - base_layer_name = l_name.split("_vacuate_")[0].split("_")[1]  
511 -  
512 - level = l_name.split("_")[-2]  
513 -  
514 - pixel_distance_str: str ="0"  
515 - try:  
516 - pixel_distance_str: str = l_name.split("_")[-1]  
517 - if pixel_distance_str.startswith("0"):  
518 - pixel_distance_str = "0.{}".format(pixel_distance_str)  
519 - except:  
520 - pass  
521 -  
522 - base_table =sys_session.query(Table).filter_by(name=base_layer_name,database_guid=database_guid).one_or_none()  
523 - if base_table:  
524 - if not sys_session.query(TableVacuate).filter_by(table_guid=base_table.guid,name=l_name).one_or_none():  
525 - table_vacuate = TableVacuate(guid=uuid.uuid1().__str__(),  
526 - table_guid=base_table.guid,  
527 - level=level,  
528 - name=l_name,  
529 - pixel_distance=float(pixel_distance_str))  
530 - sys_session.add(table_vacuate)  
531 -  
532 - sys_session.query(Table).filter_by(guid=base_table.guid).update({"is_vacuate": 1})  
533 - else:  
534 - kk=1  
535 -  
536 -  
537 -  
538 -def edit_spatial_table(pg_ds,sys_session,spatial_tables,db_tables_names,this_time,db_tuple):  
539 -  
540 -  
541 -  
542 - for table in spatial_tables:  
543 -  
544 - # 删除表  
545 - if table.name not in db_tables_names:  
546 - StructurePrint().print("空间表减少!")  
547 - sys_session.delete(table)  
548 - # 修改表  
549 - else:  
550 - layer: Layer = pg_ds.GetLayerByName(table.name)  
551 - l_name = layer.GetName()  
552 -  
553 - # 只注册public的空间表,其他表空间的表名会有.  
554 - if layer.GetName().__contains__("."):  
555 - continue  
556 -  
557 - if layer.GetName().__contains__("_vacuate_"):  
558 - continue  
559 -  
560 - # 没有权限的表跳过  
561 - if not PGUtil.check_table_privilege(l_name, "SELECT", db_tuple[0], pg_ds):  
562 - StructurePrint().print("用户{}对表{}没有select权限!".format(db_tuple[0], l_name), "warn")  
563 - sys_session.delete(table)  
564 - continue  
565 -  
566 - columns = table.relate_columns  
567 - columns_names = [column.name for column in columns]  
568 - feature_defn: FeatureDefn = layer.GetLayerDefn()  
569 - db_columns_names = []  
570 -  
571 - # 增加列  
572 - for i in range(feature_defn.GetFieldCount()):  
573 - field_defn: FieldDefn = feature_defn.GetFieldDefn(i)  
574 - field_name = field_defn.GetName()  
575 - db_columns_names.append(field_name)  
576 -  
577 - if field_name not in columns_names:  
578 - StructurePrint().print("{}空间表属性增加!".format(table.name))  
579 - field_alias = field_name if field_defn.GetAlternativeName() is None or field_defn.GetAlternativeName().__eq__(  
580 - "") else field_defn.GetAlternativeName()  
581 - column = Columns(guid=uuid.uuid1().__str__(), table_guid=table.guid,  
582 - name=field_name, alias=field_alias, create_time=this_time,  
583 - update_time=this_time)  
584 - sys_session.add(column)  
585 -  
586 - # 删除列  
587 - for column in columns:  
588 - if column.name not in db_columns_names:  
589 - StructurePrint().print("{}空间表属性减少!".format(table.name))  
590 - sys_session.delete(column)  
591 -  
592 - # 范围统计和数量统计以100w为界限  
593 - query_count_layer: Layer = pg_ds.ExecuteSQL(  
594 - '''SELECT reltuples::bigint AS ec FROM pg_class WHERE oid = 'public."{}"'::regclass'''.format(  
595 - l_name))  
596 - feature_count = query_count_layer.GetFeature(0).GetField("ec")  
597 - # 要素少于100w可以精确统计  
598 - if feature_count < 1000000:  
599 - feature_count = layer.GetFeatureCount()  
600 - ext = layer.GetExtent()  
601 -  
602 - else:  
603 - query_ext_layer: Layer = pg_ds.ExecuteSQL(  
604 - "select geometry(ST_EstimatedExtent('public', '{}','{}'))".format(l_name,  
605 - layer.GetGeometryColumn()))  
606 - ext = query_ext_layer.GetExtent()  
607 - if ext[0] < 360:  
608 - ext = [round(e, 6) for e in ext]  
609 - else:  
610 - ext = [round(e, 2) for e in ext]  
611 - extent = "{},{},{},{}".format(ext[0], ext[1], ext[2], ext[3])  
612 -  
613 - # 修改要素量  
614 - if not table.feature_count.__eq__(feature_count):  
615 - StructurePrint().print("{}空间表要素!".format(table.name))  
616 - sys_session.query(Table).filter_by(guid=table.guid).update({"feature_count": feature_count,  
617 - "extent": extent})  
618 -  
619 -  
620 -def add_common_table(data_session,sys_session,database_guid,real_common_tables_name,origin_common_tables_name,this_time,db_tuple):  
621 - for table_name in real_common_tables_name:  
622 - if table_name not in origin_common_tables_name:  
623 - StructurePrint().print("{}非空间表增加!".format(table_name))  
624 - table_guid = uuid.uuid1().__str__()  
625 -  
626 -  
627 - # 没有权限的表跳过  
628 - if not SQLUtil.check_table_privilege(table_name, "SELECT", db_tuple[0], data_session):  
629 - StructurePrint().print("用户{}对表{}没有select权限!".format(db_tuple[0], table_name), "warn")  
630 - continue  
631 -  
632 - count = data_session.execute('select count(*) from "{}"'.format(table_name)).fetchone()[0]  
633 -  
634 - table = Table(guid=table_guid,  
635 - database_guid=database_guid,  
636 - name=table_name, create_time=this_time, update_time=this_time,  
637 - table_type=0,  
638 - feature_count=count  
639 - )  
640 -  
641 - sys_session.add(table)  
642 -  
643 - sql = '''  
644 - SELECT  
645 - a.attnum,  
646 - a.attname AS field  
647 - FROM  
648 - pg_class c,  
649 - pg_attribute a,  
650 - pg_type t  
651 - WHERE  
652 - c.relname = '{}'  
653 - and a.attnum > 0  
654 - and a.attrelid = c.oid  
655 - and a.atttypid = t.oid  
656 - ORDER BY a.attnum  
657 - '''.format(table_name)  
658 -  
659 - cols = data_session.execute(sql).fetchall()  
660 - for col in cols:  
661 - column = Columns(guid=uuid.uuid1().__str__(), table_guid=table_guid,  
662 - name=col[1], create_time=this_time, update_time=this_time)  
663 - sys_session.add(column)  
664 -  
665 - # 删除不存在的表  
666 - for n in origin_common_tables_name:  
667 - if n not in real_common_tables_name:  
668 - tables = Table.query.filter_by(name=n).filter_by(database_guid=database_guid).all()  
669 - for table in tables:  
670 - sys_session.delete(table)  
671 -  
672 -def edit_common_table(data_session,sys_session,database_guid,real_common_tables_name,origin_common_tables_name,this_time,db_tuple):  
673 - for table_name in origin_common_tables_name:  
674 - tables = sys_session.query(Table).filter_by(name=table_name).filter_by(database_guid=database_guid).all()  
675 - for table in tables:  
676 - if table_name not in real_common_tables_name:  
677 - StructurePrint().print("{}非空间表减少!".format(table_name))  
678 - sys_session.delete(table)  
679 - # 修改表  
680 - else:  
681 -  
682 - # 没有权限的表删除  
683 - if not SQLUtil.check_table_privilege(table_name, "SELECT", db_tuple[0], data_session):  
684 - StructurePrint().print("用户{}对表{}没有select权限!".format(db_tuple[0], table_name), "warn")  
685 - sys_session.delete(table)  
686 - continue  
687 -  
688 - columns = table.relate_columns  
689 - columns_names = [column.name for column in columns]  
690 -  
691 - sql = '''  
692 - SELECT  
693 - a.attnum,  
694 - a.attname AS field  
695 - FROM  
696 - pg_class c,  
697 - pg_attribute a,  
698 - pg_type t  
699 - WHERE  
700 - c.relname = '{}'  
701 - and a.attnum > 0  
702 - and a.attrelid = c.oid  
703 - and a.atttypid = t.oid  
704 - ORDER BY a.attnum  
705 - '''.format(table_name)  
706 -  
707 - cols = data_session.execute(sql).fetchall()  
708 - real_cols_name = [col[1] for col in cols]  
709 -  
710 - # 属性增加  
711 - for col in real_cols_name:  
712 - if col not in columns_names:  
713 - StructurePrint().print("{}表要素属性增加!".format(table_name))  
714 - column = Columns(guid=uuid.uuid1().__str__(), table_guid=table.guid,  
715 - name=col, create_time=this_time, update_time=this_time)  
716 - sys_session.add(column)  
717 -  
718 - # 属性减少  
719 - for column in columns:  
720 - if column.name not in real_cols_name:  
721 - StructurePrint().print("{}表要素属性减少!".format(table_name))  
722 - sys_session.delete(column)  
723 -  
724 - # 修改要素量  
725 - # sql = 'select count(*) from "{}"'.format(table_name)  
726 - # count = data_session.execute(sql).fetchone()[0]  
727 -  
728 - count = SQLUtil.get_table_count(table_name,data_session)  
729 -  
730 - if not table.feature_count.__eq__(count):  
731 - StructurePrint().print("{}表要素变化!".format(table_name))  
732 - sys_session.query(Table).filter_by(guid=table.guid).update({"feature_count": count})  
733 -  
734 -  
735 -def task_vacuate(table,task_guid):  
736 -  
737 - sys_session = None  
738 - pg_session = None  
739 - pg_ds = None  
740 - vacuate_process = None  
741 - try:  
742 - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)  
743 - sys_session.query(Table).filter_by(guid=table.guid).update(  
744 - {"is_vacuate": 2, "update_time": datetime.datetime.now()})  
745 - sys_session.commit()  
746 -  
747 - database = sys_session.query(Database).filter_by(guid=table.database_guid).one_or_none()  
748 - pg_session = PGUtil.get_db_session(DES.decode(database.sqlalchemy_uri))  
749 -  
750 - pg_ds: DataSource = PGUtil.open_pg_data_source(0, DES.decode(database.sqlalchemy_uri))  
751 -  
752 - # 删除原有数据  
753 - tvs = sys_session.query(TableVacuate).filter_by(table_guid=table.guid).all()  
754 - for tv in tvs:  
755 - sys_session.delete(tv)  
756 -  
757 -  
758 - # 创建抽稀过程  
759 - options = ["OVERWRITE=yes", "GEOMETRY_NAME={}".format(PGUtil.get_geo_column(table.name, pg_session)),  
760 - "PRECISION=NO"]  
761 -  
762 - layer = pg_ds.GetLayerByName(table.name)  
763 -  
764 - vacuate_process: VacuateProcess = VacuateProcess(layer, table.guid, options, database.sqlalchemy_uri)  
765 -  
766 - for feature in layer:  
767 - geo = feature.GetGeometryRef()  
768 - # 插入抽稀图层  
769 - if geo is not None:  
770 - vacuate_process.vacuate(geo, feature)  
771 -  
772 - vacuate_process.set_vacuate_count()  
773 -  
774 - # 新增  
775 - if configure.VACUATE_DB_URI:  
776 - user, passwd, host, port, datab = PGUtil.get_info_from_sqlachemy_uri(configure.VACUATE_DB_URI)  
777 - else:  
778 - user, passwd, host, port, datab = PGUtil.get_info_from_sqlachemy_uri(DES.decode(database.sqlalchemy_uri))  
779 - connectstr = "hostaddr={} port={} dbname='{}' user='{}' password='{}'".format(host, port, datab, user,  
780 - passwd)  
781 - for l in range(vacuate_process.max_level):  
782 - lev = vacuate_process.t_grid_size.index(vacuate_process.this_gridsize[l])  
783 -  
784 - table_vacuate = TableVacuate(guid=uuid.uuid1().__str__(),  
785 - table_guid=table.guid,  
786 - level=lev,  
787 - name=vacuate_process.vacuate_layers[l].GetName(),  
788 - pixel_distance=vacuate_process.this_gridsize[l],  
789 - connectstr=DES.encode(connectstr))  
790 - sys_session.add(table_vacuate)  
791 -  
792 - sys_session.query(Task).filter_by(guid=task_guid).update({"state": 1, "update_time": datetime.datetime.now(),  
793 - "process": "精化完成"})  
794 - sys_session.query(Table).filter_by(guid=table.guid).update(  
795 - {"is_vacuate": 1, "update_time": datetime.datetime.now()})  
796 - sys_session.commit()  
797 -  
798 - except Exception as e:  
799 - try:  
800 - sys_session.query(Task).filter_by(guid=task_guid).update(  
801 - {"state": -1, "update_time": datetime.datetime.now(),  
802 - "process": "精化失败"})  
803 - sys_session.query(Table).filter_by(guid=table.guid).update(  
804 - {"is_vacuate": 0, "update_time": datetime.datetime.now()})  
805 -  
806 - message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())  
807 - task_process_guid = uuid.uuid1().__str__()  
808 - task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),  
809 - task_guid=task_guid)  
810 - sys_session.add(task_process)  
811 - sys_session.commit()  
812 - if vacuate_process:  
813 - vacuate_process.rollback()  
814 -  
815 - print(traceback.format_exc())  
816 - except Exception as ee:  
817 - print(traceback.format_exc())  
818 - finally:  
819 - if vacuate_process:  
820 - vacuate_process.end()  
821 - if sys_session:  
822 - sys_session.close()  
823 - if pg_session:  
824 - pg_session.close()  
825 - if pg_ds:  
826 - pg_ds.Destroy()  
827 -  
828 -  
829 -class VacuateProcess:  
830 -  
831 - max_level=0  
832 - fill_dict={}  
833 - vacuate_layers={}  
834 - vacuate_layers_gridsize={}  
835 - pg_ds_dict = {}  
836 - # 图层要素大于5W才抽稀  
837 - least_vacuate_count = VacuateConf.least_vacuate_count  
838 -  
839 - extent=[]  
840 - is_spatial=False  
841 -  
842 - lonlat_gridsize = VacuateConf.lonlat_gridsize  
843 - project_gridsize = VacuateConf.project_gridsize  
844 -  
845 - # 该抽稀过程使用的grid_size  
846 - t_grid_size = []  
847 -  
848 - # 该抽稀过程的抽稀网格  
849 - this_gridsize=[]  
850 -  
851 -  
852 - def __init__(self,layer:Layer,table_guid, options,sqlalchemy_uri):  
853 -  
854 - #是空间图层才初始化  
855 - if layer.GetExtent()[0] > 0 or layer.GetExtent()[0] < 0:  
856 -  
857 - self.is_spatial=True  
858 -  
859 - # 判断需要抽稀多少级  
860 -  
861 - lc = layer.GetFeatureCount()  
862 - extent = layer.GetExtent()  
863 - self.extent=extent  
864 -  
865 - #判断疏密程度  
866 - p_x = (extent[1]-extent[0])/10.0  
867 - p_y = (extent[3] - extent[2]) / 10.0  
868 - fill_precent=0  
869 - StructurePrint().print("判断疏密")  
870 - for ix in range(10):  
871 - for iy in range(10):  
872 - grid_extent = [extent[0]+ix*p_x,extent[0]+ix*p_x+p_x,extent[2]+iy*p_y,extent[2]+iy*p_y+p_y]  
873 - poly = GeometryAdapter.envelop_2_polygon(grid_extent)  
874 -  
875 - layer.SetSpatialFilter(None)  
876 - layer.SetSpatialFilter(poly)  
877 - layer.ResetReading()  
878 - if layer.GetNextFeature():  
879 - fill_precent += 1  
880 -  
881 - print(fill_precent)  
882 - StructurePrint().print("判断疏密结束")  
883 -  
884 - layer.SetSpatialFilter(None)  
885 - layer.ResetReading()  
886 - # 固有疏密程度  
887 - original_density=8  
888 -  
889 -  
890 - # 额外一层  
891 - # self.this_gridsize.append(0.000075)  
892 - # self.max_level += 1  
893 - ######  
894 -  
895 - if extent[0]>180:  
896 - self.t_grid_size=self.project_gridsize  
897 - else:  
898 - self.t_grid_size = self.lonlat_gridsize  
899 -  
900 - for grid_size in self.t_grid_size:  
901 - # 最少抽稀个数  
902 - if lc > self.least_vacuate_count:  
903 - # 网格数至少大于  
904 - if ((extent[1] - extent[0]) * (extent[3] - extent[2])) / (grid_size**2)>self.least_vacuate_count:  
905 - # 要素数量大于网格数量  
906 - # 要考虑图层的疏密程度,original_density*(100.0/fill_precent) 为疏密指数  
907 - if lc * original_density * (100.0/fill_precent)>((extent[1] - extent[0])*(extent[3] - extent[2]))/(grid_size**2) :  
908 - print(grid_size)  
909 - self.this_gridsize.append(grid_size)  
910 - self.max_level += 1  
911 -  
912 -  
913 -  
914 - # 创建抽稀ds  
915 - for l in range(self.max_level):  
916 - # pg_ds_l: DataSource = PGUtil.open_pg_data_source(1, DES.decode(sqlalchemy_uri))  
917 - if configure.VACUATE_DB_URI:  
918 - pg_ds_l: DataSource = PGUtil.open_pg_data_source(1, configure.VACUATE_DB_URI)  
919 - else:  
920 - pg_ds_l: DataSource = PGUtil.open_pg_data_source(1, DES.decode(sqlalchemy_uri))  
921 - pg_ds_l.StartTransaction()  
922 - self.pg_ds_dict[l] = pg_ds_l  
923 -  
924 - # 生成抽稀图层  
925 - options = options[1:]  
926 - options.append("OVERWRITE=yes")  
927 - options.append("LAUNDER=no")  
928 -  
929 - schema = layer.schema  
930 - # 增加统计字段  
931 - schema.append(ogr.FieldDefn("_dcigrid_count_", ogr.OFTInteger))  
932 - schema.append(ogr.FieldDefn("_dcigrid_name_", ogr.OFTString))  
933 -  
934 - for l in range(self.max_level):  
935 - this_grid_len = self.this_gridsize[l]  
936 -  
937 - self.vacuate_layers_gridsize[l] = this_grid_len  
938 -  
939 - pg = self.pg_ds_dict[l]  
940 -  
941 - grid_name = str(this_grid_len)  
942 - if this_grid_len<1:  
943 - grid_name = str(this_grid_len).split(".")[-1]  
944 - if this_grid_len.__eq__(0.00008):  
945 - grid_name = "00008"  
946 -  
947 - # 抽稀图层是点面混合的  
948 - # 抽稀表有固定的命名规则  
949 - # 抽稀表一定要覆盖  
950 -  
951 -  
952 - print("{}:{}".format(self.t_grid_size.index(this_grid_len),this_grid_len))  
953 -  
954 -  
955 - v_ln = "z{}_vacuate_{}_{}".format(table_guid, self.t_grid_size.index(this_grid_len), grid_name)  
956 - vl = pg.CreateLayer(v_ln, layer.GetSpatialRef(),ogr.wkbUnknown, options)  
957 - # 抽稀表需要属性  
958 - vl.CreateFields(schema)  
959 - self.vacuate_layers[l] = vl  
960 -  
961 - else:  
962 - pass  
963 -  
964 -  
965 - def vacuate(self,g,feature):  
966 -  
967 - if self.is_spatial:  
968 -  
969 - # 插入到所有抽稀图层中  
970 - for level in range(self.max_level):  
971 -  
972 - center: Geometry = g.Centroid()  
973 -  
974 - extent = g.GetEnvelope()  
975 - long_extent= extent[1]-extent[0]  
976 - lat_extent = extent[3]-extent[2]  
977 -  
978 - this_grid_len =self.vacuate_layers_gridsize[level]  
979 - #超大的直接加入  
980 - # if long_extent > 10*this_grid_len or lat_extent >10*this_grid_len:  
981 - # vacuate_layer: Layer = self.vacuate_layers.get(level)  
982 - # feat = ogr.Feature(vacuate_layer.GetLayerDefn())  
983 - # feat.SetGeometry(g)  
984 - # vacuate_layer.CreateFeature(feat)  
985 - # else:  
986 -  
987 - row = int((center.GetY() - self.extent[2]) / this_grid_len)  
988 - col = int((center.GetX() - self.extent[0]) / this_grid_len)  
989 - key = "{}.{}.{}".format(level, row, col)  
990 -  
991 - if not self.fill_dict.get(key):  
992 - self.fill_dict[key] = 0  
993 - if self.fill_dict[key] == 0:  
994 -  
995 - vacuate_layer: Layer = self.vacuate_layers.get(level)  
996 - feat = ogr.Feature(vacuate_layer.GetLayerDefn())  
997 - # 如果图形比网格小,直接存储其中心点  
998 - if this_grid_len>long_extent and this_grid_len>lat_extent:  
999 - feat.SetGeometry(center)  
1000 - else:  
1001 - feat.SetGeometry(g)  
1002 -  
1003 - # 复制旧feature属性  
1004 - field_dict = feature.items()  
1005 - for field_name in field_dict:  
1006 - feat.SetField(field_name, field_dict[field_name])  
1007 - feat.SetField("_dcigrid_name_",".".join(key.split(".")[1:]))  
1008 -  
1009 - vacuate_layer.CreateFeature(feat)  
1010 - self.fill_dict[key] += 1  
1011 - #超大的还有机会  
1012 - elif (long_extent > 10*this_grid_len or lat_extent >10*this_grid_len) and self.fill_dict[key]<5:  
1013 - vacuate_layer: Layer = self.vacuate_layers.get(level)  
1014 - feat = ogr.Feature(vacuate_layer.GetLayerDefn())  
1015 - feat.SetGeometry(g)  
1016 -  
1017 - # 复制旧feature属性  
1018 - field_dict = feature.items()  
1019 - for field_name in field_dict:  
1020 - feat.SetField(field_name, field_dict[field_name])  
1021 - feat.SetField("_dcigrid_name_",".".join(key.split(".")[1:]))  
1022 -  
1023 - vacuate_layer.CreateFeature(feat)  
1024 - self.fill_dict[key] += 1  
1025 - else:  
1026 - self.fill_dict[key] += 1  
1027 -  
1028 - def set_vacuate_count(self):  
1029 - if self.is_spatial:  
1030 - # 插入到所有抽稀图层中  
1031 - for level in range(self.max_level):  
1032 - vacuate_layer: Layer = self.vacuate_layers.get(level)  
1033 - for feat in vacuate_layer:  
1034 - key = "{}.{}".format(level,feat.GetField("_dcigrid_name_"))  
1035 - feat.SetField("_dcigrid_count_",self.fill_dict.get(key))  
1036 - vacuate_layer.SetFeature(feat)  
1037 -  
1038 - def end(self):  
1039 - for pg in self.pg_ds_dict.values():  
1040 - pg.Destroy()  
1041 -  
1042 - def rollback(self):  
1043 - for pg in self.pg_ds_dict.values():  
1044 - pg.RollbackTransaction() 1 +# coding=utf-8
  2 +#author: 4N
  3 +#createtime: 2021/10/11
  4 +#email: nheweijun@sina.com
  5 +
  6 +
  7 +from ..models import InsertingLayerName,Columns
  8 +from sqlalchemy.orm import Session
  9 +import json
  10 +from sqlalchemy import distinct
  11 +
  12 +import time
  13 +
  14 +from app.util.component.SQLUtil import SQLUtil
  15 +
  16 +import datetime
  17 +
  18 +from ..models import Table, Database, DES,Task,db,TableVacuate,Process
  19 +
  20 +from app.util.component.StructurePrint import StructurePrint
  21 +
  22 +from osgeo.ogr import DataSource,Layer,Geometry
  23 +
  24 +from app.util.component.VacuateConf import VacuateConf
  25 +from app.util.component.GeometryAdapter import GeometryAdapter
  26 +
  27 +import traceback
  28 +from osgeo.ogr import DataSource,Layer,FeatureDefn,FieldDefn,Feature
  29 +from osgeo import gdal,ogr
  30 +import os
  31 +import uuid
  32 +import configure
  33 +
  34 +from app.util.component.PGUtil import PGUtil
  35 +from app.util.component.ZipUtil import ZipUtil
  36 +import multiprocessing
  37 +
  38 +
  39 +def task_consumer():
  40 +
  41 + running_dict = {}
  42 + sys_session: Session = PGUtil.get_db_session(
  43 + configure.SQLALCHEMY_DATABASE_URI)
  44 +
  45 + while True:
  46 +
  47 + try:
  48 + time.sleep(3)
  49 +
  50 + # 已经结束的进程 从监测中删除
  51 + remove_process = []
  52 +
  53 + for process, layer_names in running_dict.items():
  54 + if not process.is_alive():
  55 + for l in layer_names:
  56 + inserted = sys_session.query(
  57 + InsertingLayerName).filter_by(name=l).one_or_none()
  58 + if inserted:
  59 + sys_session.delete(inserted)
  60 + sys_session.commit()
  61 + remove_process.append(process)
  62 + for process in remove_process:
  63 + running_dict.pop(process)
  64 +
  65 +
  66 + # 入库进程少于阈值,开启入库进程
  67 +
  68 + inter_size = sys_session.query(
  69 + distinct(InsertingLayerName.task_guid)).count()
  70 +
  71 + if inter_size < configure.entry_data_thread:
  72 + # 锁表啊
  73 + ready_task: Task = sys_session.query(Task).filter_by(state=0).order_by(
  74 + Task.create_time).with_lockmode("update").limit(1).one_or_none()
  75 + if ready_task:
  76 +
  77 + if ready_task.task_type == 1:
  78 + task_entry_data(ready_task,sys_session,running_dict)
  79 + elif ready_task.task_type == 2:
  80 + task_table_refresh()
  81 + elif ready_task.task_type == 3:
  82 + task_vacuate()
  83 + elif ready_task.task_type == 4:
  84 + task_download()
  85 +
  86 + else:
  87 + # 解表啊
  88 + sys_session.commit()
  89 + except Exception as e:
  90 + sys_session.commit()
  91 + StructurePrint().print(e.__str__(), "error")
  92 +
  93 +
  94 +
  95 +def task_download(para,task_guid):
  96 + sys_session = None
  97 + ds: DataSource = None
  98 +
  99 + # 设置编码
  100 + encoding = para.get("encoding")
  101 + if encoding:
  102 + gdal.SetConfigOption("SHAPE_ENCODING", encoding)
  103 + else:
  104 + gdal.SetConfigOption("SHAPE_ENCODING", "UTF-8")
  105 +
  106 + try:
  107 +
  108 + sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  109 +
  110 + table_names = para.get("table_name").split(",")
  111 + database_guid = para.get("database_guid")
  112 + database = sys_session.query(Database).filter_by(guid=database_guid).one_or_none()
  113 + if not database:
  114 + raise Exception("数据库不存在!")
  115 +
  116 + ds: DataSource = PGUtil.open_pg_data_source(0, DES.decode(database.sqlalchemy_uri))
  117 +
  118 + download_type = para.get("download_type")
  119 +
  120 + data = None
  121 + if download_type.__eq__("shp"):
  122 + data = download_shp(table_names, ds)
  123 + if download_type.__eq__("gdb"):
  124 + data = download_gdb(sys_session, table_names, ds, database_guid)
  125 +
  126 + sys_session.query(Task).filter_by(guid=task_guid).update({"state": 1, "update_time": datetime.datetime.now(),
  127 + "process": "下载完成",
  128 + "parameter": data[0]["download_url"]})
  129 + sys_session.commit()
  130 +
  131 +
  132 + except Exception as e:
  133 + try:
  134 + sys_session.query(Task).filter_by(guid=task_guid).update(
  135 + {"state": -1, "update_time": datetime.datetime.now(),
  136 + "process": "下载失败"})
  137 +
  138 + message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())
  139 + task_process_guid = uuid.uuid1().__str__()
  140 + task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),
  141 + task_guid=task_guid)
  142 + sys_session.add(task_process)
  143 + sys_session.commit()
  144 + except Exception as ee:
  145 + print(traceback.format_exc())
  146 + raise e
  147 + finally:
  148 + try:
  149 + if ds:
  150 + ds.Destroy()
  151 + if sys_session:
  152 + sys_session.close()
  153 + except:
  154 + print(traceback.format_exc())
  155 +
  156 +
  157 +def download_shp(table_names, ds):
  158 + data = []
  159 + for table_name in table_names:
  160 + url = download_one(ds, table_name)
  161 + data.append({"name": table_name, "download_url": url})
  162 + return data
  163 +
  164 +
  165 +def download_one( ds, table_name):
  166 + layer: Layer = ds.GetLayerByName(table_name)
  167 + driver = ogr.GetDriverByName("ESRI Shapefile")
  168 + uuid_ = uuid.uuid1().__str__()
  169 + parent = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
  170 + dirpath = os.path.join(parent, "file_tmp", uuid_)
  171 + os.makedirs(dirpath)
  172 + data_source: DataSource = driver.CreateDataSource(dirpath + "/{}.shp".format(table_name))
  173 + # data_source.CopyLayer(layer, table_name)
  174 +
  175 + fid = layer.GetFIDColumn()
  176 + pg_layer: Layer = data_source.CreateLayer(table_name, layer.GetSpatialRef(), layer.GetGeomType())
  177 + schema = [sche for sche in layer.schema if not sche.name.__eq__(fid)]
  178 +
  179 + pg_layer.CreateFields(schema)
  180 + layer.ResetReading()
  181 + for feature in layer:
  182 + pg_layer.CreateFeature(feature)
  183 +
  184 + data_source.Destroy()
  185 +
  186 + ZipUtil.create_zip(os.path.join(parent, "file_tmp", table_name + "_" + uuid_) + ".zip", [dirpath])
  187 +
  188 + return "http://" + configure.deploy_ip_host + "/API/IO/Download/{}".format(table_name + "_" + uuid_ + ".zip")
  189 +
  190 +
  191 +def download_gdb( sys_session, table_names, ds, database_guid):
  192 + ogr.RegisterAll()
  193 + data = []
  194 + gdal.UseExceptions()
  195 + gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
  196 +
  197 + # 创建一个gdb datasource
  198 + gdb_driver = ogr.GetDriverByName('FileGDB')
  199 + uuid_ = uuid.uuid1().__str__()
  200 + parent = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
  201 + gdb_path = os.path.join(parent, "file_tmp", uuid_ + ".gdb")
  202 +
  203 + gdb_ds: DataSource = gdb_driver.CreateDataSource(gdb_path)
  204 +
  205 + for table_name in table_names:
  206 +
  207 + layer: Layer = ds.GetLayerByName(table_name)
  208 + table = sys_session.query(Table).filter_by(name=table_name, database_guid=database_guid).one_or_none()
  209 + feature_defn: FeatureDefn = layer.GetLayerDefn()
  210 +
  211 + for i in range(feature_defn.GetFieldCount()):
  212 + field_defn: FieldDefn = feature_defn.GetFieldDefn(i)
  213 + field_alias = sys_session.query(Columns).filter_by(table_guid=table.guid,
  214 + name=field_defn.GetName()).one_or_none().alias
  215 + field_defn.SetAlternativeName(field_alias)
  216 +
  217 + table_alias = table.alias
  218 +
  219 +
  220 + fid = layer.GetFIDColumn()
  221 + pg_layer: Layer = gdb_ds.CreateLayer(table_name, layer.GetSpatialRef(), layer.GetGeomType(),
  222 + ["LAYER_ALIAS={}".format(table_alias)])
  223 + schema = [sche for sche in layer.schema if not sche.name.__eq__(fid)]
  224 + # schema = layer.schema
  225 + pg_layer.CreateFields(schema)
  226 +
  227 + # gdb 不支持fid=0的要素,所以识别到后要+1
  228 + offset = 0
  229 + f1: Feature = layer.GetNextFeature()
  230 + if f1:
  231 + if f1.GetFID().__eq__(0):
  232 + offset = 1
  233 + layer.ResetReading()
  234 + for feature in layer:
  235 + feature.SetFID(feature.GetFID() + offset)
  236 + pg_layer.CreateFeature(feature)
  237 +
  238 +
  239 +
  240 + gdb_ds.Destroy()
  241 + ZipUtil.create_zip(gdb_path + ".zip", [gdb_path])
  242 + data.append({"name": ",".join(table_names),
  243 + "download_url": "http://" + configure.deploy_ip_host + "/API/IO/Download/{}".format(
  244 + uuid_ + ".gdb" + ".zip")})
  245 +
  246 + return data
  247 +
  248 +
  249 +def task_entry_data(ready_task,sys_session,running_dict):
  250 + try:
  251 + parameter = json.loads(ready_task.parameter)
  252 + StructurePrint().print("检测到入库任务")
  253 + ready_task.state = 2
  254 + ready_task.process = "入库中"
  255 + sys_session.commit()
  256 +
  257 + metas: list = json.loads(
  258 + parameter.get("meta").__str__())
  259 + parameter["meta"] = metas
  260 +
  261 + database = sys_session.query(Database).filter_by(
  262 + guid=ready_task.database_guid).one_or_none()
  263 + pg_ds: DataSource = PGUtil.open_pg_data_source(
  264 + 1, DES.decode(database.sqlalchemy_uri))
  265 +
  266 + this_task_layer = []
  267 + for meta in metas:
  268 + overwrite = parameter.get("overwrite", "no")
  269 +
  270 + for layer_name_origin, layer_name in meta.get("layer").items():
  271 + origin_name = layer_name
  272 + no = 1
  273 +
  274 + while (overwrite.__eq__("no") and pg_ds.GetLayerByName(layer_name)) or sys_session.query(
  275 + InsertingLayerName).filter_by(name=layer_name).one_or_none():
  276 + layer_name = origin_name + "_{}".format(no)
  277 + no += 1
  278 +
  279 + # 添加到正在入库的列表中
  280 + iln = InsertingLayerName(guid=uuid.uuid1().__str__(),
  281 + task_guid=ready_task.guid,
  282 + name=layer_name)
  283 +
  284 + sys_session.add(iln)
  285 + sys_session.commit()
  286 + this_task_layer.append(layer_name)
  287 + # 修改表名
  288 + meta["layer"][layer_name_origin] = layer_name
  289 +
  290 + pg_ds.Destroy()
  291 + entry_data_process = multiprocessing.Process(
  292 + target=EntryDataVacuate().entry, args=(parameter,))
  293 + entry_data_process.start()
  294 + running_dict[entry_data_process] = this_task_layer
  295 + except Exception as e:
  296 + sys_session.query(Task).filter_by(guid=ready_task.guid).update(
  297 + {"state": -1, "process": "入库失败"})
  298 + sys_session.commit()
  299 + StructurePrint().print(e.__str__(), "error")
  300 +
  301 +
  302 +def task_table_refresh(database,task_guid):
  303 + pg_ds =None
  304 + sys_ds =None
  305 + data_session=None
  306 + result = {}
  307 + sys_session = None
  308 + db_tuple = PGUtil.get_info_from_sqlachemy_uri(DES.decode(database.sqlalchemy_uri))
  309 +
  310 + try:
  311 + sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  312 + sys_ds = PGUtil.open_pg_data_source(0,configure.SQLALCHEMY_DATABASE_URI)
  313 +
  314 + this_time = datetime.datetime.now()
  315 + database_guid = database.guid
  316 +
  317 + # 已注册空间表
  318 + spatial_tables = sys_session.query(Table).order_by(Table.create_time.desc()).filter_by(database_guid=database_guid).filter(
  319 + Table.table_type != 0).all()
  320 +
  321 + # 已注册空间表名
  322 + spatial_tables_names = [table.name for table in spatial_tables]
  323 +
  324 + # 实体库datasource
  325 + pg_ds: DataSource = PGUtil.open_pg_data_source(1, DES.decode(database.sqlalchemy_uri))
  326 +
  327 + # 更新空间表
  328 + # 增加表
  329 + db_tables_names = add_spatail_table(database, pg_ds, sys_session,spatial_tables_names, this_time,db_tuple)# 实体库中空间表名
  330 +
  331 + # 删除/修改表
  332 + edit_spatial_table(pg_ds, sys_session,spatial_tables, db_tables_names, this_time,db_tuple)
  333 +
  334 + # 空间表处理完毕
  335 + sys_session.commit()
  336 +
  337 +
  338 + # 空间表处理完毕
  339 + sys_session.commit()
  340 +
  341 + # 注册普通表
  342 + # 实体库连接
  343 + data_session: Session = PGUtil.get_db_session(DES.decode(database.sqlalchemy_uri))
  344 +
  345 + # 处理后空间表
  346 + spatial_tables = sys_session.query(Table).order_by(Table.create_time.desc()).filter_by(database_guid=database_guid).filter(
  347 + Table.table_type != 0).all()
  348 + # 处理后空间表名
  349 + spatial_tables_names = [table.name for table in spatial_tables]
  350 +
  351 + # 原有普通表
  352 + common_tables = sys_session.query(Table).order_by(Table.create_time.desc()).filter_by(database_guid=database_guid).filter(
  353 + Table.table_type == 0).all()
  354 + # 原有普通表 名
  355 + origin_common_tables_name = [table.name for table in common_tables]
  356 +
  357 + # 现有普通表
  358 + real_common_tables_name = []
  359 +
  360 + # 只注册public中的表
  361 + common_result = data_session.execute(
  362 + "select relname as tabname from pg_class c where relkind = 'r' and relnamespace=2200 and relname not like 'pg_%' and relname not like 'sql_%' order by relname").fetchall()
  363 + for re in common_result:
  364 + table_name = re[0]
  365 + if table_name not in spatial_tables_names and (not table_name.__contains__("_vacuate_")):
  366 + real_common_tables_name.append(table_name)
  367 +
  368 + # 增加新普通表
  369 +
  370 + add_common_table(data_session, sys_session, database_guid, real_common_tables_name, origin_common_tables_name,
  371 + this_time,db_tuple)
  372 +
  373 + # 删除、修改普通表
  374 + edit_common_table(data_session,sys_session, database_guid, real_common_tables_name, origin_common_tables_name,
  375 + this_time,db_tuple)
  376 +
  377 + sys_session.commit()
  378 + result["data"] = "刷新数据成功!"
  379 + result["state"] = 1
  380 + sys_session.query(Task).filter_by(guid=task_guid).update(
  381 + {"state": 1, "update_time": datetime.datetime.now(),"process":"更新成功"})
  382 + sys_session.commit()
  383 +
  384 + except Exception as e:
  385 + try:
  386 + print(traceback.format_exc())
  387 + sys_session.query(Task).filter_by(guid=task_guid).update(
  388 + {"state": -1, "update_time": datetime.datetime.now(),"process":"更新失败"})
  389 + message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())
  390 + task_process_guid = uuid.uuid1().__str__()
  391 + task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),
  392 + task_guid=task_guid)
  393 + sys_session.add(task_process)
  394 + sys_session.commit()
  395 + except Exception as ee:
  396 + print(traceback.format_exc())
  397 + finally:
  398 + if pg_ds:
  399 + pg_ds.Destroy()
  400 + if data_session:
  401 + data_session.close()
  402 + if sys_session:
  403 + sys_session.close()
  404 + if sys_ds:
  405 + sys_ds.Destroy()
  406 + return result
  407 +
  408 +def add_spatail_table(database,pg_ds,sys_session,spatial_tables_names,this_time,db_tuple):
  409 + '''
  410 + 注册新增空间表
  411 + :param database:
  412 + :param pg_ds:
  413 + :param spatial_tables_names: 已注册空间表名
  414 + :param this_time:
  415 + :return: 实体库中空间表名
  416 + '''
  417 +
  418 + db_tables_names=[]
  419 +
  420 + for i in range(pg_ds.GetLayerCount()):
  421 + layer: Layer = pg_ds.GetLayer(i)
  422 + geom_column = layer.GetGeometryColumn()
  423 + db_tables_names.append(layer.GetName())
  424 + if not geom_column:
  425 + continue
  426 + if layer.GetName() not in spatial_tables_names:
  427 + l_name = layer.GetName()
  428 +
  429 + try:
  430 + # 只注册public的空间表,其他表空间的表名会有.
  431 + if layer.GetName().__contains__("."):
  432 + continue
  433 + # 略过抽稀表
  434 + if layer.GetName().__contains__("_vacuate_"):
  435 + continue
  436 +
  437 + # 没有权限的表跳过
  438 + if not PGUtil.check_table_privilege(l_name, "SELECT", db_tuple[0], pg_ds):
  439 + StructurePrint().print("用户{}对表{}没有select权限!".format(db_tuple[0], l_name), "warn")
  440 + continue
  441 +
  442 +
  443 + # 范围统计和数量统计以100w为界限
  444 + query_count_layer: Layer = pg_ds.ExecuteSQL(
  445 + '''SELECT reltuples::bigint AS ec FROM pg_class WHERE oid = 'public."{}"'::regclass'''.format(
  446 + l_name))
  447 +
  448 + feature_count = query_count_layer.GetFeature(0).GetField("ec")
  449 + # 要素少于100w可以精确统计
  450 + if feature_count < 1000000:
  451 + feature_count = layer.GetFeatureCount()
  452 + ext = layer.GetExtent()
  453 + else:
  454 + query_ext_layer: Layer = pg_ds.ExecuteSQL(
  455 + "select geometry(ST_EstimatedExtent('public', '{}','{}'))".format(l_name,
  456 + layer.GetGeometryColumn()))
  457 + ext = query_ext_layer.GetExtent()
  458 + if ext[0] < 360:
  459 + ext = [round(e, 6) for e in ext]
  460 + else:
  461 + ext = [round(e, 2) for e in ext]
  462 + extent = "{},{},{},{}".format(ext[0], ext[1], ext[2], ext[3])
  463 +
  464 + StructurePrint().print("空间表增加!")
  465 +
  466 + geom_type = GeometryAdapter.get_geometry_type(layer)
  467 + except:
  468 + StructurePrint().print("表{}注册失败!".format(l_name), "warn")
  469 + continue
  470 +
  471 + table_guid = uuid.uuid1().__str__()
  472 + table = Table(guid=table_guid,
  473 + database_guid=database.guid,
  474 + # alias=layer.GetName(),
  475 + name=layer.GetName(), create_time=this_time, update_time=this_time,
  476 + table_type=GeometryAdapter.get_table_type(geom_type),
  477 + extent=extent,
  478 + feature_count=feature_count
  479 + )
  480 + sys_session.add(table)
  481 + feature_defn: FeatureDefn = layer.GetLayerDefn()
  482 +
  483 + for i in range(feature_defn.GetFieldCount()):
  484 + field_defn: FieldDefn = feature_defn.GetFieldDefn(i)
  485 + field_name = field_defn.GetName()
  486 + field_alias = field_name if field_defn.GetAlternativeName() is None or field_defn.GetAlternativeName().__eq__(
  487 + "") else field_defn.GetAlternativeName()
  488 + column = Columns(guid=uuid.uuid1().__str__(), table_guid=table_guid,
  489 + name=field_name, alias=field_alias, create_time=this_time, update_time=this_time)
  490 + sys_session.add(column)
  491 + return db_tables_names
  492 +
  493 +def deal_vacuate_table(sys_ds,sys_session,database_guid):
  494 +
  495 +
  496 + for i in range(sys_ds.GetLayerCount()):
  497 + layer: Layer = sys_ds.GetLayer(i)
  498 + geom_column = layer.GetGeometryColumn()
  499 +
  500 + if not geom_column:
  501 + continue
  502 +
  503 +
  504 +
  505 + if layer.GetName().__contains__("_vacuate_"):
  506 + l_name = layer.GetName()
  507 +
  508 + base_layer_name = l_name.split("_vacuate_")[0].split("_")[1]
  509 +
  510 + level = l_name.split("_")[-2]
  511 +
  512 + pixel_distance_str: str ="0"
  513 + try:
  514 + pixel_distance_str: str = l_name.split("_")[-1]
  515 + if pixel_distance_str.startswith("0"):
  516 + pixel_distance_str = "0.{}".format(pixel_distance_str)
  517 + except:
  518 + pass
  519 +
  520 + base_table =sys_session.query(Table).filter_by(name=base_layer_name,database_guid=database_guid).one_or_none()
  521 + if base_table:
  522 + if not sys_session.query(TableVacuate).filter_by(table_guid=base_table.guid,name=l_name).one_or_none():
  523 + table_vacuate = TableVacuate(guid=uuid.uuid1().__str__(),
  524 + table_guid=base_table.guid,
  525 + level=level,
  526 + name=l_name,
  527 + pixel_distance=float(pixel_distance_str))
  528 + sys_session.add(table_vacuate)
  529 +
  530 + sys_session.query(Table).filter_by(guid=base_table.guid).update({"is_vacuate": 1})
  531 + else:
  532 + kk=1
  533 +
  534 +
  535 +
  536 +def edit_spatial_table(pg_ds,sys_session,spatial_tables,db_tables_names,this_time,db_tuple):
  537 +
  538 +
  539 +
  540 + for table in spatial_tables:
  541 +
  542 + # 删除表
  543 + if table.name not in db_tables_names:
  544 + StructurePrint().print("空间表减少!")
  545 + sys_session.delete(table)
  546 + # 修改表
  547 + else:
  548 + layer: Layer = pg_ds.GetLayerByName(table.name)
  549 + l_name = layer.GetName()
  550 +
  551 + # 只注册public的空间表,其他表空间的表名会有.
  552 + if layer.GetName().__contains__("."):
  553 + continue
  554 +
  555 + if layer.GetName().__contains__("_vacuate_"):
  556 + continue
  557 +
  558 + # 没有权限的表跳过
  559 + if not PGUtil.check_table_privilege(l_name, "SELECT", db_tuple[0], pg_ds):
  560 + StructurePrint().print("用户{}对表{}没有select权限!".format(db_tuple[0], l_name), "warn")
  561 + sys_session.delete(table)
  562 + continue
  563 +
  564 + columns = table.relate_columns
  565 + columns_names = [column.name for column in columns]
  566 + feature_defn: FeatureDefn = layer.GetLayerDefn()
  567 + db_columns_names = []
  568 +
  569 + # 增加列
  570 + for i in range(feature_defn.GetFieldCount()):
  571 + field_defn: FieldDefn = feature_defn.GetFieldDefn(i)
  572 + field_name = field_defn.GetName()
  573 + db_columns_names.append(field_name)
  574 +
  575 + if field_name not in columns_names:
  576 + StructurePrint().print("{}空间表属性增加!".format(table.name))
  577 + field_alias = field_name if field_defn.GetAlternativeName() is None or field_defn.GetAlternativeName().__eq__(
  578 + "") else field_defn.GetAlternativeName()
  579 + column = Columns(guid=uuid.uuid1().__str__(), table_guid=table.guid,
  580 + name=field_name, alias=field_alias, create_time=this_time,
  581 + update_time=this_time)
  582 + sys_session.add(column)
  583 +
  584 + # 删除列
  585 + for column in columns:
  586 + if column.name not in db_columns_names:
  587 + StructurePrint().print("{}空间表属性减少!".format(table.name))
  588 + sys_session.delete(column)
  589 +
  590 + # 范围统计和数量统计以100w为界限
  591 + query_count_layer: Layer = pg_ds.ExecuteSQL(
  592 + '''SELECT reltuples::bigint AS ec FROM pg_class WHERE oid = 'public."{}"'::regclass'''.format(
  593 + l_name))
  594 + feature_count = query_count_layer.GetFeature(0).GetField("ec")
  595 + # 要素少于100w可以精确统计
  596 + if feature_count < 1000000:
  597 + feature_count = layer.GetFeatureCount()
  598 + ext = layer.GetExtent()
  599 +
  600 + else:
  601 + query_ext_layer: Layer = pg_ds.ExecuteSQL(
  602 + "select geometry(ST_EstimatedExtent('public', '{}','{}'))".format(l_name,
  603 + layer.GetGeometryColumn()))
  604 + ext = query_ext_layer.GetExtent()
  605 + if ext[0] < 360:
  606 + ext = [round(e, 6) for e in ext]
  607 + else:
  608 + ext = [round(e, 2) for e in ext]
  609 + extent = "{},{},{},{}".format(ext[0], ext[1], ext[2], ext[3])
  610 +
  611 + # 修改要素量
  612 + if not table.feature_count.__eq__(feature_count):
  613 + StructurePrint().print("{}空间表要素!".format(table.name))
  614 + sys_session.query(Table).filter_by(guid=table.guid).update({"feature_count": feature_count,
  615 + "extent": extent})
  616 +
  617 +
  618 +def add_common_table(data_session,sys_session,database_guid,real_common_tables_name,origin_common_tables_name,this_time,db_tuple):
  619 + for table_name in real_common_tables_name:
  620 + if table_name not in origin_common_tables_name:
  621 + StructurePrint().print("{}非空间表增加!".format(table_name))
  622 + table_guid = uuid.uuid1().__str__()
  623 +
  624 +
  625 + # 没有权限的表跳过
  626 + if not SQLUtil.check_table_privilege(table_name, "SELECT", db_tuple[0], data_session):
  627 + StructurePrint().print("用户{}对表{}没有select权限!".format(db_tuple[0], table_name), "warn")
  628 + continue
  629 +
  630 + count = data_session.execute('select count(*) from "{}"'.format(table_name)).fetchone()[0]
  631 +
  632 + table = Table(guid=table_guid,
  633 + database_guid=database_guid,
  634 + name=table_name, create_time=this_time, update_time=this_time,
  635 + table_type=0,
  636 + feature_count=count
  637 + )
  638 +
  639 + sys_session.add(table)
  640 +
  641 + sql = '''
  642 + SELECT
  643 + a.attnum,
  644 + a.attname AS field
  645 + FROM
  646 + pg_class c,
  647 + pg_attribute a,
  648 + pg_type t
  649 + WHERE
  650 + c.relname = '{}'
  651 + and a.attnum > 0
  652 + and a.attrelid = c.oid
  653 + and a.atttypid = t.oid
  654 + ORDER BY a.attnum
  655 + '''.format(table_name)
  656 +
  657 + cols = data_session.execute(sql).fetchall()
  658 + for col in cols:
  659 + column = Columns(guid=uuid.uuid1().__str__(), table_guid=table_guid,
  660 + name=col[1], create_time=this_time, update_time=this_time)
  661 + sys_session.add(column)
  662 +
  663 + # 删除不存在的表
  664 + for n in origin_common_tables_name:
  665 + if n not in real_common_tables_name:
  666 + tables = Table.query.filter_by(name=n).filter_by(database_guid=database_guid).all()
  667 + for table in tables:
  668 + sys_session.delete(table)
  669 +
  670 +def edit_common_table(data_session,sys_session,database_guid,real_common_tables_name,origin_common_tables_name,this_time,db_tuple):
  671 + for table_name in origin_common_tables_name:
  672 + tables = sys_session.query(Table).filter_by(name=table_name).filter_by(database_guid=database_guid).all()
  673 + for table in tables:
  674 + if table_name not in real_common_tables_name:
  675 + StructurePrint().print("{}非空间表减少!".format(table_name))
  676 + sys_session.delete(table)
  677 + # 修改表
  678 + else:
  679 +
  680 + # 没有权限的表删除
  681 + if not SQLUtil.check_table_privilege(table_name, "SELECT", db_tuple[0], data_session):
  682 + StructurePrint().print("用户{}对表{}没有select权限!".format(db_tuple[0], table_name), "warn")
  683 + sys_session.delete(table)
  684 + continue
  685 +
  686 + columns = table.relate_columns
  687 + columns_names = [column.name for column in columns]
  688 +
  689 + sql = '''
  690 + SELECT
  691 + a.attnum,
  692 + a.attname AS field
  693 + FROM
  694 + pg_class c,
  695 + pg_attribute a,
  696 + pg_type t
  697 + WHERE
  698 + c.relname = '{}'
  699 + and a.attnum > 0
  700 + and a.attrelid = c.oid
  701 + and a.atttypid = t.oid
  702 + ORDER BY a.attnum
  703 + '''.format(table_name)
  704 +
  705 + cols = data_session.execute(sql).fetchall()
  706 + real_cols_name = [col[1] for col in cols]
  707 +
  708 + # 属性增加
  709 + for col in real_cols_name:
  710 + if col not in columns_names:
  711 + StructurePrint().print("{}表要素属性增加!".format(table_name))
  712 + column = Columns(guid=uuid.uuid1().__str__(), table_guid=table.guid,
  713 + name=col, create_time=this_time, update_time=this_time)
  714 + sys_session.add(column)
  715 +
  716 + # 属性减少
  717 + for column in columns:
  718 + if column.name not in real_cols_name:
  719 + StructurePrint().print("{}表要素属性减少!".format(table_name))
  720 + sys_session.delete(column)
  721 +
  722 + # 修改要素量
  723 + # sql = 'select count(*) from "{}"'.format(table_name)
  724 + # count = data_session.execute(sql).fetchone()[0]
  725 +
  726 + count = SQLUtil.get_table_count(table_name,data_session)
  727 +
  728 + if not table.feature_count.__eq__(count):
  729 + StructurePrint().print("{}表要素变化!".format(table_name))
  730 + sys_session.query(Table).filter_by(guid=table.guid).update({"feature_count": count})
  731 +
  732 +
  733 +def task_vacuate(table,task_guid):
  734 +
  735 + sys_session = None
  736 + pg_session = None
  737 + pg_ds = None
  738 + vacuate_process = None
  739 + try:
  740 + sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  741 + sys_session.query(Table).filter_by(guid=table.guid).update(
  742 + {"is_vacuate": 2, "update_time": datetime.datetime.now()})
  743 + sys_session.commit()
  744 +
  745 + database = sys_session.query(Database).filter_by(guid=table.database_guid).one_or_none()
  746 + pg_session = PGUtil.get_db_session(DES.decode(database.sqlalchemy_uri))
  747 +
  748 + pg_ds: DataSource = PGUtil.open_pg_data_source(0, DES.decode(database.sqlalchemy_uri))
  749 +
  750 + # 删除原有数据
  751 + tvs = sys_session.query(TableVacuate).filter_by(table_guid=table.guid).all()
  752 + for tv in tvs:
  753 + sys_session.delete(tv)
  754 +
  755 +
  756 + # 创建抽稀过程
  757 + options = ["OVERWRITE=yes", "GEOMETRY_NAME={}".format(PGUtil.get_geo_column(table.name, pg_session)),
  758 + "PRECISION=NO"]
  759 +
  760 + layer = pg_ds.GetLayerByName(table.name)
  761 +
  762 + vacuate_process: VacuateProcess = VacuateProcess(layer, table.guid, options, database.sqlalchemy_uri)
  763 +
  764 + for feature in layer:
  765 + geo = feature.GetGeometryRef()
  766 + # 插入抽稀图层
  767 + if geo is not None:
  768 + vacuate_process.vacuate(geo, feature)
  769 +
  770 + vacuate_process.set_vacuate_count()
  771 +
  772 + # 新增
  773 + if configure.VACUATE_DB_URI:
  774 + user, passwd, host, port, datab = PGUtil.get_info_from_sqlachemy_uri(configure.VACUATE_DB_URI)
  775 + else:
  776 + user, passwd, host, port, datab = PGUtil.get_info_from_sqlachemy_uri(DES.decode(database.sqlalchemy_uri))
  777 + connectstr = "hostaddr={} port={} dbname='{}' user='{}' password='{}'".format(host, port, datab, user,
  778 + passwd)
  779 + for l in range(vacuate_process.max_level):
  780 + lev = vacuate_process.t_grid_size.index(vacuate_process.this_gridsize[l])
  781 +
  782 + table_vacuate = TableVacuate(guid=uuid.uuid1().__str__(),
  783 + table_guid=table.guid,
  784 + level=lev,
  785 + name=vacuate_process.vacuate_layers[l].GetName(),
  786 + pixel_distance=vacuate_process.this_gridsize[l],
  787 + connectstr=DES.encode(connectstr))
  788 + sys_session.add(table_vacuate)
  789 +
  790 + sys_session.query(Task).filter_by(guid=task_guid).update({"state": 1, "update_time": datetime.datetime.now(),
  791 + "process": "精化完成"})
  792 + sys_session.query(Table).filter_by(guid=table.guid).update(
  793 + {"is_vacuate": 1, "update_time": datetime.datetime.now()})
  794 + sys_session.commit()
  795 +
  796 + except Exception as e:
  797 + try:
  798 + sys_session.query(Task).filter_by(guid=task_guid).update(
  799 + {"state": -1, "update_time": datetime.datetime.now(),
  800 + "process": "精化失败"})
  801 + sys_session.query(Table).filter_by(guid=table.guid).update(
  802 + {"is_vacuate": 0, "update_time": datetime.datetime.now()})
  803 +
  804 + message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())
  805 + task_process_guid = uuid.uuid1().__str__()
  806 + task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),
  807 + task_guid=task_guid)
  808 + sys_session.add(task_process)
  809 + sys_session.commit()
  810 + if vacuate_process:
  811 + vacuate_process.rollback()
  812 +
  813 + print(traceback.format_exc())
  814 + except Exception as ee:
  815 + print(traceback.format_exc())
  816 + finally:
  817 + if vacuate_process:
  818 + vacuate_process.end()
  819 + if sys_session:
  820 + sys_session.close()
  821 + if pg_session:
  822 + pg_session.close()
  823 + if pg_ds:
  824 + pg_ds.Destroy()
  825 +
  826 +
  827 +class VacuateProcess:
  828 +
  829 + max_level=0
  830 + fill_dict={}
  831 + vacuate_layers={}
  832 + vacuate_layers_gridsize={}
  833 + pg_ds_dict = {}
  834 + # 图层要素大于5W才抽稀
  835 + least_vacuate_count = VacuateConf.least_vacuate_count
  836 +
  837 + extent=[]
  838 + is_spatial=False
  839 +
  840 + lonlat_gridsize = VacuateConf.lonlat_gridsize
  841 + project_gridsize = VacuateConf.project_gridsize
  842 +
  843 + # 该抽稀过程使用的grid_size
  844 + t_grid_size = []
  845 +
  846 + # 该抽稀过程的抽稀网格
  847 + this_gridsize=[]
  848 +
  849 +
  850 + def __init__(self,layer:Layer,table_guid, options,sqlalchemy_uri):
  851 +
  852 + #是空间图层才初始化
  853 + if layer.GetExtent()[0] > 0 or layer.GetExtent()[0] < 0:
  854 +
  855 + self.is_spatial=True
  856 +
  857 + # 判断需要抽稀多少级
  858 +
  859 + lc = layer.GetFeatureCount()
  860 + extent = layer.GetExtent()
  861 + self.extent=extent
  862 +
  863 + #判断疏密程度
  864 + p_x = (extent[1]-extent[0])/10.0
  865 + p_y = (extent[3] - extent[2]) / 10.0
  866 + fill_precent=0
  867 + StructurePrint().print("判断疏密")
  868 + for ix in range(10):
  869 + for iy in range(10):
  870 + grid_extent = [extent[0]+ix*p_x,extent[0]+ix*p_x+p_x,extent[2]+iy*p_y,extent[2]+iy*p_y+p_y]
  871 + poly = GeometryAdapter.envelop_2_polygon(grid_extent)
  872 +
  873 + layer.SetSpatialFilter(None)
  874 + layer.SetSpatialFilter(poly)
  875 + layer.ResetReading()
  876 + if layer.GetNextFeature():
  877 + fill_precent += 1
  878 +
  879 + print(fill_precent)
  880 + StructurePrint().print("判断疏密结束")
  881 +
  882 + layer.SetSpatialFilter(None)
  883 + layer.ResetReading()
  884 + # 固有疏密程度
  885 + original_density=8
  886 +
  887 +
  888 + # 额外一层
  889 + # self.this_gridsize.append(0.000075)
  890 + # self.max_level += 1
  891 + ######
  892 +
  893 + if extent[0]>180:
  894 + self.t_grid_size=self.project_gridsize
  895 + else:
  896 + self.t_grid_size = self.lonlat_gridsize
  897 +
  898 + for grid_size in self.t_grid_size:
  899 + # 最少抽稀个数
  900 + if lc > self.least_vacuate_count:
  901 + # 网格数至少大于
  902 + if ((extent[1] - extent[0]) * (extent[3] - extent[2])) / (grid_size**2)>self.least_vacuate_count:
  903 + # 要素数量大于网格数量
  904 + # 要考虑图层的疏密程度,original_density*(100.0/fill_precent) 为疏密指数
  905 + if lc * original_density * (100.0/fill_precent)>((extent[1] - extent[0])*(extent[3] - extent[2]))/(grid_size**2) :
  906 + print(grid_size)
  907 + self.this_gridsize.append(grid_size)
  908 + self.max_level += 1
  909 +
  910 +
  911 +
  912 + # 创建抽稀ds
  913 + for l in range(self.max_level):
  914 + # pg_ds_l: DataSource = PGUtil.open_pg_data_source(1, DES.decode(sqlalchemy_uri))
  915 + if configure.VACUATE_DB_URI:
  916 + pg_ds_l: DataSource = PGUtil.open_pg_data_source(1, configure.VACUATE_DB_URI)
  917 + else:
  918 + pg_ds_l: DataSource = PGUtil.open_pg_data_source(1, DES.decode(sqlalchemy_uri))
  919 + pg_ds_l.StartTransaction()
  920 + self.pg_ds_dict[l] = pg_ds_l
  921 +
  922 + # 生成抽稀图层
  923 + options = options[1:]
  924 + options.append("OVERWRITE=yes")
  925 + options.append("LAUNDER=no")
  926 +
  927 + schema = layer.schema
  928 + # 增加统计字段
  929 + schema.append(ogr.FieldDefn("_dcigrid_count_", ogr.OFTInteger))
  930 + schema.append(ogr.FieldDefn("_dcigrid_name_", ogr.OFTString))
  931 +
  932 + for l in range(self.max_level):
  933 + this_grid_len = self.this_gridsize[l]
  934 +
  935 + self.vacuate_layers_gridsize[l] = this_grid_len
  936 +
  937 + pg = self.pg_ds_dict[l]
  938 +
  939 + grid_name = str(this_grid_len)
  940 + if this_grid_len<1:
  941 + grid_name = str(this_grid_len).split(".")[-1]
  942 + if this_grid_len.__eq__(0.00008):
  943 + grid_name = "00008"
  944 +
  945 + # 抽稀图层是点面混合的
  946 + # 抽稀表有固定的命名规则
  947 + # 抽稀表一定要覆盖
  948 +
  949 +
  950 + print("{}:{}".format(self.t_grid_size.index(this_grid_len),this_grid_len))
  951 +
  952 +
  953 + v_ln = "z{}_vacuate_{}_{}".format(table_guid, self.t_grid_size.index(this_grid_len), grid_name)
  954 + vl = pg.CreateLayer(v_ln, layer.GetSpatialRef(),ogr.wkbUnknown, options)
  955 + # 抽稀表需要属性
  956 + vl.CreateFields(schema)
  957 + self.vacuate_layers[l] = vl
  958 +
  959 + else:
  960 + pass
  961 +
  962 +
  963 + def vacuate(self,g,feature):
  964 +
  965 + if self.is_spatial:
  966 +
  967 + # 插入到所有抽稀图层中
  968 + for level in range(self.max_level):
  969 +
  970 + center: Geometry = g.Centroid()
  971 +
  972 + extent = g.GetEnvelope()
  973 + long_extent= extent[1]-extent[0]
  974 + lat_extent = extent[3]-extent[2]
  975 +
  976 + this_grid_len =self.vacuate_layers_gridsize[level]
  977 + #超大的直接加入
  978 + # if long_extent > 10*this_grid_len or lat_extent >10*this_grid_len:
  979 + # vacuate_layer: Layer = self.vacuate_layers.get(level)
  980 + # feat = ogr.Feature(vacuate_layer.GetLayerDefn())
  981 + # feat.SetGeometry(g)
  982 + # vacuate_layer.CreateFeature(feat)
  983 + # else:
  984 +
  985 + row = int((center.GetY() - self.extent[2]) / this_grid_len)
  986 + col = int((center.GetX() - self.extent[0]) / this_grid_len)
  987 + key = "{}.{}.{}".format(level, row, col)
  988 +
  989 + if not self.fill_dict.get(key):
  990 + self.fill_dict[key] = 0
  991 + if self.fill_dict[key] == 0:
  992 +
  993 + vacuate_layer: Layer = self.vacuate_layers.get(level)
  994 + feat = ogr.Feature(vacuate_layer.GetLayerDefn())
  995 + # 如果图形比网格小,直接存储其中心点
  996 + if this_grid_len>long_extent and this_grid_len>lat_extent:
  997 + feat.SetGeometry(center)
  998 + else:
  999 + feat.SetGeometry(g)
  1000 +
  1001 + # 复制旧feature属性
  1002 + field_dict = feature.items()
  1003 + for field_name in field_dict:
  1004 + feat.SetField(field_name, field_dict[field_name])
  1005 + feat.SetField("_dcigrid_name_",".".join(key.split(".")[1:]))
  1006 +
  1007 + vacuate_layer.CreateFeature(feat)
  1008 + self.fill_dict[key] += 1
  1009 + #超大的还有机会
  1010 + elif (long_extent > 10*this_grid_len or lat_extent >10*this_grid_len) and self.fill_dict[key]<5:
  1011 + vacuate_layer: Layer = self.vacuate_layers.get(level)
  1012 + feat = ogr.Feature(vacuate_layer.GetLayerDefn())
  1013 + feat.SetGeometry(g)
  1014 +
  1015 + # 复制旧feature属性
  1016 + field_dict = feature.items()
  1017 + for field_name in field_dict:
  1018 + feat.SetField(field_name, field_dict[field_name])
  1019 + feat.SetField("_dcigrid_name_",".".join(key.split(".")[1:]))
  1020 +
  1021 + vacuate_layer.CreateFeature(feat)
  1022 + self.fill_dict[key] += 1
  1023 + else:
  1024 + self.fill_dict[key] += 1
  1025 +
  1026 + def set_vacuate_count(self):
  1027 + if self.is_spatial:
  1028 + # 插入到所有抽稀图层中
  1029 + for level in range(self.max_level):
  1030 + vacuate_layer: Layer = self.vacuate_layers.get(level)
  1031 + for feat in vacuate_layer:
  1032 + key = "{}.{}".format(level,feat.GetField("_dcigrid_name_"))
  1033 + feat.SetField("_dcigrid_count_",self.fill_dict.get(key))
  1034 + vacuate_layer.SetFeature(feat)
  1035 +
  1036 + def end(self):
  1037 + for pg in self.pg_ds_dict.values():
  1038 + pg.Destroy()
  1039 +
  1040 + def rollback(self):
  1041 + for pg in self.pg_ds_dict.values():
  1042 + pg.RollbackTransaction()
@@ -8,7 +8,7 @@ import socket @@ -8,7 +8,7 @@ import socket
8 8
9 9
10 from app.util.component.ApiTemplate import ApiTemplate 10 from app.util.component.ApiTemplate import ApiTemplate
11 -from app.util.component.StructurePrint import StructurePrint 11 +
12 class Api(ApiTemplate): 12 class Api(ApiTemplate):
13 13
14 def process(self): 14 def process(self):
@@ -8,7 +8,7 @@ from app.util.component.ApiTemplate import ApiTemplate @@ -8,7 +8,7 @@ from app.util.component.ApiTemplate import ApiTemplate
8 import requests 8 import requests
9 from app.util.component.UserCheck import UserCheck 9 from app.util.component.UserCheck import UserCheck
10 from ..util.ServiceType import ServiceType 10 from ..util.ServiceType import ServiceType
11 - 11 +import json
12 class Api(ApiTemplate): 12 class Api(ApiTemplate):
13 13
14 api_name = "修改ImageService服务" 14 api_name = "修改ImageService服务"
@@ -29,6 +29,11 @@ class Api(ApiTemplate): @@ -29,6 +29,11 @@ class Api(ApiTemplate):
29 UserCheck.verify(user_req.json().get("data").get("service").get("creator")) 29 UserCheck.verify(user_req.json().get("data").get("service").get("creator"))
30 30
31 url = "{}/API/Service/Edit".format(self.para.get("url")) 31 url = "{}/API/Service/Edit".format(self.para.get("url"))
  32 +
  33 + # 转换Null值
  34 + for key in self.para.keys():
  35 + if self.para[key] is None:
  36 + self.para[key] = ""
32 response:requests.Response = requests.post(url,data=self.para) 37 response:requests.Response = requests.post(url,data=self.para)
33 if response.status_code == 200: 38 if response.status_code == 200:
34 if not response.json().get("result"): 39 if not response.json().get("result"):
@@ -39,7 +39,7 @@ class Api(ApiTemplate): @@ -39,7 +39,7 @@ class Api(ApiTemplate):
39 try: 39 try:
40 40
41 this_time = datetime.datetime.now() 41 this_time = datetime.datetime.now()
42 - service_guid = uuid.uuid1().__str__() 42 + service_guid = uuid.uuid1().__str__ida()
43 map_service_guid = uuid.uuid1().__str__() 43 map_service_guid = uuid.uuid1().__str__()
44 44
45 # 逻辑是,先调用引擎接口,引擎说可以,才做持久化 45 # 逻辑是,先调用引擎接口,引擎说可以,才做持久化
@@ -4,7 +4,6 @@ @@ -4,7 +4,6 @@
4 #email: nheweijun@sina.com 4 #email: nheweijun@sina.com
5 5
6 6
7 -  
8 import os 7 import os
9 8
10 from app.util.component.ApiTemplate import ApiTemplate 9 from app.util.component.ApiTemplate import ApiTemplate
@@ -2,11 +2,11 @@ @@ -2,11 +2,11 @@
2 #author: 4N 2 #author: 4N
3 #createtime: 2021/5/18 3 #createtime: 2021/5/18
4 #email: nheweijun@sina.com 4 #email: nheweijun@sina.com
5 -from app.util.component.ParameterUtil import ParameterUtil,StructurePrint  
6 -from flask import current_app 5 +from app.util.component.ParameterUtil import ParameterUtil
  6 +from flask import current_app,request
7 import traceback 7 import traceback
8 import json 8 import json
9 - 9 +from app.modules.auth.models import OAuth2Token,User,db
10 10
11 class ApiTemplate: 11 class ApiTemplate:
12 #模板方法 12 #模板方法
@@ -15,14 +15,23 @@ class ApiTemplate: @@ -15,14 +15,23 @@ class ApiTemplate:
15 api_name="[未命名]" 15 api_name="[未命名]"
16 def __init__(self): 16 def __init__(self):
17 pass 17 pass
  18 +
18 def get_para(self): 19 def get_para(self):
19 self.para = ParameterUtil.get_parameter() 20 self.para = ParameterUtil.get_parameter()
  21 + token = request.headers.get('Authorization')
  22 + if token:
  23 + token = token.split(" ")[-1]
  24 + user = User.query.join(OAuth2Token).filter(OAuth2Token.access_token == token).one_or_none()
  25 + if user:
  26 + self.para["creator"] = user.username
  27 +
20 def para_check(self): 28 def para_check(self):
21 pass 29 pass
22 def process(self): 30 def process(self):
23 pass 31 pass
24 def log(self): 32 def log(self):
25 - current_app.logger.info("{}调用了:{},参数为:{}".format(self.para.get("apiuser","4N"),self.api_name,json.dumps(self.para,ensure_ascii=False))) 33 + ip = request.remote_addr
  34 + current_app.logger.info("[{}][{}]调用了:{},参数为:{}".format(ip,self.para.get("creator","非法用户"),self.api_name,json.dumps(self.para,ensure_ascii=False)))
26 # 接口模板 35 # 接口模板
27 @property 36 @property
28 def result(self): 37 def result(self):
@@ -37,5 +46,3 @@ class ApiTemplate: @@ -37,5 +46,3 @@ class ApiTemplate:
37 res["msg"]=e.__str__() 46 res["msg"]=e.__str__()
38 current_app.logger.error(e.__str__()+":"+ traceback.format_exc()) 47 current_app.logger.error(e.__str__()+":"+ traceback.format_exc())
39 return res 48 return res
40 -  
41 -  
@@ -8,7 +8,7 @@ import uuid @@ -8,7 +8,7 @@ import uuid
8 import os 8 import os
9 import stat 9 import stat
10 import platform 10 import platform
11 -from app.util.component.StructurePrint import StructurePrint 11 +
12 class FileProcess: 12 class FileProcess:
13 @classmethod 13 @classmethod
14 def save(cls,parent): 14 def save(cls,parent):
@@ -24,7 +24,7 @@ class FileProcess: @@ -24,7 +24,7 @@ class FileProcess:
24 filename = file.filename.split('"')[0] 24 filename = file.filename.split('"')[0]
25 25
26 store_file = os.path.join(dir_path, filename) 26 store_file = os.path.join(dir_path, filename)
27 - StructurePrint().print(store_file) 27 +
28 28
29 file.save(store_file) 29 file.save(store_file)
30 if platform.system().lower() == 'linux': 30 if platform.system().lower() == 'linux':
@@ -5,6 +5,7 @@ @@ -5,6 +5,7 @@
5 from osgeo import ogr 5 from osgeo import ogr
6 from sqlalchemy import create_engine 6 from sqlalchemy import create_engine
7 from sqlalchemy.orm import sessionmaker,Session 7 from sqlalchemy.orm import sessionmaker,Session
  8 +from app.util.component.StructurePrint import StructurePrint
8 class PGUtil: 9 class PGUtil:
9 10
10 @classmethod 11 @classmethod
@@ -25,6 +26,7 @@ class PGUtil: @@ -25,6 +26,7 @@ class PGUtil:
25 26
26 @classmethod 27 @classmethod
27 def get_info_from_sqlachemy_uri(cls,uri): 28 def get_info_from_sqlachemy_uri(cls,uri):
  29 +
28 parts = uri.split(":") 30 parts = uri.split(":")
29 user = parts[1][2:] 31 user = parts[1][2:]
30 32
@@ -126,7 +128,7 @@ class PGUtil: @@ -126,7 +128,7 @@ class PGUtil:
126 layer = pg_ds.GetLayerByName(table_name) 128 layer = pg_ds.GetLayerByName(table_name)
127 if not layer: 129 if not layer:
128 return None 130 return None
129 - srid_sql = '''select st_srid({}) from public."{}" limit 1'''.format(layer.GetGeometryColumn(), layer.GetName()) 131 + srid_sql = '''select st_srid("{}") from public."{}" limit 1'''.format(layer.GetGeometryColumn(), layer.GetName())
130 srid_layer = pg_ds.ExecuteSQL(srid_sql) 132 srid_layer = pg_ds.ExecuteSQL(srid_sql)
131 srid_feature = srid_layer.GetNextFeature() 133 srid_feature = srid_layer.GetNextFeature()
132 if srid_feature: 134 if srid_feature:
@@ -145,7 +147,8 @@ class PGUtil: @@ -145,7 +147,8 @@ class PGUtil:
145 pass 147 pass
146 148
147 if __name__ == '__main__': 149 if __name__ == '__main__':
148 - session:Session = PGUtil.get_db_session("postgresql://postgres:chinadci@172.26.60.100:5432/template1")  
149 - result = session.execute("SELECT datname FROM pg_database;")  
150 - for re in result:  
151 - print(re)  
  150 + # session:Session = PGUtil.get_db_session("postgresql://postgres:chinadci@172.26.60.100:5432/template1")
  151 + # result = session.execute("SELECT datname FROM pg_database;")
  152 + # for re in result:
  153 + # print(re)
  154 + print(PGUtil.get_info_from_sqlachemy_uri("postgresql://postgres:chinadci@123@10.101.21.63:5432/postgres"))
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 5
6 from flask import request 6 from flask import request
7 import json 7 import json
8 -from .StructurePrint import StructurePrint 8 +
9 9
10 class ParameterUtil: 10 class ParameterUtil:
11 11
@@ -52,7 +52,7 @@ class Migration(): @@ -52,7 +52,7 @@ class Migration():
52 52
53 53
54 def process(self): 54 def process(self):
55 - # self.dmap_dms_migration() 55 + self.dmap_dms_migration()
56 self.dmap_migration() 56 self.dmap_migration()
57 self.dmap_server_migration() 57 self.dmap_server_migration()
58 58
@@ -130,6 +130,7 @@ class Migration(): @@ -130,6 +130,7 @@ class Migration():
130 dmap_feat_all.SetField("node", 1) 130 dmap_feat_all.SetField("node", 1)
131 dmap_feat_all.SetField("overview", "http://{}/DMap/Services/{}/mapserver/WmsService?REQUEST=GetThumbnail".format(self.dmap_engine,ser["service"])) 131 dmap_feat_all.SetField("overview", "http://{}/DMap/Services/{}/mapserver/WmsService?REQUEST=GetThumbnail".format(self.dmap_engine,ser["service"]))
132 dmap_feat_all.SetField("create_time",ser["sctime"] ) 132 dmap_feat_all.SetField("create_time",ser["sctime"] )
  133 + ser["updatetime"] = ser["updatetime"] if ser["updatetime"] is not None else ser["sctime"]
133 dmap_feat_all.SetField("update_time",ser["updatetime"] ) 134 dmap_feat_all.SetField("update_time",ser["updatetime"] )
134 dmap_feat_all.SetField("description", ser["abstract"]) 135 dmap_feat_all.SetField("description", ser["abstract"])
135 dmapmanager_service_all.CreateFeature(dmap_feat_all) 136 dmapmanager_service_all.CreateFeature(dmap_feat_all)
@@ -527,7 +528,7 @@ class Migration(): @@ -527,7 +528,7 @@ class Migration():
527 layer = pg_ds.GetLayerByName(table_name) 528 layer = pg_ds.GetLayerByName(table_name)
528 if not layer: 529 if not layer:
529 return None 530 return None
530 - srid_sql = '''select st_srid({}) from public."{}" limit 1'''.format(layer.GetGeometryColumn(), layer.GetName()) 531 + srid_sql = '''select st_srid("{}") from public."{}" limit 1'''.format(layer.GetGeometryColumn(), layer.GetName())
531 srid_layer = pg_ds.ExecuteSQL(srid_sql) 532 srid_layer = pg_ds.ExecuteSQL(srid_sql)
532 srid_feature = srid_layer.GetNextFeature() 533 srid_feature = srid_layer.GetNextFeature()
533 if srid_feature: 534 if srid_feature:
@@ -7,4 +7,4 @@ docker rm $dn @@ -7,4 +7,4 @@ docker rm $dn
7 7
8 curPath=$(readlink -f "$(dirname "$0")") 8 curPath=$(readlink -f "$(dirname "$0")")
9 9
10 -docker run -d --name $dn -v $curPath:/usr/src/app -w /usr/src/app dci/dmapmanager:4.1-gdb python3 ./migration.py 10 +docker run -d --name $dn -v $curPath:/usr/src/app -w /usr/src/app dci/dmapmanager:4.0 python3 ./migration.py
1 flask==1.1.2 1 flask==1.1.2
2 SQLAlchemy==1.3.17 2 SQLAlchemy==1.3.17
3 Flask-SQLAlchemy==2.4.3 3 Flask-SQLAlchemy==2.4.3
4 -gevent==20.9.0  
5 -gunicorn==20.0.4  
6 flask_cors==3.0.8 4 flask_cors==3.0.8
7 flasgger==0.9.5 5 flasgger==0.9.5
8 #GDAL==3.2.1 6 #GDAL==3.2.1
9 psycopg2-binary==2.8.5 7 psycopg2-binary==2.8.5
10 pyDes==2.0.1 8 pyDes==2.0.1
11 gevent-websocket==0.10.1 9 gevent-websocket==0.10.1
12 -opencv-python==4.5.1.48 10 +opencv-python-headless==4.5.1.48
13 mod_wsgi==4.8.0 11 mod_wsgi==4.8.0
14 thrift==0.13.0 12 thrift==0.13.0
15 Authlib==0.13 13 Authlib==0.13
注册登录 后发表评论