提交 ec3d2c0b3fdba2fa1c5aac6f50b4cebc4138956a

作者 nheweijun
1 个父辈 59938637

2021.12.02 任务控制使用新方式

... ... @@ -79,12 +79,9 @@ def create_app():
79 79
80 80 # 日志
81 81 logging.basicConfig(level=configure.log_level)
82   - log_file = os.path.join(os.path.dirname(os.path.dirname(
83   - os.path.realpath(__file__))), "logs", "log.txt")
84   - handler = logging.FileHandler(
85   - log_file, encoding='UTF-8') # 设置日志字符集和存储路径名字
86   - logging_format = logging.Formatter(
87   - '[%(levelname)s] %(asctime)s %(message)s')
  82 + log_file = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "logs", "log.txt")
  83 + handler = logging.FileHandler(log_file, encoding='UTF-8') # 设置日志字符集和存储路径名字
  84 + logging_format = logging.Formatter('[%(levelname)s] %(asctime)s %(message)s')
88 85 handler.setFormatter(logging_format)
89 86 app.logger.addHandler(handler)
90 87
... ... @@ -97,16 +94,13 @@ def create_app():
97 94 app.register_blueprint(api.bp)
98 95
99 96 # 入库监测线程
100   - @app.before_first_request
101   - def data_entry_process():
102   - StructurePrint().print("start listen")
103   - process = threading.Thread(target=data_entry_center)
104   - process.start()
  97 + # @app.before_first_request
  98 + # def data_entry_process():
  99 + # StructurePrint().print("start listen")
  100 + # process = threading.Thread(target=data_entry_center)
  101 + # process.start()
105 102
106 103 # 不检测https
107 104 os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
108   -
109   -
110   -
111 105 return app
112 106
... ...
... ... @@ -19,6 +19,7 @@ from app.util.component.ZipUtil import ZipUtil
19 19 from app.util.component.StructuredPrint import StructurePrint
20 20 import multiprocessing
21 21 import datetime
  22 +from app.util.component.TaskController import TaskController
22 23
23 24 class Api(ApiTemplate):
24 25
... ... @@ -69,6 +70,9 @@ class Api(ApiTemplate):
69 70
70 71 try:
71 72
  73 + #任务控制,等待执行
  74 + TaskController.wait(task_guid)
  75 +
72 76 sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
73 77
74 78 table_names = para.get("table_name").split(",")
... ...
... ... @@ -13,8 +13,13 @@ import json
13 13 import re
14 14 from app.util.component.ApiTemplate import ApiTemplate
15 15 from app.util.component.PGUtil import PGUtil
  16 +from app.util.component.StructuredPrint import StructurePrint
  17 +from sqlalchemy.orm import Session
  18 +import configure
16 19 import datetime
17   -
  20 +import multiprocessing
  21 +from app.util.component.EntryDataVacuate import EntryDataVacuate
  22 +from app.util.component.TaskController import TaskController
18 23
19 24 class Api(ApiTemplate):
20 25
... ... @@ -68,7 +73,11 @@ class Api(ApiTemplate):
68 73 except:
69 74 print("关闭数据库失败!")
70 75 return res
71   -
  76 +
  77 + entry_thread = multiprocessing.Process(target=self.entry,args=(self.para.get("task_guid"),))
  78 + entry_thread.start()
  79 +
  80 +
72 81 # 录入数据后台进程,录入主函数为entry
73 82 # 初始化task
74 83 task = Task(guid=self.para.get("task_guid"),
... ... @@ -81,10 +90,12 @@ class Api(ApiTemplate):
81 90 database_guid=self.para.get("database_guid"),
82 91 catalog_guid=self.para.get("catalog_guid"),
83 92 process="等待入库",
84   - parameter=json.dumps(self.para))
  93 + parameter=json.dumps(self.para),
  94 + task_pid=entry_thread.pid)
