提交 69a22b533257330e16c3c3b0789f0064b58fa274

作者 nheweijun
1 个父辈 81150e15

2022.04.18 修复任务bug

@@ -44,7 +44,6 @@ class Api(ApiTemplate): @@ -44,7 +44,6 @@ class Api(ApiTemplate):
44 file_name=None, 44 file_name=None,
45 process="数据下载中", 45 process="数据下载中",
46 database_guid=self.para.get("database_guid"), 46 database_guid=self.para.get("database_guid"),
47 - # task_pid=download_process.pid  
48 ) 47 )
49 48
50 db.session.add(task) 49 db.session.add(task)
@@ -53,9 +52,6 @@ class Api(ApiTemplate): @@ -53,9 +52,6 @@ class Api(ApiTemplate):
53 download_process = multiprocessing.Process(target=self.download, args=(task_guid,self.para)) 52 download_process = multiprocessing.Process(target=self.download, args=(task_guid,self.para))
54 download_process.start() 53 download_process.start()
55 54
56 - Task.query.filter_by(guid=task_guid).update({"task_pid": download_process.pid})  
57 - db.session.commit()  
58 -  
59 res["data"] = "下载任务已提交!" 55 res["data"] = "下载任务已提交!"
60 56
61 # 提示信息 57 # 提示信息
@@ -84,10 +80,12 @@ class Api(ApiTemplate): @@ -84,10 +80,12 @@ class Api(ApiTemplate):
84 80
85 try: 81 try:
86 82
  83 + task_writer = TaskWriter(task_guid)
  84 + pid = multiprocessing.current_process().pid
  85 + task_writer.update_task({"task_pid":pid})
87 #任务控制,等待执行 86 #任务控制,等待执行
88 87
89 TaskController.wait(task_guid) 88 TaskController.wait(task_guid)
90 - task_writer = TaskWriter(task_guid)  
91 89
92 task_writer.update_task({"state":2,"update_time":datetime.datetime.now(),"process" : "下载中"}) 90 task_writer.update_task({"state":2,"update_time":datetime.datetime.now(),"process" : "下载中"})
93 task_writer.update_process("开始下载...") 91 task_writer.update_process("开始下载...")
@@ -101,9 +101,6 @@ class Api(ApiTemplate): @@ -101,9 +101,6 @@ class Api(ApiTemplate):
101 entry_thread = multiprocessing.Process(target=self.entry,args=(self.para.get("task_guid"),)) 101 entry_thread = multiprocessing.Process(target=self.entry,args=(self.para.get("task_guid"),))
102 entry_thread.start() 102 entry_thread.start()
103 103
104 - Task.query.filter_by(guid=task_guid).update({"task_pid": entry_thread.pid})  
105 - db.session.commit()  
106 -  
107 res["result"] = True 104 res["result"] = True
108 res["msg"] = "数据录入提交成功!" 105 res["msg"] = "数据录入提交成功!"
109 res["data"] = self.para["task_guid"] 106 res["data"] = self.para["task_guid"]
@@ -116,11 +113,13 @@ class Api(ApiTemplate): @@ -116,11 +113,13 @@ class Api(ApiTemplate):
116 task_writer = None 113 task_writer = None
117 this_task_layer = [] 114 this_task_layer = []
118 try: 115 try:
  116 + task_writer = TaskWriter(task_guid)
  117 + pid = multiprocessing.current_process().pid
  118 + task_writer.update_task({"task_pid": pid})
