ShapeData.py
4.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# coding=utf-8
#author: 4N
#createtime: 2022/3/15
#email: nheweijun@sina.com
from osgeo import ogr
from osgeo.ogr import *
import math
import os
import copy
class ShapeData:
driver = ogr.GetDriverByName("ESRI Shapefile")
def __init__(self,path):
self.ds: DataSource = self.driver.Open(path, 1)
if not self.ds:
raise Exception("打开数据失败!")
self.layer: Layer = self.ds.GetLayer(0)
def get_polygons(self):
polygons = []
for feature in self.layer:
f:Feature = feature
geom : Geometry = copy.deepcopy(f.GetGeometryRef())
if geom is None:
continue
if geom.GetGeometryType() in [3,-2147483645,3001]:
polygons.append(ogr.ForceToPolygon(geom))
if geom.GetGeometryType() in [6 ,-2147483642 ,3006]:
for i in range(geom.GetGeometryCount()):
polygons.append(ogr.ForceToPolygon(geom.GetGeometryRef(i)))
# p_cs = []
# for p in polygons:
# polygon_line: Geometry = self.get_polygon_lines(p)
# print(polygon_line)
# p_cs.append(self.create_polygon(polygon_line.GetPoints()))
#
# return p_cs
return polygons
def get_polygon_lines(self,polygon):
#无孔Polygon
if polygon.GetGeometryCount() in [0,1]:
polygon_line : Geometry = ogr.ForceToLineString(polygon)
# 有孔Polygon
else:
polygon_line : Geometry = ogr.ForceToLineString(polygon.GetGeometryRef(0))
return polygon_line
def create_polygon(self,ps):
ring = ogr.Geometry(ogr.wkbLinearRing)
for p in ps:
ring.AddPoint(p[0], p[1],0)
poly = ogr.Geometry(ogr.wkbPolygon)
poly.AddGeometry(ring)
return poly
def close(self):
self.ds.Destroy()
@classmethod
def create_by_layer(cls,path,layer:Layer):
data_source: DataSource = cls.driver.CreateDataSource(path)
data_source.CopyLayer(layer,layer.GetName())
data_source.Destroy()
@classmethod
def create_by_scheme(cls,path,name,sr,geo_type,scheme,features):
data_source: DataSource = cls.driver.CreateDataSource(path)
layer :Layer = data_source.CreateLayer(name, sr, geo_type)
if scheme:
layer.CreateFields(scheme)
for feature in features:
layer.CreateFeature(feature)
data_source.Destroy()
@classmethod
def create_point(cls,path,name,point):
data_source: DataSource = cls.driver.CreateDataSource(path)
layer :Layer = data_source.CreateLayer(name, None, ogr.wkbPoint)
feat_new = ogr.Feature(layer.GetLayerDefn())
feat_new.SetGeometry(point)
layer.CreateFeature(feat_new)
data_source.Destroy()
@classmethod
def create_shp_fromwkts(cls,path,name,wkts):
geo_type = None
geoms = []
for wkt in wkts:
geom : Geometry = ogr.CreateGeometryFromWkt(wkt)
if geo_type is None:
geo_type = geom.GetGeometryType()
geoms.append(geom)
if os.path.exists(path):
pre_name = ".".join(path.split(".")[0:-1])
for bac in ["dbf","prj","cpg","shp","shx","sbn","sbx"]:
try:
os.remove(pre_name+"."+bac)
except Exception as e:
pass
data_source: DataSource = cls.driver.CreateDataSource(path)
layer :Layer = data_source.CreateLayer(name, None, geo_type)
for geom in geoms:
feat_new = ogr.Feature(layer.GetLayerDefn())
feat_new.SetGeometry(geom)
layer.CreateFeature(feat_new)
data_source.Destroy()
if __name__ == '__main__':
sd = ShapeData(r"J:\Data\制图综合result\test.shp")
# polygons = sd.get_polygons()
# wkts = []
# for p in polygons:
# # print(p.ExportToWkt())
# wkts.append(p.ExportToWkt())
# result = r"J:\Data\制图综合result\zhongshan_ronghe.shp"
# ShapeData.create_shp_fromwkts(result,"zh",wkts)
layer = sd.layer
fn = "PG: user=postgres password=chinadci host=172.26.60.101 port=5432 dbname=ceshi "
driver = ogr.GetDriverByName("PostgreSQL")
if driver is None:
raise Exception("打开PostgreSQL驱动失败,可能是当前GDAL未支持PostgreSQL驱动!")
ds:DataSource = driver.Open(fn, 1)
pg_layer = ds.CreateLayer("test", layer.GetSpatialRef(), layer.GetGeomType(), ["overwrite=yes"])
schema = layer.schema
pg_layer.CreateFields(schema)
for feature in layer:
out_feature: Feature = copy.copy(feature)
out_feature.SetFID(out_feature.GetFID() + 1)
pg_layer.CreateFeature(out_feature)
ds.Destroy()