table_vacuate_one.py 14.9 KB
# author:        4N
# createtime:    2021/1/27
# email:         nheweijun@sina.com


import datetime
import traceback
from ..models import Table, Database, DES,Task,db,TableVacuate
from app.util.component.ApiTemplate import ApiTemplate
from app.util.component.PGUtil import PGUtil
from app.util.component.StructurePrint import StructurePrint
import multiprocessing
import uuid
import configure
from osgeo.ogr import DataSource,Layer,Geometry
from osgeo import ogr
from app.util.component.UserCheck import UserCheck
from app.util.component.VacuateConf import VacuateConf
from app.util.component.TaskController import TaskController
from app.util.component.TaskWriter import TaskWriter
import copy

class Api(ApiTemplate):
    api_name = "单独抽稀"
    def process(self):

        res = {}
        res["data"] = {}
        db_session = None
        try:
            table_guid = self.para.get("guid")

            if not self.para.get("grids"):
                raise Exception("请输入grids参数!")
            else:
                grids = [float(x) for x in self.para.get("grids").split(",")]
                grids.sort()

            table: Table = Table.query.filter_by(guid=table_guid).one_or_none()

            if not table:
                raise Exception("数据不存在!")

            #验证权限
            UserCheck.verify(table.relate_database.creator)

            # 判断图层是否存在

            pg_ds :DataSource= PGUtil.open_pg_data_source(0,DES.decode(table.relate_database.sqlalchemy_uri))
            layer = pg_ds.GetLayerByName(table.name)

            # 判断用户权限
            db_tuple = PGUtil.get_info_from_sqlachemy_uri(DES.decode(table.relate_database.sqlalchemy_uri))
            if not PGUtil.check_table_privilege(table.name,"SELECT",db_tuple[0],pg_ds):
                raise  Exception("用户{}对表{}没有select权限!".format(db_tuple[0],table.name))

            if not layer:
                raise Exception("图层不存在!")
            if pg_ds:
                pg_ds.Destroy()


            if Task.query.filter_by(table_guid=table_guid,state=0).one_or_none():
                res["result"] = False
                res["msg"] = "矢量金字塔构建中!"
                return res

            if Task.query.filter_by(table_guid=table_guid,state=0).one_or_none() or table.is_vacuate==2:
                res["result"] = False
                res["msg"] = "矢量金字塔构建中!"
                return res


            # 初始化task
            task_guid = uuid.uuid1().__str__()

            vacuate_process = multiprocessing.Process(target=self.task,args=(table,task_guid,grids))
            vacuate_process.start()


            task = Task(guid=task_guid,
                        name="{}构建矢量金字塔,网格大小:{}".format(table.name,self.para.get("grids")),
                        table_guid=table_guid,
                        create_time=datetime.datetime.now(),
                        state=0,
                        task_type=2,
                        creator=self.para.get("creator"),
                        file_name=None,
                        database_guid=table.database_guid,
                        process="构建中",
                        parameter=self.para.get("grids"),
                        task_pid=vacuate_process.pid)

            db.session.add(task)
            db.session.commit()
            res["msg"] = "矢量金字塔构建已提交!"
            res["data"] = task_guid
            res["result"] = True

        except Exception as e:
            raise e
        finally:
            if db_session:
                db_session.close()
        return res

    def task(self,table,task_guid,grids):

        task_writer = None
        pg_session = None
        pg_ds = None
        vacuate_process = None
        origin_vacuate = table.is_vacuate

        try:

            #任务控制,等待执行
            TaskController.wait(task_guid)
            task_writer =  TaskWriter(task_guid)

            task_writer.update_table(table.guid,{"is_vacuate": 2, "update_time": datetime.datetime.now()})
            task_writer.update_task({"state":2,"process":"构建中"})
            task_writer.update_process("开始构建...")

            database = task_writer.session.query(Database).filter_by(guid=table.database_guid).one_or_none()
            database_sqlalchemy_uri = str(database.sqlalchemy_uri)
            pg_session = PGUtil.get_db_session(DES.decode(database_sqlalchemy_uri))
            pg_ds :DataSource= PGUtil.open_pg_data_source(1,DES.decode(database_sqlalchemy_uri))

            #删除原有数据
            for grid in grids:
                tvs = task_writer.session.query(TableVacuate).filter_by(pixel_distance=grid,table_guid=table.guid).all()
                for tv in tvs :
                    task_writer.session.delete(tv)
            task_writer.session.commit()


            # 创建抽稀过程
            options = ["OVERWRITE=yes", "GEOMETRY_NAME={}".format(PGUtil.get_geo_column(table.name,pg_session)),
                       "PRECISION=NO"]

            layer = pg_ds.GetLayerByName(table.name)

            vacuate_process:VacuateProcess = VacuateProcess(layer, table.guid, options,database_sqlalchemy_uri,grids)

            count = 0
            for feature in layer:
                geo = feature.GetGeometryRef()
                #插入抽稀图层
                if geo is not None:
                    vacuate_process.vacuate(geo,feature)
                count += 1
                if count%10000==0:
                    StructurePrint().print("{}图层已抽稀{}个对象".format(table.name, count))

            # vacuate_process.set_vacuate_count()


            #新增
            if configure.VACUATE_DB_URI:
                user, passwd, host, port, datab = PGUtil.get_info_from_sqlachemy_uri(configure.VACUATE_DB_URI)
            else:
                user, passwd, host, port, datab = PGUtil.get_info_from_sqlachemy_uri(DES.decode(database_sqlalchemy_uri))
            connectstr = "hostaddr={} port={} dbname='{}' user='{}' password='{}'".format(host, port, datab, user,
                                                                                                  passwd)
            for l in range(vacuate_process.max_level):
                layer_name = vacuate_process.vacuate_layers[l].GetName()
                lev = layer_name.split("_")[-2]

                table_vacuate = TableVacuate(guid=uuid.uuid1().__str__(),
                                             table_guid=table.guid,
                                             level=int(lev),
                                             name=layer_name,
                                             pixel_distance=vacuate_process.this_gridsize[l],
                                             connectstr=DES.encode(connectstr))
                task_writer.session.add(table_vacuate)
            task_writer.update_task({"state":1,"update_time":datetime.datetime.now(),"process" : "构建完成"})
            task_writer.update_table(table.guid,{"is_vacuate": 1, "update_time": datetime.datetime.now()})
            task_writer.update_process("构建完成!")

        except Exception as e:
            try:
                task_writer.update_task({"state": -1,"update_time":datetime.datetime.now(),"process": "构建失败"})
                task_writer.update_table(table.guid, {"is_vacuate": origin_vacuate, "update_time": datetime.datetime.now()})
                task_writer.update_process( e.__str__())
                task_writer.update_process("任务中止!")
                if vacuate_process:
                    vacuate_process.rollback()
                print(traceback.format_exc())
            except Exception as ee:
                print(traceback.format_exc())
        finally:
            try:
                task_writer.close()
                if vacuate_process:
                    vacuate_process.end()
                if pg_session:
                    pg_session.close()
                if pg_ds:
                    pg_ds.Destroy()
            except:
                print(traceback.format_exc())

    api_doc = {
        "tags": ["管理接口"],
        "parameters": [
            {"name": "guid",
             "in": "formData",
             "type": "string",
             "description": "表guid", "required": "true"},
            {"name": "grids",
             "in": "formData",
             "type": "string",
             "description": "需要抽稀的网格大小,以逗号相隔", "required": "true"},
            {"name": "creator",
             "in": "formData",
             "type": "string",
             "description": "创建者"}

        ],
        "responses": {
            200: {
                "schema": {
                    "properties": {
                    }
                }
            }
        }
    }


