提交 205d2934828f5bf03f3751c51a0c23d63756940a

作者 nheweijun
1 个父辈 8a3b68e3

2021.8.10 14:44 update table_refresh with task

正在显示 1 个修改的文件 包含 143 行增加75 行删除
... ... @@ -5,7 +5,7 @@
5 5
6 6
7 7 import traceback
8   -from app.models import Table,Database,DES,Columns,db,TableVacuate
  8 +from app.models import Table,Database,DES,Columns,db,TableVacuate,Task,Process
9 9
10 10 from osgeo.ogr import DataSource,FeatureDefn,FieldDefn,Layer
11 11
... ... @@ -20,59 +20,108 @@ from app.util.component.StructuredPrint import StructurePrint
20 20 from app.util.component.ApiTemplate import ApiTemplate
21 21 from app.util.component.GeometryAdapter import GeometryAdapter
22 22
  23 +import multiprocessing
  24 +import configure
  25 +
23 26 class Api(ApiTemplate):
24 27 api_name = "数据刷新"
25 28 def process(self):
26   -
27   -
28 29 res = {}
29   - pg_ds =None
30   - data_session=None
31   - this_time = datetime.datetime.now()
32 30 try:
33   -
  31 +
34 32 database_guid = self.para.get("database_guid")
35 33 database = Database.query.filter_by(guid=database_guid).one_or_none()
36   -
  34 +
37 35 if not database:
38 36 raise Exception("数据库不存在!")
39   - spatial_tables = Table.query.order_by(Table.create_time.desc()).filter_by(database_guid=database_guid).filter(Table.table_type!=0).all()
  37 + # 初始化task
  38 + task_guid = uuid.uuid1().__str__()
  39 +
  40 + # result = self.table_refresh(database)
  41 +
  42 + refresh_process = multiprocessing.Process(target=self.table_refresh,args=(database,task_guid))
  43 + refresh_process.start()
  44 +
  45 + task = Task(guid=task_guid,
  46 + name="数据库{}更新".format(database.alias),
  47 + create_time=datetime.datetime.now(),
  48 + state=0,
  49 + task_type=3,
  50 + creator=self.para.get("creator"),
  51 + file_name=None,
  52 + database_guid=database.guid,
  53 + process="数据库更新中")
  54 +
  55 + db.session.add(task)
  56 + db.session.commit()
  57 + res["msg"] = "数据库更新已提交!"
  58 + res["data"] = task_guid
  59 + res["result"] = True
  60 +
  61 + except Exception as e:
  62 + print(traceback.format_exc())
  63 + raise e
  64 + return res
  65 +
  66 +
  67 + def table_refresh(self,database,task_guid):
  68 +
  69 + pg_ds =None
  70 + data_session=None
  71 + result = {}
  72 + sys_session = None
  73 + try:
  74 + sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  75 +
  76 +
  77 + this_time = datetime.datetime.now()
  78 + database_guid = database.guid
  79 +
  80 + # 已注册空间表
  81 + spatial_tables = sys_session.query(Table).order_by(Table.create_time.desc()).filter_by(database_guid=database_guid).filter(
  82 + Table.table_type != 0).all()
  83 +
  84 + # 已注册空间表名
40 85 spatial_tables_names = [table.name for table in spatial_tables]
41   - db_tables_names = []
42   -
43 86
  87 + # 实体库datasource
44 88 pg_ds: DataSource = PGUtil.open_pg_data_source(1, DES.decode(database.sqlalchemy_uri))
  89 +
45 90 # 更新空间表
46 91 # 增加表
47   - self.add_spatail_table(database, pg_ds, spatial_tables_names, db_tables_names, this_time)
48   -
  92 + db_tables_names = self.add_spatail_table(database, pg_ds, sys_session,spatial_tables_names, this_time)# 实体库中空间表名
  93 +
49 94 # 删除/修改表
50   - self.edit_spatial_table(pg_ds, spatial_tables, db_tables_names, this_time)
  95 + self.edit_spatial_table(pg_ds, sys_session,spatial_tables, db_tables_names, this_time)
51 96
52   - # 处理抽稀表
53   - self.deal_vacuate_table(pg_ds,database.guid)
  97 + # 空间表处理完毕
  98 + sys_session.commit()
54 99
  100 + # 处理抽稀表
  101 + self.deal_vacuate_table(pg_ds,sys_session, database.guid)
55 102
56 103 # 空间表处理完毕
57   - db.session.commit()
  104 + sys_session.commit()
