__init__.py 8.5 KB
# coding=utf-8

import datetime
import importlib
import inspect
import json
import pkgutil

from osgeo import ogr
import configure
import math
from flask import request
import zipfile
import os
from flask import current_app
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker,session

# 蓝图的父类,用作标识蓝图
class BlueprintApi():
    pass

# 递归查找某个模块下特定父类的子类
def find_class(modname, class_name):
    module = importlib.import_module(modname)
    path = getattr(module, "__path__", None)
    basename = module.__name__ + "."

    for _importer, modname, ispkg in pkgutil.iter_modules(path):
        modname = basename + modname
        if ispkg:
            for name, obj in inspect.getmembers(importlib.import_module(modname), inspect.isclass):
                if obj.__base__ == class_name:
                    yield obj
            for i in find_class(modname, class_name):
                yield i

#
#
# # 从sqlachemy_uri解析出相关信息
# def get_info_from_sqlachemy_uri(uri):
#     parts = uri.split(":")
#     user = parts[1][2:]
#
#     password_list = parts[2].split("@")
#     if password_list.__len__()>2:
#         password = "@".join(password_list[:-1])
#     else:
#         password = parts[2].split("@")[0]
#     host = parts[2].split("@")[-1]
#     port = parts[3].split("/")[0]
#     database = parts[3].split("/")[1]
#     return user, password, host, port, database
#
#
# # 获取请求参数
# def get_parameter():
#     parameter = {}
#     parameter.update(request.args.items())
#     parameter.update(request.form.items())
#     try:
#         request_json = request.get_json()
#         if json:
#             parameter.update(request_json)
#     except:
#         pass
#     for key in parameter.keys():
#         if parameter.get(key) == "":
#             parameter[key] = None
#     return parameter
#
#
# def togeojson(dict):
#     geojson = {}
#     geojson["type"] = "Feature"
#     geojson["geometry"] = json.loads(dict.pop("geojson"))
#     del dict["geojson"]
#     if dict.get("wkb_geometry"):
#         del dict["wkb_geometry"]
#     geojson["properties"] = dict
#     return geojson
#
#
# def to_feature_collection(result):
#     geojsonresult = [togeojson(x) for x in result]
#     data = {}
#     data["type"] = "FeatureCollection"
#     data["features"] = geojsonresult
#     return data
#
#
# def getType(layer):
#     origin = layer.GetGeomType()  # type:str
#     if origin == 1 or origin == 4:
#         return "point"
#     elif origin == 2 or origin == 5:
#         return "linestring"
#     elif origin == 3 or origin == 6:
#         return "polygon"
#     return str(origin)
#
#
# def objects_to_jsonarray(object):
#     result = []
#     for o in object:
#         result.append(object_to_json(o))
#     return result
#
#
# def object_to_json(object):
#     info = {}
#     if object:
#         info = object.__dict__
#         try:
#             del info['_sa_instance_state']
#         except:
#             pass
#         for key in info:
#             if isinstance(info[key], datetime.datetime):
#                 info[key] = info[key].strftime('%Y-%m-%d %H:%M:%S')
#             if isinstance(info[key], datetime.time):
#                 info[key] = info[key].strftime('%H:%M:%S')
#     return info
#
#
# def create_zip(path, filepaths: list):
#     """
#     创建ZIP文件  并写入文件  (支持大文件夹)
#     :param path:   创建的ZIP文件路径
#     :param filepaths:
#     :return:
#     """
#     try:
#         with zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED) as zip:
#             for filepath in filepaths:
#                 if os.path.isdir(filepath):
#                     pre_len = len(os.path.dirname(filepath))
#                     for parent, dirnames, filenames in os.walk(filepath):
#                         for filename in filenames:
#                             pathfile = os.path.join(parent, filename)
#                             if pathfile != path:  # 避免将新创建的zip写入该zip文件
#                                 arcname = pathfile[pre_len:].strip(os.path.sep)  # 相对路径  不进行则会生成目录树
#                                 zip.write(pathfile, arcname)
#                 elif os.path.isfile(filepath):
#                     zip.write(filepath, os.path.basename(filepath))
#                 else:
#                     print("创建zip文件对象失败!原因:未识别到路径")
#                     return "创建zip文件对象失败!原因:未识别到路径"
#             return True
#
#     except Exception as e:
#         print("创建zip文件对象失败!原因:%s" % str(e))
#         return "创建zip文件对象失败!原因:%s" % str(e)
#
# def unzip(store_file):
#     #处理zip
#     store_path = os.path.dirname(store_file)
#     zip_file = zipfile.ZipFile(store_file, 'r')
#
#     #是否要重命名文件夹
#     rename_path = False
#     rename_path_origin=None
#     rename_path_after =None
#     for names in zip_file.namelist():
#         #zipfile解压中文文件会有编码问题,需要解压后重命名
#         try:
#             name_t = names.encode('cp437').decode('gbk')
#         except:
#             name_t = names.encode('utf-8').decode('utf-8')
#
#         zip_file.extract(names,store_path,)
#
#         os.chdir(store_path)  # 切换到目标目录
#
#         if names.__contains__("/"):
#             pat = names.split("/")
#             pat2 = name_t.split("/")[-1]
#             pat[-1] = pat2
#             name_tt = "/".join(pat)
#             os.rename(names, name_tt)
#             if not names[-1].__eq__("/"):
#                 rename_path = True
#                 rename_path_origin = names.split("/")[:-1]
#                 rename_path_after = name_t.split("/")[:-1]
#         else:
#
#             os.rename(names, name_t)  # 重命名文件
#     # 重命名文件夹
#     if rename_path:
#         for i in range(len(rename_path_origin),0,-1):
#             origin = "/".join(rename_path_origin[:i])
#             rename_path_origin[i-1] = rename_path_after[i-1]
#             after = "/".join(rename_path_origin[:i])
#             os.rename(origin, after)
#
#     os.chdir(os.path.dirname(store_path))
#     zip_file.close()
#
#
#
#     file_for_open = None
#     encoding = None
#     for root, dirs, files in os.walk(store_path):
#         for fn in files:
#             if fn.endswith("shp"):
#                 file_for_open = os.path.join(root, fn)
#             if fn.endswith("cpg"):
#                 with open(os.path.join(root, fn)) as fd:
#                     encoding = fd.readline().strip()
#             if fn.endswith("gdb"):
#                 print(fn)
#                 file_for_open =root
#
#     return file_for_open,encoding
#
# def open_pg_data_source(iswrite,uri=configure.SQLALCHEMY_DATABASE_URI):
#     """
#     # 获取PostGIS数据源
#     :return:
#     """
#     db_conn_tuple = get_info_from_sqlachemy_uri(uri)
#     fn = "PG: user=%s password=%s host=%s port=%s dbname=%s " % db_conn_tuple
#     driver = ogr.GetDriverByName("PostgreSQL")
#     if driver is None:
#         raise Exception("打开PostgreSQL驱动失败,可能是当前GDAL未支持PostgreSQL驱动!")
#     ds = driver.Open(fn, iswrite)
#     if ds is None:
#         raise Exception("打开数据源失败!")
#     return ds
#
#
# def is_chinese(string):
#     """
#     检查整个字符串是否包含中文
#     :param string: 需要检查的字符串
#     :return: bool
#     """
#     for ch in string:
#         if u'\u4e00' <= ch <= u'\u9fff':
#             return True
#
#     return False
#
#
# def get_db_session(db_url=configure.SQLALCHEMY_DATABASE_URI,autocommit=False) -> session:
#     engine = create_engine(db_url)
#     system_session = sessionmaker(bind=engine,autocommit=autocommit)()
#     return system_session
#
#
# class Queue:
#     def __init__(self):
#         self.items = []
#     def init(self,items):
#         self.items = items
#     def empty(self):
#         return self.items == []
#     def append(self, item):
#         self.items.insert(0,item)
#     def pop(self):
#         return self.items.pop()
#     def size(self):
#         return len(self.items)
#     def __str__(self):
#         return self.items.__str__()
#
# def structured_print(mes,type="info"):
#     message = "[{}] {} {}".format(type.upper(),datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), mes)
#     print(message)
#
#
#
# if __name__ == '__main__':
#     print(get_info_from_sqlachemy_uri("postgresql://postgres:CDchinadci@2020@10.48.0.50:5432/dcidms"))