119 119
120 #任务控制,等待执行 120 #任务控制,等待执行
121 - TaskController.wait(task_guid)  
122 121
123 - task_writer = TaskWriter(task_guid) 122 + TaskController.wait(task_guid)
124 123
125 task:Task = task_writer.session.query(Task).filter_by(guid=task_guid).one_or_none() 124 task:Task = task_writer.session.query(Task).filter_by(guid=task_guid).one_or_none()
126 parameter = json.loads(task.parameter) 125 parameter = json.loads(task.parameter)
@@ -43,8 +43,7 @@ class Api(ApiTemplate): @@ -43,8 +43,7 @@ class Api(ApiTemplate):
43 creator=self.para.get("creator"), 43 creator=self.para.get("creator"),
44 file_name=None, 44 file_name=None,
45 database_guid=database.guid, 45 database_guid=database.guid,
46 - process="数据库更新中",  
47 - # task_pid=refresh_process.pid 46 + process="数据库更新中"
48 ) 47 )
49 48
50 db.session.add(task) 49 db.session.add(task)
@@ -53,13 +52,12 @@ class Api(ApiTemplate): @@ -53,13 +52,12 @@ class Api(ApiTemplate):
53 refresh_process = multiprocessing.Process(target=self.table_refresh,args=(database,task_guid,self.para.get("creator"))) 52 refresh_process = multiprocessing.Process(target=self.table_refresh,args=(database,task_guid,self.para.get("creator")))
54 refresh_process.start() 53 refresh_process.start()
55 54
56 - Task.query.filter_by(guid=task_guid).update({"task_pid":refresh_process.pid})  
57 - db.session.commit()  
58 55
59 res["msg"] = "数据库更新已提交!" 56 res["msg"] = "数据库更新已提交!"
60 res["data"] = task_guid 57 res["data"] = task_guid
61 res["result"] = True 58 res["result"] = True
62 59
  60 +
63 except Exception as e: 61 except Exception as e:
64 print(traceback.format_exc()) 62 print(traceback.format_exc())
65 raise e 63 raise e
@@ -69,24 +67,24 @@ class Api(ApiTemplate): @@ -69,24 +67,24 @@ class Api(ApiTemplate):
69 def table_refresh(self,database,task_guid,creator): 67 def table_refresh(self,database,task_guid,creator):
70 68
71 pg_ds =None 69 pg_ds =None
72 - sys_ds =None 70 + # sys_ds =None
73 data_session=None 71 data_session=None
74 result = {} 72 result = {}
75 sys_session = None 73 sys_session = None
76 task_writer = None 74 task_writer = None
77 75
78 -  
79 db_tuple = PGUtil.get_info_from_sqlachemy_uri(DES.decode(database.sqlalchemy_uri)) 76 db_tuple = PGUtil.get_info_from_sqlachemy_uri(DES.decode(database.sqlalchemy_uri))
80 77
81 try: 78 try:
82 -  
83 - #任务控制,等待执行  
84 - TaskController.wait(task_guid)  
85 task_writer = TaskWriter(task_guid) 79 task_writer = TaskWriter(task_guid)
  80 + pid = multiprocessing.current_process().pid
  81 + task_writer.update_task({"task_pid":pid})
86 82
  83 + # 任务控制,等待执行
  84 + TaskController.wait(task_guid)
87 85
88 sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI) 86 sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
89 - sys_ds = PGUtil.open_pg_data_source(0,configure.SQLALCHEMY_DATABASE_URI) 87 + # sys_ds = PGUtil.open_pg_data_source(0,configure.SQLALCHEMY_DATABASE_URI)
90 # 实体库连接 88 # 实体库连接
91 data_session: Session = PGUtil.get_db_session(DES.decode(database.sqlalchemy_uri)) 89 data_session: Session = PGUtil.get_db_session(DES.decode(database.sqlalchemy_uri))
92 90
@@ -113,7 +111,6 @@ class Api(ApiTemplate): @@ -113,7 +111,6 @@ class Api(ApiTemplate):
113 # 空间表处理完毕 111 # 空间表处理完毕
114 sys_session.commit() 112 sys_session.commit()
115 113
116 -  
117 # 注册普通表 114 # 注册普通表
118 115
119 116
@@ -173,8 +170,8 @@ class Api(ApiTemplate): @@ -173,8 +170,8 @@ class Api(ApiTemplate):
173 data_session.close() 170 data_session.close()
174 if sys_session: 171 if sys_session:
175 sys_session.close() 172 sys_session.close()
176 - if sys_ds:  
177 - sys_ds.Destroy() 173 + # if sys_ds:
  174 + # sys_ds.Destroy()
