data_entry_simple.py
3.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# coding=utf-8
#author: 4N
#createtime: 2021/1/27
#email: nheweijun@sina.com
from osgeo.ogr import *
import uuid
import time
from ..models import *
import datetime
import json
from app.util.component.ApiTemplate import ApiTemplate
from .get_meta import Api as MetaApi
from threading import Thread
from .util.EntryDataVacuate import EntryDataVacuate
class Api(ApiTemplate):
api_name = "Simple入库"
def process(self):
data_path = self.para.get("data_path")
database_guid = self.para.get("database_guid")
is_task = self.para.get("is_task")
self.para["overwrite"] = "yes"
self.para["task_guid"] = uuid.uuid1().__str__()
self.para["task_time"] = time.time()
self.para["task_name"] = "入库测试接口"
self.para["creator"] = "4N"
#返回结果
res={}
try:
meta_api = MetaApi()
meta_api.para["data_path"] = data_path
meta_list = json.loads(meta_api.process())["data"]
if is_task:
task = Task(guid=self.para.get("task_guid"),
name=self.para.get("task_name"),
create_time=datetime.datetime.now(),
state=0,
task_type=1,
creator=self.para.get("creator"),
file_name=meta_list[0].get("filename"),
database_guid=self.para.get("database_guid"),
catalog_guid=self.para.get("catalog_guid"),
process="等待入库",
parameter=json.dumps(self.para))
db.session.add(task)
db.session.commit()
res["result"] = True
res["msg"] = "数据录入提交成功!"
res["data"] = self.para["task_guid"]
else:
start = time.time()
self.para["meta"] = meta_list
task = Task(guid=self.para.get("task_guid"),
name=self.para.get("task_name"),
create_time=datetime.datetime.now(),
state=0,
task_type=-1,
creator=self.para.get("creator"),
file_name=meta_list[0].get("filename"),
database_guid=self.para.get("database_guid"),
catalog_guid=self.para.get("catalog_guid"),
process="等待入库",
parameter=json.dumps(self.para))
db.session.add(task)
db.session.commit()
entry_data_thread = Thread(
target=EntryDataVacuate().entry, args=(self.para,))
entry_data_thread.start()
entry_data_thread.join()
res["result"] = True
res["msg"] = "数据入库成功!"
res["data"] = "耗时{}秒".format(time.time()-start)
except Exception as e:
raise e
return res
api_doc={
"tags":["IO接口"],
"parameters":[
{"name": "data_path",
"in": "formData",
"type": "string",
"description": "服务器数据路径"},
{"name": "database_guid",
"in": "formData",
"type": "string",
"description": "数据库guid"},
{"name": "is_task",
"in": "formData",
"type": "boolean",
"description": "是否形成任务"}
],
"responses":{
200:{
"schema":{
"properties":{
}
}
}
}
}