image_build_pyramid.py 6.3 KB
# coding=utf-8
#author:        4N
#createtime:    2021/7/19
#email:         nheweijun@sina.com


from app.util.component.ApiTemplate import ApiTemplate
import datetime
from app.modules.service.image.util.ThriftConnect import ThriftConnect
from app.models import db
from app.modules.service.models import Image
from app.modules.data.models import Task
import multiprocessing
import uuid
import configure
from app.util.component.PGUtil import PGUtil
from .util.Cache import Cache
import json
from osgeo import gdal
from osgeo.gdal import *
import traceback
import os
from app.util.component.TaskController import TaskController

class Api(ApiTemplate):

    api_name = "创建影像金字塔"

    def process(self):

        # 返回结果
        res = {}
        try:
            image_guid = self.para.get("guid")
            task_guid = uuid.uuid1().__str__()
            image = Image.query.filter_by(guid=image_guid).one_or_none()
            if not image:
                raise Exception("数据不存在!")
            if image.has_pyramid==-1:
                raise Exception("数据正在创建金字塔!")

            image_service_info, zoo, servers = Cache.cache_data(None)

            image_servers = image.server.split(",")
            image_servers = [ser for ser in image_servers if ser in servers]

            if len(image_servers)==0:
                raise Exception("暂时没有可用数据服务器提供该影像相关服务!")

            #创建金字塔的进程
            build_process = multiprocessing.Process(target=self.build_pyramid_task,
                                                    args=(image_guid,task_guid,image_servers,image.path))
            build_process.start()

            task = Task(guid=task_guid,
                        name="{}创建金字塔".format(image.name),
                        create_time=datetime.datetime.now(),
                        state=0,
                        task_type=5,
                        creator=self.para.get("creator"),
                        process="创建金字塔",
                        task_pid=build_process.pid,
                        parameter=image_guid)

            db.session.add(task)
            db.session.commit()


            res["data"] = "创建金字塔任务已提交!"

            res["result"] = True

        except Exception as e:
            raise e

        return res

    def build_pyramid_task(self,image_guid,task_guid,data_servers,path):
        sys_session = None
        try:


            #任务控制,等待执行
            TaskController.wait(task_guid)

            sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)
            #进入创建金字塔的状态
            sys_session.query(Image).filter_by(guid=image_guid).update({"has_pyramid": -1})
            sys_session.query(Task).filter_by(guid=task_guid).update({"state": 2})

            sys_session.commit()
            sys_session.close()


            #所有数据节点的影像都要建金字塔
            update_size = None
            overview_count = None
            for data_server in data_servers:
                if data_server=="本地服务器":
                    result = self.buildOverview(path)
                    update_size = os.path.getsize(path)
                else:
                    thrift_connect = ThriftConnect(data_server)
                    image_info = json.loads(thrift_connect.client.getInfo(path))
                    if image_info.get("overview_count") > 0:
                        overview_count = image_info.get("overview_count")
                        continue
                    result = thrift_connect.client.buildOverview(path)
                    update_size = json.loads(thrift_connect.client.getInfo(path)).get("size")
                    thrift_connect.close()

            #重新连接,防止session超时
            sys_session = PGUtil.get_db_session(configure.SQLALCHEMY_DATABASE_URI)

            sys_session.query(Task).filter_by(guid=task_guid).update({"state":1,"update_time":datetime.datetime.now()})
            image:Image = sys_session.query(Image).filter_by(guid=image_guid).one_or_none()

            if not overview_count:
                overview_count = 0
                raster_x_size = image.raster_x_size
                raster_y_size = image.raster_y_size
                while raster_x_size * raster_y_size > 65536:
                    raster_x_size /= 2.0
                    raster_y_size /= 2.0
                    overview_count += 1

            image.has_pyramid = 1
            image.overview_count = overview_count
            if update_size:
                image.size = update_size

        except:
            sys_session.query(Image).filter_by(guid=image_guid).update({"has_pyramid": 0})
            sys_session.query(Task).filter_by(guid=task_guid).update({"state": -1,"update_time":datetime.datetime.now()})
        finally:
            sys_session.commit()
            if sys_session:
                try:
                    sys_session.close()
                except:
                    pass




    def buildOverview(self,path):
        image: Dataset = gdal.Open(path, 1)
        result =True
        try:
            overviews=[]
            raster_x_size = image.RasterXSize
            raster_y_size = image.RasterYSize
            while raster_x_size*raster_y_size>65536:
                raster_x_size /= 2.0
                raster_y_size /= 2.0
                overviews.append(round(image.RasterXSize/raster_x_size))
            band: Band = image.GetRasterBand(1)
            if len(overviews) == band.GetOverviewCount():
                pass
            else:
                image.BuildOverviews("AVERAGE",overviews)
        except Exception as e:
            print(traceback.format_exc())
            result = False
        finally:
            del image
        return result

    api_doc = {
        "tags": ["影像接口"],
        "parameters": [
            {"name": "guid",
             "in": "formData",
             "type": "string",
             "description": "guid"},
        ],
        "responses": {
            200: {
                "schema": {
                    "properties": {
                    }
                }
            }
        }
    }