class VacuateProcess:
    '''
    抽稀类
    '''

    max_level=0
    fill_dict={}
    vacuate_layers={}
    vacuate_layers_gridsize={}
    pg_ds_dict = {}
    #被抽吸图层的范围
    extent=[]
    is_spatial=False
    #该抽稀过程的抽稀网格
    this_gridsize=[]


    def __init__(self,layer:Layer,table_guid, options,sqlalchemy_uri,grids):

        #是空间图层才初始化
        if layer.GetExtent()[0] > 0 or layer.GetExtent()[0] < 0:

            self.is_spatial=True

            # 判断需要抽稀多少级

            extent = layer.GetExtent()
            self.extent=extent

            layer.SetSpatialFilter(None)
            layer.ResetReading()

            self.this_gridsize = grids
            self.max_level = len(grids)


            # 创建抽稀ds
            for l in range(self.max_level):

                if configure.VACUATE_DB_URI:
                    pg_ds_l: DataSource = PGUtil.open_pg_data_source(1, configure.VACUATE_DB_URI)
                else:
                    pg_ds_l: DataSource = PGUtil.open_pg_data_source(1, DES.decode(sqlalchemy_uri))


                pg_ds_l.StartTransaction()
                self.pg_ds_dict[l] = pg_ds_l

            # 生成抽稀图层
            options = options[1:]
            options.append("OVERWRITE=yes")
            options.append("LAUNDER=no")

            schema = layer.schema
            # 增加统计字段
            # schema.append(ogr.FieldDefn("_dcigrid_count_", ogr.OFTInteger))
            # schema.append(ogr.FieldDefn("_dcigrid_name_", ogr.OFTString))

            for l in range(self.max_level):
                this_grid_len = self.this_gridsize[l]

                self.vacuate_layers_gridsize[l] = this_grid_len

                pg = self.pg_ds_dict[l]

                if this_grid_len<1:
                    grid_name = str(this_grid_len).split(".")[-1]
                    if this_grid_len.__eq__(0.00008):
                        grid_name = "00008"
                else:
                    grid_name = str(int(this_grid_len))

                # 抽稀图层是点面混合的
                # 抽稀表有固定的命名规则
                # 抽稀表一定要覆盖

                this_grid_szie = VacuateConf.project_gridsize if VacuateConf.project_gridsize.__contains__(this_grid_len) else  VacuateConf.lonlat_gridsize

                lev = this_grid_szie.index(this_grid_len)
                print("{}:{}".format(grid_name, lev))


                v_ln = "z{}_vacuate_{}_{}".format(table_guid,lev, grid_name)
                vl = pg.CreateLayer(v_ln, layer.GetSpatialRef(),ogr.wkbUnknown, options)

                # 抽稀表需要属性
                vl.CreateFields(schema)
                self.vacuate_layers[l] = vl

        else:
            pass


    def vacuate(self,g,feature):

        if self.is_spatial:
            feat = copy.copy(feature)
            # 插入到所有抽稀图层中
            for level in range(self.max_level):

                center: Geometry = g.Centroid()

                extent = g.GetEnvelope()
                long_extent= extent[1]-extent[0]
                lat_extent = extent[3]-extent[2]

                this_grid_len =self.vacuate_layers_gridsize[level]


                row = int((center.GetY() - self.extent[2]) / this_grid_len)
                col = int((center.GetX() - self.extent[0]) / this_grid_len)
                key = "{}.{}.{}".format(level, row, col)

                if not self.fill_dict.get(key):
                    self.fill_dict[key] = 0
                if self.fill_dict[key] == 0:

                    vacuate_layer: Layer = self.vacuate_layers.get(level)

                    # feat = ogr.Feature(vacuate_layer.GetLayerDefn())
                    # 如果图形比网格小,直接存储其中心点
                    if this_grid_len>long_extent and this_grid_len>lat_extent:
                        feat.SetGeometry(center)
                    else:
                        feat.SetGeometry(g)

                    # 复制旧feature属性
                    # field_dict = feature.items()
                    # for field_name in field_dict:
                    #     feat.SetField(field_name, field_dict[field_name])
                    # feat.SetField("_dcigrid_name_",".".join(key.split(".")[1:]))

                    vacuate_layer.CreateFeature(feat)
                    self.fill_dict[key] += 1

                #超大的还有机会
                elif long_extent > 10 * this_grid_len or lat_extent > 10 * this_grid_len:
                    vacuate_layer: Layer = self.vacuate_layers.get(level)

                    # feat = ogr.Feature(vacuate_layer.GetLayerDefn())
                    feat.SetGeometry(g)

                    # 复制旧feature属性
                    # field_dict = feature.items()
                    # for field_name in field_dict:
                    #     feat.SetField(field_name, field_dict[field_name])
                    # feat.SetField("_dcigrid_name_",".".join(key.split(".")[1:]))

                    vacuate_layer.CreateFeature(feat)
                    self.fill_dict[key] += 1
                else:
                    self.fill_dict[key] += 1

    def set_vacuate_count(self):
        if self.is_spatial:
            # 插入到所有抽稀图层中
            for level in range(self.max_level):
                vacuate_layer: Layer = self.vacuate_layers.get(level)
                for feat in vacuate_layer:
                    key = "{}.{}".format(level,feat.GetField("_dcigrid_name_"))
                    feat.SetField("_dcigrid_count_",self.fill_dict.get(key))
                    vacuate_layer.SetFeature(feat)

    def end(self):
        for pg in self.pg_ds_dict.values():
            pg.Destroy()

    def rollback(self):
        for pg in self.pg_ds_dict.values():
            pg.RollbackTransaction()