实用代码
geojson 转 shp
import geopandas as gpd
import pandas as pd
import os
from shapely.geometry import shape
import fiona
def geojson_to_shp(geojson_file, shp_file):
"""
将GeoJSON文件转换为Shapefile文件
参数:
geojson_file: 输入的GeoJSON文件路径
shp_file: 输出的Shapefile文件路径
"""
try:
# 读取GeoJSON文件
print(f"正在读取GeoJSON文件: {geojson_file}")
gdf = gpd.read_file(geojson_file)
# 确保输出目录存在
output_dir = os.path.dirname(shp_file)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
# 保存为Shapefile
print(f"正在转换为Shapefile: {shp_file}")
gdf.to_file(shp_file, encoding='utf-8')
print("转换完成!")
return True
except Exception as e:
print(f"转换过程中发生错误: {e}")
return False
def read_shp_info(shp_file):
"""
读取Shapefile文件并打印各种信息
参数:
shp_file: Shapefile文件路径
"""
try:
# 读取Shapefile
print(f"\n正在读取Shapefile文件: {shp_file}")
gdf = gpd.read_file(shp_file)
# 1. 基本信息
print("\n" + "="*50)
print("Shapefile基本信息")
print("="*50)
print(f"文件路径: {shp_file}")
print(f"要素数量: {len(gdf)}")
print(f"几何类型: {gdf.geometry.type.unique()}")
print(f"坐标参考系(CRS): {gdf.crs}")
# 2. 字段信息
print("\n" + "="*50)
print("字段信息")
print("="*50)
print(f"字段数量: {len(gdf.columns)}")
print("字段详情:")
for i, col in enumerate(gdf.columns, 1):
dtype = gdf[col].dtype
print(f" {i}. {col} ({dtype})")
# 3. 属性表预览
print("\n" + "="*50)
print("属性表前5行")
print("="*50)
print(gdf.head())
# 4. 空间范围
print("\n" + "="*50)
print("空间范围")
print("="*50)
bounds = gdf.total_bounds
print(f"最小X: {bounds[0]:.6f}")
print(f"最小Y: {bounds[1]:.6f}")
print(f"最大X: {bounds[2]:.6f}")
print(f"最大Y: {bounds[3]:.6f}")
# 5. 几何信息统计
print("\n" + "="*50)
print("几何信息统计")
print("="*50)
print(f"总面积: {gdf.area.sum():.6f} 平方单位")
print(f"总长度: {gdf.length.sum():.6f} 单位")
# 6. 各几何类型的数量统计
print("\n" + "="*50)
print("几何类型统计")
print("="*50)
geometry_counts = gdf.geometry.type.value_counts()
for geom_type, count in geometry_counts.items():
print(f"{geom_type}: {count} 个")
# 7. 空间参考系统详细信息
print("\n" + "="*50)
print("坐标参考系详细信息")
print("="*50)
if gdf.crs is not None:
crs_info = gdf.crs.to_dict()
for key, value in crs_info.items():
print(f"{key}: {value}")
else:
print("未定义坐标参考系")
# 8. 内存使用情况
print("\n" + "="*50)
print("内存使用信息")
print("="*50)
memory_usage = gdf.memory_usage(deep=True).sum()
print(f"总内存使用: {memory_usage / 1024:.2f} KB")
return gdf
except Exception as e:
print(f"读取Shapefile时发生错误: {e}")
return None
geojson_file = r"D:\DownLoad\chrome\黔东南苗族侗族自治州_县.geojson" # 替换为您的GeoJSON文件路径
shp_file = "黔东南苗族侗族自治州_县.shp"
# 执行转换
print("\n开始GeoJSON到Shapefile的转换...")
success = geojson_to_shp(geojson_file, shp_file)
if success:
# 读取并显示Shapefile信息
gdf = read_shp_info(shp_file)
# 可选: 显示更多详细信息
if gdf is not None:
print("\n" + "="*50)
print("前3个要素的详细信息")
print("="*50)
for i, (idx, row) in enumerate(gdf.head(3).iterrows()):
print(f"\n要素 {i+1}:")
for col in gdf.columns:
if col != 'geometry':
print(f" {col}: {row[col]}")
print(f" 几何类型: {row.geometry.geom_type}")
print(f" 面积: {row.geometry.area:.2f}")
print(f" 边界: {row.geometry.bounds}")
print("\n程序执行完成!")切分shp,切分县
切分2块
代码
import geopandas as gpd
import pandas as pd
import numpy as np
from shapely.geometry import Polygon, MultiPolygon, LineString
from shapely.ops import split
import os
import math
def split_polygon_into_two(polygon, split_direction='longest'):
"""
将多边形切分为两部分
"""
try:
if polygon.is_empty:
return None, None
# 获取多边形的边界框
minx, miny, maxx, maxy = polygon.bounds
if split_direction == 'longest':
# 按最长边方向切分
width = maxx - minx
height = maxy - miny
if width > height:
split_direction = 'vertical'
else:
split_direction = 'horizontal'
if split_direction == 'vertical':
# 垂直切分(东西方向)
mid_x = (minx + maxx) / 2
split_line = LineString([(mid_x, miny - 1), (mid_x, maxy + 1)])
else:
# 水平切分(南北方向)
mid_y = (miny + maxy) / 2
split_line = LineString([(minx - 1, mid_y), (maxx + 1, mid_y)])
# 执行切分
split_parts = split(polygon, split_line)
# 过滤有效的多边形
valid_parts = [part for part in split_parts.geoms
if isinstance(part, (Polygon, MultiPolygon)) and not part.is_empty]
if len(valid_parts) >= 2:
return valid_parts[0], valid_parts[1]
else:
# 如果切分不成功,尝试对角线切分
return split_by_diagonal(polygon)
except Exception as e:
print(f"切分多边形时出错: {e}")
return split_by_diagonal(polygon)
def split_by_diagonal(polygon):
"""
通过对角线切分的备选方法
"""
try:
minx, miny, maxx, maxy = polygon.bounds
# 从左上到右下
split_line = LineString([(minx, maxy), (maxx, miny)])
split_parts = split(polygon, split_line)
valid_parts = [part for part in split_parts.geoms
if isinstance(part, (Polygon, MultiPolygon)) and not part.is_empty]
if len(valid_parts) >= 2:
return valid_parts[0], valid_parts[1]
else:
# 从右上到左下
split_line = LineString([(maxx, maxy), (minx, miny)])
split_parts = split(polygon, split_line)
valid_parts = [part for part in split_parts.geoms
if isinstance(part, (Polygon, MultiPolygon)) and not part.is_empty]
if len(valid_parts) >= 2:
return valid_parts[0], valid_parts[1]
else:
print("所有切分方法都失败,返回原始多边形")
return polygon, polygon
except Exception as e:
print(f"对角线切分也失败: {e}")
return polygon, polygon
def auto_detect_town_field(gdf):
"""
自动检测乡镇名字段
"""
# 常见的中文乡镇字段名
town_keywords = ['镇', '乡', '街道', 'town', 'TOWN', 'name', 'NAME', '名称', '地名']
for field in gdf.columns:
field_lower = str(field).lower()
# 检查字段名是否包含乡镇相关关键词
for keyword in town_keywords:
if keyword.lower() in field_lower:
return field
# 检查字段内容是否包含乡镇特征
sample_values = gdf[field].dropna().head(10)
if len(sample_values) > 0:
sample_str = ' '.join(str(x) for x in sample_values)
if any(keyword in sample_str for keyword in ['镇', '乡', '街道']):
return field
# 如果没有找到,返回第一个非几何字段
non_geom_fields = [col for col in gdf.columns if col != gdf.geometry.name]
return non_geom_fields[0] if non_geom_fields else gdf.columns[0]
def auto_detect_county_field(gdf):
"""
自动检测县名字段
"""
# 常见的中文县字段名
county_keywords = ['县', '区', '市', 'county', 'COUNTY', 'city', 'CITY']
for field in gdf.columns:
field_lower = str(field).lower()
for keyword in county_keywords:
if keyword.lower() in field_lower:
return field
# 检查字段内容是否包含县区特征
sample_values = gdf[field].dropna().head(10)
if len(sample_values) > 0:
sample_str = ' '.join(str(x) for x in sample_values)
if any(keyword in sample_str for keyword in ['县', '区', '市']):
return field
# 如果没有找到,尝试用乡镇字段名
town_field = auto_detect_town_field(gdf)
if town_field != gdf.columns[0]:
return gdf.columns[0]
else:
return "未知县"
def clean_filename(name):
"""
清理文件名,移除非法字符
"""
import re
# 移除Windows文件名中的非法字符
illegal_chars = r'[<>:"/\\|?*]'
cleaned = re.sub(illegal_chars, '_', str(name))
# 移除开头和结尾的空格和点号
cleaned = cleaned.strip().strip('.')
# 限制文件名长度
if len(cleaned) > 100:
cleaned = cleaned[:100]
return cleaned
def save_single_shapefile(gdf, output_path, file_name):
"""
保存单个Shapefile文件
"""
try:
# 确保输出目录存在
output_dir = os.path.dirname(output_path)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 完整的文件路径
full_path = os.path.join(output_path, f"{file_name}.shp")
# 保存为Shapefile
gdf.to_file(full_path, encoding='utf-8')
print(f" 已保存: {file_name}.shp")
return True
except Exception as e:
print(f" 保存文件 {file_name}.shp 时出错: {e}")
return False
def split_towns_to_individual_files(shp_file, output_dir):
"""
自动读取县级shp文件,将每个乡镇切分为两部分,并保存为单独的shp文件
参数:
shp_file: 输入的县级Shapefile文件路径
output_dir: 输出目录
"""
# 确保输出目录存在
if not os.path.exists(output_dir):
os.makedirs(output_dir)
try:
# 读取Shapefile
print(f"正在读取文件: {shp_file}")
gdf = gpd.read_file(shp_file)
# 自动检测字段
town_field = auto_detect_town_field(gdf)
county_field = auto_detect_county_field(gdf)
print(f"检测到县字段: {county_field}")
print(f"检测到乡镇字段: {town_field}")
# 显示字段信息
print(f"\n所有字段: {list(gdf.columns)}")
print(f"几何类型: {gdf.geometry.type.unique()}")
print(f"要素数量: {len(gdf)}")
# 显示乡镇列表
towns = gdf[town_field].unique()
print(f"\n发现 {len(towns)} 个乡镇:")
for i, town in enumerate(towns, 1):
print(f" {i}. {town}")
# 获取县名
if county_field in gdf.columns and len(gdf[county_field].unique()) == 1:
county_name = gdf[county_field].iloc[0]
else:
# 从文件名提取县名或使用默认值
county_name = os.path.basename(shp_file).replace('.shp', '')
print(f"使用文件名作为县名: {county_name}")
# 清理县名用于文件名
county_name_clean = clean_filename(county_name)
# 创建县目录
county_dir = os.path.join(output_dir, county_name_clean)
if not os.path.exists(county_dir):
os.makedirs(county_dir)
print(f"\n开始切分并保存乡镇文件...")
print("="*60)
success_count = 0
fail_count = 0
total_files_created = 0
for idx, row in gdf.iterrows():
town_name = row[town_field]
geometry = row.geometry
print(f"\n处理乡镇: {town_name}")
# 清理乡镇名用于文件名
town_name_clean = clean_filename(town_name)
# 处理MultiPolygon
if isinstance(geometry, MultiPolygon):
polygons = list(geometry.geoms)
else:
polygons = [geometry]
town_parts = []
for poly_idx, polygon in enumerate(polygons):
# 切分多边形
part1, part2 = split_polygon_into_two(polygon)
if part1 is not None and part2 is not None and not part1.equals(part2):
town_parts.extend([part1, part2])
print(f" ✓ 成功切分第{poly_idx+1}个多边形")
else:
# 如果切分失败,使用边界框切分
print(f" ⚠ 第{poly_idx+1}个多边形切分不理想,使用边界框切分")
part1, part2 = split_by_bounds(polygon)
if part1 is not None and part2 is not None and not part1.equals(part2):
town_parts.extend([part1, part2])
else:
print(f" ✗ 第{poly_idx+1}个多边形切分完全失败,保留原状")
town_parts.append(polygon)
# 为每个部分创建单独的shp文件
for i, part in enumerate(town_parts[:2], 1): # 只取前两个部分
# 创建新的GeoDataFrame
new_row = row.copy()
new_row.geometry = part
# 添加切分信息
new_row['SPLIT_ID'] = f"{county_name}-{town_name}-{i}"
new_row['ORIGINAL_TOWN'] = town_name
new_row['SPLIT_PART'] = i
new_row['COUNTY_NAME'] = county_name
single_gdf = gpd.GeoDataFrame([new_row], crs=gdf.crs)
# 生成文件名
file_name = f"{county_name_clean}-{town_name_clean}-{i}"
# 保存为单独的shp文件
if save_single_shapefile(single_gdf, county_dir, file_name):
total_files_created += 1
print(f" ✓ 保存: {file_name}.shp")
else:
print(f" ✗ 保存失败: {file_name}.shp")
if len(town_parts) >= 2:
success_count += 1
else:
fail_count += 1
print(f"\n" + "="*60)
print(f"处理完成!")
print("="*60)
print(f"原始乡镇数量: {len(gdf)}")
print(f"成功切分: {success_count} 个乡镇")
print(f"切分失败: {fail_count} 个乡镇")
print(f"创建的shp文件总数: {total_files_created}")
print(f"输出目录: {county_dir}")
# 显示生成的文件列表
print(f"\n生成的文件列表:")
shp_files = [f for f in os.listdir(county_dir) if f.endswith('.shp')]
for i, file in enumerate(shp_files[:20], 1): # 只显示前20个文件
print(f" {i}. {file}")
if len(shp_files) > 20:
print(f" ... 还有 {len(shp_files) - 20} 个文件")
# 保存处理日志
save_processing_log(county_dir, county_name, gdf, success_count, fail_count, total_files_created)
return total_files_created
except Exception as e:
print(f"处理过程中发生错误: {e}")
import traceback
traceback.print_exc()
return 0
def split_by_bounds(polygon):
"""
使用边界框进行切分
"""
try:
minx, miny, maxx, maxy = polygon.bounds
center_x, center_y = (minx + maxx) / 2, (miny + maxy) / 2
# 创建四个象限的矩形
rect1 = Polygon([(minx, miny), (center_x, miny), (center_x, center_y), (minx, center_y), (minx, miny)])
rect2 = Polygon([(center_x, miny), (maxx, miny), (maxx, center_y), (center_x, center_y), (center_x, miny)])
rect3 = Polygon([(minx, center_y), (center_x, center_y), (center_x, maxy), (minx, maxy), (minx, center_y)])
rect4 = Polygon([(center_x, center_y), (maxx, center_y), (maxx, maxy), (center_x, maxy), (center_x, center_y)])
# 与原始多边形求交
parts = []
for rect in [rect1, rect2, rect3, rect4]:
intersection = polygon.intersection(rect)
if not intersection.is_empty and intersection.area > 0:
if isinstance(intersection, MultiPolygon):
parts.extend(list(intersection.geoms))
else:
parts.append(intersection)
# 按面积排序,取前两个最大的部分
parts.sort(key=lambda x: x.area, reverse=True)
if len(parts) >= 2:
return parts[0], parts[1]
elif len(parts) == 1:
return parts[0], parts[0]
else:
return polygon, polygon
except Exception as e:
print(f"边界框切分失败: {e}")
return polygon, polygon
def save_processing_log(output_dir, county_name, original_gdf, success_count, fail_count, total_files):
"""
保存处理日志
"""
try:
log_file = os.path.join(output_dir, f"处理日志.txt")
with open(log_file, 'w', encoding='utf-8') as f:
f.write(f"乡镇切分处理日志\n")
f.write(f"=" * 50 + "\n")
f.write(f"处理时间: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"县名称: {county_name}\n")
f.write(f"原始乡镇数量: {len(original_gdf)}\n")
f.write(f"成功切分: {success_count} 个乡镇\n")
f.write(f"切分失败: {fail_count} 个乡镇\n")
f.write(f"生成的shp文件总数: {total_files}\n")
f.write(f"\n原始乡镇列表:\n")
town_field = auto_detect_town_field(original_gdf)
for town in original_gdf[town_field].unique():
f.write(f" - {town}\n")
print(f"处理日志已保存: {log_file}")
except Exception as e:
print(f"保存处理日志时出错: {e}")
# 使用示例
if __name__ == "__main__":
# 输入文件路径 - 替换为您的县级Shapefile路径
input_shp = "黔东南苗族侗族自治州_县.shp" # 您的县级shp文件
output_directory = input_shp.split('.')[0]
print("开始自动切分乡镇并保存为单独文件...")
print("="*60)
# 执行切分
files_created = split_towns_to_individual_files(
shp_file=input_shp,
output_dir=output_directory
)
if files_created > 0:
print(f"\n" + "="*60)
print("处理完成!")
print("="*60)
print(f"总共生成了 {files_created} 个shp文件")
print(f"文件命名格式: 县名-镇名-编号.shp")
print(f"示例: 某县-某镇-1.shp, 某县-某镇-2.shp")
else:
print("\n处理失败,请检查输入文件和错误信息。")效果
切分多块
代码
import geopandas as gpd
import pandas as pd
import numpy as np
from shapely.geometry import Polygon, MultiPolygon, LineString, Point
from shapely.ops import split, voronoi_diagram
import os
import math
def split_polygon_into_n_parts(polygon, n_parts=2, method='grid'):
"""
将多边形切分为N部分
参数:
polygon: 要切分的多边形
n_parts: 切分数量 (2, 3, 4)
method: 切分方法 ('grid', 'voronoi', 'radial')
"""
try:
if polygon.is_empty:
return [polygon] * n_parts
# 获取多边形的边界框
minx, miny, maxx, maxy = polygon.bounds
if method == 'grid':
return split_by_grid(polygon, n_parts)
elif method == 'voronoi':
return split_by_voronoi(polygon, n_parts)
elif method == 'radial':
return split_by_radial(polygon, n_parts)
else:
return split_by_grid(polygon, n_parts)
except Exception as e:
print(f"切分多边形时出错: {e}")
# 如果切分失败,返回原始多边形的多个副本
return [polygon] * n_parts
def split_by_grid(polygon, n_parts):
"""
使用网格方法切分多边形
"""
minx, miny, maxx, maxy = polygon.bounds
width = maxx - minx
height = maxy - miny
parts = []
if n_parts == 2:
# 2等分
if width > height:
# 垂直切分
mid_x = (minx + maxx) / 2
split_line = LineString([(mid_x, miny - 1), (mid_x, maxy + 1)])
else:
# 水平切分
mid_y = (miny + maxy) / 2
split_line = LineString([(minx - 1, mid_y), (maxx + 1, mid_y)])
split_parts = split(polygon, split_line)
parts = [part for part in split_parts.geoms if not part.is_empty]
elif n_parts == 3:
# 3等分 - 根据长宽比决定切分方向
if width > height:
# 垂直三等分
x1 = minx + width / 3
x2 = minx + 2 * width / 3
split_line1 = LineString([(x1, miny - 1), (x1, maxy + 1)])
split_line2 = LineString([(x2, miny - 1), (x2, maxy + 1)])
else:
# 水平三等分
y1 = miny + height / 3
y2 = miny + 2 * height / 3
split_line1 = LineString([(minx - 1, y1), (maxx + 1, y1)])
split_line2 = LineString([(minx - 1, y2), (maxx + 1, y2)])
# 第一次切分
temp_parts = split(polygon, split_line1)
parts = []
for part in temp_parts.geoms:
if not part.is_empty:
# 第二次切分
sub_parts = split(part, split_line2)
parts.extend([p for p in sub_parts.geoms if not p.is_empty])
elif n_parts == 4:
# 4等分 - 十字切分
mid_x = (minx + maxx) / 2
mid_y = (miny + maxy) / 2
# 垂直切分线
vert_line = LineString([(mid_x, miny - 1), (mid_x, maxy + 1)])
# 水平切分线
horz_line = LineString([(minx - 1, mid_y), (maxx + 1, mid_y)])
# 先垂直切分
temp_parts = split(polygon, vert_line)
parts = []
for part in temp_parts.geoms:
if not part.is_empty:
# 再水平切分
sub_parts = split(part, horz_line)
parts.extend([p for p in sub_parts.geoms if not p.is_empty])
# 过滤有效的多边形并确保数量正确
valid_parts = [part for part in parts if isinstance(part, (Polygon, MultiPolygon)) and not part.is_empty]
if len(valid_parts) >= n_parts:
# 按面积排序,取前n_parts个最大的部分
valid_parts.sort(key=lambda x: x.area, reverse=True)
return valid_parts[:n_parts]
else:
# 如果切分出的部分不够,使用边界框方法
return split_by_bounds(polygon, n_parts)
def split_by_voronoi(polygon, n_parts):
"""
使用Voronoi图方法切分多边形
"""
try:
minx, miny, maxx, maxy = polygon.bounds
center_x, center_y = (minx + maxx) / 2, (miny + maxy) / 2
# 生成随机点(在边界框内)
points = []
for _ in range(n_parts):
# 在边界框内随机生成点,但偏向中心区域
x = np.random.uniform(minx + 0.2*(maxx-minx), maxx - 0.2*(maxx-minx))
y = np.random.uniform(miny + 0.2*(maxy-miny), maxy - 0.2*(maxy-miny))
points.append(Point(x, y))
# 添加中心点确保覆盖
points.append(Point(center_x, center_y))
# 创建Voronoi图
voronoi = voronoi_diagram(MultiPoint(points))
parts = []
for region in voronoi.geoms:
if region.intersects(polygon):
intersection = region.intersection(polygon)
if not intersection.is_empty:
parts.append(intersection)
# 按面积排序,取前n_parts个最大的部分
parts.sort(key=lambda x: x.area, reverse=True)
return parts[:n_parts] if len(parts) >= n_parts else split_by_bounds(polygon, n_parts)
except Exception as e:
print(f"Voronoi切分失败: {e}")
return split_by_bounds(polygon, n_parts)
def split_by_radial(polygon, n_parts):
"""
使用径向切分方法
"""
try:
centroid = polygon.centroid
minx, miny, maxx, maxy = polygon.bounds
parts = []
angle_step = 360 / n_parts
for i in range(n_parts):
# 计算径向线的角度
angle1 = math.radians(i * angle_step)
angle2 = math.radians((i + 1) * angle_step)
# 创建扇形(从中心到边界)
distance = math.sqrt((maxx-minx)**2 + (maxy-miny)**2)
x1 = centroid.x + distance * math.cos(angle1)
y1 = centroid.y + distance * math.sin(angle1)
x2 = centroid.x + distance * math.cos(angle2)
y2 = centroid.y + distance * math.sin(angle2)
# 创建三角形扇形
triangle = Polygon([(centroid.x, centroid.y), (x1, y1), (x2, y2), (centroid.x, centroid.y)])
# 与原始多边形求交
intersection = polygon.intersection(triangle)
if not intersection.is_empty:
parts.append(intersection)
return parts if len(parts) >= n_parts else split_by_bounds(polygon, n_parts)
except Exception as e:
print(f"径向切分失败: {e}")
return split_by_bounds(polygon, n_parts)
def split_by_bounds(polygon, n_parts):
"""
使用边界框方法切分多边形
"""
try:
minx, miny, maxx, maxy = polygon.bounds
if n_parts == 2:
# 2等分
mid_x = (minx + maxx) / 2
rect1 = Polygon([(minx, miny), (mid_x, miny), (mid_x, maxy), (minx, maxy), (minx, miny)])
rect2 = Polygon([(mid_x, miny), (maxx, miny), (maxx, maxy), (mid_x, maxy), (mid_x, miny)])
parts = [rect1.intersection(polygon), rect2.intersection(polygon)]
elif n_parts == 3:
# 3等分
x1 = minx + (maxx - minx) / 3
x2 = minx + 2 * (maxx - minx) / 3
rect1 = Polygon([(minx, miny), (x1, miny), (x1, maxy), (minx, maxy), (minx, miny)])
rect2 = Polygon([(x1, miny), (x2, miny), (x2, maxy), (x1, maxy), (x1, miny)])
rect3 = Polygon([(x2, miny), (maxx, miny), (maxx, maxy), (x2, maxy), (x2, miny)])
parts = [rect1.intersection(polygon), rect2.intersection(polygon), rect3.intersection(polygon)]
elif n_parts == 4:
# 4等分
mid_x = (minx + maxx) / 2
mid_y = (miny + maxy) / 2
rect1 = Polygon([(minx, miny), (mid_x, miny), (mid_x, mid_y), (minx, mid_y), (minx, miny)])
rect2 = Polygon([(mid_x, miny), (maxx, miny), (maxx, mid_y), (mid_x, mid_y), (mid_x, miny)])
rect3 = Polygon([(minx, mid_y), (mid_x, mid_y), (mid_x, maxy), (minx, maxy), (minx, mid_y)])
rect4 = Polygon([(mid_x, mid_y), (maxx, mid_y), (maxx, maxy), (mid_x, maxy), (mid_x, mid_y)])
parts = [rect1.intersection(polygon), rect2.intersection(polygon),
rect3.intersection(polygon), rect4.intersection(polygon)]
# 过滤空的部分
valid_parts = [part for part in parts if not part.is_empty]
return valid_parts if len(valid_parts) >= n_parts else [polygon] * n_parts
except Exception as e:
print(f"边界框切分失败: {e}")
return [polygon] * n_parts
def auto_detect_town_field(gdf):
"""自动检测乡镇名字段"""
town_keywords = ['镇', '乡', '街道', 'town', 'TOWN', 'name', 'NAME', '名称', '地名']
for field in gdf.columns:
field_lower = str(field).lower()
for keyword in town_keywords:
if keyword.lower() in field_lower:
return field
non_geom_fields = [col for col in gdf.columns if col != gdf.geometry.name]
return non_geom_fields[0] if non_geom_fields else gdf.columns[0]
def auto_detect_county_field(gdf):
"""自动检测县名字段"""
county_keywords = ['县', '区', '市', 'county', 'COUNTY', 'city', 'CITY']
for field in gdf.columns:
field_lower = str(field).lower()
for keyword in county_keywords:
if keyword.lower() in field_lower:
return field
town_field = auto_detect_town_field(gdf)
return gdf.columns[0] if town_field != gdf.columns[0] else "未知县"
def clean_filename(name):
"""清理文件名"""
import re
illegal_chars = r'[<>:"/\\|?*]'
cleaned = re.sub(illegal_chars, '_', str(name))
cleaned = cleaned.strip().strip('.')
if len(cleaned) > 100:
cleaned = cleaned[:100]
return cleaned
def split_towns_to_individual_files(shp_file, output_dir, n_parts=2, method='grid'):
"""
将每个乡镇切分为N部分,并保存为单独的shp文件
参数:
shp_file: 输入的县级Shapefile文件路径
output_dir: 输出目录
n_parts: 切分数量 (2, 3, 4)
method: 切分方法 ('grid', 'voronoi', 'radial')
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
try:
print(f"正在读取文件: {shp_file}")
gdf = gpd.read_file(shp_file)
town_field = auto_detect_town_field(gdf)
county_field = auto_detect_county_field(gdf)
print(f"检测到县字段: {county_field}")
print(f"检测到乡镇字段: {town_field}")
print(f"切分数量: {n_parts}块")
print(f"切分方法: {method}")
# 获取县名
if county_field in gdf.columns and len(gdf[county_field].unique()) == 1:
county_name = gdf[county_field].iloc[0]
else:
county_name = os.path.basename(shp_file).replace('.shp', '')
county_name_clean = clean_filename(county_name)
county_dir = os.path.join(output_dir, f"{county_name_clean}_{n_parts}等分")
if not os.path.exists(county_dir):
os.makedirs(county_dir)
print(f"\n开始切分并保存乡镇文件...")
print("="*60)
success_count = 0
fail_count = 0
total_files_created = 0
for idx, row in gdf.iterrows():
town_name = row[town_field]
geometry = row.geometry
print(f"\n处理乡镇: {town_name}")
town_name_clean = clean_filename(town_name)
if isinstance(geometry, MultiPolygon):
polygons = list(geometry.geoms)
else:
polygons = [geometry]
all_parts = []
for poly_idx, polygon in enumerate(polygons):
print(f" 切分第{poly_idx+1}个多边形...")
parts = split_polygon_into_n_parts(polygon, n_parts, method)
all_parts.extend(parts)
print(f" ✓ 成功切分为 {len(parts)} 部分")
# 为每个部分创建单独的shp文件
for i, part in enumerate(all_parts[:n_parts], 1): # 只取前n_parts个部分
new_row = row.copy()
new_row.geometry = part
new_row['SPLIT_ID'] = f"{county_name}-{town_name}-{i}"
new_row['ORIGINAL_TOWN'] = town_name
new_row['SPLIT_PART'] = i
new_row['COUNTY_NAME'] = county_name
new_row['TOTAL_PARTS'] = n_parts
single_gdf = gpd.GeoDataFrame([new_row], crs=gdf.crs)
file_name = f"{county_name_clean}-{town_name_clean}-{i}"
output_path = os.path.join(county_dir, f"{file_name}.shp")
try:
single_gdf.to_file(output_path, encoding='utf-8')
total_files_created += 1
print(f" ✓ 保存: {file_name}.shp (面积: {part.area:.2f})")
except Exception as e:
print(f" ✗ 保存失败: {file_name}.shp - {e}")
if len(all_parts) >= n_parts:
success_count += 1
else:
fail_count += 1
print(f"\n" + "="*60)
print(f"处理完成!")
print("="*60)
print(f"原始乡镇数量: {len(gdf)}")
print(f"成功切分: {success_count} 个乡镇")
print(f"切分失败: {fail_count} 个乡镇")
print(f"创建的shp文件总数: {total_files_created}")
print(f"输出目录: {county_dir}")
# 显示生成的文件列表
shp_files = [f for f in os.listdir(county_dir) if f.endswith('.shp')]
print(f"\n生成的文件列表 (前20个):")
for i, file in enumerate(shp_files[:20], 1):
print(f" {i}. {file}")
if len(shp_files) > 20:
print(f" ... 还有 {len(shp_files) - 20} 个文件")
return total_files_created
except Exception as e:
print(f"处理过程中发生错误: {e}")
import traceback
traceback.print_exc()
return 0
# 使用示例
if __name__ == "__main__":
input_shp = "黔东南苗族侗族自治州_县.shp"
output_directory = "乡镇切分结果"
# 可以选择切分数量和切分方法
n_parts = 3 # 可以改为 2, 3, 4
method = 'grid' # 可以改为 'grid', 'voronoi', 'radial'
print(f"开始自动切分乡镇为 {n_parts} 块...")
print("="*60)
files_created = split_towns_to_individual_files(
shp_file=input_shp,
output_dir=output_directory,
n_parts=n_parts,
method=method
)
if files_created > 0:
print(f"\n处理完成!总共生成了 {files_created} 个shp文件")
print(f"文件命名格式: 县名-镇名-编号.shp")
print(f"示例: 某县-某镇-1.shp, 某县-某镇-2.shp, 某县-某镇-3.shp")效果
切分shp,切分镇(多块)
首先需要有省市县镇四级行政区划的数据,我的数据来源:点点GIS公众号,哔哩哔哩—>适合切分多个乡镇
通过网盘分享的文件:2021年7月中国乡镇行政区划shp.rar
链接: https://pan.baidu.com/s/1bnG5ShtNTe0uL3KAVMf6Jg?pwd=nefu 提取码: nefu
--来自百度网盘超级会员v4的分享
或者到五级区划查询与下载挨个搜索,要麻烦一些—>适合切分单个乡镇
代码
版本1,整个省的乡镇边界切割
import geopandas as gpd
import pandas as pd
import numpy as np
from shapely.geometry import Polygon, MultiPolygon, LineString
from shapely.ops import split
import os
import math
import warnings
from tqdm import tqdm
# 忽略所有警告
warnings.filterwarnings('ignore')
def read_shp_with_encoding(shp_file):
"""尝试多种编码方式读取SHP文件"""
encodings = ['gbk', 'gb2312', 'utf-8', 'latin1', 'cp936', 'big5']
for encoding in encodings:
try:
gdf = gpd.read_file(shp_file, encoding=encoding)
return gdf, encoding
except:
continue
try:
gdf = gpd.read_file(shp_file)
return gdf, 'unknown'
except Exception as e:
print(f"❌ 读取文件失败: {e}")
return None, None
def split_polygon_into_n_parts(polygon, n_parts=2):
"""将多边形切分为N部分"""
try:
if polygon.is_empty:
return [polygon] * n_parts
minx, miny, maxx, maxy = polygon.bounds
width = maxx - minx
height = maxy - miny
parts = []
if n_parts == 2:
if width > height:
mid_x = (minx + maxx) / 2
split_line = LineString([(mid_x, miny - 1), (mid_x, maxy + 1)])
else:
mid_y = (miny + maxy) / 2
split_line = LineString([(minx - 1, mid_y), (maxx + 1, mid_y)])
split_parts = split(polygon, split_line)
parts = [part for part in split_parts.geoms if not part.is_empty]
elif n_parts == 3:
if width > height:
x1 = minx + width / 3
x2 = minx + 2 * width / 3
split_line1 = LineString([(x1, miny - 1), (x1, maxy + 1)])
split_line2 = LineString([(x2, miny - 1), (x2, maxy + 1)])
else:
y1 = miny + height / 3
y2 = miny + 2 * height / 3
split_line1 = LineString([(minx - 1, y1), (maxx + 1, y1)])
split_line2 = LineString([(minx - 1, y2), (maxx + 1, y2)])
temp_parts = split(polygon, split_line1)
parts = []
for part in temp_parts.geoms:
if not part.is_empty:
sub_parts = split(part, split_line2)
parts.extend([p for p in sub_parts.geoms if not p.is_empty])
elif n_parts == 4:
mid_x = (minx + maxx) / 2
mid_y = (miny + maxy) / 2
vert_line = LineString([(mid_x, miny - 1), (mid_x, maxy + 1)])
horz_line = LineString([(minx - 1, mid_y), (maxx + 1, mid_y)])
temp_parts = split(polygon, vert_line)
parts = []
for part in temp_parts.geoms:
if not part.is_empty:
sub_parts = split(part, horz_line)
parts.extend([p for p in sub_parts.geoms if not p.is_empty])
valid_parts = [part for part in parts if isinstance(part, (Polygon, MultiPolygon)) and not part.is_empty]
if len(valid_parts) >= n_parts:
valid_parts.sort(key=lambda x: x.area, reverse=True)
return valid_parts[:n_parts]
else:
return [polygon] * n_parts
except Exception:
return [polygon] * n_parts
def auto_detect_admin_fields(gdf):
"""自动检测行政区划字段"""
admin_keywords = {
'name': ['name', '名称', '地名']
}
detected_fields = {}
for field in gdf.columns:
if field == 'geometry':
continue
field_lower = field.lower()
for field_type, keywords in admin_keywords.items():
for keyword in keywords:
if keyword.lower() in field_lower:
if field_type not in detected_fields:
detected_fields[field_type] = field
break
if 'name' not in detected_fields:
non_geom_fields = [col for col in gdf.columns if col != 'geometry']
if non_geom_fields:
detected_fields['name'] = non_geom_fields[0]
return detected_fields
def clean_town_name(name):
"""清理乡镇名称"""
import re
if name is None:
return "未知乡镇"
illegal_chars = r'[<>:"/\\|?*]'
cleaned = re.sub(illegal_chars, '', str(name))
cleaned = cleaned.strip().strip('.')
if len(cleaned) > 50:
cleaned = cleaned[:50]
if not cleaned:
cleaned = "未知乡镇"
return cleaned
def get_field_value(row, field_name):
"""安全获取字段值"""
if field_name and field_name in row:
value = row[field_name]
return value if pd.notna(value) and value != "" else None
return None
def split_all_towns_silent(town_shp_file, output_base_dir, n_parts=2):
"""
静默版本:批量切分所有乡镇,忽略警告
"""
if not os.path.exists(output_base_dir):
os.makedirs(output_base_dir)
try:
print("📥 读取乡镇数据...")
town_gdf, encoding = read_shp_with_encoding(town_shp_file)
if town_gdf is None:
print("❌ 无法读取乡镇文件")
return 0
town_fields = auto_detect_admin_fields(town_gdf)
print(f"📊 找到 {len(town_gdf)} 个乡镇")
print(f"🔪 开始切分为 {n_parts} 块...")
total_files_created = 0
success_count = 0
# 使用进度条
with tqdm(total=len(town_gdf), desc="切分进度") as pbar:
for idx, row in town_gdf.iterrows():
town_name = get_field_value(row, town_fields.get('name')) or f"乡镇_{idx+1}"
town_name_clean = clean_town_name(town_name)
geometry = row.geometry
if isinstance(geometry, MultiPolygon):
polygons = list(geometry.geoms)
else:
polygons = [geometry]
# 创建乡镇目录
town_dir = os.path.join(output_base_dir, town_name_clean)
if not os.path.exists(town_dir):
os.makedirs(town_dir)
# 切分每个多边形
all_parts = []
for polygon in polygons:
parts = split_polygon_into_n_parts(polygon, n_parts)
all_parts.extend(parts)
# 保存切分结果
parts_created = 0
for i, part in enumerate(all_parts[:n_parts], 1):
new_row = row.copy()
new_row.geometry = part
# 添加切分信息
new_row['SPLIT_ID'] = f"{town_name_clean}-{i}"
new_row['ORIGINAL_TOWN'] = town_name
new_row['SPLIT_PART'] = i
new_row['TOTAL_PARTS'] = n_parts
single_gdf = gpd.GeoDataFrame([new_row], geometry='geometry', crs=town_gdf.crs)
file_name = f"{town_name_clean}-{i}"
output_path = os.path.join(town_dir, f"{file_name}.shp")
try:
single_gdf.to_file(output_path, encoding='utf-8')
total_files_created += 1
parts_created += 1
except Exception:
pass
if parts_created > 0:
success_count += 1
pbar.update(1)
print(f"\n🎉 切分完成!")
print(f"📊 统计信息:")
print(f" • 总乡镇数量: {len(town_gdf)}")
print(f" • 成功处理: {success_count} 个乡镇")
print(f" • 创建文件: {total_files_created} 个")
return total_files_created
except Exception as e:
print(f"❌ 处理失败: {e}")
return 0
# 使用示例
if __name__ == "__main__":
# 文件路径配置
town_shp_file = "data/贵州省/贵州省_乡镇边界.shp"
output_base_dir = "贵州省乡镇切分结果"
n_parts = 4
print("🗺️ 乡镇边界切分工具 (静默版)")
print("=" * 40)
files_created = split_all_towns_silent(
town_shp_file=town_shp_file,
output_base_dir=output_base_dir,
n_parts=n_parts
)
if files_created > 0:
print(f"✅ 任务完成! 生成 {files_created} 个文件")
else:
print(f"❌ 处理失败")版本2,单个乡镇切割
import geopandas as gpd
import pandas as pd
import numpy as np
from shapely.geometry import Polygon, MultiPolygon, LineString
from shapely.ops import split
import os
import math
import warnings
from tqdm import tqdm
# 忽略所有警告
warnings.filterwarnings('ignore')
def read_shp_with_encoding(shp_file):
"""尝试多种编码方式读取SHP文件"""
encodings = ['gbk', 'gb2312', 'utf-8', 'latin1', 'cp936', 'big5']
for encoding in encodings:
try:
gdf = gpd.read_file(shp_file, encoding=encoding)
return gdf, encoding
except:
continue
try:
gdf = gpd.read_file(shp_file)
return gdf, 'unknown'
except Exception as e:
print(f"❌ 读取文件失败: {e}")
return None, None
def split_polygon_into_n_parts(polygon, n_parts=2):
"""将多边形切分为N部分"""
try:
if polygon.is_empty:
return [polygon] * n_parts
minx, miny, maxx, maxy = polygon.bounds
width = maxx - minx
height = maxy - miny
parts = []
if n_parts == 2:
if width > height:
mid_x = (minx + maxx) / 2
split_line = LineString([(mid_x, miny - 1), (mid_x, maxy + 1)])
else:
mid_y = (miny + maxy) / 2
split_line = LineString([(minx - 1, mid_y), (maxx + 1, mid_y)])
split_parts = split(polygon, split_line)
parts = [part for part in split_parts.geoms if not part.is_empty]
elif n_parts == 3:
if width > height:
x1 = minx + width / 3
x2 = minx + 2 * width / 3
split_line1 = LineString([(x1, miny - 1), (x1, maxy + 1)])
split_line2 = LineString([(x2, miny - 1), (x2, maxy + 1)])
else:
y1 = miny + height / 3
y2 = miny + 2 * height / 3
split_line1 = LineString([(minx - 1, y1), (maxx + 1, y1)])
split_line2 = LineString([(minx - 1, y2), (maxx + 1, y2)])
temp_parts = split(polygon, split_line1)
parts = []
for part in temp_parts.geoms:
if not part.is_empty:
sub_parts = split(part, split_line2)
parts.extend([p for p in sub_parts.geoms if not p.is_empty])
elif n_parts == 4:
mid_x = (minx + maxx) / 2
mid_y = (miny + maxy) / 2
vert_line = LineString([(mid_x, miny - 1), (mid_x, maxy + 1)])
horz_line = LineString([(minx - 1, mid_y), (maxx + 1, mid_y)])
temp_parts = split(polygon, vert_line)
parts = []
for part in temp_parts.geoms:
if not part.is_empty:
sub_parts = split(part, horz_line)
parts.extend([p for p in sub_parts.geoms if not p.is_empty])
valid_parts = [part for part in parts if isinstance(part, (Polygon, MultiPolygon)) and not part.is_empty]
if len(valid_parts) >= n_parts:
valid_parts.sort(key=lambda x: x.area, reverse=True)
return valid_parts[:n_parts]
else:
return [polygon] * n_parts
except Exception:
return [polygon] * n_parts
def get_town_name_from_file(shp_file, gdf):
"""从SHP文件获取乡镇名称"""
# 尝试从字段中获取
name_fields = ['名称', '地名', 'name', 'NAME', '乡镇名', '镇名', 'town', 'TOWN']
for field in name_fields:
if field in gdf.columns and len(gdf) > 0:
town_name = gdf[field].iloc[0]
if pd.notna(town_name) and town_name != "":
return str(town_name).strip()
# 从文件名提取
base_name = os.path.splitext(os.path.basename(shp_file))[0]
suffixes = ['_边界', '_polygon', '_town', '_乡镇', '_boundary', '_shp']
town_name = base_name
for suffix in suffixes:
if town_name.endswith(suffix):
town_name = town_name[:-len(suffix)]
break
return town_name
def clean_town_name(name):
"""清理乡镇名称"""
import re
if name is None:
return "未知乡镇"
illegal_chars = r'[<>:"/\\|?*]'
cleaned = re.sub(illegal_chars, '', str(name))
cleaned = cleaned.strip().strip('.')
if len(cleaned) > 50:
cleaned = cleaned[:50]
if not cleaned:
cleaned = "未知乡镇"
return cleaned
def split_single_town_shp(shp_file, output_base_dir, n_parts=2):
"""
切分单个乡镇SHP文件
保存为: 输出目录/乡镇名/乡镇名-编号.shp
"""
if not os.path.exists(output_base_dir):
os.makedirs(output_base_dir)
try:
print(f"📥 读取文件: {os.path.basename(shp_file)}")
gdf, encoding = read_shp_with_encoding(shp_file)
if gdf is None:
print("❌ 无法读取SHP文件")
return 0
# 获取乡镇名称
town_name = get_town_name_from_file(shp_file, gdf)
town_name_clean = clean_town_name(town_name)
print(f"🏘️ 乡镇名称: {town_name_clean}")
print(f"📊 要素数量: {len(gdf)}")
# 创建乡镇目录
town_dir = os.path.join(output_base_dir, town_name_clean)
if not os.path.exists(town_dir):
os.makedirs(town_dir)
total_files_created = 0
print(f"🔪 开始切分为 {n_parts} 块...")
# 使用进度条
with tqdm(total=len(gdf), desc="切分进度") as pbar:
for idx, row in gdf.iterrows():
geometry = row.geometry
if isinstance(geometry, MultiPolygon):
polygons = list(geometry.geoms)
else:
polygons = [geometry]
# 切分每个多边形
for poly_idx, polygon in enumerate(polygons):
parts = split_polygon_into_n_parts(polygon, n_parts)
# 保存每个部分
for i, part in enumerate(parts[:n_parts], 1):
new_row = row.copy()
new_row.geometry = part
# 添加切分信息
new_row['SPLIT_ID'] = f"{town_name_clean}-{i}"
new_row['ORIGINAL_TOWN'] = town_name
new_row['SPLIT_PART'] = i
new_row['TOTAL_PARTS'] = n_parts
single_gdf = gpd.GeoDataFrame([new_row], geometry='geometry', crs=gdf.crs)
# 生成文件名
if len(polygons) > 1:
file_name = f"{town_name_clean}-{poly_idx+1}-{i}"
else:
file_name = f"{town_name_clean}-{i}"
output_path = os.path.join(town_dir, f"{file_name}.shp")
try:
single_gdf.to_file(output_path, encoding='utf-8')
total_files_created += 1
except Exception:
pass
pbar.update(1)
print(f"\n🎉 切分完成!")
print(f"📊 创建文件: {total_files_created} 个")
print(f"📁 输出目录: {town_dir}")
# 显示生成的文件
print(f"\n📄 生成的文件:")
shp_files = [f for f in os.listdir(town_dir) if f.endswith('.shp')]
for file in shp_files:
print(f" • {file}")
return total_files_created
except Exception as e:
print(f"❌ 处理失败: {e}")
return 0
# 使用示例
if __name__ == "__main__":
# 单个文件切分示例
town_shp_file = "停洞镇.shp" # 替换为您的乡镇SHP文件
output_base_dir = "单个乡镇切分结果"
n_parts = 4
print("🗺️ 单个乡镇边界切分工具")
print("=" * 40)
print(f"目标结构: 乡镇名/乡镇名-编号.shp")
print("=" * 40)
files_created = split_single_town_shp(
shp_file=town_shp_file,
output_base_dir=output_base_dir,
n_parts=n_parts
)
if files_created > 0:
print(f"\n✅ 任务完成! 生成 {files_created} 个文件")
else:
print(f"\n❌ 处理失败")