提交 377a8e2043ea003aae593e85ad3fd67e4e3807c8

作者 nheweijun
1 个父辈 13263e89

2021.12.03 优化任务管理

正在显示 29 个修改的文件 包含 331 行增加1014 行删除
... ... @@ -12,13 +12,10 @@ from app.models import db
12 12 from app.modules.auth.oauth2 import config_oauth, myCodeIDToken
13 13 from flasgger import Swagger
14 14 import logging
15   -from app.util.component.EntryData import EntryData
16   -from app.util.component.EntryDataVacuate import EntryDataVacuate
17   -import threading
18 15 from app.util.component.StructuredPrint import StructurePrint
19 16 from app.util.component.PGUtil import PGUtil
20 17 import os
21   -from app.modules.data.io.data_entry_center import data_entry_center
  18 +
22 19
23 20 class JSONEncoder(_JSONEncoder):
24 21 """
... ... @@ -93,13 +90,6 @@ def create_app():
93 90 for api in find_class(scan, BlueprintApi):
94 91 app.register_blueprint(api.bp)
95 92
96   - # 入库监测线程
97   - # @app.before_first_request
98   - # def data_entry_process():
99   - # StructurePrint().print("start listen")
100   - # process = threading.Thread(target=data_entry_center)
101   - # process.start()
102   -
103 93 # 不检测https
104 94 os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
105 95 return app
... ...
  1 +GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137.0,298.257223563]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]]
\ No newline at end of file
... ...
  1 +GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137.0,298.257223563]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]]
\ No newline at end of file
... ...
... ... @@ -20,9 +20,12 @@ from app.util.component.StructuredPrint import StructurePrint
20 20 import multiprocessing
21 21 import datetime
22 22 from app.util.component.TaskController import TaskController
  23 +from app.util.component.TaskWriter import TaskWriter
23 24
24 25 class Api(ApiTemplate):
25 26
  27 + api_name = "数据下载"
  28 +
26 29 def process(self):
27 30
28 31 res = {}
... ... @@ -47,7 +50,6 @@ class Api(ApiTemplate):
47 50 db.session.add(task)
48 51 db.session.commit()
49 52
50   -
51 53 res["data"] = "下载任务已提交!"
52 54 res["result"] = True
53 55
... ... @@ -58,8 +60,8 @@ class Api(ApiTemplate):
58 60
59 61 def download(self,task_guid,para):
60 62
61   - sys_session = None
62 63 ds: DataSource = None
  64 + task_writer = None
63 65
64 66 # 设置编码
65 67 encoding = para.get("encoding")
... ... @@ -72,12 +74,13 @@ class Api(ApiTemplate):
72 74
73 75 #任务控制,等待执行
74 76 TaskController.wait(task_guid)
75   -
76   - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  77 + task_writer = TaskWriter(task_guid)
  78 + task_writer.update_task({"state":2,"update_time":datetime.datetime.now(),"process" : "下载中"})
  79 + task_writer.update_process("开始下载...")
77 80
78 81 table_names = para.get("table_name").split(",")
79 82 database_guid = para.get("database_guid")
80   - database = sys_session.query(Database).filter_by(guid=database_guid).one_or_none()
  83 + database = task_writer.session.query(Database).filter_by(guid=database_guid).one_or_none()
81 84 if not database:
82 85 raise Exception("数据库不存在!")
83 86
... ... @@ -89,36 +92,25 @@ class Api(ApiTemplate):
89 92 if download_type.__eq__("shp"):
90 93 data = self.download_shp(table_names, ds)
91 94 if download_type.__eq__("gdb"):
92   - data = self.download_gdb(sys_session,table_names, ds, database_guid)
93   -
94   - sys_session.query(Task).filter_by(guid=task_guid).update({"state":1,"update_time":datetime.datetime.now(),
95   - "process" : "下载完成",
96   - "parameter":data[0]["download_url"]})
97   - sys_session.commit()
  95 + data = self.download_gdb(task_writer.session,table_names, ds, database_guid)
98 96
  97 + task_writer.update_task({"state":1,"update_time":datetime.datetime.now(),"process" : "下载完成","parameter":data[0]["download_url"]})
  98 + task_writer.update_process("下载完成!")
99 99
100 100 except Exception as e:
101 101 try:
102   - sys_session.query(Task).filter_by(guid=task_guid).update({"state": -1,"update_time":datetime.datetime.now(),
103   - "process": "下载失败"})
104   -
105   - message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())
106   - task_process_guid = uuid.uuid1().__str__()
107   - task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),
108   - task_guid=task_guid)
109   - sys_session.add(task_process)
110   - sys_session.commit()
  102 + task_writer.update_task({"state": -1,"update_time":datetime.datetime.now(),"process": "下载失败"})
  103 + task_writer.update_process( e.__str__())
111 104 except Exception as ee:
112   - print(traceback.format_exc())
  105 + StructurePrint().print(ee.__str__())
113 106 raise e
114 107 finally:
115 108 try:
116 109 if ds:
117 110 ds.Destroy()
118   - if sys_session:
119   - sys_session.close()
120   - except:
121   - print(traceback.format_exc())
  111 + task_writer.close()
  112 + except Exception as e:
  113 + StructurePrint().print(e.__str__())
122 114
123 115
124 116 def download_shp(self,table_names,ds):
... ... @@ -154,10 +146,7 @@ class Api(ApiTemplate):
154 146 StructurePrint().print("{}图层已下载{}个对象".format(table_name, count))
155 147
156 148 data_source.Destroy()
157   -
158   -
159 149 ZipUtil.create_zip(os.path.join(parent, "file_tmp", table_name+"_"+uuid_) + ".zip", [dirpath])
160   -
161 150 return "http://" + configure.deploy_ip_host + "/API/IO/Download/{}".format(table_name+"_"+uuid_ + ".zip")
162 151
163 152
... ... @@ -175,8 +164,7 @@ class Api(ApiTemplate):
175 164 gdb_path = os.path.join(parent, "file_tmp", uuid_+".gdb")
176 165
177 166 gdb_ds: DataSource = gdb_driver.CreateDataSource(gdb_path)
178   -
179   -
  167 +
180 168 for table_name in table_names:
181 169
182 170 layer: Layer = ds.GetLayerByName(table_name)
... ... @@ -196,7 +184,6 @@ class Api(ApiTemplate):
196 184 # schema = layer.schema
197 185 pg_layer.CreateFields(schema)
198 186
199   -
200 187 # gdb 不支持fid=0的要素,所以识别到后要+1
201 188 offset = 0
202 189 f1:Feature = layer.GetNextFeature()
... ... @@ -218,8 +205,6 @@ class Api(ApiTemplate):
218 205
219 206
220 207 return data
221   -
222   -
223 208
224 209
225 210 api_doc={
... ... @@ -259,4 +244,7 @@ class Api(ApiTemplate):
259 244 }
260 245 }
261 246 }
262   -}
\ No newline at end of file
  247 +
  248 +}
  249 +
  250 +
... ...
... ... @@ -18,9 +18,9 @@ from sqlalchemy.orm import Session
18 18 import configure
19 19 import datetime
20 20 import multiprocessing
21   -from app.util.component.EntryDataVacuate import EntryDataVacuate
  21 +from ..util.EntryDataVacuate import EntryDataVacuate
22 22 from app.util.component.TaskController import TaskController
23   -
  23 +from app.util.component.TaskWriter import TaskWriter
24 24 class Api(ApiTemplate):
25 25
26 26 api_name = "通过meta入库"
... ... @@ -105,29 +105,23 @@ class Api(ApiTemplate):
105 105
106 106 def entry(self,task_guid):
107 107
108   - sys_session: Session = None
  108 + task_writer = None
109 109 this_task_layer = []
110 110 try:
111 111 #任务控制,等待执行
112 112 TaskController.wait(task_guid)
113 113
114   - sys_session: Session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
115   -
  114 + task_writer = TaskWriter(task_guid)
116 115
117   - task:Task = sys_session.query(Task).filter_by(guid=task_guid).one_or_none()
  116 + task:Task = task_writer.session.query(Task).filter_by(guid=task_guid).one_or_none()
118 117 parameter = json.loads(task.parameter)
119   -
120   - task.state = 2
121   - task.process = "入库中"
122   - sys_session.commit()
123   -
  118 + task_writer.update_task({"state": 2, "prcess": "入库中"})
124 119
125 120 #处理修改入库信息
126   -
127 121 metas: list = json.loads(parameter.get("meta").__str__())
128 122 parameter["meta"] = metas
129 123
130   - database = sys_session.query(Database).filter_by(guid=task.database_guid).one_or_none()
  124 + database = task_writer.session.query(Database).filter_by(guid=task.database_guid).one_or_none()
131 125 pg_ds: DataSource = PGUtil.open_pg_data_source(1, DES.decode(database.sqlalchemy_uri))
132 126
133 127
... ... @@ -148,8 +142,8 @@ class Api(ApiTemplate):
148 142 task_guid=task.guid,
149 143 name=layer_name)
150 144
151   - sys_session.add(iln)
152   - sys_session.commit()
  145 + task_writer.session.add(iln)
  146 + task_writer.session.commit()
153 147 this_task_layer.append(layer_name)
154 148 # 修改表名
155 149 meta["layer"][layer_name_origin] = layer_name
... ... @@ -160,23 +154,18 @@ class Api(ApiTemplate):
160 154
161 155 #完成后
162 156 for ln in this_task_layer:
163   - iln = sys_session.query(InsertingLayerName).filter_by(name=ln).one_or_none()
164   - sys_session.delete(iln)
  157 + iln = task_writer.session.query(InsertingLayerName).filter_by(name=ln).one_or_none()
  158 + task_writer.session.delete(iln)
165 159
166 160 except Exception as e:
167   - sys_session.query(Task).filter_by(guid=task_guid).update(
168   - {"state": -1, "process": "入库失败"})
169   -
  161 + task_writer.update_task({"state": -1, "process": "入库失败"})
170 162 for ln in this_task_layer:
171   - iln = sys_session.query(InsertingLayerName).filter_by(name=ln).one_or_none()
172   - sys_session.delete(iln)
173   -
174   - sys_session.commit()
  163 + iln = task_writer.session.query(InsertingLayerName).filter_by(name=ln).one_or_none()
  164 + task_writer.session.delete(iln)
175 165 StructurePrint().print(e.__str__(), "error")
176 166 finally:
177   - sys_session.commit()
178   - if sys_session:
179   - sys_session.close()
  167 + task_writer.session.commit()
  168 + task_writer.close()
180 169
181 170
182 171 api_doc={
... ...
1   -# coding=utf-8
2   -#author: 4N
3   -#createtime: 2021/9/14
4   -#email: nheweijun@sina.com
5   -
6   -import configure
7   -from ..models import InsertingLayerName, Database, Task,DES
8   -
9   -from sqlalchemy.orm import Session
10   -import multiprocessing
11   -from app.util.component.EntryDataVacuate import EntryDataVacuate
12   -import json
13   -from sqlalchemy import distinct
14   -import uuid
15   -from osgeo.ogr import DataSource
16   -from app.util.component.StructuredPrint import StructurePrint
17   -from app.util.component.PGUtil import PGUtil
18   -import time
19   -
20   -def data_entry_center():
21   - running_dict = {}
22   - sys_session: Session = PGUtil.get_db_session(
23   - configure.SQLALCHEMY_DATABASE_URI)
24   -
25   - while True:
26   -
27   - try:
28   - time.sleep(3)
29   -
30   - # 已经结束的进程 从监测中删除
31   - remove_process = []
32   - for process, layer_names in running_dict.items():
33   - if not process.is_alive():
34   - for l in layer_names:
35   - inserted = sys_session.query(
36   - InsertingLayerName).filter_by(name=l).one_or_none()
37   - if inserted:
38   - sys_session.delete(inserted)
39   - sys_session.commit()
40   - remove_process.append(process)
41   - for process in remove_process:
42   - running_dict.pop(process)
43   -
44   -
45   - # 入库进程少于阈值,开启入库进程
46   - inter_size = sys_session.query(
47   - distinct(InsertingLayerName.task_guid)).count()
48   -
49   - if inter_size < configure.entry_data_thread:
50   - # 锁表
51   - ready_task: Task = sys_session.query(Task).filter_by(state=0, task_type=1).order_by(
52   - Task.create_time).with_lockmode("update").limit(1).one_or_none()
53   - if ready_task:
54   -
55   - try:
56   - parameter = json.loads(ready_task.parameter)
57   - StructurePrint().print("检测到入库任务")
58   - ready_task.state = 2
59   - ready_task.process = "入库中"
60   - sys_session.commit()
61   -
62   - metas: list = json.loads(
63   - parameter.get("meta").__str__())
64   - parameter["meta"] = metas
65   -
66   - database = sys_session.query(Database).filter_by(
67   - guid=ready_task.database_guid).one_or_none()
68   - pg_ds: DataSource = PGUtil.open_pg_data_source(
69   - 1, DES.decode(database.sqlalchemy_uri))
70   -
71   - this_task_layer = []
72   - for meta in metas:
73   - overwrite = parameter.get("overwrite", "no")
74   -
75   - for layer_name_origin, layer_name in meta.get("layer").items():
76   - origin_name = layer_name
77   - no = 1
78   -
79   - while (overwrite.__eq__("no") and pg_ds.GetLayerByName(layer_name)) or sys_session.query(InsertingLayerName).filter_by(name=layer_name).one_or_none():
80   - layer_name = origin_name + "_{}".format(no)
81   - no += 1
82   -
83   - # 添加到正在入库的列表中
84   - iln = InsertingLayerName(guid=uuid.uuid1().__str__(),
85   - task_guid=ready_task.guid,
86   - name=layer_name)
87   -
88   - sys_session.add(iln)
89   - sys_session.commit()
90   - this_task_layer.append(layer_name)
91   - # 修改表名
92   - meta["layer"][layer_name_origin] = layer_name
93   -
94   - pg_ds.Destroy()
95   - entry_data_process = multiprocessing.Process(
96   - target=EntryDataVacuate().entry, args=(parameter,))
97   - entry_data_process.start()
98   -
99   - pid = entry_data_process.pid
100   - sys_session.query(Task).filter_by(guid=ready_task.guid).update({"task_pid":pid})
101   - sys_session.commit()
102   -
103   - running_dict[entry_data_process] = this_task_layer
104   -
105   - except Exception as e:
106   - sys_session.query(Task).filter_by(guid=ready_task.guid).update(
107   - {"state": -1, "process": "入库失败"})
108   - sys_session.commit()
109   - StructurePrint().print(e.__str__(), "error")
110   - else:
111   - # 解表啊
112   - sys_session.commit()
113   - except Exception as e:
114   - sys_session.commit()
115   - StructurePrint().print(e.__str__(), "error")
\ No newline at end of file
... ... @@ -13,7 +13,7 @@ import json
13 13 from app.util.component.ApiTemplate import ApiTemplate
14 14 from .get_meta import Api as MetaApi
15 15 from threading import Thread
16   -from app.util.component.EntryDataVacuate import EntryDataVacuate
  16 +from ..util.EntryDataVacuate import EntryDataVacuate
17 17
18 18 class Api(ApiTemplate):
19 19
... ...
... ... @@ -5,14 +5,11 @@ from osgeo import gdal
5 5 import os
6 6 import uuid
7 7 import shutil
8   -import time
9 8 from app.modules.data.models import *
10 9 from app.util.component.PGUtil import PGUtil
11 10 from app.util.component.StructuredPrint import StructurePrint
12 11 from sqlalchemy.orm import Session
13 12 import configure
14   -import math
15   -from functools import lru_cache
16 13 import traceback
17 14 import copy
18 15 from app.util.component.GeometryAdapter import GeometryAdapter
... ...
... ... @@ -19,6 +19,7 @@ from app.util.component.StructuredPrint import StructurePrint
19 19 from app.util.component.ApiTemplate import ApiTemplate
20 20 from app.util.component.GeometryAdapter import GeometryAdapter
21 21 from app.util.component.TaskController import TaskController
  22 +from app.util.component.TaskWriter import TaskWriter
22 23 import multiprocessing
23 24 import configure
24 25
... ... @@ -69,12 +70,17 @@ class Api(ApiTemplate):
69 70 data_session=None
70 71 result = {}
71 72 sys_session = None
  73 + task_writer = None
  74 +
  75 +
72 76 db_tuple = PGUtil.get_info_from_sqlachemy_uri(DES.decode(database.sqlalchemy_uri))
73 77
74 78 try:
75 79
76 80 #任务控制,等待执行
77 81 TaskController.wait(task_guid)
  82 + task_writer = TaskWriter(task_guid)
  83 +
78 84
79 85 sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
80 86 sys_ds = PGUtil.open_pg_data_source(0,configure.SQLALCHEMY_DATABASE_URI)
... ... @@ -145,24 +151,19 @@ class Api(ApiTemplate):
145 151 sys_session.commit()
146 152 result["data"] = "刷新数据成功!"
147 153 result["state"] = 1
148   - sys_session.query(Task).filter_by(guid=task_guid).update(
149   - {"state": 1, "update_time": datetime.datetime.now(),"process":"更新成功"})
150   - sys_session.commit()
  154 +
  155 + task_writer.update_task({"state": 1, "update_time": datetime.datetime.now(),"process":"更新成功"})
  156 +
151 157
152 158 except Exception as e:
153 159 try:
154   - print(traceback.format_exc())
155   - sys_session.query(Task).filter_by(guid=task_guid).update(
156   - {"state": -1, "update_time": datetime.datetime.now(),"process":"更新失败"})
157   - message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())
158   - task_process_guid = uuid.uuid1().__str__()
159   - task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),
160   - task_guid=task_guid)
161   - sys_session.add(task_process)
162   - sys_session.commit()
  160 + StructurePrint().print(traceback.format_exc())
  161 + task_writer.update_task({"state": -1, "update_time": datetime.datetime.now(),"process":"更新失败"})
  162 + task_writer.update_process(e.__str__())
163 163 except Exception as ee:
164   - print(traceback.format_exc())
  164 + StructurePrint().print(traceback.format_exc())
165 165 finally:
  166 + task_writer.close()
166 167 if pg_ds:
167 168 pg_ds.Destroy()
168 169 if data_session:
... ...
... ... @@ -18,6 +18,7 @@ from app.util.component.VacuateConf import VacuateConf
18 18 from app.util.component.GeometryAdapter import GeometryAdapter
19 19
20 20 from app.util.component.TaskController import TaskController
  21 +from app.util.component.TaskWriter import TaskWriter
21 22 from osgeo.ogr import DataSource,Layer,Geometry
22 23 from osgeo import ogr
23 24
... ... @@ -100,7 +101,7 @@ class Api(ApiTemplate):
100 101
101 102 def task(self,table,task_guid):
102 103
103   - sys_session = None
  104 + task_writer = None
104 105 pg_session = None
105 106 pg_ds = None
106 107 vacuate_process = None
... ... @@ -109,25 +110,22 @@ class Api(ApiTemplate):
109 110 #任务控制,等待执行
110 111 TaskController.wait(task_guid)
111 112
  113 + task_writer = TaskWriter(task_guid)
112 114
113   - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
114   - sys_session.query(Table).filter_by(guid=table.guid).update({"is_vacuate": 2, "update_time": datetime.datetime.now()})
115   - sys_session.query(Task).filter_by(guid=task_guid).update({"state":2,"process":"精华中"})
  115 + task_writer.update_table(table.guid,{"is_vacuate": 2, "update_time": datetime.datetime.now()})
  116 + task_writer.update_task({"state":2,"process":"精华中"})
116 117
117   - database = sys_session.query(Database).filter_by(guid=table.database_guid).one_or_none()
  118 + database = task_writer.session.query(Database).filter_by(guid=table.database_guid).one_or_none()
118 119 database_sqlalchemy_uri = str(database.sqlalchemy_uri)
  120 +
119 121 pg_session = PGUtil.get_db_session(DES.decode(database.sqlalchemy_uri))
120 122 pg_ds :DataSource= PGUtil.open_pg_data_source(0,DES.decode(database.sqlalchemy_uri))
121 123
122   -
123 124 #删除原有数据
124   - tvs = sys_session.query(TableVacuate).filter_by(table_guid=table.guid).all()
  125 + tvs = task_writer.session.query(TableVacuate).filter_by(table_guid=table.guid).all()
125 126 for tv in tvs :
126   - sys_session.delete(tv)
127   -
128   - sys_session.commit()
129   - #长时间的连接,导致后续的session超时,现在先断开
130   - sys_session.close()
  127 + task_writer.session.delete(tv)
  128 + task_writer.session.commit()
131 129
132 130
133 131 # 创建抽稀过程
... ... @@ -150,9 +148,6 @@ class Api(ApiTemplate):
150 148
151 149 vacuate_process.set_vacuate_count()
152 150
153   - # 重连
154   - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
155   -
156 151 #新增
157 152 if configure.VACUATE_DB_URI:
158 153 user, passwd, host, port, datab = PGUtil.get_info_from_sqlachemy_uri(configure.VACUATE_DB_URI)
... ... @@ -169,41 +164,30 @@ class Api(ApiTemplate):
169 164 name=vacuate_process.vacuate_layers[l].GetName(),
170 165 pixel_distance=vacuate_process.this_gridsize[l],
171 166 connectstr=DES.encode(connectstr))
172   - sys_session.add(table_vacuate)
  167 + task_writer.session.add(table_vacuate)
173 168
174   - sys_session.query(Task).filter_by(guid=task_guid).update({"state":1,"update_time":datetime.datetime.now(),
175   - "process": "精化完成"})
176   - sys_session.query(Table).filter_by(guid=table.guid).update(
177   - {"is_vacuate": 1, "update_time": datetime.datetime.now()})
178   - sys_session.commit()
  169 + task_writer.update_task({"state":1,"update_time":datetime.datetime.now(),"process": "精化完成"})
  170 + task_writer.update_table(table.guid, {"is_vacuate": 1, "update_time": datetime.datetime.now()})
179 171
180 172 except Exception as e:
181 173 try:
182   - if not sys_session.is_active:
183   - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
184   - sys_session.query(Task).filter_by(guid=task_guid).update({"state": -1,"update_time":datetime.datetime.now(),
185   - "process": "精化失败"})
186   - sys_session.query(Table).filter_by(guid=table.guid).update(
187   - {"is_vacuate": 0, "update_time": datetime.datetime.now()})
188   -
189   - message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())
190   - task_process_guid = uuid.uuid1().__str__()
191   - task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),
192   - task_guid=task_guid)
193   - sys_session.add(task_process)
194   - sys_session.commit()
  174 + task_writer.update_task({"state": -1,"update_time":datetime.datetime.now(),"process": "精化失败"})
  175 + task_writer.update_table(table.guid, {"is_vacuate": 0, "update_time": datetime.datetime.now()})
  176 + task_writer.update_process( e.__str__())
  177 +
  178 + task_writer.session.commit()
  179 +
195 180 if vacuate_process:
196 181 vacuate_process.rollback()
197 182
198   - print(traceback.format_exc())
  183 + StructurePrint().print(traceback.format_exc())
199 184 except Exception as ee:
200   - print(traceback.format_exc())
  185 + StructurePrint().print(traceback.format_exc())
201 186 finally:
202 187 try:
  188 + task_writer.close()
203 189 if vacuate_process:
204 190 vacuate_process.end()
205   - if sys_session:
206   - sys_session.close()
207 191 if pg_session:
208 192 pg_session.close()
209 193 if pg_ds:
... ... @@ -233,15 +217,6 @@ class Api(ApiTemplate):
233 217 }
234 218 }
235 219
236   -class SysSession:
237   - def __init__(self):
238   - self.session : Session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
239   -
240   - def close(self):
241   - try:
242   - self.session.close()
243   - except:
244   - pass
245 220
246 221 class VacuateProcess:
247 222
... ...
... ... @@ -5,10 +5,9 @@
5 5
6 6 import datetime
7 7 import traceback
8   -from ..models import Table, Database, DES,Task,db,TableVacuate
  8 +from ..models import Table, Database, DES,Task,db,TableVacuate,Process
9 9 from app.util.component.ApiTemplate import ApiTemplate
10 10 from app.util.component.PGUtil import PGUtil
11   -from app.util.component.EntryDataVacuate import Process
12 11 from app.util.component.StructuredPrint import StructurePrint
13 12 import multiprocessing
14 13 import uuid
... ... @@ -18,6 +17,7 @@ from osgeo import ogr
18 17
19 18 from app.util.component.VacuateConf import VacuateConf
20 19 from app.util.component.TaskController import TaskController
  20 +from app.util.component.TaskWriter import TaskWriter
21 21
22 22 class Api(ApiTemplate):
23 23 api_name = "单独抽稀"
... ... @@ -100,7 +100,7 @@ class Api(ApiTemplate):
100 100
101 101 def task(self,table,task_guid,grids):
102 102
103   - sys_session = None
  103 + task_write = None
104 104 pg_session = None
105 105 pg_ds = None
106 106 vacuate_process = None
... ... @@ -110,27 +110,23 @@ class Api(ApiTemplate):
110 110
111 111 #任务控制,等待执行
112 112 TaskController.wait(task_guid)
  113 + task_write = TaskWriter(task_guid)
113 114
114   - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
115   - sys_session.query(Table).filter_by(guid=table.guid).update(
116   - {"is_vacuate": 2, "update_time": datetime.datetime.now()})
117   - sys_session.query(Task).filter_by(guid=task_guid).update({"state":2,"process":"精华中"})
  115 + task_write.update_table(table.guid,{"is_vacuate": 2, "update_time": datetime.datetime.now()})
  116 + task_write.update_task({"state":2,"process":"精华中"})
118 117
119   -
120   - database = sys_session.query(Database).filter_by(guid=table.database_guid).one_or_none()
  118 + database = task_write.session.query(Database).filter_by(guid=table.database_guid).one_or_none()
121 119 database_sqlalchemy_uri = str(database.sqlalchemy_uri)
122 120 pg_session = PGUtil.get_db_session(DES.decode(database_sqlalchemy_uri))
123 121 pg_ds :DataSource= PGUtil.open_pg_data_source(1,DES.decode(database_sqlalchemy_uri))
124 122
125 123 #删除原有数据
126 124 for grid in grids:
127   - tvs = sys_session.query(TableVacuate).filter_by(pixel_distance=grid,table_guid=table.guid).all()
  125 + tvs = task_write.session.query(TableVacuate).filter_by(pixel_distance=grid,table_guid=table.guid).all()
128 126 for tv in tvs :
129   - sys_session.delete(tv)
  127 + task_write.session.delete(tv)
  128 + task_write.session.commit()
130 129
131   - sys_session.commit()
132   - #长时间的连接,导致后续的session超时,现在先断开
133   - sys_session.close()
134 130
135 131 # 创建抽稀过程
136 132 options = ["OVERWRITE=yes", "GEOMETRY_NAME={}".format(PGUtil.get_geo_column(table.name,pg_session)),
... ... @@ -152,8 +148,6 @@ class Api(ApiTemplate):
152 148
153 149 vacuate_process.set_vacuate_count()
154 150
155   - #重新连接
156   - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
157 151
158 152 #新增
159 153 if configure.VACUATE_DB_URI:
... ... @@ -172,41 +166,26 @@ class Api(ApiTemplate):
172 166 name=layer_name,
173 167 pixel_distance=vacuate_process.this_gridsize[l],
174 168 connectstr=DES.encode(connectstr))
175   - sys_session.add(table_vacuate)
  169 + task_write.session.add(table_vacuate)
  170 + task_write.update_task({"state":1,"update_time":datetime.datetime.now(),"process" : "精化完成"})
  171 + task_write.update_table(table.guid,{"is_vacuate": 1, "update_time": datetime.datetime.now()})
176 172
177   - sys_session.query(Task).filter_by(guid=task_guid).update({"state":1,"update_time":datetime.datetime.now(),
178   - "process" : "精化完成"})
179   - sys_session.query(Table).filter_by(guid=table.guid).update(
180   - {"is_vacuate": 1, "update_time": datetime.datetime.now()})
181   - sys_session.commit()
182 173
183 174 except Exception as e:
184 175 try:
185   - if not sys_session.is_active:
186   - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
187   - sys_session.query(Task).filter_by(guid=task_guid).update({"state": -1,"update_time":datetime.datetime.now(),
188   - "process": "精化失败"})
189   - sys_session.query(Table).filter_by(guid=table.guid).update(
190   - {"is_vacuate": origin_vacuate, "update_time": datetime.datetime.now()})
191   -
192   - message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())
193   - task_process_guid = uuid.uuid1().__str__()
194   - task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),
195   - task_guid=task_guid)
196   - sys_session.add(task_process)
197   - sys_session.commit()
  176 + task_write.update_task({"state": -1,"update_time":datetime.datetime.now(),"process": "精化失败"})
  177 + task_write.update_table(table.guid, {"is_vacuate": origin_vacuate, "update_time": datetime.datetime.now()})
  178 + task_write.update_process( e.__str__())
198 179 if vacuate_process:
199 180 vacuate_process.rollback()
200   -
201 181 print(traceback.format_exc())
202 182 except Exception as ee:
203 183 print(traceback.format_exc())
204 184 finally:
205 185 try:
  186 + task_write.close()
206 187 if vacuate_process:
207 188 vacuate_process.end()
208   - if sys_session:
209   - sys_session.close()
210 189 if pg_session:
211 190 pg_session.close()
212 191 if pg_ds:
... ...
... ... @@ -4,7 +4,8 @@
4 4 #email: nheweijun@sina.com
5 5
6 6 from app.models import db
7   -from sqlalchemy import Column, Integer, String, ForeignKey, Text, DateTime, Time,Float,Binary
  7 +from sqlalchemy import Column, Integer, String, ForeignKey, Text, DateTime, Time,Float,Binary,Sequence
  8 +from sqlalchemy import Sequence
8 9 from sqlalchemy.orm import relationship
9 10 import base64
10 11 from pyDes import *
... ...
... ... @@ -29,6 +29,7 @@ class Api(ApiTemplate):
29 29 exec_result = os.popen('taskkill.exe /pid:' + str(pid))
30 30
31 31 else:
  32 + #分布式下,很难吧,还需要记录机器,分布式是不可行的
32 33 os.kill(pid,signal.SIGILL)
33 34 except Exception as e:
34 35 StructurePrint.print("Kill task 失败")
... ...
... ... @@ -144,7 +144,10 @@ class EntryDataVacuate:
144 144
145 145
146 146 for i in range(ds.GetLayerCount()):
  147 +
147 148 layer: Layer = ds.GetLayer(i)
  149 + if layer.GetName().lower() not in meta.get("layer").keys():
  150 + continue
148 151 is_success, new_layer_name = self.entry_one_layer(layer,this_task,meta)
149 152 new_layer_names.append(new_layer_name)
150 153 is_successes.append(is_success)
... ... @@ -330,7 +333,12 @@ class ThisTask:
330 333 is_vacuate=is_vacuate
331 334 )
332 335 # 删除遗留业务数据
333   - history_table = self.sys_session.query(Table).filter_by(name=new_layer_name,database_guid=self.database.guid).all()
  336 + try:
  337 + history_table = self.sys_session.query(Table).filter_by(name=new_layer_name,database_guid=self.database.guid).all()
  338 + except:
  339 + self.sys_session: Session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  340 + history_table = self.sys_session.query(Table).filter_by(name=new_layer_name,database_guid=self.database.guid).all()
  341 +
334 342 if history_table:
335 343 for ht in history_table:
336 344 self.sys_session.delete(ht)
... ...
... ... @@ -21,6 +21,7 @@ from osgeo.gdal import *
21 21 import traceback
22 22 import os
23 23 from app.util.component.TaskController import TaskController
  24 +from app.util.component.TaskWriter import TaskWriter
24 25
25 26 class Api(ApiTemplate):
26 27
... ... @@ -76,21 +77,17 @@ class Api(ApiTemplate):
76 77 return res
77 78
78 79 def build_pyramid_task(self,image_guid,task_guid,data_servers,path):
79   - sys_session = None
80   - try:
81 80
  81 + task_writer = None
82 82
  83 + try:
83 84 #任务控制,等待执行
84 85 TaskController.wait(task_guid)
  86 + task_writer = TaskWriter(task_guid)
85 87
86   - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
87 88 #进入创建金字塔的状态
88   - sys_session.query(Image).filter_by(guid=image_guid).update({"has_pyramid": -1})
89   - sys_session.query(Task).filter_by(guid=task_guid).update({"state": 2})
90   -
91   - sys_session.commit()
92   - sys_session.close()
93   -
  89 + task_guid.sys_session.query(Image).filter_by(guid=image_guid).update({"has_pyramid": -1})
  90 + task_writer.update_task({"state": 2})
94 91
95 92 #所有数据节点的影像都要建金字塔
96 93 update_size = None
... ... @@ -109,11 +106,9 @@ class Api(ApiTemplate):
109 106 update_size = json.loads(thrift_connect.client.getInfo(path)).get("size")
110 107 thrift_connect.close()
111 108
112   - #重新连接,防止session超时
113   - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  109 + task_writer.update_task({"state":1,"update_time":datetime.datetime.now()})
114 110
115   - sys_session.query(Task).filter_by(guid=task_guid).update({"state":1,"update_time":datetime.datetime.now()})
116   - image:Image = sys_session.query(Image).filter_by(guid=image_guid).one_or_none()
  111 + image:Image = task_writer.session.query(Image).filter_by(guid=image_guid).one_or_none()
117 112
118 113 if not overview_count:
119 114 overview_count = 0
... ... @@ -128,20 +123,12 @@ class Api(ApiTemplate):
128 123 image.overview_count = overview_count
129 124 if update_size:
130 125 image.size = update_size
131   -
132 126 except:
133   - sys_session.query(Image).filter_by(guid=image_guid).update({"has_pyramid": 0})
134   - sys_session.query(Task).filter_by(guid=task_guid).update({"state": -1,"update_time":datetime.datetime.now()})
  127 + task_writer.session.query(Image).filter_by(guid=image_guid).update({"has_pyramid": 0})
  128 + task_writer.update_task({"state": -1,"update_time":datetime.datetime.now()})
135 129 finally:
136   - sys_session.commit()
137   - if sys_session:
138   - try:
139   - sys_session.close()
140   - except:
141   - pass
142   -
143   -
144   -
  130 + task_writer.session.commit()
  131 + task_writer.close()
145 132
146 133 def buildOverview(self,path):
147 134 image: Dataset = gdal.Open(path, 1)
... ...
... ... @@ -10,8 +10,9 @@ import time
10 10 from app.modules.service.models import ImageService
11 11 from app.modules.service.models import TileScheme,Service
12 12 from app.util.component.ModelVisitor import ModelVisitor
13   -
  13 +from app.util.component.StructuredPrint import StructurePrint
14 14 import json
  15 +import traceback
15 16
16 17 class Cache:
17 18
... ... @@ -29,17 +30,23 @@ class Cache:
29 30 GLOBAL_DIC["zookeeper_updatetime"] = time.time()
30 31 else:
31 32 if not zoo.connected:
32   - zoo.start()
  33 + try:
  34 + zoo.start()
  35 + except:
  36 + pass
33 37
34 38 # 更新zoo
35 39 if not GLOBAL_DIC.get("zookeeper_updatetime"):
36 40 GLOBAL_DIC["zookeeper_updatetime"] = time.time()
37 41 if time.time() - GLOBAL_DIC["zookeeper_updatetime"] > 15:
38   - #释放
  42 + #释放,高并发下可行吗,线程安全问题
39 43 try:
40 44 zoo.stop()
41   - except:
42   - pass
  45 + zoo.close()
  46 + del zoo
  47 + except Exception as e:
  48 + StructurePrint().print("关闭zoo失败")
  49 + StructurePrint().print(e.__str__())
43 50 zoo: KazooClient = KazooClient(hosts=configure.zookeeper, timeout=1)
44 51 zoo.start()
45 52 GLOBAL_DIC["zookeeper"] = zoo
... ...
... ... @@ -8,6 +8,8 @@ from osgeo import gdal,ogr
8 8 import uuid
9 9 import time
10 10 import os
  11 +import json
  12 +
11 13
12 14 def get_info_from_sqlachemy_uri(uri):
13 15 parts = uri.split(":")
... ... @@ -39,121 +41,158 @@ def open_pg_data_source(iswrite, uri):
39 41 raise Exception("打开数据源失败!")
40 42 return ds
41 43
42   -def move(geo:Geometry,offx,offy):
43 44
44   - g = geo.GetGeometryRef(0)
45   - num = g.GetPointCount()
  45 +def move_digui(coords,offx,offy):
  46 + if isinstance(coords[0],list):
  47 + for coord in coords:
  48 + move_digui(coord, offx, offy)
  49 + else :
  50 + coords[0] += offx
  51 + coords[1] += offy
46 52
47   - coor = []
48   - try:
49   - for j in range(num):
50   - point = g.GetPoint(j)
51   - x = point[0]
52   - y = point[1]
53   - x += offx
54   - y += offy
55   - coor.append([x, y])
56   - if num==1:
57   - point = ogr.Geometry(ogr.wkbPoint)
58   - point.AddPoint(coor[0][0], coor[0][1])
59   - return point
60   - elif coor[0].__eq__(coor[-1]):
61   - ring = ogr.Geometry(ogr.wkbLinearRing)
62   - for co in coor:
63   - ring.AddPoint(co[0], co[1])
64   - poly = ogr.Geometry(ogr.wkbPolygon)
65   - poly.AddGeometry(ring)
66   - return poly
67   - else :
68   - line = ogr.Geometry(ogr.wkbLineString)
69   - for co in coor:
70   - line.AddPoint(co[0], co[1])
71   - return line
72   - except:
73   - return None
  53 +def move(geo,offx,offy):
74 54
75 55
76   -def copydata():
  56 + geojson_str = geo.ExportToJson()
  57 + geojson = json.loads(geojson_str)
77 58
  59 + coords = geojson["coordinates"]
  60 + move_digui(coords, offx, offy)
  61 + og = ogr.CreateGeometryFromJson(json.dumps(geojson))
78 62
79   - bound = ""
  63 + return og
80 64
81   - work_dir =r"E:\D2\桌面\FSSD_RES_PY_FWM"
82 65
83   - data_path=r"E:\D2\桌面\FSSD_RES_PY_FWM\fs4326.shp"
  66 +def copydata(bound,input,output):
  67 + work_dir = os.path.dirname(os.path.abspath(__file__))
84 68
85   - base_name="fs900w"
  69 + shp_driver: Driver = ogr.GetDriverByName("ESRI Shapefile")
  70 + bound_ds : DataSource = shp_driver.Open(bound, 0)
  71 + bound_layer :Layer = bound_ds.GetLayer(0)
86 72
87   - driver: Driver = ogr.GetDriverByName("ESRI Shapefile")
88   - ds: DataSource = driver.Open(data_path, 1)
  73 + bf = bound_layer.GetNextFeature()
  74 + bound_geom = bf.GetGeometryRef()
89 75
90   - bound_ds : DataSource = driver.Open(bound, 1)
91   - bound_layer :Layer = bound_ds.GetLayer(0)
92   - bound_geom = bound_layer.GetNextFeature().GetGeometryRef()
93 76
94 77
95   - if not ds:
  78 + gdb_driver: Driver = ogr.GetDriverByName("FileGDB")
  79 + # gdb_ds: DataSource = gdb_driver.Open(gdb, 0)
  80 + # layer: Layer = gdb_ds.GetLayerByName(layer_name)
  81 +
  82 + input_ds: DataSource = shp_driver.Open(input, 0)
  83 + layer: Layer = input_ds.GetLayer(0)
  84 +
  85 + if not layer:
96 86 raise Exception("打开数据失败!")
97 87
98   - layer: Layer = ds.GetLayer(0)
  88 +
  89 + output_ds: DataSource = gdb_driver.CreateDataSource(output)
99 90
100 91
  92 + gdb_layer:Layer = output_ds.CreateLayer(layer.GetName(),layer.GetSpatialRef(),layer.GetGeomType())
101 93
102   - schema = layer.schema
103   - schema = [s for s in schema if not s.GetName().lower().__eq__("objectid")]
104   - for s in schema :
105   - if s.GetName().lower().__eq__("objectid") and not s.GetTypeName().__eq__("Interger"):
106   - s.SetType(ogr.OFTInteger)
  94 + gdb_layer.CreateFields(layer.schema)
107 95
108 96
109 97
110   - gdb_driver:Driver = ogr.GetDriverByName("FileGDB")
111   - gdb_ds: DataSource = gdb_driver.CreateDataSource(os.path.join(work_dir,"{}.gdb".format(base_name)))
112   - gdb_layer:Layer = gdb_ds.CreateLayer(base_name, layer.GetSpatialRef(),layer.GetGeomType())
113 98
114   - # gdb_layer.CreateFields(schema)
  99 + #
  100 + bound_exent = bound_layer.GetExtent()
115 101
116   - print(gdb_layer.GetFIDColumn())
  102 + layer_extent = layer.GetExtent()
  103 +
  104 + xxrange=layer_extent[1]-layer_extent[0]
  105 + yrange = layer_extent[3]-layer_extent[2]
  106 +
  107 +
  108 + left = (int((layer_extent[0] - bound_exent[0]) / xxrange) + 1) * (-1)
  109 + right = int((bound_exent[1] - layer_extent[1]) / xxrange) + 1
  110 + up = int((bound_exent[3] - layer_extent[3]) / yrange) + 1
  111 + down = (int((layer_extent[2] - bound_exent[2]) / yrange) + 1) * (-1)
117 112
118   - extent = layer.GetExtent()
119   - xxrange=extent[1]-extent[0]
120   - yrange = extent[3]-extent[2]
121   - feature_defn: FeatureDefn = layer.GetLayerDefn()
122 113
123 114 begin = time.time()
124   - count=0
125   - work_dir =os.path.dirname(os.path.abspath(__file__))
  115 + count=1
  116 +
126 117
127 118 for f in layer:
128 119
129   - if count%10000==0:
130   - print(count)
131   - with open(os.path.join(work_dir, "copy.txt"), "w") as fi:
132   - fi.write("已完成{}".format(count))
133   - g:Geometry=f.GetGeometryRef()
134   - new_f:Feature = copy.copy(f)
135   - for xt in range(3):
136   - for yt in range(3):
137   - out_g = move(g,xxrange*xt,yrange*yt)
138   - new_f.SetGeometry(out_g)
139   - new_f.SetFID(count)
140   - dd:FieldDefn=new_f.GetField("OBJECTID")
141   - # new_f.UnsetField("OBJECTID")
142   - gdb_layer.CreateFeature(new_f)
143   - count += 1
  120 + new_f = copy.copy(f)
  121 + for xt in range(left,right+1,1):
  122 + for yt in range(down,up+1,1):
  123 + out_g = move(f.GetGeometryRef(),xxrange*xt,yrange*yt)
  124 +
  125 + if out_g.Intersect(bound_geom):
  126 + out_g = out_g.Intersection(bound_geom)
  127 + new_f.SetGeometry(out_g)
  128 + new_f.SetFID(count)
  129 + gdb_layer.CreateFeature(new_f)
  130 + count += 1
  131 + if count % 10000 == 0:
  132 + # print(count)
  133 + with open(os.path.join(work_dir, "copy.txt"), "w") as fi:
  134 + fi.write("已完成{}".format(count))
144 135
145 136 print(time.time()-begin)
146 137
147   - gdb_ds.Destroy()
  138 + input_ds.Destroy()
148 139
  140 +def envelop_2_polygon(env):
  141 + ring = ogr.Geometry(ogr.wkbLinearRing)
  142 + ring.AddPoint(env[0], env[2])
  143 + ring.AddPoint(env[0], env[3])
  144 + ring.AddPoint(env[1], env[3])
  145 + ring.AddPoint(env[1], env[2])
  146 + ring.AddPoint(env[0], env[2])
  147 + # Create polygon
  148 + poly = ogr.Geometry(ogr.wkbPolygon)
  149 + poly.AddGeometry(ring)
  150 + return poly
149 151
  152 +def clip():
150 153
151   -if __name__ == '__main__':
152   - bound_shp = ""
153   - gdb = ""
154   - layer_name = ""
  154 + input = r"E:\Data\copy\x.shp"
  155 + output = r"E:\Data\copy\xianorigin.shp"
  156 + gdal.SetConfigOption("SHAPE_ENCODING", "UTF-8")
  157 + shp_driver: Driver = ogr.GetDriverByName("ESRI Shapefile")
155 158
156   - copydata()
  159 + input_ds: DataSource = shp_driver.Open(input, 0)
  160 + layer: Layer = input_ds.GetLayer(0)
  161 +
  162 + bound_geo = envelop_2_polygon([112.84,112.952,22.914,22.985])
  163 +
  164 + if not layer:
  165 + raise Exception("打开数据失败!")
  166 +
  167 + output_ds: DataSource = shp_driver.CreateDataSource(output)
  168 + #
  169 + # out_layer: Layer = output_ds.CreateLayer(layer.GetName(), layer.GetSpatialRef(), layer.GetGeomType())
  170 + # out_layer.CreateFields(layer.schema)
  171 + for f in layer:
  172 + new_f:Feature = copy.copy(f)
  173 + out_g = f.GetGeometryRef()
  174 + if out_g.Intersect(bound_geo):
  175 + out_g = out_g.Intersection(bound_geo)
  176 + new_f.SetGeometry(out_g)
  177 + new_f.SetField("ttt",1)
  178 + # new_f.UnsetField("OBJECTID")
  179 + # out_layer.CreateFeature(new_f)
  180 +
  181 + # output_ds.Destroy()
  182 +
  183 +
  184 +
  185 +if __name__ == '__main__':
  186 + # bound_shp = "E:\Data\广东边界\gdsample.shp"
  187 + # input = r"E:\Data\copy\origin.shp"
  188 + # output = r"E:\Data\copy\re.shp"
  189 + # copydata(bound_shp,input,output)
  190 + #
  191 + # bound_shp = "/root/CopyData/gdsample.shp"
  192 + # input = r"/root/CopyData/origin.shp"
  193 + # output = r"/root/CopyData/re.gdb"
  194 + # copydata(bound_shp,input,output)
  195 + clip()
157 196
158 197
159 198
... ...
1   -# coding=utf-8
2   -#author: 4N
3   -#createtime: 2021/6/11
4   -#email: nheweijun@sina.com
5   -import copy
6   -from osgeo.ogr import *
7   -from osgeo import gdal,ogr
8   -import uuid
9   -import time
10   -import os
11   -import json
12   -
13   -
14   -def get_info_from_sqlachemy_uri(uri):
15   - parts = uri.split(":")
16   - user = parts[1][2:]
17   -
18   - password_list = parts[2].split("@")
19   - if password_list.__len__() > 2:
20   - password = "@".join(password_list[:-1])
21   - else:
22   - password = parts[2].split("@")[0]
23   - host = parts[2].split("@")[-1]
24   - port = parts[3].split("/")[0]
25   - database = parts[3].split("/")[1]
26   -
27   - return user, password, host, port, database
28   -
29   -def open_pg_data_source(iswrite, uri):
30   - """
31   - # 获取PostGIS数据源
32   - :return:
33   - """
34   - db_conn_tuple = get_info_from_sqlachemy_uri(uri)
35   - fn = "PG: user=%s password=%s host=%s port=%s dbname=%s " % db_conn_tuple
36   - driver = ogr.GetDriverByName("PostgreSQL")
37   - if driver is None:
38   - raise Exception("打开PostgreSQL驱动失败,可能是当前GDAL未支持PostgreSQL驱动!")
39   - ds = driver.Open(fn, iswrite)
40   - if ds is None:
41   - raise Exception("打开数据源失败!")
42   - return ds
43   -
44   -
45   -def move_digui(coords,offx,offy):
46   - if isinstance(coords[0],list):
47   - for coord in coords:
48   - move_digui(coord, offx, offy)
49   - else :
50   - coords[0] += offx
51   - coords[1] += offy
52   -
53   -def move(geo,offx,offy):
54   -
55   -
56   - geojson_str = geo.ExportToJson()
57   - geojson = json.loads(geojson_str)
58   -
59   - coords = geojson["coordinates"]
60   - move_digui(coords, offx, offy)
61   - og = ogr.CreateGeometryFromJson(json.dumps(geojson))
62   -
63   - return og
64   -
65   -
66   -def copydata(bound,input,output):
67   - work_dir = os.path.dirname(os.path.abspath(__file__))
68   -
69   - shp_driver: Driver = ogr.GetDriverByName("ESRI Shapefile")
70   - bound_ds : DataSource = shp_driver.Open(bound, 0)
71   - bound_layer :Layer = bound_ds.GetLayer(0)
72   -
73   - bf = bound_layer.GetNextFeature()
74   - bound_geom = bf.GetGeometryRef()
75   -
76   -
77   -
78   - gdb_driver: Driver = ogr.GetDriverByName("FileGDB")
79   - # gdb_ds: DataSource = gdb_driver.Open(gdb, 0)
80   - # layer: Layer = gdb_ds.GetLayerByName(layer_name)
81   -
82   - input_ds: DataSource = shp_driver.Open(input, 0)
83   - layer: Layer = input_ds.GetLayer(0)
84   -
85   - if not layer:
86   - raise Exception("打开数据失败!")
87   -
88   -
89   - output_ds: DataSource = gdb_driver.CreateDataSource(output)
90   -
91   -
92   - gdb_layer:Layer = output_ds.CreateLayer(layer.GetName(),layer.GetSpatialRef(),layer.GetGeomType())
93   -
94   - gdb_layer.CreateFields(layer.schema)
95   -
96   -
97   -
98   -
99   - #
100   - bound_exent = bound_layer.GetExtent()
101   -
102   - layer_extent = layer.GetExtent()
103   -
104   - xxrange=layer_extent[1]-layer_extent[0]
105   - yrange = layer_extent[3]-layer_extent[2]
106   -
107   -
108   - left = (int((layer_extent[0] - bound_exent[0]) / xxrange) + 1) * (-1)
109   - right = int((bound_exent[1] - layer_extent[1]) / xxrange) + 1
110   - up = int((bound_exent[3] - layer_extent[3]) / yrange) + 1
111   - down = (int((layer_extent[2] - bound_exent[2]) / yrange) + 1) * (-1)
112   -
113   -
114   - begin = time.time()
115   - count=1
116   -
117   -
118   - for f in layer:
119   -
120   - new_f = copy.copy(f)
121   - for xt in range(left,right+1,1):
122   - for yt in range(down,up+1,1):
123   - out_g = move(f.GetGeometryRef(),xxrange*xt,yrange*yt)
124   -
125   - if out_g.Intersect(bound_geom):
126   - out_g = out_g.Intersection(bound_geom)
127   - new_f.SetGeometry(out_g)
128   - new_f.SetFID(count)
129   - gdb_layer.CreateFeature(new_f)
130   - count += 1
131   - if count % 10000 == 0:
132   - # print(count)
133   - with open(os.path.join(work_dir, "copy.txt"), "w") as fi:
134   - fi.write("已完成{}".format(count))
135   -
136   - print(time.time()-begin)
137   -
138   - input_ds.Destroy()
139   -
140   -def envelop_2_polygon(env):
141   - ring = ogr.Geometry(ogr.wkbLinearRing)
142   - ring.AddPoint(env[0], env[2])
143   - ring.AddPoint(env[0], env[3])
144   - ring.AddPoint(env[1], env[3])
145   - ring.AddPoint(env[1], env[2])
146   - ring.AddPoint(env[0], env[2])
147   - # Create polygon
148   - poly = ogr.Geometry(ogr.wkbPolygon)
149   - poly.AddGeometry(ring)
150   - return poly
151   -
152   -def clip():
153   -
154   - input = r"E:\Data\copy\x.shp"
155   - output = r"E:\Data\copy\xianorigin.shp"
156   - gdal.SetConfigOption("SHAPE_ENCODING", "UTF-8")
157   - shp_driver: Driver = ogr.GetDriverByName("ESRI Shapefile")
158   -
159   - input_ds: DataSource = shp_driver.Open(input, 0)
160   - layer: Layer = input_ds.GetLayer(0)
161   -
162   - bound_geo = envelop_2_polygon([112.84,112.952,22.914,22.985])
163   -
164   - if not layer:
165   - raise Exception("打开数据失败!")
166   -
167   - output_ds: DataSource = shp_driver.CreateDataSource(output)
168   - #
169   - # out_layer: Layer = output_ds.CreateLayer(layer.GetName(), layer.GetSpatialRef(), layer.GetGeomType())
170   - # out_layer.CreateFields(layer.schema)
171   - for f in layer:
172   - new_f:Feature = copy.copy(f)
173   - out_g = f.GetGeometryRef()
174   - if out_g.Intersect(bound_geo):
175   - out_g = out_g.Intersection(bound_geo)
176   - new_f.SetGeometry(out_g)
177   - new_f.SetField("ttt",1)
178   - # new_f.UnsetField("OBJECTID")
179   - # out_layer.CreateFeature(new_f)
180   -
181   - # output_ds.Destroy()
182   -
183   -
184   -
185   -if __name__ == '__main__':
186   - # bound_shp = "E:\Data\广东边界\gdsample.shp"
187   - # input = r"E:\Data\copy\origin.shp"
188   - # output = r"E:\Data\copy\re.shp"
189   - # copydata(bound_shp,input,output)
190   - #
191   - # bound_shp = "/root/CopyData/gdsample.shp"
192   - # input = r"/root/CopyData/origin.shp"
193   - # output = r"/root/CopyData/re.gdb"
194   - # copydata(bound_shp,input,output)
195   - clip()
196   -
197   -
198   -
199   -
200   -
201   -
202   -
203   -
204   -
205   -
206   -
1   -
2   -from osgeo.ogr import *
3   -from osgeo import ogr
4   -from osgeo import gdal
5   -import os
6   -import uuid
7   -import shutil
8   -import time
9   -from app.modules.data.models import *
10   -from app.util.component.PGUtil import PGUtil
11   -from app.util.component.StructuredPrint import StructurePrint
12   -from sqlalchemy.orm import Session
13   -import configure
14   -import copy
15   -import datetime
16   -class EntryData:
17   -
18   - def entry(self,parameter):
19   - meta:dict = parameter.get("meta")
20   -
21   - #设置编码
22   - encoding = parameter.get("encoding")
23   - if encoding:
24   - gdal.SetConfigOption("SHAPE_ENCODING",encoding)
25   - else:
26   - gdal.SetConfigOption("SHAPE_ENCODING", "GBK")
27   -
28   - #如果包含cpg文件,优先使用cpg文件中声明的编码
29   - encoding_cpg = meta.get("encoding")
30   - if encoding_cpg:
31   - gdal.SetConfigOption("SHAPE_ENCODING", encoding_cpg)
32   -
33   -
34   - # 初始化任务
35   - this_task = ThisTask(parameter)
36   - data_path = meta.get("data_path")
37   -
38   - try:
39   - this_task.write_process("入库任务初始化...")
40   -
41   - this_task.update({"process": "入库中"})
42   - is_success = None
43   - if not data_path:
44   - raise Exception("数据错误!")
45   - # 分为shp和gdb 2种录入形式
46   -
47   - # 开始事务
48   - this_task.pg_ds.StartTransaction()
49   - if data_path.endswith("shp"):
50   - is_success,new_layer_name = self.entry_shp(data_path,this_task)
51   -
52   - if data_path.endswith("gdb"):
53   - is_success,new_layer_names = self.entry_gdb(data_path,this_task)
54   -
55   - this_task.write_process("数据入库结束。")
56   -
57   - if is_success:
58   - # 更新任务为成功任务
59   - this_task.pg_ds.CommitTransaction()
60   - this_task.update({"state": 1,"process":"入库完成","update_time": datetime.datetime.now()})
61   - else:
62   - # 更新任务为失败任务
63   - this_task.update({"state": -1, "process": "入库失败", "update_time": datetime.datetime.now()})
64   - # rollback
65   - this_task.pg_ds.RollbackTransaction()
66   - except Exception as e:
67   - this_task.write_process("{} 任务结束!".format(e.__str__()))
68   - this_task.update({"state": -1, "process": "入库失败", "update_time": datetime.datetime.now()})
69   - StructurePrint().print(e.__str__(),"ERROR")
70   - # rollback
71   - this_task.pg_ds.RollbackTransaction()
72   - finally:
73   - this_task.end()
74   - try:
75   - file_tmp_path = os.path.join(data_path.split("file_tmp")[0],"file_tmp")
76   - dir_path = os.path.dirname(data_path)
77   - i=0
78   - while not os.path.dirname(dir_path).__eq__(file_tmp_path) and i<30:
79   - dir_path = os.path.dirname(dir_path)
80   - i+=1
81   - if i<30:
82   - shutil.rmtree(dir_path,True)
83   - StructurePrint().print("删除文件成功!")
84   - else:
85   - raise Exception("找不到文件!")
86   -
87   - except Exception as e:
88   - StructurePrint().print(e.__str__(), "ERROR")
89   - StructurePrint().print("删除文件失败!","ERROR")
90   -
91   - def entry_shp(self,data_path,this_task):
92   - '''
93   - 录入shp
94   - :param data_path:
95   - :return:
96   - '''
97   -
98   - driver: Driver = ogr.GetDriverByName("ESRI Shapefile")
99   - ds: DataSource = driver.Open(data_path, 1)
100   - if not ds:
101   - raise Exception("打开数据失败!")
102   - layer: Layer = ds.GetLayer(0)
103   -
104   - return self.entry_one_layer(layer, this_task)
105   -
106   - def entry_gdb(self,data_path,this_task):
107   - '''
108   - 录入gdb
109   - :param data_path:
110   - :return:
111   - '''
112   -
113   - is_successes = []
114   - new_layer_names=[]
115   - driver: Driver = ogr.GetDriverByName("OpenFileGDB")
116   - ds: DataSource = driver.Open(data_path, 0)
117   - if not ds:
118   - raise Exception("打开数据失败!")
119   -
120   -
121   - for i in range(ds.GetLayerCount()):
122   - layer: Layer = ds.GetLayer(i)
123   - is_success, new_layer_name = self.entry_one_layer(layer,this_task)
124   - new_layer_names.append(new_layer_name)
125   - is_successes.append(is_success)
126   -
127   - if is_successes.__contains__(False):
128   - return False,new_layer_names
129   - else:
130   - return True,new_layer_names
131   -
132   - def entry_one_layer(self,layer: Layer,this_task):
133   -
134   - # this_task.pg_ds.StartTransaction()
135   - new_layer_name = None
136   -
137   -
138   - try:
139   - # 图层设置
140   - parameter = this_task.parameter
141   -
142   - meta: dict = parameter.get("meta")
143   - overwrite = parameter.get("overwrite") if parameter.get("overwrite") is not None and parameter.get("overwrite")=="yes" else "no"
144   - geom_name = parameter.get("geom_name") if parameter.get("geom_name") is not None else "geom"
145   - fid = parameter.get("fid") if parameter.get("fid") is not None else "fid"
146   - options = ["OVERWRITE={}".format(overwrite), "FID={}".format(fid), "GEOMETRY_NAME={}".format(geom_name),"PRECISION=NO"]
147   -
148   -
149   -
150   - # # 中文名处理
151   - # chinese = is_chinese(new_layer_name)
152   - # if chinese:
153   - # new_layer_name = new_layer_name.__hash__()
154   -
155   - # 将线/面转多线多面,dmap只支持多线面
156   - geom_type = self.change_geom_type(layer.GetGeomType())
157   -
158   - # 更改图层名
159   - change_name = False
160   - origin_name = layer.GetName().lower()
161   -
162   - # 新图层名
163   - new_layer_name: str = meta.get("layer").get(origin_name)
164   - origin_name = new_layer_name
165   - no = 1
166   - while overwrite.__eq__("no") and this_task.pg_ds.GetLayerByName(new_layer_name) :
167   - change_name=True
168   - new_layer_name = origin_name+"_{}".format(no)
169   - no+=1
170   -
171   - if change_name:
172   - this_task.write_process("{}图层已存在,更名为{}入库".format(origin_name, new_layer_name))
173   -
174   -
175   - this_task.write_process("{}图层正在入库...".format(new_layer_name))
176   -
177   - pg_layer: Layer = this_task.pg_ds.CreateLayer(new_layer_name, layer.GetSpatialRef(), geom_type, options)
178   -
179   - # 复制原图层的属性
180   - # 去掉fid的属性
181   - schema = [sche for sche in layer.schema if not sche.name.__eq__(fid)]
182   -
183   - pg_layer.CreateFields(schema)
184   -
185   - count =0
186   - this_time = time.time()
187   - for feature in layer:
188   - count+=1
189   - if count%10000==0:
190   - StructurePrint().print("{}图层已入库{}个对象".format(new_layer_name,count))
191   - # print(time.time()-this_time)
192   - this_time=time.time()
193   - geo :Geometry = feature.GetGeometryRef()
194   - # 如果是空对象不录入
195   - if geo is not None:
196   - if geo.IsEmpty():
197   - this_task.write_process("FID:{}要素的空间字段为空,跳过该要素!".format(feature.GetFID()))
198   - StructurePrint().print("FID:{}要素的空间字段为空,跳过该要素!".format(feature.GetFID()),"WARN")
199   - continue
200   - out_feature: Feature = copy.copy(feature)
201   -
202   - if geo is not None:
203   - out_geom:Geometry = self.change_geom(geo, geom_type)
204   - out_feature.SetGeometry(out_geom)
205   -
206   - pg_layer.CreateFeature(out_feature)
207   -
208   - # 注册图层信息
209   - this_task.register_table(pg_layer,new_layer_name,overwrite,parameter.get("creator"))
210   - this_task.write_process("{}图层入库成功。".format(new_layer_name))
211   - except Exception as e:
212   - # this_task.pg_ds.RollbackTransaction()
213   - this_task.write_process("{}入库失败,数据回滚!原因:{}".format(new_layer_name,e.__str__()))
214   - StructurePrint().print("{}入库失败,数据回滚!原因:{}".format(new_layer_name,e.__str__()), "error")
215   - return False,new_layer_name
216   - # finally:
217   -
218   - return True,new_layer_name
219   -
220   - def entry_feature(self):
221   - pass
222   -
223   -
224   - def change_geom_type(self,raw):
225   - if raw.__eq__(-2147483646):
226   - return 5
227   - if raw.__eq__(-2147483645):
228   - return 6
229   - if raw==2 or raw ==3:
230   - return raw+3
231   - if raw==4:
232   - return 1
233   - return raw
234   -
235   - def get_table_type(self,raw):
236   - if raw==4 or raw ==5 or raw ==6:
237   - return raw-3
238   - return raw
239   -
240   - def change_geom(self,geo:Geometry,geom_type):
241   - '''
242   - 转换空间对象的类型,以适应dmap只支持Multi类型
243   - :param geo:
244   - :param geom_type:
245   - :return: 转换后的空间对象
246   - '''
247   -
248   -
249   - # Point = 1,
250   - # LineString = 2,
251   - # Polygon = 3,
252   - # MultiPoint = 4,
253   - # MultiLineString = 5,
254   - # MultiPolygon = 6,
255   - # GeometryCollection = 7,
256   - # CircularString = 8,
257   - # CompoundCurve = 9,
258   - # CurvePolygon = 10,
259   - # MultiCurve = 11,
260   - # MultiSurface = 12,
261   - # PolyhedralSurface = 15,
262   - # LinearRing = 101,
263   -
264   - # MultiPointZ = -2147483644
265   - # MultiPolygonZ = -2147483643
266   - # MultiLineStringZ = -2147483642
267   -
268   -
269   - # PointZ = -2147483647
270   - # LINESTRINGZ=-2147483646
271   - # POLYGONZ=-2147483645
272   -
273   - if geom_type==5 or geom_type.__eq__(-2147483646):
274   - return ogr.ForceToMultiLineString(geo)
275   - if geom_type==6 or geom_type.__eq__(-2147483645):
276   - return ogr.ForceToMultiPolygon(geo)
277   - if geom_type==1:
278   - # 多点转单点,会有问题,只拿了第一个点
279   - xy = geo.GetPoint()
280   - point = ogr.Geometry(ogr.wkbPoint)
281   - point.AddPoint(xy[0], xy[1])
282   - return point
283   - return geo
284   -
285   - def write_task_process(self,session, task_guid, message):
286   - '''
287   - 写详细过程
288   - :param session:
289   - :param task_guid:
290   - :param message:
291   - :return:
292   - '''
293   -
294   - task_process_guid = uuid.uuid1().__str__()
295   - task_process = Process(guid=task_process_guid,
296   - message=message,
297   - time=datetime.datetime.now(),
298   - task_guid=task_guid)
299   - session.add(task_process)
300   -
301   -
302   - def data_check(self):
303   - pass
304   -
305   -class ThisTask:
306   -
307   - def __init__(self, parameter):
308   - try:
309   - self.sys_session: Session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
310   - except Exception as e:
311   - raise Exception("打开数据库失败!")
312   - self.parameter = parameter
313   -
314   - # # 初始化task
315   - # task = Task(guid=parameter.get("task_guid"), name=parameter.get("task_name"),create_time=datetime.datetime.now(),state=0)
316   - # self.sys_session.add(task)
317   - # self.sys_session.commit()
318   -
319   - self.task = self.sys_session.query(Task).filter_by(guid=parameter.get("task_guid"))
320   -
321   - self.database = self.sys_session.query(Database).filter_by(
322   - guid=parameter.get("database_guid")).one_or_none()
323   -
324   - self.catalog_guid = parameter.get("catalog_guid")
325   -
326   - self.pg_ds: DataSource = PGUtil.open_pg_data_source(1, DES.decode(self.database.sqlalchemy_uri))
327   -
328   - if self.pg_ds is None:
329   - raise Exception("打开系统数据库失败!")
330   -
331   - def update(self, update_dict):
332   - self.task.update(update_dict)
333   - self.sys_session.commit()
334   -
335   - def write_process(self, message):
336   - message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), message)
337   - task_process_guid = uuid.uuid1().__str__()
338   - task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),
339   - task_guid=self.parameter.get("task_guid"))
340   - self.sys_session.add(task_process)
341   - self.sys_session.commit()
342   -
343   - def register_table(self, layer: Layer, new_layer_name, overwrite, creator):
344   - '''
345   - 注册表
346   - :param layer: 图层
347   - :param new_layer_name: 图层名
348   - :return: 表名
349   - '''
350   -
351   - # 在覆盖模式下,删除原有记录
352   - if overwrite.__eq__("yes"):
353   - old_table = self.sys_session.query(Table).filter_by(name=new_layer_name).one_or_none()
354   - if old_table:
355   - self.sys_session.delete(old_table)
356   - self.sys_session.commit()
357   -
358   - this_time = datetime.datetime.now()
359   -
360   - ext = layer.GetExtent()
361   - if ext[0] < 360:
362   - ext = [round(e, 6) for e in ext]
363   - else:
364   - ext = [round(e, 2) for e in ext]
365   -
366   - # extent = "西:{},东:{},南:{},北:{}".format(ext[0],ext[1],ext[2],ext[3])
367   - extent = "{},{},{},{}".format(ext[0], ext[1], ext[2], ext[3])
368   -
369   - # 其他相同库也更新
370   - connectstr = self.database.connectstr
371   - databs = self.sys_session.query(Database).filter_by(connectstr=connectstr).all()
372   - databs_guid = [d.guid for d in databs]
373   - for d_guid in databs_guid:
374   - table_guid = uuid.uuid1().__str__()
375   - table = Table(guid=table_guid,
376   - database_guid=d_guid,
377   - # alias=new_layer_name,
378   - creator=creator,
379   - name=new_layer_name, create_time=this_time, update_time=this_time,
380   - catalog_guid=self.catalog_guid, table_type=self.get_table_type(layer.GetGeomType()),
381   - extent=extent,
382   - feature_count=layer.GetFeatureCount()
383   - )
384   - self.sys_session.add(table)
385   -
386   - feature_defn: FeatureDefn = layer.GetLayerDefn()
387   -
388   - for i in range(feature_defn.GetFieldCount()):
389   - field_defn: FieldDefn = feature_defn.GetFieldDefn(i)
390   - field_name = field_defn.GetName().lower()
391   - field_alias = field_name if field_defn.GetAlternativeName() is None or field_defn.GetAlternativeName().__eq__(
392   - "") else field_defn.GetAlternativeName()
393   - column = Columns(guid=uuid.uuid1().__str__(), table_guid=table_guid,
394   - name=field_name, alias=field_alias, create_time=this_time, update_time=this_time)
395   - self.sys_session.add(column)
396   -
397   - self.sys_session.commit()
398   -
399   - def end(self):
400   - if self.sys_session:
401   - self.sys_session.close()
402   - if self.pg_ds:
403   - self.pg_ds.Destroy()
404   -
405   - def get_table_type(self,raw):
406   - if raw==4 or raw ==5 or raw ==6:
407   - return raw-3
408   - return raw
\ No newline at end of file
... ... @@ -52,6 +52,15 @@ class TaskController:
52 52 @classmethod
53 53 def wait(cls,task_guid):
54 54 sys_session= PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  55 + #三秒检测一次是否可以执行任务
  56 + wait_time = 0
55 57 while not cls.pass_check(sys_session,task_guid):
56 58 time.sleep(3)
  59 + wait_time += 3
  60 + if wait_time > 21600:
  61 + try:
  62 + sys_session.close()
  63 + except Exception as e:
  64 + pass
  65 + raise Exception("任务等待超时")
57 66 sys_session.close()
\ No newline at end of file
... ...
  1 +# coding=utf-8
  2 +#author: 4N
  3 +#createtime: 2021/12/3
  4 +#email: nheweijun@sina.com
  5 +
  6 +import datetime
  7 +import uuid
  8 +from app.modules.data.models import Process,Task,Table
  9 +from app.modules.service.models import Image
  10 +import configure
  11 +from app.util.component.PGUtil import PGUtil
  12 +
  13 +def check_session(fun):
  14 + '''
  15 + 检查session的装饰器
  16 + :param fun:
  17 + :return:
  18 + '''
  19 +
  20 + def wrap(self,*args, **kwargs):
  21 + if not self.sys_session.is_active:
  22 + self.sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  23 + fun(self,*args, **kwargs)
  24 +
  25 + return wrap
  26 +
  27 +class TaskWriter:
  28 +
  29 + def __init__(self,task_guid):
  30 + self.sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  31 + self.task_guid = task_guid
  32 +
  33 + @check_session
  34 + def update_task(self,update_info,commit=True):
  35 + self.sys_session.query(Task).filter_by(guid=self.task_guid).update(update_info)
  36 + if commit:
  37 + self.sys_session.commit()
  38 +
  39 + @check_session
  40 + def update_process(self,msg,commit=True):
  41 + message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), msg)
  42 + task_process_guid = uuid.uuid1().__str__()
  43 + task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),
  44 + task_guid=self.task_guid)
  45 +
  46 + self.sys_session.add(task_process)
  47 + if commit:
  48 + self.sys_session.commit()
  49 +
  50 + @check_session
  51 + def update_table(self,table_guid,update_info,commit=True):
  52 + self.sys_session.query(Table).filter_by(guid=table_guid).update(update_info)
  53 + if commit:
  54 + self.sys_session.commit()
  55 +
  56 + @check_session
  57 + def update_image(self,image_guid,update_info,commit=True):
  58 + self.sys_session.query(Image).filter_by(guid=image_guid).update(update_info)
  59 + if commit:
  60 + self.sys_session.commit()
  61 +
  62 + def close(self):
  63 + try:
  64 + self.sys_session.commit()
  65 + self.sys_session.close()
  66 + except:
  67 + pass
  68 +
  69 + @property
  70 + def session(self):
  71 + if not self.sys_session.is_active:
  72 + self.sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  73 + return self.sys_session
\ No newline at end of file
... ...
注册登录 后发表评论