PolygonDivision.py 10.5 KB
# coding=utf-8
#author:        4N
#createtime:    2022/3/30
#email:         nheweijun@sina.com

import math

from osgeo import ogr,gdal
from osgeo.ogr import *

from test.zonghe.ShapeData import ShapeData
import copy

from scipy import spatial
from rtreelib import RTree,Rect



class RTreeData:

    def __init__(self,triangle,state):
        self.state = state
        self.triangle = triangle

    def set_state(self,state):
        self.state = state


def env2rect(env):
    return Rect(env[0],env[2],env[1],env[3])


class PolygonDivision:

    @classmethod
    def get_delauney(cls,polygon):

        polygon_line = cls.get_polygon_lines(polygon)
        polygon_line_points:list = polygon_line.GetPoints()

        if len(polygon_line_points)==4:
            tris = [polygon]
        else:
            d = 0
            for ind in range(len(polygon_line_points) - 1):
                p = polygon_line_points[ind]
                p_n = polygon_line_points[ind + 1]

                d += -0.5 * (p_n[1] + p[1]) * (p_n[0] - p[0])

            if d < 0:
                polygon_line_points.reverse()

            tris = list(cls.get_tri(polygon_line_points))

        return tris

    @classmethod
    def get_tri(cls,polygon_line_points:list):

        if len(polygon_line_points) == 4:
            yield cls.create_polygon(polygon_line_points)
        else:
            for index,point in enumerate(polygon_line_points):
                if index == 0:
                   continue
                # elif index == len(polygon_line_points)-1 :
                #     pass
                else:
                    point_front = polygon_line_points[index-1]
                    point_next = polygon_line_points[index+1]

                check = (point[0]-point_front[0]) * (point_next[1]-point[1]) - (point_next[0]-point[0])*(point[1]-point_front[1])

                if check > 0:
                    triangle = cls.create_polygon([point_front,point,point_next,point_front])
                    yield triangle
                    polygon_line_points.pop(index)
                    for tri in cls.get_tri(polygon_line_points):
                        yield tri
                    break

    @classmethod
    def triangle_division(cls,polygon:Geometry):

        rtree = RTree()
        polygon = polygon.UnionCascaded()
        delauney: Geometry = polygon.DelaunayTriangulation()

        for index in range(delauney.GetGeometryCount()):
            de:Geometry = copy.deepcopy(delauney.GetGeometryRef(index))
            de_extent = de.GetEnvelope()
            rtree.insert(RTreeData(de,1), env2rect(de_extent))


        # return [entry.data.triangle.ExportToWkt() for entry in rtree.get_leaf_entries()]
        bians: list = cls.get_bian(polygon)


        for bian in bians:

            bian_ext = bian.GetEnvelope()

            query_result = rtree.query(env2rect(bian_ext))

            intersection_data = []


            for entry in query_result:

                # 已被抛弃的不要
                if entry.data.state == 0:
                    continue
                intersection_geom: Geometry = bian.Intersection(entry.data.triangle)

                if not intersection_geom.IsEmpty():
                    if not intersection_geom.Equals(bian):
                        if intersection_geom.GetGeometryType() in [2, 5, -2147483643, -2147483646, 3002, 3005]:
                            intersection_data.append(entry.data)


            for data in intersection_data:
                data.set_state(0)

            if intersection_data:

                inter_tri_merge:Geometry = intersection_data[0].triangle

                for ind in range(1,len(intersection_data)):
                    inter_tri_merge = inter_tri_merge.Union(intersection_data[ind].triangle)



                inter_tri_merge_line:Geometry = cls.get_polygon_lines(inter_tri_merge)

                inter_tri_merge_line_points:list = inter_tri_merge_line.GetPoints()

                bian_ps = bian.GetPoints()

                if len(inter_tri_merge_line_points[0]) == 3:
                    try:
                        first_index = inter_tri_merge_line_points.index(bian_ps[0])
                        second_index = inter_tri_merge_line_points.index(bian_ps[1])
                    except:
                        print(11)
                        print(bian)
                        print(inter_tri_merge)

                        rtree.insert(RTreeData(inter_tri_merge, 1), env2rect(inter_tri_merge.GetEnvelope()))

                        continue
                else:
                    try:
                        first_index = inter_tri_merge_line_points.index((bian_ps[0][0],bian_ps[0][1]))
                        second_index = inter_tri_merge_line_points.index((bian_ps[1][0],bian_ps[1][1]))
                    except:
                        print(11)
                        print(bian)
                        print(inter_tri_merge)

                        rtree.insert(RTreeData(inter_tri_merge, 1), env2rect(inter_tri_merge.GetEnvelope()))
                        continue



                small_index = min(first_index,second_index)
                big_index = max(first_index,second_index)

                small_points:list = inter_tri_merge_line_points[:small_index+1]
                small_points.extend(inter_tri_merge_line_points[big_index:])
                small_polygon:Geometry = cls.create_polygon(small_points)



                # delauney: Geometry = small_polygon.DelaunayTriangulation()
                #
                # for index in range(delauney.GetGeometryCount()):
                #     de: Geometry = copy.deepcopy(delauney.GetGeometryRef(index))
                #
                #     if small_polygon.Contains(de):
                #         de_extent = de.GetEnvelope()
                #         rtree.insert(RTreeData(de, 1), env2rect(de_extent))

                delauney = cls.get_delauney(small_polygon)
                for de in delauney:
                    de_extent = de.GetEnvelope()
                    rtree.insert(RTreeData(de, 1), env2rect(de_extent))


                big_points = inter_tri_merge_line_points[small_index:big_index+1]
                big_points.append(inter_tri_merge_line_points[small_index])

                big_polygon:Geometry = cls.create_polygon(big_points)

                # delauney: Geometry = big_polygon.DelaunayTriangulation()
                #
                # for index in range(delauney.GetGeometryCount()):
                #     de: Geometry = copy.deepcopy(delauney.GetGeometryRef(index))
                #
                #     if big_polygon.Contains(de):
                #
                #         de_extent = de.GetEnvelope()
                #         rtree.insert(RTreeData(de, 1), env2rect(de_extent))

                delauney = cls.get_delauney(big_polygon)
                for de in delauney:
                    de_extent = de.GetEnvelope()
                    rtree.insert(RTreeData(de, 1), env2rect(de_extent))

        tris = []

        for entry in rtree.get_leaf_entries():
            if entry.data.state==0:
                continue
            if polygon.Contains(entry.data.triangle):
                pass
            else:
                tris.append(entry.data.triangle)

        for tri in tris:

            tri_center,long_line = cls.get_center(tri)
            dd = tri_center.Distance(polygon)
            if dd > 1.114691025217302e-04:
                continue

            polygon.AddGeometry(tri)
        result :Geometry = polygon.UnionCascaded()
        return [result.ExportToWkt()]


    @classmethod
    def get_center(cls,tri:Geometry):
        tri_line: Geometry = ogr.ForceToLineString(tri)
        tri_line_points = tri_line.GetPoints()

        lines = []

        for index in range(len(tri_line_points)-1):
            first = tri_line_points[index]
            second = tri_line_points[index+1]
            line: Geometry = ogr.Geometry(ogr.wkbLineString)
            line.AddPoint(first[0], first[1])
            line.AddPoint(second[0], second[1])
            lines.append(line)

        lines = sorted(lines,key=lambda l:l.Length())

        return lines[-1].Centroid(),lines[-1]

    @classmethod
    def get_polygon_lines(cls,polygon):
        #无孔Polygon
        if polygon.GetGeometryCount() in [0,1]:
            polygon_line : Geometry = ogr.ForceToLineString(polygon)
        # 有孔Polygon
        else:
            polygon_line : Geometry = ogr.ForceToLineString(polygon.GetGeometryRef(0))

        return polygon_line

    @classmethod
    def get_bian(cls,polygon):

        bians = []

        polygon_lines = []

        if polygon.GetGeometryCount() in [0, 1]:
            polygon_line: Geometry = ogr.ForceToLineString(polygon)
            polygon_lines.append(polygon_line)
        # 有孔Polygon
        else:
            for index in range(polygon.GetGeometryCount()):
                polygon_line: Geometry = ogr.ForceToLineString(polygon.GetGeometryRef(index))
                polygon_lines.append(polygon_line)

        for polygon_line in polygon_lines:
            for index in range(polygon_line.GetPointCount()-1):
                p = polygon_line.GetPoint(index)
                p_next = polygon_line.GetPoint(index+1)

                line: Geometry = ogr.Geometry(ogr.wkbLineString)

                if len(p) ==2:
                    line.AddPoint(p[0], p[1],0)
                    line.AddPoint(p_next[0], p_next[1],0)
                else:
                    line.AddPoint(p[0], p[1])
                    line.AddPoint(p_next[0], p_next[1])
                bians.append(line)

        return bians

    @classmethod
    def create_polygon(cls,ps):

        ring = ogr.Geometry(ogr.wkbLinearRing)
        for p in ps:
            ring.AddPoint(p[0], p[1])
        poly = ogr.Geometry(ogr.wkbPolygon)
        poly.AddGeometry(ring)
        return poly


if __name__ == '__main__':
    import time
    t1 = time.time()
    gdal.UseExceptions()

    sd = ShapeData(r"J:\Data\制图综合result\t4.shp")
    mp = ogr.Geometry(ogr.wkbMultiPolygon)
    polygons = sd.get_polygons()
    for p in polygons:
        mp.AddGeometry(p)

    wkts = PolygonDivision.triangle_division(mp)

    result = r"J:\Data\制图综合result\t4_merge.shp"

    ShapeData.create_shp_fromwkts(result,"zh",wkts)

    print(time.time()-t1)