Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import geopandas as gpd | |
| import leafmap.foliumap as leafmap | |
| from PIL import Image | |
| import rasterio | |
| from rasterio.windows import Window | |
| from stqdm import stqdm | |
| import io | |
| import zipfile | |
| import os | |
| import albumentations as albu | |
| import segmentation_models_pytorch as smp | |
| from albumentations.pytorch.transforms import ToTensorV2 | |
| from shapely.geometry import shape | |
| from shapely.ops import unary_union | |
| from rasterio.features import shapes | |
| import torch | |
| import numpy as np | |
| DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| ENCODER = 'se_resnext50_32x4d' | |
| ENCODER_WEIGHTS = 'imagenet' | |
| # Load and prepare the model | |
| def load_model(): | |
| model = torch.load('deeplabv3 v15.pth', map_location=DEVICE) | |
| model.eval().float() | |
| return model | |
| best_model = load_model() | |
| def to_tensor(x, **kwargs): | |
| return x.astype('float32') | |
| # Preprocessing | |
| preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS) | |
| def get_preprocessing(tile_size): | |
| _transform = [ | |
| albu.PadIfNeeded(min_height=tile_size, min_width=tile_size, always_apply=True), | |
| albu.Lambda(image=preprocessing_fn), | |
| albu.Lambda(image=to_tensor, mask=to_tensor), | |
| ToTensorV2(), | |
| ] | |
| return albu.Compose(_transform) | |
| def extract_tiles(map_file, model, tile_size=512, overlap=0, batch_size=4, threshold=0.6): | |
| preprocess = get_preprocessing(tile_size) | |
| tiles = [] | |
| with rasterio.open(map_file) as src: | |
| height = src.height | |
| width = src.width | |
| effective_tile_size = tile_size - overlap | |
| for y in stqdm(range(0, height, effective_tile_size)): | |
| for x in range(0, width, effective_tile_size): | |
| batch_images = [] | |
| batch_metas = [] | |
| for i in range(batch_size): | |
| curr_y = y + (i * effective_tile_size) | |
| if curr_y >= height: | |
| break | |
| window = Window(x, curr_y, tile_size, tile_size) | |
| out_image = src.read(window=window) | |
| if out_image.shape[0] == 1: | |
| out_image = np.repeat(out_image, 3, axis=0) | |
| elif out_image.shape[0] != 3: | |
| raise ValueError("The number of channels in the image is not supported") | |
| out_image = np.transpose(out_image, (1, 2, 0)) | |
| tile_image = Image.fromarray(out_image.astype(np.uint8)) | |
| out_meta = src.meta.copy() | |
| out_meta.update({ | |
| "driver": "GTiff", | |
| "height": tile_size, | |
| "width": tile_size, | |
| "transform": rasterio.windows.transform(window, src.transform) | |
| }) | |
| tile_image = np.array(tile_image) | |
| preprocessed_tile = preprocess(image=tile_image)['image'] | |
| batch_images.append(preprocessed_tile) | |
| batch_metas.append(out_meta) | |
| if not batch_images: | |
| break | |
| batch_tensor = torch.cat([img.unsqueeze(0).to(DEVICE) for img in batch_images], dim=0) | |
| with torch.no_grad(): | |
| batch_masks = model(batch_tensor) | |
| batch_masks = torch.sigmoid(batch_masks) | |
| batch_masks = (batch_masks > threshold).float() | |
| for j, mask_tensor in enumerate(batch_masks): | |
| mask_resized = torch.nn.functional.interpolate(mask_tensor.unsqueeze(0), | |
| size=(tile_size, tile_size), mode='bilinear', | |
| align_corners=False).squeeze(0) | |
| mask_array = mask_resized.squeeze().cpu().numpy() | |
| if mask_array.any() == 1: | |
| tiles.append([mask_array, batch_metas[j]]) | |
| return tiles | |
| def create_vector_mask(tiles, output_path): | |
| all_polygons = [] | |
| for mask_array, meta in tiles: | |
| # Ensure mask is binary | |
| mask_array = (mask_array > 0).astype(np.uint8) | |
| # Get shapes from the mask | |
| mask_shapes = list(shapes(mask_array, mask=mask_array, transform=meta['transform'])) | |
| # Convert shapes to Shapely polygons | |
| polygons = [shape(geom) for geom, value in mask_shapes if value == 1] | |
| all_polygons.extend(polygons) | |
| # Perform union of all polygons | |
| union_polygon = unary_union(all_polygons) | |
| # Create a GeoDataFrame | |
| gdf = gpd.GeoDataFrame({'geometry': [union_polygon]}, crs=meta['crs']) | |
| # Save to file | |
| gdf.to_file(output_path) | |
| # Calculate area in square meters | |
| area_m2 = gdf.to_crs(epsg=3857).area.sum() | |
| return gdf, area_m2 | |
| def display_map(shapefile_path, tif_path): | |
| st.title("Map with Shape and TIFF Overlay") | |
| # Load the shapefile | |
| mask = gpd.read_file(shapefile_path) | |
| # Check and reproject the mask to EPSG:3857 if needed | |
| if mask.crs is None or mask.crs.to_string() != 'EPSG:3857': | |
| mask = mask.to_crs('EPSG:3857') | |
| # Get the bounds of the shapefile to center the map | |
| bounds = mask.total_bounds # [minx, miny, maxx, maxy] | |
| center = [(bounds[1] + bounds[3]) / 2, (bounds[0] + bounds[2]) / 2] | |
| # Create a leafmap centered on the shapefile bounds | |
| m = leafmap.Map( | |
| center=[center[1], center[0]], # leafmap uses [latitude, longitude] | |
| zoom=10, | |
| crs='EPSG3857' | |
| ) | |
| # Add the mask layer to the map | |
| m.add_gdf(mask, layer_name="Shapefile Mask") | |
| # Add the TIFF image to the map as RGB | |
| m.add_raster(tif_path, layer_name="Satellite Image", rgb=True, opacity=0.9) | |
| # Display the map in Streamlit | |
| m.to_streamlit() | |
| def main(): | |
| st.title("PV Segmentor") | |
| uploaded_file = st.file_uploader("Choose a TIF file", type="tif") | |
| if uploaded_file is not None: | |
| st.write("File uploaded successfully!") | |
| resolution = st.radio( | |
| "Selext Processing resolution:", | |
| (512, 1024), | |
| index=0 | |
| ) | |
| overlap = st.slider( | |
| 'Select the value of overlap', | |
| min_value=50, | |
| max_value=150, | |
| value=100, | |
| step=25 | |
| ) | |
| threshold = st.slider( | |
| 'Select the value of the threshold', | |
| min_value=0.1, | |
| max_value=0.9, | |
| value=0.6, | |
| step=0.01 | |
| ) | |
| st.write('You selected:',resolution) | |
| st.write('Selected overlap value:', overlap) | |
| st.write('Selected threshold value:', threshold) | |
| if st.button("Process File"): | |
| st.write("Processing...") | |
| with open("temp.tif", "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| best_model.float() | |
| tiles = extract_tiles("temp.tif", best_model, tile_size=resolution, overlap=overlap, batch_size=4, threshold=threshold) | |
| st.write("Processing complete!") | |
| output_path = "output_mask.shp" | |
| result_gdf, area_m2 = create_vector_mask(tiles, output_path) | |
| st.write("Vector mask created successfully!") | |
| st.write(f"Total area occupied by PV panels: {area_m2:.4f} m^2") | |
| # Offer the shapefile for download | |
| shp_files = [f for f in os.listdir() if | |
| f.startswith("output_mask") and f.endswith((".shp", ".shx", ".dbf", ".prj"))] | |
| with io.BytesIO() as zip_buffer: | |
| with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file: | |
| for file in shp_files: | |
| zip_file.write(file) | |
| zip_buffer.seek(0) | |
| st.download_button( | |
| label="Download shapefile", | |
| data=zip_buffer, | |
| file_name="output_mask.zip", | |
| mime="application/zip" | |
| ) | |
| # Display the map with the predicted shapefile | |
| display_map("output_mask.shp", "temp.tif") | |
| # Clean up temporary files | |
| #os.remove("temp.tif") | |
| #for file in shp_files: | |
| # os.remove(file) | |
| if __name__ == "__main__": | |
| main() |