58 105
59   -
60 106 # 注册普通表
61 107 # 实体库连接
62 108 data_session: Session = PGUtil.get_db_session(DES.decode(database.sqlalchemy_uri))
63   -
64   - # 空间表
65   - spatial_tables = Table.query.order_by(Table.create_time.desc()).filter_by(database_guid=database_guid).filter(Table.table_type!=0).all()
  109 +
  110 + # 处理后空间表
  111 + spatial_tables = sys_session.query(Table).order_by(Table.create_time.desc()).filter_by(database_guid=database_guid).filter(
  112 + Table.table_type != 0).all()
  113 + # 处理后空间表名
66 114 spatial_tables_names = [table.name for table in spatial_tables]
67   -
68   -
69   - #原有普通表
70   - common_tables = Table.query.order_by(Table.create_time.desc()).filter_by(database_guid=database_guid).filter(Table.table_type==0).all()
  115 +
  116 + # 原有普通表
  117 + common_tables = sys_session.query(Table).order_by(Table.create_time.desc()).filter_by(database_guid=database_guid).filter(
  118 + Table.table_type == 0).all()
  119 + # 原有普通表 名
71 120 origin_common_tables_name = [table.name for table in common_tables]
72   -
73   -
  121 +
74 122 # 现有普通表
75   - real_common_tables_name =[]
  123 + real_common_tables_name = []
  124 +
76 125 # 只注册public中的表
77 126 common_result = data_session.execute(
78 127 "select relname as tabname from pg_class c where relkind = 'r' and relnamespace=2200 and relname not like 'pg_%' and relname not like 'sql_%' order by relname").fetchall()
... ... @@ -80,36 +129,56 @@ class Api(ApiTemplate):
80 129 table_name = re[0]
81 130 if table_name not in spatial_tables_names and (not table_name.__contains__("_vacuate_")):
82 131 real_common_tables_name.append(table_name)
83   -
  132 +
84 133 # 增加新普通表
85 134
86   - self.add_common_table(data_session, database_guid, real_common_tables_name, origin_common_tables_name,
87   - this_time)
  135 + self.add_common_table(data_session, sys_session, database_guid, real_common_tables_name, origin_common_tables_name,
  136 + this_time)
88 137
89   -
90   - #删除、修改普通表
91   - self.edit_common_table(data_session, database_guid, real_common_tables_name, origin_common_tables_name,
92   - this_time)
  138 + # 删除、修改普通表
  139 + self.edit_common_table(data_session,sys_session, database_guid, real_common_tables_name, origin_common_tables_name,
  140 + this_time)
93 141
94   - db.session.commit()
95   - res["msg"] = "刷新数据成功!"
96   - res["result"] = True
  142 + sys_session.commit()
  143 + result["data"] = "刷新数据成功!"
  144 + result["state"] = 1
  145 + sys_session.query(Task).filter_by(guid=task_guid).update(
  146 + {"state": 1, "update_time": datetime.datetime.now(),"process":"更新成功"})
  147 + sys_session.commit()
97 148
98 149 except Exception as e:
99   - raise e
100   -
  150 + try:
  151 + print(traceback.format_exc())
  152 + sys_session.query(Task).filter_by(guid=task_guid).update(
  153 + {"state": -1, "update_time": datetime.datetime.now(),"process":"更新失败"})
  154 + message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())
  155 + task_process_guid = uuid.uuid1().__str__()
  156 + task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),
  157 + task_guid=task_guid)
  158 + sys_session.add(task_process)
  159 + sys_session.commit()
  160 + except Exception as ee:
  161 + print(traceback.format_exc())
101 162 finally:
102 163 if pg_ds:
103 164 pg_ds.Destroy()
104 165 if data_session:
105 166 data_session.close()
106   -
107   - return res
108   -
109   -
110   - def add_spatail_table(self,database,pg_ds,spatial_tables_names,db_tables_names,this_time):
111   - # 更新空间表
112   - # 增加表
  167 + if sys_session:
  168 + sys_session.close()
  169 + return result
  170 +
  171 + def add_spatail_table(self,database,pg_ds,sys_session,spatial_tables_names,this_time):
  172 + '''
  173 + 注册新增空间表
  174 + :param database:
  175 + :param pg_ds:
  176 + :param spatial_tables_names: 已注册空间表名
  177 + :param this_time:
  178 + :return: 实体库中空间表名
  179 + '''
  180 +
  181 + db_tables_names=[]