178 return result 175 return result
179 176
180 def add_spatail_table(self,database,pg_ds,data_session,sys_session,spatial_tables_names,this_time,db_tuple,creator): 177 def add_spatail_table(self,database,pg_ds,data_session,sys_session,spatial_tables_names,this_time,db_tuple,creator):
@@ -92,9 +92,6 @@ class Api(ApiTemplate): @@ -92,9 +92,6 @@ class Api(ApiTemplate):
92 vacuate_process = multiprocessing.Process(target=self.task,args=(table,task_guid)) 92 vacuate_process = multiprocessing.Process(target=self.task,args=(table,task_guid))
93 vacuate_process.start() 93 vacuate_process.start()
94 94
95 - Task.query.filter_by(guid=task_guid).update({"task_pid": vacuate_process.pid})  
96 - db.session.commit()  
97 -  
98 res["msg"] = "矢量金字塔构建已提交!" 95 res["msg"] = "矢量金字塔构建已提交!"
99 res["data"] = task_guid 96 res["data"] = task_guid
100 res["result"] = True 97 res["result"] = True
@@ -114,11 +111,13 @@ class Api(ApiTemplate): @@ -114,11 +111,13 @@ class Api(ApiTemplate):
114 vacuate_process = None 111 vacuate_process = None
115 try: 112 try:
116 113
  114 + task_writer = TaskWriter(task_guid)
  115 + pid = multiprocessing.current_process().pid
  116 + task_writer.update_task({"task_pid":pid})
  117 +
117 #任务控制,等待执行 118 #任务控制,等待执行
118 TaskController.wait(task_guid) 119 TaskController.wait(task_guid)
119 120
120 - task_writer = TaskWriter(task_guid)  
121 -  
122 task_writer.update_table(table.guid,{"is_vacuate": 2, "update_time": datetime.datetime.now()}) 121 task_writer.update_table(table.guid,{"is_vacuate": 2, "update_time": datetime.datetime.now()})
123 task_writer.update_task({"state":2,"process":"构建中"}) 122 task_writer.update_task({"state":2,"process":"构建中"})
124 task_writer.update_process("开始构建...") 123 task_writer.update_process("开始构建...")
@@ -98,9 +98,6 @@ class Api(ApiTemplate): @@ -98,9 +98,6 @@ class Api(ApiTemplate):
98 vacuate_process = multiprocessing.Process(target=self.task,args=(table,task_guid,grids)) 98 vacuate_process = multiprocessing.Process(target=self.task,args=(table,task_guid,grids))
99 vacuate_process.start() 99 vacuate_process.start()
100 100
101 - Task.query.filter_by(guid=task_guid).update({"task_pid": vacuate_process.pid})  
102 - db.session.commit()  
103 -  
104 res["msg"] = "矢量金字塔构建已提交!" 101 res["msg"] = "矢量金字塔构建已提交!"
105 res["data"] = task_guid 102 res["data"] = task_guid
106 res["result"] = True 103 res["result"] = True
@@ -122,9 +119,13 @@ class Api(ApiTemplate): @@ -122,9 +119,13 @@ class Api(ApiTemplate):
122 119
123 try: 120 try:
124 121
  122 +
  123 + task_writer = TaskWriter(task_guid)
  124 + pid = multiprocessing.current_process().pid
  125 + task_writer.update_task({"task_pid":pid})
  126 +
125 #任务控制,等待执行 127 #任务控制,等待执行
126 TaskController.wait(task_guid) 128 TaskController.wait(task_guid)
127 - task_writer = TaskWriter(task_guid)  
128 129
129 task_writer.update_table(table.guid,{"is_vacuate": 2, "update_time": datetime.datetime.now()}) 130 task_writer.update_table(table.guid,{"is_vacuate": 2, "update_time": datetime.datetime.now()})
130 task_writer.update_task({"state":2,"process":"构建中"}) 131 task_writer.update_task({"state":2,"process":"构建中"})
注册登录 后发表评论