__init__.py
8.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# coding=utf-8
import datetime
import importlib
import inspect
import json
import pkgutil
from osgeo import ogr
import configure
import math
from flask import request
import zipfile
import os
from flask import current_app
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker,session
# 蓝图的父类,用作标识蓝图
class BlueprintApi():
pass
# 递归查找某个模块下特定父类的子类
def find_class(modname, class_name):
module = importlib.import_module(modname)
path = getattr(module, "__path__", None)
basename = module.__name__ + "."
for _importer, modname, ispkg in pkgutil.iter_modules(path):
modname = basename + modname
if ispkg:
for name, obj in inspect.getmembers(importlib.import_module(modname), inspect.isclass):
if obj.__base__ == class_name:
yield obj
for i in find_class(modname, class_name):
yield i
#
#
# # 从sqlachemy_uri解析出相关信息
# def get_info_from_sqlachemy_uri(uri):
# parts = uri.split(":")
# user = parts[1][2:]
#
# password_list = parts[2].split("@")
# if password_list.__len__()>2:
# password = "@".join(password_list[:-1])
# else:
# password = parts[2].split("@")[0]
# host = parts[2].split("@")[-1]
# port = parts[3].split("/")[0]
# database = parts[3].split("/")[1]
# return user, password, host, port, database
#
#
# # 获取请求参数
# def get_parameter():
# parameter = {}
# parameter.update(request.args.items())
# parameter.update(request.form.items())
# try:
# request_json = request.get_json()
# if json:
# parameter.update(request_json)
# except:
# pass
# for key in parameter.keys():
# if parameter.get(key) == "":
# parameter[key] = None
# return parameter
#
#
# def togeojson(dict):
# geojson = {}
# geojson["type"] = "Feature"
# geojson["geometry"] = json.loads(dict.pop("geojson"))
# del dict["geojson"]
# if dict.get("wkb_geometry"):
# del dict["wkb_geometry"]
# geojson["properties"] = dict
# return geojson
#
#
# def to_feature_collection(result):
# geojsonresult = [togeojson(x) for x in result]
# data = {}
# data["type"] = "FeatureCollection"
# data["features"] = geojsonresult
# return data
#
#
# def getType(layer):
# origin = layer.GetGeomType() # type:str
# if origin == 1 or origin == 4:
# return "point"
# elif origin == 2 or origin == 5:
# return "linestring"
# elif origin == 3 or origin == 6:
# return "polygon"
# return str(origin)
#
#
# def objects_to_jsonarray(object):
# result = []
# for o in object:
# result.append(object_to_json(o))
# return result
#
#
# def object_to_json(object):
# info = {}
# if object:
# info = object.__dict__
# try:
# del info['_sa_instance_state']
# except:
# pass
# for key in info:
# if isinstance(info[key], datetime.datetime):
# info[key] = info[key].strftime('%Y-%m-%d %H:%M:%S')
# if isinstance(info[key], datetime.time):
# info[key] = info[key].strftime('%H:%M:%S')
# return info
#
#
# def create_zip(path, filepaths: list):
# """
# 创建ZIP文件 并写入文件 (支持大文件夹)
# :param path: 创建的ZIP文件路径
# :param filepaths:
# :return:
# """
# try:
# with zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED) as zip:
# for filepath in filepaths:
# if os.path.isdir(filepath):
# pre_len = len(os.path.dirname(filepath))
# for parent, dirnames, filenames in os.walk(filepath):
# for filename in filenames:
# pathfile = os.path.join(parent, filename)
# if pathfile != path: # 避免将新创建的zip写入该zip文件
# arcname = pathfile[pre_len:].strip(os.path.sep) # 相对路径 不进行则会生成目录树
# zip.write(pathfile, arcname)
# elif os.path.isfile(filepath):
# zip.write(filepath, os.path.basename(filepath))
# else:
# print("创建zip文件对象失败!原因:未识别到路径")
# return "创建zip文件对象失败!原因:未识别到路径"
# return True
#
# except Exception as e:
# print("创建zip文件对象失败!原因:%s" % str(e))
# return "创建zip文件对象失败!原因:%s" % str(e)
#
# def unzip(store_file):
# #处理zip
# store_path = os.path.dirname(store_file)
# zip_file = zipfile.ZipFile(store_file, 'r')
#
# #是否要重命名文件夹
# rename_path = False
# rename_path_origin=None
# rename_path_after =None
# for names in zip_file.namelist():
# #zipfile解压中文文件会有编码问题,需要解压后重命名
# try:
# name_t = names.encode('cp437').decode('gbk')
# except:
# name_t = names.encode('utf-8').decode('utf-8')
#
# zip_file.extract(names,store_path,)
#
# os.chdir(store_path) # 切换到目标目录
#
# if names.__contains__("/"):
# pat = names.split("/")
# pat2 = name_t.split("/")[-1]
# pat[-1] = pat2
# name_tt = "/".join(pat)
# os.rename(names, name_tt)
# if not names[-1].__eq__("/"):
# rename_path = True
# rename_path_origin = names.split("/")[:-1]
# rename_path_after = name_t.split("/")[:-1]
# else:
#
# os.rename(names, name_t) # 重命名文件
# # 重命名文件夹
# if rename_path:
# for i in range(len(rename_path_origin),0,-1):
# origin = "/".join(rename_path_origin[:i])
# rename_path_origin[i-1] = rename_path_after[i-1]
# after = "/".join(rename_path_origin[:i])
# os.rename(origin, after)
#
# os.chdir(os.path.dirname(store_path))
# zip_file.close()
#
#
#
# file_for_open = None
# encoding = None
# for root, dirs, files in os.walk(store_path):
# for fn in files:
# if fn.endswith("shp"):
# file_for_open = os.path.join(root, fn)
# if fn.endswith("cpg"):
# with open(os.path.join(root, fn)) as fd:
# encoding = fd.readline().strip()
# if fn.endswith("gdb"):
# print(fn)
# file_for_open =root
#
# return file_for_open,encoding
#
# def open_pg_data_source(iswrite,uri=configure.SQLALCHEMY_DATABASE_URI):
# """
# # 获取PostGIS数据源
# :return:
# """
# db_conn_tuple = get_info_from_sqlachemy_uri(uri)
# fn = "PG: user=%s password=%s host=%s port=%s dbname=%s " % db_conn_tuple
# driver = ogr.GetDriverByName("PostgreSQL")
# if driver is None:
# raise Exception("打开PostgreSQL驱动失败,可能是当前GDAL未支持PostgreSQL驱动!")
# ds = driver.Open(fn, iswrite)
# if ds is None:
# raise Exception("打开数据源失败!")
# return ds
#
#
# def is_chinese(string):
# """
# 检查整个字符串是否包含中文
# :param string: 需要检查的字符串
# :return: bool
# """
# for ch in string:
# if u'\u4e00' <= ch <= u'\u9fff':
# return True
#
# return False
#
#
# def get_db_session(db_url=configure.SQLALCHEMY_DATABASE_URI,autocommit=False) -> session:
# engine = create_engine(db_url)
# system_session = sessionmaker(bind=engine,autocommit=autocommit)()
# return system_session
#
#
# class Queue:
# def __init__(self):
# self.items = []
# def init(self,items):
# self.items = items
# def empty(self):
# return self.items == []
# def append(self, item):
# self.items.insert(0,item)
# def pop(self):
# return self.items.pop()
# def size(self):
# return len(self.items)
# def __str__(self):
# return self.items.__str__()
#
# def structured_print(mes,type="info"):
# message = "[{}] {} {}".format(type.upper(),datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), mes)
# print(message)
#
#
#
# if __name__ == '__main__':
# print(get_info_from_sqlachemy_uri("postgresql://postgres:CDchinadci@2020@10.48.0.50:5432/dcidms"))