113 182
114 183 for i in range(pg_ds.GetLayerCount()):
115 184 layer: Layer = pg_ds.GetLayer(i)
... ... @@ -123,7 +192,7 @@ class Api(ApiTemplate):
123 192 # 只注册public的空间表,其他表空间的表名会有.
124 193 if layer.GetName().__contains__("."):
125 194 continue
126   -
  195 + # 略过抽稀表
127 196 if layer.GetName().__contains__("_vacuate_"):
128 197 continue
129 198
... ... @@ -152,14 +221,13 @@ class Api(ApiTemplate):
152 221 table_guid = uuid.uuid1().__str__()
153 222 table = Table(guid=table_guid,
154 223 database_guid=database.guid,
155   - alias=layer.GetName(),
  224 + # alias=layer.GetName(),
156 225 name=layer.GetName(), create_time=this_time, update_time=this_time,
157 226 table_type=GeometryAdapter.get_table_type(layer.GetGeomType()),
158 227 extent=extent,
159 228 feature_count=feature_count
160 229 )
161   - db.session.add(table)
162   -
  230 + sys_session.add(table)
163 231 feature_defn: FeatureDefn = layer.GetLayerDefn()
164 232
165 233 for i in range(feature_defn.GetFieldCount()):
... ... @@ -169,9 +237,10 @@ class Api(ApiTemplate):
169 237 "") else field_defn.GetAlternativeName()
170 238 column = Columns(guid=uuid.uuid1().__str__(), table_guid=table_guid,
171 239 name=field_name, alias=field_alias, create_time=this_time, update_time=this_time)
172   - db.session.add(column)
  240 + sys_session.add(column)
  241 + return db_tables_names
173 242
174   - def deal_vacuate_table(self,pg_ds,database_guid):
  243 + def deal_vacuate_table(self,pg_ds,sys_session,database_guid):
175 244
176 245
177 246 for i in range(pg_ds.GetLayerCount()):
... ... @@ -198,30 +267,30 @@ class Api(ApiTemplate):
198 267 except:
199 268 pass
200 269
201   - base_table =Table.query.filter_by(name=base_layer_name,database_guid=database_guid).one_or_none()
  270 + base_table =sys_session.query(Table).filter_by(name=base_layer_name,database_guid=database_guid).one_or_none()
202 271 if base_table:
203   - if not TableVacuate.query.filter_by(table_guid=base_table.guid,name=l_name).one_or_none():
  272 + if not sys_session.query(TableVacuate).filter_by(table_guid=base_table.guid,name=l_name).one_or_none():
204 273 table_vacuate = TableVacuate(guid=uuid.uuid1().__str__(),
205 274 table_guid=base_table.guid,
206 275 level=level,
207 276 name=l_name,
208 277 pixel_distance=float(pixel_distance_str))
209   - db.session.add(table_vacuate)
210   - Table.query.filter_by(guid=base_table.guid).update({"is_vacuate": 1})
  278 + sys_session.add(table_vacuate)
  279 +
  280 + sys_session.query(Table).filter_by(guid=base_table.guid).update({"is_vacuate": 1})
211 281 else:
212 282 kk=1
213 283
214 284
215 285
216   -
217   - def edit_spatial_table(self,pg_ds,spatial_tables,db_tables_names,this_time):
  286 + def edit_spatial_table(self,pg_ds,sys_session,spatial_tables,db_tables_names,this_time):
218 287
219 288 for table in spatial_tables:
220 289
221 290 # 删除表
222 291 if table.name not in db_tables_names:
223 292 StructurePrint.print("空间表减少!")
224   - db.session.delete(table)
  293 + sys_session.delete(table)
225 294 # 修改表
226 295 else:
227 296 layer: Layer = pg_ds.GetLayerByName(table.name)
... ... @@ -252,15 +321,13 @@ class Api(ApiTemplate):
252 321 column = Columns(guid=uuid.uuid1().__str__(), table_guid=table.guid,
253 322 name=field_name, alias=field_alias, create_time=this_time,
254 323 update_time=this_time)
255   - db.session.add(column)
  324 + sys_session.add(column)
256 325
257 326 # 删除列
258 327 for column in columns:
259 328 if column.name not in db_columns_names:
260 329 StructurePrint.print("{}空间表属性减少!".format(table.name))
261   - db.session.delete(column)
262   -
263   -
  330 + sys_session.delete(column)