85 95 db.session.add(task)
86 96 db.session.commit()
87   -
  97 +
  98 +
88 99 res["result"] = True
89 100 res["msg"] = "数据录入提交成功!"
90 101 res["data"] = self.para["task_guid"]
... ... @@ -92,6 +103,81 @@ class Api(ApiTemplate):
92 103 raise e
93 104 return res
94 105
  106 + def entry(self,task_guid):
  107 +
  108 + sys_session: Session = None
  109 + this_task_layer = []
  110 + try:
  111 + #任务控制,等待执行
  112 + TaskController.wait(task_guid)
  113 +
  114 + sys_session: Session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  115 +
  116 +
  117 + task:Task = sys_session.query(Task).filter_by(guid=task_guid).one_or_none()
  118 + parameter = json.loads(task.parameter)
  119 +
  120 + task.state = 2
  121 + task.process = "入库中"
  122 + sys_session.commit()
  123 +
  124 +
  125 + #处理修改入库信息
  126 +
  127 + metas: list = json.loads(parameter.get("meta").__str__())
  128 + parameter["meta"] = metas
  129 +
  130 + database = sys_session.query(Database).filter_by(guid=task.database_guid).one_or_none()
  131 + pg_ds: DataSource = PGUtil.open_pg_data_source(1, DES.decode(database.sqlalchemy_uri))
  132 +
  133 +
  134 + for meta in metas:
  135 + overwrite = parameter.get("overwrite", "no")
  136 +
  137 + for layer_name_origin, layer_name in meta.get("layer").items():
  138 + origin_name = layer_name
  139 + no = 1
  140 +
  141 + while (overwrite.__eq__("no") and pg_ds.GetLayerByName(layer_name)) or sys_session.query(
  142 + InsertingLayerName).filter_by(name=layer_name).one_or_none():
  143 + layer_name = origin_name + "_{}".format(no)
  144 + no += 1
  145 +
  146 + # 添加到正在入库的列表中
  147 + iln = InsertingLayerName(guid=uuid.uuid1().__str__(),
  148 + task_guid=task.guid,
  149 + name=layer_name)
  150 +
  151 + sys_session.add(iln)
  152 + sys_session.commit()
  153 + this_task_layer.append(layer_name)
  154 + # 修改表名
  155 + meta["layer"][layer_name_origin] = layer_name
  156 + pg_ds.Destroy()
  157 +
  158 + #入库
  159 + EntryDataVacuate().entry(parameter)
  160 +
  161 + #完成后
  162 + for ln in this_task_layer:
  163 + iln = sys_session.query(InsertingLayerName).filter_by(name=ln).one_or_none()
  164 + sys_session.delete(iln)
  165 +
  166 + except Exception as e:
  167 + sys_session.query(Task).filter_by(guid=task_guid).update(
  168 + {"state": -1, "process": "入库失败"})
  169 +
  170 + for ln in this_task_layer:
  171 + iln = sys_session.query(InsertingLayerName).filter_by(name=ln).one_or_none()
  172 + sys_session.delete(iln)
  173 +
  174 + sys_session.commit()
  175 + StructurePrint().print(e.__str__(), "error")
  176 + finally:
  177 + sys_session.commit()
  178 + if sys_session:
  179 + sys_session.close()
  180 +
