table_vacuate_one.py 13.6 KB
# author:        4N
# createtime:    2021/1/27
# email:         nheweijun@sina.com


import datetime
import traceback
from app.models import Table, Database, DES,Task,db,TableVacuate
from app.util.component.ApiTemplate import ApiTemplate
from app.util.component.PGUtil import PGUtil
from app.util.component.EntryDataVacuate import Process
import multiprocessing
import uuid
import configure
from osgeo.ogr import DataSource,Layer,Geometry
from osgeo import ogr
from app.util.component.StructuredPrint import StructurePrint
from app.util.component.VacuateConf import VacuateConf

class Api(ApiTemplate):
    api_name = "单独抽稀"
    def process(self):

        res = {}
        res["data"] = {}
        db_session = None
        try:
            table_guid = self.para.get("guid")
            grids = []
            if not self.para.get("grids"):
                raise Exception("请输入grids参数!")
            else:
                grids = [float(x) for x in self.para.get("grids").split(",")]
                grids.sort()

            table: Table = Table.query.filter_by(guid=table_guid).one_or_none()

            if not table:
                raise Exception("数据不存在!")

            # 判断图层是否存在

            pg_ds :DataSource= PGUtil.open_pg_data_source(0,DES.decode(table.relate_database.sqlalchemy_uri))
            layer = pg_ds.GetLayerByName(table.name)

            # 判断用户权限
            db_tuple = PGUtil.get_info_from_sqlachemy_uri(DES.decode(table.relate_database.sqlalchemy_uri))
            if not PGUtil.check_table_privilege(table.name,"SELECT",db_tuple[0],pg_ds):
                raise  Exception("用户{}对表{}没有select权限!".format(db_tuple[0],table.name))

            if not layer:
                raise Exception("图层不存在!")
            if pg_ds:
                pg_ds.Destroy()



            if Task.query.filter_by(table_guid=table_guid,state=0).one_or_none():
                res["result"] = False
                res["msg"] = "数据精化中!"
                return res
            if table.table_type==0:
                res["result"] = False
                res["msg"] = "非空间表!"
                return res

            # if table.is_vacuate==1:
            #     res["state"] = -1
            #     res["message"] = "已精化!"
            #     return res

            # 初始化task
            task_guid = uuid.uuid1().__str__()

            vacuate_process = multiprocessing.Process(target=self.task,args=(table,task_guid,grids))
            vacuate_process.start()


            task = Task(guid=task_guid,
                        name="{}精化,网格大小:{}".format(table.name,self.para.get("grids")),
                        table_guid=table_guid,
                        create_time=datetime.datetime.now(),
                        state=0,
                        task_type=2,
                        creator=self.para.get("creator"),
                        file_name=None,
                        database_guid=table.database_guid,
                        process="精化中",
                        parameter=self.para.get("grids"))

            db.session.add(task)
            db.session.commit()
            res["msg"] = "图层抽稀已提交!"
            res["data"] = task_guid
            res["result"] = True

        except Exception as e:
            raise e
        finally:
            if db_session:
                db_session.close()
        return res

    def task(self,table,task_guid,grids):

        sys_session = None
        pg_session = None
        pg_ds = None
        vacuate_process = None
        origin_vacuate = table.is_vacuate

        try:
            sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
            sys_session.query(Table).filter_by(guid=table.guid).update(
                {"is_vacuate": 2, "update_time": datetime.datetime.now()})
            sys_session.commit()


            database = sys_session.query(Database).filter_by(guid=table.database_guid).one_or_none()
            pg_session = PGUtil.get_db_session(DES.decode(database.sqlalchemy_uri))

            pg_ds :DataSource= PGUtil.open_pg_data_source(1,DES.decode(database.sqlalchemy_uri))




            # 创建抽稀过程
            options = ["OVERWRITE=yes", "GEOMETRY_NAME={}".format(PGUtil.get_geo_column(table.name,pg_session)),
                       "PRECISION=NO"]

            layer = pg_ds.GetLayerByName(table.name)

            vacuate_process:VacuateProcess = VacuateProcess(layer, table.guid, options,database.sqlalchemy_uri,grids)


            for feature in layer:
                geo = feature.GetGeometryRef()
                #插入抽稀图层
                if geo is not None:
                    vacuate_process.vacuate(geo)

            #删除原有数据
            for grid in grids:
                tvs = sys_session.query(TableVacuate).filter_by(pixel_distance=grid,table_guid=table.guid).all()
                for tv in tvs :
                    sys_session.delete(tv)


            #新增
            if configure.VACUATE_DB_URI:
                user, passwd, host, port, datab = PGUtil.get_info_from_sqlachemy_uri(configure.VACUATE_DB_URI)
            else:
                user, passwd, host, port, datab = PGUtil.get_info_from_sqlachemy_uri(DES.decode(database.sqlalchemy_uri))
            connectstr = "hostaddr={} port={} dbname='{}' user='{}' password='{}'".format(host, port, datab, user,
                                                                                                  passwd)
            for l in range(vacuate_process.max_level):
                layer_name = vacuate_process.vacuate_layers[l].GetName()
                lev = layer_name.split("_")[-2]

                table_vacuate = TableVacuate(guid=uuid.uuid1().__str__(),
                                             table_guid=table.guid,
                                             level=int(lev),
                                             name=layer_name,
                                             pixel_distance=vacuate_process.this_gridsize[l],
                                             connectstr=DES.encode(connectstr))
                sys_session.add(table_vacuate)

            sys_session.query(Task).filter_by(guid=task_guid).update({"state":1,"update_time":datetime.datetime.now(),
                                                                      "process" : "精化完成"})
            sys_session.query(Table).filter_by(guid=table.guid).update(
                {"is_vacuate": 1, "update_time": datetime.datetime.now()})
            sys_session.commit()

        except Exception as e:
            try:
                sys_session.query(Task).filter_by(guid=task_guid).update({"state": -1,"update_time":datetime.datetime.now(),
                                                                          "process": "精化失败"})
                sys_session.query(Table).filter_by(guid=table.guid).update(
                    {"is_vacuate": origin_vacuate, "update_time": datetime.datetime.now()})

                message = "{} {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e.__str__())
                task_process_guid = uuid.uuid1().__str__()
                task_process = Process(guid=task_process_guid, message=message, time=datetime.datetime.now(),
                                       task_guid=task_guid)
                sys_session.add(task_process)
                sys_session.commit()
                if vacuate_process:
                    vacuate_process.rollback()

                print(traceback.format_exc())
            except Exception as ee:
                print(traceback.format_exc())
        finally:
            try:
                if vacuate_process:
                    vacuate_process.end()
                if sys_session:
                    sys_session.close()
                if pg_session:
                    pg_session.close()
                if pg_ds:
                    pg_ds.Destroy()
            except:
                print(traceback.format_exc())

    api_doc = {
        "tags": ["管理接口"],
        "parameters": [
            {"name": "guid",
             "in": "formData",
             "type": "string",
             "description": "表guid", "required": "true"},
            {"name": "grids",
             "in": "formData",
             "type": "string",
             "description": "需要抽稀的网格大小,以逗号相隔", "required": "true"}
        ],
        "responses": {
            200: {
                "schema": {
                    "properties": {
                    }
                }
            }
        }
    }