264 331
265 332 # 范围统计和数量统计以100w为界限
266 333 query_count_layer: Layer = pg_ds.ExecuteSQL(
... ... @@ -271,6 +338,7 @@ class Api(ApiTemplate):
271 338 if feature_count < 1000000:
272 339 feature_count = layer.GetFeatureCount()
273 340 ext = layer.GetExtent()
  341 +
274 342 else:
275 343 query_ext_layer: Layer = pg_ds.ExecuteSQL(
276 344 "select geometry(ST_EstimatedExtent('public', '{}','{}'))".format(l_name,
... ... @@ -285,11 +353,11 @@ class Api(ApiTemplate):
285 353 # 修改要素量
286 354 if not table.feature_count.__eq__(feature_count):
287 355 StructurePrint.print("{}空间表要素!".format(table.name))
288   - Table.query.filter_by(guid=table.guid).update({"feature_count": feature_count,
  356 + sys_session.query(Table).filter_by(guid=table.guid).update({"feature_count": feature_count,
289 357 "extent": extent})
290 358
291 359
292   - def add_common_table(self,data_session,database_guid,real_common_tables_name,origin_common_tables_name,this_time):
  360 + def add_common_table(self,data_session,sys_session,database_guid,real_common_tables_name,origin_common_tables_name,this_time):
293 361 for table_name in real_common_tables_name:
294 362 if table_name not in origin_common_tables_name:
295 363 StructurePrint.print("{}非空间表增加!".format(table_name))
... ... @@ -303,7 +371,7 @@ class Api(ApiTemplate):
303 371 feature_count=count
304 372 )
305 373
306   - db.session.add(table)
  374 + sys_session.add(table)
307 375
308 376 sql = '''
309 377 SELECT
... ... @@ -325,22 +393,22 @@ class Api(ApiTemplate):
325 393 for col in cols:
326 394 column = Columns(guid=uuid.uuid1().__str__(), table_guid=table_guid,
327 395 name=col[1], create_time=this_time, update_time=this_time)
328   - db.session.add(column)
  396 + sys_session.add(column)
329 397
330 398 # 删除不存在的表
331 399 for n in origin_common_tables_name:
332 400 if n not in real_common_tables_name:
333 401 tables = Table.query.filter_by(name=n).filter_by(database_guid=database_guid).all()
334 402 for table in tables:
335   - db.session.delete(table)
  403 + sys_session.delete(table)
336 404
337   - def edit_common_table(self,data_session,database_guid,real_common_tables_name,origin_common_tables_name,this_time):
  405 + def edit_common_table(self,data_session,sys_session,database_guid,real_common_tables_name,origin_common_tables_name,this_time):
338 406 for table_name in origin_common_tables_name:
339   - tables = Table.query.filter_by(name=table_name).filter_by(database_guid=database_guid).all()
  407 + tables = sys_session.query(Table).filter_by(name=table_name).filter_by(database_guid=database_guid).all()
340 408 for table in tables:
341 409 if table_name not in real_common_tables_name:
342 410 StructurePrint.print("{}非空间表减少!".format(table_name))
343   - db.session.delete(table)
  411 + sys_session.delete(table)
344 412 # 修改表
345 413 else:
346 414 columns = table.relate_columns
... ... @@ -371,13 +439,13 @@ class Api(ApiTemplate):
371 439 StructurePrint.print("{}表要素属性增加!".format(table_name))
372 440 column = Columns(guid=uuid.uuid1().__str__(), table_guid=table.guid,
373 441 name=col, create_time=this_time, update_time=this_time)
374   - db.session.add(column)
  442 + sys_session.add(column)
375 443
376 444 # 属性减少
377 445 for column in columns:
378 446 if column.name not in real_cols_name:
379 447 StructurePrint.print("{}表要素属性减少!".format(table_name))
380   - db.session.delete(column)
  448 + sys_session.delete(column)
381 449
382 450 # 修改要素量
383 451 sql = 'select count(*) from "{}"'.format(table_name)
... ... @@ -385,7 +453,7 @@ class Api(ApiTemplate):
385 453 count = data_session.execute(sql).fetchone()[0]
386 454 if not table.feature_count.__eq__(count):
387 455 StructurePrint.print("{}表要素变化!".format(table_name))
388   - Table.query.filter_by(guid=table.guid).update({"feature_count": count})
  456 + sys_session.query(Table).filter_by(guid=table.guid).update({"feature_count": count})
389 457
390 458
391 459
... ...
注册登录 后发表评论