95 181
96 182 api_doc={
97 183 "tags":["IO接口"],
... ...
... ... @@ -18,7 +18,7 @@ from app.util.component.PGUtil import PGUtil
18 18 from app.util.component.StructuredPrint import StructurePrint
19 19 from app.util.component.ApiTemplate import ApiTemplate
20 20 from app.util.component.GeometryAdapter import GeometryAdapter
21   -
  21 +from app.util.component.TaskController import TaskController
22 22 import multiprocessing
23 23 import configure
24 24
... ... @@ -72,6 +72,10 @@ class Api(ApiTemplate):
72 72 db_tuple = PGUtil.get_info_from_sqlachemy_uri(DES.decode(database.sqlalchemy_uri))
73 73
74 74 try:
  75 +
  76 + #任务控制,等待执行
  77 + TaskController.wait(task_guid)
  78 +
75 79 sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
76 80 sys_ds = PGUtil.open_pg_data_source(0,configure.SQLALCHEMY_DATABASE_URI)
77 81
... ...
... ... @@ -17,7 +17,7 @@ from app.util.component.PGUtil import PGUtil
17 17 from app.util.component.VacuateConf import VacuateConf
18 18 from app.util.component.GeometryAdapter import GeometryAdapter
19 19
20   -
  20 +from app.util.component.TaskController import TaskController
21 21 from osgeo.ogr import DataSource,Layer,Geometry
22 22 from osgeo import ogr
23 23
... ... @@ -105,15 +105,18 @@ class Api(ApiTemplate):
105 105 pg_ds = None
106 106 vacuate_process = None
107 107 try:
108   - sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
109 108
110   - sys_session.query(Table).filter_by(guid=table.guid).update({"is_vacuate": 2, "update_time": datetime.datetime.now()})
  109 + #任务控制,等待执行
  110 + TaskController.wait(task_guid)
  111 +
111 112
  113 + sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  114 + sys_session.query(Table).filter_by(guid=table.guid).update({"is_vacuate": 2, "update_time": datetime.datetime.now()})
  115 + sys_session.query(Task).filter_by(guid=task_guid).update({"state":2,"process":"精华中"})
112 116
113 117 database = sys_session.query(Database).filter_by(guid=table.database_guid).one_or_none()
114 118 database_sqlalchemy_uri = str(database.sqlalchemy_uri)
115 119 pg_session = PGUtil.get_db_session(DES.decode(database.sqlalchemy_uri))
116   -
117 120 pg_ds :DataSource= PGUtil.open_pg_data_source(0,DES.decode(database.sqlalchemy_uri))
118 121
119 122
... ... @@ -126,6 +129,7 @@ class Api(ApiTemplate):
126 129 #长时间的连接,导致后续的session超时,现在先断开
127 130 sys_session.close()
128 131
  132 +
129 133 # 创建抽稀过程
130 134 options = ["OVERWRITE=yes", "GEOMETRY_NAME={}".format(PGUtil.get_geo_column(table.name,pg_session)),
131 135 "PRECISION=NO"]
... ...
... ... @@ -17,6 +17,7 @@ from osgeo.ogr import DataSource,Layer,Geometry
17 17 from osgeo import ogr
18 18
19 19 from app.util.component.VacuateConf import VacuateConf
  20 +from app.util.component.TaskController import TaskController
20 21
21 22 class Api(ApiTemplate):
22 23 api_name = "单独抽稀"
... ... @@ -106,16 +107,19 @@ class Api(ApiTemplate):
106 107 origin_vacuate = table.is_vacuate
107 108
108 109 try:
  110 +
  111 + #任务控制,等待执行
  112 + TaskController.wait(task_guid)
  113 +
109 114 sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
110 115 sys_session.query(Table).filter_by(guid=table.guid).update(
111 116 {"is_vacuate": 2, "update_time": datetime.datetime.now()})
  117 + sys_session.query(Task).filter_by(guid=task_guid).update({"state":2,"process":"精华中"})
112 118
113   - database = sys_session.query(Database).filter_by(guid=table.database_guid).one_or_none()
114 119
  120 + database = sys_session.query(Database).filter_by(guid=table.database_guid).one_or_none()
115 121 database_sqlalchemy_uri = str(database.sqlalchemy_uri)
116   -
117 122 pg_session = PGUtil.get_db_session(DES.decode(database_sqlalchemy_uri))
118   -
119 123 pg_ds :DataSource= PGUtil.open_pg_data_source(1,DES.decode(database_sqlalchemy_uri))
120 124
121 125 #删除原有数据
... ...
... ... @@ -103,6 +103,10 @@ class Task(db.Model):
103 103 process = Column(Text)
104 104 create_time = Column(DateTime)
105 105 update_time = Column(DateTime)
  106 + #0等待
  107 + #1完成
  108 + #-1失败
  109 + #2正在进行中
106 110 state = Column(Integer)
107 111 #数据源外键
108 112 database_guid = Column(String(256), ForeignKey('dmap_database.guid'))
... ...
... ... @@ -16,10 +16,11 @@ import configure
16 16 from app.util.component.PGUtil import PGUtil
17 17 from .util.Cache import Cache
18 18 import json
19   -from osgeo import gdal,osr
  19 +from osgeo import gdal
20 20 from osgeo.gdal import *
21 21 import traceback
22 22 import os
  23 +from app.util.component.TaskController import TaskController
23 24
24 25 class Api(ApiTemplate):
25 26
... ... @@ -77,6 +78,11 @@ class Api(ApiTemplate):
77 78 def build_pyramid_task(self,image_guid,task_guid,data_servers,path):
78 79 sys_session = None
79 80 try:
  81 +
  82 +
  83 + #任务控制,等待执行
  84 + TaskController.wait(task_guid)
  85 +
80 86 sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
81 87 #进入创建金字塔的状态
82 88 sys_session.query(Image).filter_by(guid=image_guid).update({"has_pyramid": -1})
... ...
  1 +# coding=utf-8
  2 +#author: 4N
  3 +#createtime: 2021/12/2
  4 +#email: nheweijun@sina.com
  5 +
  6 +from app.modules.data.models import Task
  7 +from app.util.component.StructuredPrint import StructurePrint
  8 +import configure
  9 +import time
  10 +from app.util.component.PGUtil import PGUtil
  11 +
  12 +class TaskController:
  13 + '''
  14 + 此类是面向切面的类,控制各种任务是否可以进入执行状态
  15 + '''
  16 +
  17 + @classmethod
  18 + def pass_check(cls,sys_session,task_guid):
  19 +
  20 + # 任务类型
  21 + # 1:入库任务
  22 + # 2:抽稀任务
  23 + # 3:数据库刷新任务
  24 + # 4:数据下载任务
  25 + # 5:影像金字塔任务
  26 +
  27 + task = sys_session.query(Task).filter_by(guid=task_guid).one_or_none()
  28 +
  29 + check = True
  30 + if task.task_type==1:
  31 + #控制正在运行的任务数量
  32 + running_count = sys_session.query(Task).filter(Task.state == 2).filter(Task.task_type == 1).count()
  33 + first_task = sys_session.query(Task).filter(Task.state == 0).filter(Task.task_type == 1).order_by(Task.create_time).first()
  34 + if (running_count > configure.entry_data_thread) or (not first_task.guid.__eq__(task_guid)):
  35 + StructurePrint().print("等待入库")
  36 + check = False
  37 + if task.task_type==2:
  38 + running_count = sys_session.query(Task).filter(Task.state == 2).filter(Task.task_type == 2).count()
  39 + first_task = sys_session.query(Task).filter(Task.state == 0).filter(Task.task_type == 2).order_by(Task.create_time).first()
  40 + if (running_count > configure.entry_data_thread) or (not first_task.guid.__eq__(task_guid)):
  41 + StructurePrint().print("等待精化")
  42 + check = False
  43 + if task.task_type==3:
  44 + pass
  45 + if task.task_type==4:
  46 + pass
  47 + if task.task_type==5:
  48 + pass
  49 +
  50 + return check
  51 +
  52 + @classmethod
  53 + def wait(cls,task_guid):
  54 + sys_session= PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
  55 + while not cls.pass_check(sys_session,task_guid):
  56 + time.sleep(3)
  57 + sys_session.close()
\ No newline at end of file
... ...
注册登录 后发表评论