class VacuateProcess:
    '''
    抽稀类
    '''

    max_level=0
    fill_dict={}
    vacuate_layers={}
    vacuate_layers_gridsize={}
    pg_ds_dict = {}
    #被抽吸图层的范围
    extent=[]
    is_spatial=False
    #该抽稀过程的抽稀网格
    this_gridsize=[]


    def __init__(self,layer:Layer,table_guid, options,sqlalchemy_uri,grids):

        #是空间图层才初始化
        if layer.GetExtent()[0] > 0 or layer.GetExtent()[0] < 0:

            self.is_spatial=True

            # 判断需要抽稀多少级

            extent = layer.GetExtent()
            self.extent=extent

            layer.SetSpatialFilter(None)
            layer.ResetReading()

            self.this_gridsize = grids
            self.max_level = len(grids)


            # 创建抽稀ds
            for l in range(self.max_level):

                if configure.VACUATE_DB_URI:
                    pg_ds_l: DataSource = PGUtil.open_pg_data_source(1, configure.VACUATE_DB_URI)
                else:
                    pg_ds_l: DataSource = PGUtil.open_pg_data_source(1, DES.decode(sqlalchemy_uri))


                pg_ds_l.StartTransaction()
                self.pg_ds_dict[l] = pg_ds_l

            # 生成抽稀图层
            options = options[1:]
            options.append("OVERWRITE=yes")
            options.append("LAUNDER=no")
            for l in range(self.max_level):
                this_grid_len = self.this_gridsize[l]

                self.vacuate_layers_gridsize[l] = this_grid_len

                pg = self.pg_ds_dict[l]

                if this_grid_len<1:
                    grid_name = str(this_grid_len).split(".")[-1]
                    if this_grid_len.__eq__(0.00008):
                        grid_name = "00008"
                else:
                    grid_name = str(int(this_grid_len))

                # 抽稀图层是点面混合的
                # 抽稀表有固定的命名规则
                # 抽稀表一定要覆盖

                this_grid_szie = VacuateConf.project_gridsize if VacuateConf.project_gridsize.__contains__(this_grid_len) else  VacuateConf.lonlat_gridsize

                lev = this_grid_szie.index(this_grid_len)
                print("{}:{}".format(grid_name, lev))


                v_ln = "z{}_vacuate_{}_{}".format(table_guid,lev, grid_name)
                vl = pg.CreateLayer(v_ln, layer.GetSpatialRef(),ogr.wkbUnknown, options)
                # 抽稀表需要属性
                vl.CreateFields(layer.schema)
                self.vacuate_layers[l] = vl

        else:
            pass


    def vacuate(self,g):

        if self.is_spatial:

            # 插入到所有抽稀图层中
            for level in range(self.max_level):

                center: Geometry = g.Centroid()

                extent = g.GetEnvelope()
                long_extent= extent[1]-extent[0]
                lat_extent = extent[3]-extent[2]

                this_grid_len =self.vacuate_layers_gridsize[level]
                #超大的直接加入
                # if long_extent > 10*this_grid_len or lat_extent >10*this_grid_len:
                #     vacuate_layer: Layer = self.vacuate_layers.get(level)
                #     feat = ogr.Feature(vacuate_layer.GetLayerDefn())
                #     feat.SetGeometry(g)
                #     vacuate_layer.CreateFeature(feat)
                # else:

                row = int((center.GetY() - self.extent[2]) / this_grid_len)
                col = int((center.GetX() - self.extent[0]) / this_grid_len)
                key = "{}.{}.{}".format(level, row, col)

                if not self.fill_dict.get(key):
                    self.fill_dict[key] = 0
                if self.fill_dict[key] == 0:

                    vacuate_layer: Layer = self.vacuate_layers.get(level)
                    feat = ogr.Feature(vacuate_layer.GetLayerDefn())
                    # 如果图形比网格小,直接存储其中心点
                    if this_grid_len>long_extent and this_grid_len>lat_extent:
                        feat.SetGeometry(center)
                    else:
                        feat.SetGeometry(g)
                    vacuate_layer.CreateFeature(feat)
                    self.fill_dict[key] += 1
                #超大的还有机会
                elif (long_extent > 10*this_grid_len or lat_extent >10*this_grid_len) and self.fill_dict[key]<5:
                    vacuate_layer: Layer = self.vacuate_layers.get(level)
                    feat = ogr.Feature(vacuate_layer.GetLayerDefn())
                    feat.SetGeometry(g)
                    vacuate_layer.CreateFeature(feat)
                    self.fill_dict[key] += 1

    def end(self):
        for pg in self.pg_ds_dict.values():
            pg.Destroy()

    def rollback(self):
        for pg in self.pg_ds_dict.values():
            pg.RollbackTransaction()