polygon_merge.py 9.9 KB
# coding=utf-8
#author:        4N
#createtime:    2022/3/8
#email:         nheweijun@sina.com

import math

from osgeo import ogr,gdal
from osgeo.ogr import *

from test.zonghe.ShapeData import ShapeData
import copy

class Zonghe:

    data:ShapeData
    area_threshold = 100.0
    buffer_threshold = 100.0
    distance_buffer_threshold = 2.114691025217302e-04

    def __init__(self,data):
        self.data = data

    def get_rectangle(self,multipolygon:Geometry):
        return self.envelop_2_polygon(multipolygon.GetEnvelope())

    def envelop_2_polygon(self,env):
        ring = ogr.Geometry(ogr.wkbLinearRing)
        ring.AddPoint(env[0], env[2])
        ring.AddPoint(env[0], env[3])
        ring.AddPoint(env[1], env[3])
        ring.AddPoint(env[1], env[2])
        ring.AddPoint(env[0], env[2])
        # Create polygon
        poly = ogr.Geometry(ogr.wkbPolygon)
        poly.AddGeometry(ring)
        return poly

    def merge(self,polygons):

        multipolygon:Geometry = ogr.Geometry(ogr.wkbMultiPolygon)

        for polygon in polygons:
            multipolygon.AddGeometry(polygon)

        tris = set()
        ts: Geometry = multipolygon.DelaunayTriangulation()


        for t in ts:

            tris.add(t)


        bians = self.get_bian(multipolygon)

        for bian in bians:
            inter_tri = []

            for tri in tris:

                #是三角形一边则跳过
                if  tri.Touches(bian):
                    continue

                intersection_geom:Geometry = bian.Intersection(tri)

                if not intersection_geom.IsEmpty():
                    if not intersection_geom.Equals(bian):
                        if intersection_geom.GetGeometryType() in [2, 5, -2147483643, -2147483646, 3002, 3005]:
                            inter_tri.append(tri)

            for int in inter_tri:
                tris.remove(int)



            if inter_tri:

                inter_tri_merge:Geometry = inter_tri[0]

                for ind in range(1,len(inter_tri)):
                    # print(inter_tri[ind])
                    inter_tri_merge = inter_tri_merge.Union(inter_tri[ind])



                inter_tri_merge_line:Geometry = self.get_polygon_lines(inter_tri_merge)

                inter_tri_merge_line_points:list = inter_tri_merge_line.GetPoints()

                bian_ps = bian.GetPoints()

                if len(inter_tri_merge_line_points[0]) == 3:
                    try:
                        first_index = inter_tri_merge_line_points.index(bian_ps[0])
                        second_index = inter_tri_merge_line_points.index(bian_ps[1])
                    except:
                        #
                        # for int in inter_tri:
                        #     tris.remove(int)
                        print(11)
                        print(bian)
                        print(inter_tri_merge)


                        continue
                else:
                    try:
                        first_index = inter_tri_merge_line_points.index((bian_ps[0][0],bian_ps[0][1]))
                        second_index = inter_tri_merge_line_points.index((bian_ps[1][0],bian_ps[1][1]))
                    except:
                        print(11)
                        print(bian)
                        print(inter_tri_merge)

                        # for int in inter_tri:
                        #     tris.remove(int)
                        continue

                small_index = min(first_index,second_index)
                big_index = max(first_index,second_index)

                small_points:list = inter_tri_merge_line_points[:small_index+1]
                small_points.extend(inter_tri_merge_line_points[big_index:])
                small_polygon:Geometry = self.create_polygon(small_points)

                delauney: Geometry = small_polygon.DelaunayTriangulation()

                for de in delauney:
                    dde = copy.deepcopy(de)
                    tris.add(dde)


                big_points = inter_tri_merge_line_points[small_index:big_index+1]
                big_points.append(inter_tri_merge_line_points[small_index])

                big_polygon:Geometry = self.create_polygon(big_points)

                delauney: Geometry = big_polygon.DelaunayTriangulation()

                for de in delauney:
                    dde = copy.deepcopy(de)
                    tris.add(dde)


        centers = []

        after_tir = []

        for tri in tris:

            # tri_ext = tri.GetEnvelope()
            # tri_center_x = (tri_ext[0] + tri_ext[1]) / 2
            # tri_center_y = (tri_ext[2] + tri_ext[3]) / 2
            #
            # tri_center:Geometry = ogr.Geometry(ogr.wkbPoint)
            # tri_center.AddPoint(tri_center_x,tri_center_y)

            tri_center = self.get_center(tri)

            centers.append(tri_center)
            dd = tri_center.Distance(multipolygon)

            if dd > 1.114691025217302e-04:
                continue
            intersection_geom: Geometry = multipolygon.Intersection(tri)

            if intersection_geom:

                if intersection_geom.GetGeometryType() in [3, -2147483645, 3003]:
                    pass
                else:
                    after_tir.append(tri)
                    multipolygon.AddGeometry(tri)

        result :Geometry = multipolygon.UnionCascaded()

        #去岛
        # if result.GetGeometryCount()>1:
        #     for indx in range(1,result.GetGeometryCount()):
        #         hole:Geometry = result.GetGeometryRef(indx)
        #         if hole.GetArea() < (1.114691025217302e-04* 1.114691025217302e-04):
        #             result.RemoveGeometry(hole)

        return [t.ExportToWkt() for t in tris]
        return [result.ExportToWkt()]


    def get_center(self,tri:Geometry):
        tri_line: Geometry = ogr.ForceToLineString(tri)
        tri_line_points = tri_line.GetPoints()

        lines = []

        for index in range(len(tri_line_points)-1):
            first = tri_line_points[index]
            second = tri_line_points[index+1]
            line: Geometry = ogr.Geometry(ogr.wkbLineString)
            line.AddPoint(first[0], first[1])
            line.AddPoint(second[0], second[1])
            lines.append(line)

        lines = sorted(lines,key=lambda l:l.Length())

        return lines[-1].Centroid()

    def extend_tri(self,triangle:Geometry):

        center:Geometry = triangle.Centroid()
        tri_line: Geometry = ogr.ForceToLineString(triangle)

        result_geo = ogr.Geometry(ogr.wkbPolygon)
        result_geo_ring:Geometry = ogr.Geometry(ogr.wkbLinearRing)
        for point in tri_line.GetPoints():
            radpoint = self.get_radpoint(center.GetX(),center.GetY(),point[0],point[1],0.00000001)

            result_geo_ring.AddPoint(radpoint.GetX(),radpoint.GetY())
        result_geo.AddGeometry(result_geo_ring)

        return result_geo

    def get_radpoint(self,x0,y0,x1,y1,len_size):
        a2 = math.pow((x1-x0),2)
        b2 = math.pow((y1-y0),2)
        len_size = len_size+ math.sqrt(a2+b2)

        tanx = abs(y1-y0) / abs(x1-x0)
        cosx = 1 / math.sqrt(1 + tanx *tanx)
        dx = cosx* len_size
        dy = tanx * dx

        if x1 == x0:
            x2 = x0
            if y1 > y0 :
                y2 = y0 + abs(dy)
            else:
                y2 = y0 - abs(dy)
        else:
            if x1 < x0:
                x2 = x0 - dx
            else:
                x2 = x0 + dx

            if y1 < y0:
                y2 = y0 - dy
            else:
                y2 = y0 + dy

        other_point = ogr.Geometry(ogr.wkbPoint)
        other_point.AddPoint(x2, y2)

        return  other_point

    def create_polygon(self,ps):

        ring = ogr.Geometry(ogr.wkbLinearRing)
        for p in ps:
            ring.AddPoint(p[0], p[1])
        poly = ogr.Geometry(ogr.wkbPolygon)
        poly.AddGeometry(ring)
        return poly



    def get_polygon_lines(self,polygon):
        #无孔Polygon
        if polygon.GetGeometryCount() in [0,1]:
            polygon_line : Geometry = ogr.ForceToLineString(polygon)
        # 有孔Polygon
        else:
            polygon_line : Geometry = ogr.ForceToLineString(polygon.GetGeometryRef(0))

        return polygon_line

    def get_bian(self,polygon):

        bians = []

        polygon_lines = []

        if polygon.GetGeometryCount() in [0, 1]:
            polygon_line: Geometry = ogr.ForceToLineString(polygon)
            polygon_lines.append(polygon_line)
        # 有孔Polygon
        else:
            for index in range(polygon.GetGeometryCount()):
                polygon_line: Geometry = ogr.ForceToLineString(polygon.GetGeometryRef(index))
                polygon_lines.append(polygon_line)

        for polygon_line in polygon_lines:
            for index in range(polygon_line.GetPointCount()-1):
                p = polygon_line.GetPoint(index)
                p_next = polygon_line.GetPoint(index+1)

                line: Geometry = ogr.Geometry(ogr.wkbLineString)

                if len(p) ==2:
                    line.AddPoint(p[0], p[1],0)
                    line.AddPoint(p_next[0], p_next[1],0)
                else:
                    line.AddPoint(p[0], p[1])
                    line.AddPoint(p_next[0], p_next[1])
                bians.append(line)

        return bians

    def process(self):
        polygons = self.data.get_polygons()

        return self.merge(polygons)


if __name__ == '__main__':

    import time
    t1 = time.time()

    gdal.UseExceptions()

    sd = ShapeData(r"J:\Data\制图综合result\hole.shp")

    zh = Zonghe(sd)
    wkts = zh.process()

    result = r"J:\Data\制图综合result\hole_mege.shp"

    ShapeData.create_shp_fromwkts(result,"zh",wkts)

    print(time.time()-t1)