data_download_task.py 8.9 KB
# coding=utf-8
#author:        4N
#createtime:    2020/11/27
#email:         nheweijun@sina.com


from ..models import *

import traceback

from osgeo.ogr import DataSource,Layer,FeatureDefn,FieldDefn,Feature
from osgeo import gdal,ogr
import os
import uuid
import configure
from app.util.component.ApiTemplate import ApiTemplate
from app.util.component.PGUtil import PGUtil
from app.util.component.ZipUtil import ZipUtil
import multiprocessing
import datetime

class Api(ApiTemplate):

    def process(self):

        res = {}
        #获取参数
        try:

            task_guid = uuid.uuid1().__str__()
            download_process = multiprocessing.Process(target=self.download, args=(task_guid,self.para))

            task = Task(guid=task_guid,
                        name="{}下载".format(self.para.get("table_name")),
                        create_time=datetime.datetime.now(),
                        state=0,
                        task_type=4,
                        creator=self.para.get("creator"),
                        file_name=None,
                        process="数据下载中",
                        database_guid=self.para.get("database_guid"))

            db.session.add(task)
            db.session.commit()
            download_process.start()

            res["data"] = "下载任务已提交!"
            res["state"] = True

        except Exception as e:
            raise e
        return res


    def download(self,task_guid,para):

        sys_session = None
        ds: DataSource = None

        # 设置编码
        encoding = para.get("encoding")
        if encoding:
            gdal.SetConfigOption("SHAPE_ENCODING", encoding)
        else:
            gdal.SetConfigOption("SHAPE_ENCODING", "UTF-8")

        try:

            sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)

            table_names = para.get("table_name").split(",")
            database_guid = para.get("database_guid")
            database = sys_session.query(Database).filter_by(guid=database_guid).one_or_none()
            if not database:
                raise Exception("数据库不存在!")

            ds: DataSource = PGUtil.open_pg_data_source(0, DES.decode(database.sqlalchemy_uri))

            download_type = para.get("download_type")

            data = None
            if download_type.__eq__("shp"):
                data = self.download_shp(table_names, ds)
            if download_type.__eq__("gdb"):
                data = self.download_gdb(sys_session,table_names, ds, database_guid)

            sys_session.query(Task).filter_by(guid=task_guid).update({"state":1,"update_time":datetime.datetime.now(),
                                                                      "process" : "下载完成",
                                                                      "parameter":data[0]["download_url"]})
            sys_session.commit()


        except Exception as e:
            try:
                sys_session.query(Task).filter_by(guid=task_guid).update({"state": -1,"update_time":datetime.datetime.now(),
                                                                          "process": "下载失败"})

                message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())
                task_process_guid = uuid.uuid1().__str__()
                task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),
                                       task_guid=task_guid)
                sys_session.add(task_process)
                sys_session.commit()
            except Exception as ee:
                print(traceback.format_exc())
            raise e
        finally:
            try:
                if ds:
                    ds.Destroy()
                if sys_session:
                    sys_session.close()
            except:
                print(traceback.format_exc())
    
    
    def download_shp(self,table_names,ds):
        data = []
        for table_name in table_names:
            url = self.download_one(ds, table_name)
            data.append({"name": table_name, "download_url": url})
        return data
    
    def download_one(self,ds,table_name):
    
        layer: Layer = ds.GetLayerByName(table_name)
        driver = ogr.GetDriverByName("ESRI Shapefile")
        uuid_ = uuid.uuid1().__str__()
        parent = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
        dirpath = os.path.join(parent, "file_tmp", uuid_)
        os.makedirs(dirpath)
        data_source: DataSource = driver.CreateDataSource(dirpath + "/{}.shp".format(table_name))
        # data_source.CopyLayer(layer, table_name)

        fid = layer.GetFIDColumn()
        pg_layer: Layer = data_source.CreateLayer(table_name, layer.GetSpatialRef(), layer.GetGeomType())
        schema = [sche for sche in layer.schema if not sche.name.__eq__(fid)]

        pg_layer.CreateFields(schema)
        layer.ResetReading()
        for feature in layer:
            pg_layer.CreateFeature(feature)

        data_source.Destroy()


        ZipUtil.create_zip(os.path.join(parent, "file_tmp", table_name+"_"+uuid_) + ".zip", [dirpath])

        return "http://" + configure.deploy_ip_host + "/API/IO/Download/{}".format(table_name+"_"+uuid_ + ".zip")
    
    
    def download_gdb(self,sys_session,table_names,ds,database_guid):
        ogr.RegisterAll()
        data = []
        gdal.UseExceptions()
        gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
    
        # 创建一个gdb datasource
        gdb_driver = ogr.GetDriverByName('FileGDB')
        uuid_ = uuid.uuid1().__str__()
        parent = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
        gdb_path = os.path.join(parent, "file_tmp", uuid_+".gdb")

        gdb_ds: DataSource = gdb_driver.CreateDataSource(gdb_path)
    
    
        for table_name in table_names:
    
            layer: Layer = ds.GetLayerByName(table_name)
            table = sys_session.query(Table).filter_by(name=table_name, database_guid=database_guid).one_or_none()
            feature_defn: FeatureDefn = layer.GetLayerDefn()
    
            for i in range(feature_defn.GetFieldCount()):
                field_defn:FieldDefn = feature_defn.GetFieldDefn(i)
                field_alias = sys_session.query(Columns).filter_by(table_guid=table.guid,name=field_defn.GetName()).one_or_none().alias
                field_defn.SetAlternativeName(field_alias)
    
            table_alias= table.alias
    
            # if is_chinese(table_name):
            #     if not table_alias:
            #         table_alias = table_name
            #     table_name = "table{}".format(table_name.__hash__())

            fid = layer.GetFIDColumn()
            pg_layer: Layer = gdb_ds.CreateLayer(table_name, layer.GetSpatialRef(), layer.GetGeomType(),["LAYER_ALIAS={}".format(table_alias)])
            schema = [sche for sche in layer.schema if not sche.name.__eq__(fid)]
            # schema = layer.schema
            pg_layer.CreateFields(schema)


            # gdb 不支持fid=0的要素,所以识别到后要+1
            offset = 0
            f1:Feature =  layer.GetNextFeature()
            if f1:
                if f1.GetFID().__eq__(0):
                    offset = 1
            layer.ResetReading()
            for feature in layer:
                feature.SetFID(feature.GetFID()+offset)
                pg_layer.CreateFeature(feature)


            # gdb_ds.CopyLayer(layer, table_name,["LAYER_ALIAS={}".format(table_alias)])
    
        gdb_ds.Destroy()
        ZipUtil.create_zip(gdb_path + ".zip", [gdb_path])
        data.append({"name": ",".join(table_names), "download_url": "http://" + configure.deploy_ip_host + "/API/IO/Download/{}".format(uuid_+".gdb" + ".zip")})

    
        return data
    
    


    api_doc={
    "tags":["IO接口"],
    "description":"下载数据",
    "parameters":[
        {"name": "table_name",
         "in": "formData",
         "type":"string","description":"支持多图层下载,以逗号相隔","required":"true"},
        {"name": "encoding",
         "in": "formData",
         "type": "string",
         "enum":["GBK","UTF-8"]},
        {"name": "download_type",
        "in": "formData",
        "type": "string",
        "enum": ["shp", "gdb"],"required":"true"
        },
        {"name": "database_guid",
         "in": "formData",
         "type": "string","required":"true"
         },
        {"name": "creator",
         "in": "formData",
         "type": "string"
         }
    ],
    "responses":{
        200:{
            "schema":{
                "properties":{
                    "content":{
                        "type": "string",
                        "description": "The name of the user"
                    }
                }
            }
            }
        }
}