PolygonMerge.py 10.2 KB
# coding=utf-8
#author:        4N
#createtime:    2022/3/30
#email:         nheweijun@sina.com


from osgeo import ogr,gdal
from osgeo.ogr import *

from test.zonghe.ShapeData import ShapeData
import copy

from rtreelib import RTree,Rect



class RTreeData:

    def __init__(self,triangle,state):
        self.state = state
        self.triangle = triangle

    def set_state(self,state):
        self.state = state


def env2rect(env):
    return Rect(env[0],env[2],env[1],env[3])


class PolygonMerge:

    @classmethod
    def get_delauney(cls,polygon):

        polygon_line = cls.get_polygon_lines(polygon)
        polygon_line_points:list = polygon_line.GetPoints()

        if len(polygon_line_points)==4:
            tris = [polygon]
        else:
            d = 0
            for ind in range(len(polygon_line_points) - 1):
                p = polygon_line_points[ind]
                p_n = polygon_line_points[ind + 1]

                d += -0.5 * (p_n[1] + p[1]) * (p_n[0] - p[0])

            if d < 0:
                polygon_line_points.reverse()

            tris = list(cls.get_tri(polygon_line_points))

        return tris

    @classmethod
    def get_tri(cls,polygon_line_points:list):

        if len(polygon_line_points) == 4 or len(polygon_line_points) == 3:
            yield cls.create_polygon(polygon_line_points)
        else:
            for index,point in enumerate(polygon_line_points):
                if index == 0:
                   continue
                else:
                    try:
                        point_front = polygon_line_points[index-1]
                        point_next = polygon_line_points[index+1]
                    except Exception as e:
                        print(e)
                        kk=1
                        d=1

                check = (point[0]-point_front[0]) * (point_next[1]-point[1]) - (point_next[0]-point[0])*(point[1]-point_front[1])

                if check > 0:
                    triangle = cls.create_polygon([point_front,point,point_next,point_front])
                    yield triangle

                    polygon_line_points.pop(index)
                    for tri in cls.get_tri(polygon_line_points):
                        yield tri
                    break

    @classmethod
    def merge(cls,polygons:list,distance_buffer_threshold):

        rtree = RTree()

        polygon:Geometry = polygons[0]
        # multipolygon = ogr.Geometry(ogr.wkbMultiPolygon)
        # for poly in polygons:
        #     multipolygon.AddGeometry(poly)

        for poly in polygons:
            polygon = polygon.Union(poly)



        delauney: Geometry = polygon.DelaunayTriangulation()

        for index in range(delauney.GetGeometryCount()):
            de:Geometry = copy.deepcopy(delauney.GetGeometryRef(index))
            de_extent = de.GetEnvelope()
            rtree.insert(RTreeData(de,1), env2rect(de_extent))

        bians: list = cls.get_bian(polygon)


        for bian in bians:

            bian_ext = bian.GetEnvelope()

            query_result = rtree.query(env2rect(bian_ext))

            intersection_data = []


            for entry in query_result:

                # 已被抛弃的不要
                if entry.data.state == 0:
                    continue
                intersection_geom: Geometry = bian.Intersection(entry.data.triangle)

                if not intersection_geom.IsEmpty():
                    if not intersection_geom.Equals(bian):
                        if intersection_geom.GetGeometryType() in [2, 5, -2147483643, -2147483646, 3002, 3005]:
                            intersection_data.append(entry.data)


            for data in intersection_data:
                data.set_state(0)

            if intersection_data:

                inter_tri_merge:Geometry = intersection_data[0].triangle

                for ind in range(1,len(intersection_data)):
                    inter_tri_merge = inter_tri_merge.Union(intersection_data[ind].triangle)


                inter_tri_merge_line:Geometry = cls.get_polygon_lines(inter_tri_merge)

                inter_tri_merge_line_points:list = inter_tri_merge_line.GetPoints()

                bian_ps = bian.GetPoints()

                if len(inter_tri_merge_line_points[0]) == 3:
                    try:
                        first_index = inter_tri_merge_line_points.index(bian_ps[0])
                        second_index = inter_tri_merge_line_points.index(bian_ps[1])
                    except:
                        rtree.insert(RTreeData(inter_tri_merge, 1), env2rect(inter_tri_merge.GetEnvelope()))
                        continue
                else:
                    try:
                        first_index = inter_tri_merge_line_points.index((bian_ps[0][0],bian_ps[0][1]))
                        second_index = inter_tri_merge_line_points.index((bian_ps[1][0],bian_ps[1][1]))
                    except:
                        rtree.insert(RTreeData(inter_tri_merge, 1), env2rect(inter_tri_merge.GetEnvelope()))
                        continue



                small_index = min(first_index,second_index)
                big_index = max(first_index,second_index)

                small_points:list = inter_tri_merge_line_points[:small_index+1]
                small_points.extend(inter_tri_merge_line_points[big_index:])
                small_polygon:Geometry = cls.create_polygon(small_points)

                delauney = cls.get_delauney(small_polygon)
                for de in delauney:
                    de_extent = de.GetEnvelope()
                    rtree.insert(RTreeData(de, 1), env2rect(de_extent))

                big_points = inter_tri_merge_line_points[small_index:big_index+1]
                big_points.append(inter_tri_merge_line_points[small_index])

                big_polygon:Geometry = cls.create_polygon(big_points)

                delauney = cls.get_delauney(big_polygon)
                for de in delauney:
                    de_extent = de.GetEnvelope()
                    rtree.insert(RTreeData(de, 1), env2rect(de_extent))

        for entry in rtree.get_leaf_entries():
            triangle = entry.data.triangle
            if entry.data.state==0:
                continue
            if polygon.Contains(triangle):
                pass
            else:
                tri_center,tri_center2 = cls.get_center(triangle)
                distance = tri_center.Distance(polygon)
                distance2 = tri_center2.Distance(polygon)

                if distance > distance_buffer_threshold :
                    continue
                # if distance2 > distance_buffer_threshold:
                #     continue

                polygon = polygon.Union(triangle)
                # polygon.AddGeometry(triangle)

        result :Geometry = polygon

        return result


    @classmethod
    def get_center(cls,tri:Geometry):

        tri_line: Geometry = cls.get_polygon_lines(tri)
        tri_line_points = tri_line.GetPoints()

        lines = []

        for index in range(len(tri_line_points)-1):
            first = tri_line_points[index]
            second = tri_line_points[index+1]
            line: Geometry = ogr.Geometry(ogr.wkbLineString)
            line.AddPoint(first[0], first[1])
            line.AddPoint(second[0], second[1])
            lines.append(line)

        lines = sorted(lines,key=lambda l:l.Length())

        return lines[-1].Centroid(),lines[-2].Centroid()

    @classmethod
    def get_polygon_lines(cls,polygon):
        #无孔Polygon
        if polygon.GetGeometryCount() in [0,1]:
            polygon_line : Geometry = ogr.ForceToLineString(polygon)
        # 有孔Polygon
        else:
            polygon_line : Geometry = ogr.ForceToLineString(polygon.GetGeometryRef(0))

        return polygon_line

    @classmethod
    def get_bian(cls,polygon):

        bians = []

        polygon_lines = []

        if polygon.GetGeometryCount() in [0, 1]:
            polygon_line: Geometry = ogr.ForceToLineString(polygon)
            polygon_lines.append(polygon_line)
        # 有孔Polygon
        else:
            for index in range(polygon.GetGeometryCount()):
                polygon_line: Geometry = ogr.ForceToLineString(polygon.GetGeometryRef(index))
                polygon_lines.append(polygon_line)

        for polygon_line in polygon_lines:
            for index in range(polygon_line.GetPointCount()-1):
                p = polygon_line.GetPoint(index)
                p_next = polygon_line.GetPoint(index+1)

                line: Geometry = ogr.Geometry(ogr.wkbLineString)

                if len(p) ==2:
                    line.AddPoint(p[0], p[1],0)
                    line.AddPoint(p_next[0], p_next[1],0)
                else:
                    line.AddPoint(p[0], p[1])
                    line.AddPoint(p_next[0], p_next[1])
                bians.append(line)

        return bians

    @classmethod
    def create_polygon(cls,ps):

        ring = ogr.Geometry(ogr.wkbLinearRing)
        for p in ps:
            ring.AddPoint(p[0], p[1])
        poly = ogr.Geometry(ogr.wkbPolygon)
        poly.AddGeometry(ring)
        return poly


if __name__ == '__main__':
    import time
    t1 = time.time()


    # sd = ShapeData(r"J:\Data\制图综合result\t4.shp")
    # mp = ogr.Geometry(ogr.wkbMultiPolygon)
    # polygons = sd.get_polygons()
    #
    # wkts = PolygonMerge.merge(polygons)
    #
    # result = r"J:\Data\制图综合result\t4_merge.shp"
    #
    # ShapeData.create_shp_fromwkts(result,"zh",wkts)
    #
    # print(time.time()-t1)


    p = PolygonMerge.create_polygon([(113.37029204400005, 22.526576828100076, 0.0), (113.36972403800007, 22.526369320699985, 0.0), (113.36974547200009, 22.526379207800005, 0.0), (113.37025989200004, 22.52656695200011, 0.0), (113.37029204400005, 22.526576828100076, 0.0)])

    pl:Geometry = ogr.ForceToLineString(p)

    ps:list = pl.GetPoints()
    ps.reverse()

    PolygonMerge.get_tri(ps)

    # wkts =[p.ExportToWkt()]
    #
    # result = r"J:\Data\制图综合result\error.shp"
    #
    # ShapeData.create_shp_fromwkts(result,"zh",wkts)
    #
    # print(time.time()-t1)