"""Module for splitting and organizing regional avalanche input data."""
import logging
import shapefile # pyshp
from shapely.geometry import box
import pathlib
import time
from avaframe.in2Trans import rasterUtils
from avaframe.in3Utils import fileHandlerUtils as fU
from avaframe.in1Data import getInput
from avaframe.in3Utils.initializeProject import initializeFolderStruct
from avaframe.in2Trans import shpConversion as shpConv
from avaframe.out3Plot.outCom7Regional import createReportPlot
# create local logger
log = logging.getLogger(__name__)
[docs]def splitInputsMain(avalancheDir, outputDir, cfg, cfgMain):
"""Process and organize avalanche input data into individual avalanche directories based
on release area's "group" and "scenario" attributes provided in the release area file. If no
"group" attribute is provided, one avalanche directory per feature will be created (scenario is
ignored in this case).
Parameters
----------
avalancheDir : pathlib.Path object
Path to input directory containing release areas (REL) and DEM files
outputDir : pathlib.Path object
Path to output directory where organized folders will be created
cfg : dict
Configuration settings containing:
- GENERAL.bufferSize : float, buffer size for DEM clipping
cfgMain : dict
Configuration settings containing:
- FLAGS.createReport : bool, whether to write report
- FLAGS.savePlot : bool, whether to save plots
Returns
-------
none
Notes
-----
Expected input directory structure:
avalancheDir/
└── Inputs/
├── REL/
│ └── *.shp # all release areas
├── ENT/ # all entrainment areas (optional)
│ └── *.shp
├── RES/ # all resistance areas (optional)
│ └── *.shp
└── *.asc or *.tif # digital elevation model (DEM)
"""
# Fetch the necessary input
inputSimFilesAll = getInput.getInputDataCom1DFA(avalancheDir)
# extract release shapefile, make sure only one exists
if len(inputSimFilesAll["relFiles"]) == 1:
inputShp = inputSimFilesAll["relFiles"][0]
else:
log.error(f"Expected only one release area file, found {len(inputSimFilesAll['relFiles'])}.")
return
# Get the input DEM
inputDEM = getInput.getDEMPath(avalancheDir)
# Create the output directory
fU.makeADir(outputDir)
# Step 1: Create the directory list
log.info("Initializing folder structure for each group...")
dirListGrouped = createDirList(inputShp)
log.info("Finished creating folder list")
# Step 2: Set up avalanche directories
log.info("Initializing folder structure for each entry...")
for entry in dirListGrouped:
dirName = entry["dirName"]
initializeFolderStruct(str(outputDir / dirName), removeExisting=True)
log.debug(f"Created folder structure for '{dirName}'.")
log.info("Finished folder initialization")
# Step 3: Split and move release areas to each directory
log.info("Splitting and moving release areas...")
splitAndMoveReleaseAreas(dirListGrouped, inputShp, outputDir)
log.info("Finished splitting and moving release areas")
# Step 4: Clip and move DEM
log.info("Clipping and moving DEM...")
groupExtents = clipDEMByReleaseGroup(dirListGrouped, inputDEM, outputDir, cfg)
log.info("Finished clipping and moving of DEM")
# Step 5: Clip and move optional input (currently only ENT and RES)
log.info("Clipping and moving optional input...")
groupFeatures = clipAndMoveOptionalInput(inputSimFilesAll, outputDir, groupExtents)
log.info("Finished clipping and moving optional input")
# Step 6: Divide release areas into scenarios
log.info("Separating release areas by scenarios...")
splitByScenarios(dirListGrouped, outputDir)
log.info("Finished separating by scenarios")
# Step 7: Write reports
if cfgMain["FLAGS"].getboolean("createReport"):
log.info("Writing reports...")
writeScenarioReport(dirListGrouped, outputDir)
if cfgMain["FLAGS"].getboolean("savePlot"):
createReportPlots(dirListGrouped, inputDEM, outputDir, groupExtents, groupFeatures)
log.info("Finished writing reports")
[docs]def createDirList(inputShp):
"""Create a list of entries from each feature in the input shapefile, grouped by the 'group' attribute.
Parameters
----------
inputShp: pathlib.Path object
path to input shapefile
Returns
-------
dirListGrouped: list
list of dictionaries containing dirName (group name), properties list, and geometries list,
where features are grouped by their 'group' attribute
"""
fields, fieldNames, properties, geometries, srs = shpConv.readShapefile(inputShp)
# Create dictionary to store groups
groups = {}
unnamedCount = 1
for props, geom in zip(properties, geometries):
propsLower = {key.lower(): value for key, value in props.items()} # Handle case sensitivity
# Get group name from 'group' attribute, fallback to unnamed if not present
groupName = propsLower.get("group", "").strip() or f"{str(unnamedCount).zfill(5)}"
if not propsLower.get("group", "").strip():
unnamedCount += 1
log.info(f"No 'group' field or empty group found in {inputShp}, using '{groupName}'")
# Initialize group if not exists
if groupName not in groups:
groups[groupName] = {
"dirName": groupName,
"properties": [],
"geometries": [],
}
# Add feature to group
groups[groupName]["properties"].append(props)
groups[groupName]["geometries"].append(geom)
# Convert dictionary to list and sort by dirName
dirListGrouped = list(groups.values())
dirListGrouped.sort(key=lambda x: x["dirName"].lower())
# Log total number of features
totalFeatures = sum(len(group["geometries"]) for group in dirListGrouped)
log.info(f"Found '{totalFeatures}' features that were organized into '{len(dirListGrouped)}' groups")
return dirListGrouped
[docs]def splitAndMoveReleaseAreas(dirList, inputShp, outputDir):
"""Split release areas into individual shapefiles and write them to their respective folders.
Parameters
----------
dirList: list
list of dictionaries containing dirName, properties list, and geometries list
inputShp: pathlib.Path object
path to input shapefile
outputDir: pathlib.Path object
path to output directory where folders will be created
Returns
-------
none
"""
# Read the input shapefile
fields, fieldNames, properties, geometries, srs = shpConv.readShapefile(inputShp)
featuresByName = {}
for entry in dirList:
name = entry["dirName"] # Get release area name
# Group entries with the same name
if name not in featuresByName:
featuresByName[name] = []
# add corresponding properties and geometries
for i, properties in enumerate(entry["properties"]):
featuresByName[name].append((properties, entry["geometries"][i]))
# Write shapefiles to their respective folders
for name, features in featuresByName.items():
shpOutPath = outputDir / name / "Inputs" / "REL" / name
shpConv.writeShapefile(shpOutPath, fields, fieldNames, features, srs)
log.debug(f"Saved release area to '{shpOutPath}'.")
[docs]def checkFeatureIsolation(geometries, properties, bufferSize, groupName):
"""Check if any feature in the group is isolated from all others.
A feature is considered isolated if its buffered bounding box does not overlap
with any other feature's buffered bounding box in the group.
Parameters
----------
geometries: list
List of geometry objects to check
properties: list
List of dictionaries containing properties for each geometry
bufferSize: float
Buffer size to use when creating bounding boxes
groupName: str
Name of the group, used for error messages
Raises
------
ValueError
If any feature is isolated from all others in the group
"""
# Skip check if only one feature
if len(geometries) <= 1:
log.debug(f"Group '{groupName}' has only one feature, proceeding without isolation check.")
return
# Create buffered bounding boxes for each feature
boundingBoxes = []
for geom in geometries:
center = geom.centroid
# Calculate bounding box for this feature
currXMin = center.x - bufferSize
currYMin = center.y - bufferSize
currXMax = center.x + bufferSize
currYMax = center.y + bufferSize
# Update group extent
boundingBoxes.append(box(currXMin, currYMin, currXMax, currYMax))
# Check each feature's bounding box against all others
for i, bbox in enumerate(boundingBoxes):
hasOverlap = False
for j, otherBbox in enumerate(boundingBoxes):
if i != j and bbox.intersects(otherBbox):
hasOverlap = True
break
if not hasOverlap:
# Find feature name regardless of case (NAME, name, Name etc.)
featureProps = {key.lower(): value for key, value in properties[i].items()}
featureName = featureProps.get("name", f"unnamed feature {i+1}").strip()
message = f"Feature '{featureName}' in group '{groupName}' is isolated from all other features - consider assigning it to a different group"
log.error(message)
raise ValueError(message)
[docs]def clipDEMByReleaseGroup(dirList, inputDEM, outputDir, cfg):
"""Clip the DEM to include all features in each release group. Returns an error if any feature in a group is isolated.
Parameters
----------
dirList : list
List of dictionaries containing dirName, and geometries list
inputDEM : pathlib.Path
Path to input DEM file
outputDir : pathlib.Path
Path to output directory where clipped DEMs will be saved
cfg : configparser object
Configuration settings containing:
- GENERAL.bufferSize : float
Size of buffer to add around release areas Configuration settings
Returns
-------
groupExtents : dict
Dictionary with dirName as key and (xMin, xMax, yMin, yMax) as value.
The extents are reduced by one pixel on each side to ensure DEM extents
are larger than clip extents of other input.
"""
# Read input DEM
demData = rasterUtils.readRaster(inputDEM)
header = demData["header"]
raster = demData["rasterData"]
cellSize = header["cellsize"]
xOrigin = header["xllcenter"]
yOrigin = header["yllcenter"]
nRows = header["nrows"]
nCols = header["ncols"]
# Process each group
groupExtents = {}
for entry in dirList:
dirName = entry["dirName"]
geometries = entry["geometries"]
properties = entry["properties"]
if not geometries:
message = f"No geometries found for {dirName}"
log.error(message)
raise ValueError(message)
# Check if any features in the group are isolated
bufferSize = cfg["GENERAL"].getfloat("bufferSize")
checkFeatureIsolation(geometries, properties, bufferSize, dirName)
# Get extent of all geometries in group
bounds = [geom.bounds for geom in geometries]
xMins, yMins, xMaxs, yMaxs = zip(*bounds)
# Calculate extent with buffer
bufferSize = cfg["GENERAL"].getfloat("bufferSize")
xMin = min(xMins) - bufferSize
xMax = max(xMaxs) + bufferSize
yMin = min(yMins) - bufferSize
yMax = max(yMaxs) + bufferSize
groupExtents[dirName] = (xMin, xMax, yMin, yMax) # Store extent for this group
# Convert extent to grid indices (using top-left origin)
colStart = max(0, int((xMin - xOrigin) / cellSize))
colEnd = min(nCols, int((xMax - xOrigin) / cellSize) + 1)
# Convert y-coordinates to row indices (flipped for bottom-left origin)
rowStart = max(0, int((yOrigin + nRows * cellSize - yMax) / cellSize))
rowEnd = min(nRows, int((yOrigin + nRows * cellSize - yMin) / cellSize) + 1)
# Ensure valid row indices
if rowEnd <= rowStart:
log.warning(f"Invalid row indices calculated for {dirName}: start={rowStart}, end={rowEnd}")
continue
# Flip row indices for bottom-left origin
rowStart, rowEnd = nRows - rowEnd, nRows - rowStart
# Clip the DEM data
clippedData = raster[rowStart:rowEnd, colStart:colEnd]
# Create header for clipped DEM
clippedHeader = header.copy()
clippedHeader["ncols"] = colEnd - colStart
clippedHeader["nrows"] = rowEnd - rowStart
clippedHeader["xllcenter"] = xOrigin + colStart * cellSize
clippedHeader["yllcenter"] = yOrigin + rowStart * cellSize
# Update transformation matrix for clipped DEM
clippedHeader["transform"] = rasterUtils.transformFromASCHeader(clippedHeader)
# Write clipped DEM
outputDEM = outputDir / dirName / "Inputs" / f"{dirName}_DEM"
rasterUtils.writeResultToRaster(clippedHeader, clippedData, outputDEM, flip=True)
log.debug(f"Clipped DEM saved to: {outputDEM}")
# Store final DEM extents (reduced by one pixel on each side to ensure DEM > clip extents of other input)
xMinDEM = clippedHeader["xllcenter"] + (cellSize * 0.5)
yMinDEM = clippedHeader["yllcenter"] + (cellSize * 0.5)
xMaxDEM = clippedHeader["xllcenter"] + (clippedHeader["ncols"] * cellSize) - (cellSize * 0.5)
yMaxDEM = clippedHeader["yllcenter"] + (clippedHeader["nrows"] * cellSize) - (cellSize * 0.5)
groupExtents[dirName] = (xMinDEM, xMaxDEM, yMinDEM, yMaxDEM)
return groupExtents
[docs]def getScenarioGroups(inputShp, fieldNames):
"""Group shapefile records by their scenario attribute.
Parameters
----------
inputShp : pathlib.Path
Path to input shapefile
fieldNames : list
List of field names in the shapefile
Returns
-------
scenarios: dict
Dictionary mapping scenario names to lists of shape records
"""
scenarios = {}
for shapeRecord in shapefile.Reader(str(inputShp)).iterShapeRecords():
properties = {k.lower(): v for k, v in zip(fieldNames, shapeRecord.record)}
scenarioValues = properties.get("scenario", "").split(",")
for scenario in scenarioValues:
# Check if scenario value is empty and set flag
if scenario.strip() == "":
scenario = "NULL"
# If scenario is not in scenarios dict, add it
if scenario not in scenarios:
scenarios[scenario] = []
scenarios[scenario].append(shapeRecord)
return scenarios
[docs]def writeScenarioShapefile(outputShp, records, fields, fieldNames, srs):
"""Write a shapefile for a specific scenario.
Parameters
----------
outputShp : pathlib.Path
Path where to write the shapefile
records : list
List of shape records for this scenario
fields : list
List of field definitions
fieldNames : list
List of field names
srs : str
Spatial reference system string
"""
# Filter out the scenario attribute
shapeFeatures = [(dict(zip(fieldNames, record.record)), record.shape) for record in records]
filteredFields = [field for field in fields if field[0].lower() != "scenario"]
filteredFieldNames = [name for name in fieldNames if name.lower() != "scenario"]
# Write the shapefile
shpConv.writeShapefile(outputShp, filteredFields, filteredFieldNames, shapeFeatures, srs)
[docs]def splitByScenarios(dirList, outputDir):
"""Split release areas into separate shapefiles based on their scenario attribute.
Parameters
----------
dirList: list
list of dictionaries containing dirName and list of geometries
outputDir: pathlib.Path object
path to output directory
Returns
-------
none
Notes
-----
- If a feature has no scenario attribute or it's empty, it will be marked as 'NULL' and grouped together with other 'NULL' features
- Intermediate shapefiles are deleted or renamed after scenario splitting
"""
totalInputFiles = 0
totalScenarioFiles = 0
# Loop through each folder
for folder in dirList:
inputShp = pathlib.Path(outputDir) / folder["dirName"] / "Inputs" / "REL" / folder["dirName"]
fields, fieldNames, properties, geometries, srs = shpConv.readShapefile(inputShp)
totalInputFiles += 1
# Get the scenario attribute values
if "scenario" in map(str.lower, fieldNames):
# Group records by scenario
scenarios = getScenarioGroups(inputShp, fieldNames)
# Write a shapefile for each scenario
for scenario, records in scenarios.items():
if all(scenario == "NULL" for scenario in scenarios):
outputShp = (
pathlib.Path(outputDir)
/ folder["dirName"]
/ "Inputs"
/ "REL"
/ f"{folder['dirName']}_REL"
)
elif scenario == "NULL":
outputShp = (
pathlib.Path(outputDir)
/ folder["dirName"]
/ "Inputs"
/ "REL"
/ f"{folder['dirName']}_NULL"
)
else:
outputShp = (
pathlib.Path(outputDir)
/ folder["dirName"]
/ "Inputs"
/ "REL"
/ f"{folder['dirName']}_{scenario}"
)
writeScenarioShapefile(outputShp, records, fields, fieldNames, srs)
totalScenarioFiles += 1
# Delete the intermediate shapefile
for ext in [".shp", ".shx", ".dbf", ".prj"]:
if (inputShp.with_suffix(ext)).exists():
(inputShp.with_suffix(ext)).unlink()
else:
# If no scenario attribute exists, rename the file (necessary for further processing)
outputShp = (
pathlib.Path(outputDir) / folder["dirName"] / "Inputs" / "REL" / f"{folder['dirName']}_REL"
)
shapeFeatures = [
(dict(zip(fieldNames, record.record)), record.shape)
for record in shapefile.Reader(str(inputShp)).iterShapeRecords()
]
shpConv.writeShapefile(outputShp, fields, fieldNames, shapeFeatures, srs)
for ext in [".shp", ".shx", ".dbf", ".prj"]:
if (inputShp.with_suffix(ext)).exists():
(inputShp.with_suffix(ext)).unlink()
if totalScenarioFiles == 0:
log.info("No 'scenario' attribute or only 'NULL' found in release area shapefiles, continuing")
else:
log.info(f"Split '{totalInputFiles}' release area shapefiles into '{totalScenarioFiles}' scenarios")
[docs]def writeScenarioReport(dirListGrouped, outputDir):
"""Create a report in txt format listing all scenarios and their associated features.
Parameters
----------
dirListGrouped : list
list of dictionaries containing dirName and list of geometries
outputDir : pathlib.Path
Path to output directory where the report will be saved
Returns
-------
none
"""
reportPath = outputDir / "splitInputs_scenarioReport.txt"
with open(reportPath, "w") as f:
f.write("SCENARIO REPORT\n")
f.write("==============\n")
f.write(f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n")
# Process each group and their scenarios
for group in sorted(dirListGrouped, key=lambda x: x["dirName"].lower()):
dirName = group["dirName"]
f.write(f"Group: {dirName}\n")
f.write("-" * (len(dirName) + 7) + "\n\n")
relPath = pathlib.Path(outputDir) / dirName / "Inputs" / "REL"
scenarioFiles = sorted(relPath.glob(f"{dirName}_*.shp"), key=lambda x: x.stem.split("_")[-1])
if not scenarioFiles:
f.write("No scenarios found\n\n")
continue
# Write release areas for each scenario
for scenFile in scenarioFiles:
fields, fieldNames, properties, geometries, _ = shpConv.readShapefile(scenFile)
scenName = scenFile.stem.split("_")[-1]
f.write(f"Scenario: {scenName}\n")
f.write(f"No. of release areas: {len(geometries)}\n")
if "name" in map(str.lower, fieldNames): # Handle case sensitivity
nameIdx = [i for i, name in enumerate(fieldNames) if name.lower() == "name"][0]
with shapefile.Reader(str(scenFile)) as shp:
records = sorted(shp.records(), key=lambda x: x[nameIdx].lower())
for record in records:
f.write(f"- {record[nameIdx]}\n")
else:
for i in range(len(geometries)):
f.write(f"- Release Area {i+1}\n")
f.write("\n")
# Write total entrainment and resistance areas for the group
entPath = pathlib.Path(outputDir) / dirName / "Inputs" / "ENT"
resPath = pathlib.Path(outputDir) / dirName / "Inputs" / "RES"
entFiles = list(entPath.glob(f"{dirName}_*.shp"))
if entFiles:
totalEnt = sum(len(shpConv.readShapefile(ef)[3]) for ef in entFiles)
if totalEnt > 0:
f.write(f"No. of entrainment areas: {totalEnt}\n")
resFiles = list(resPath.glob(f"{dirName}_*.shp"))
if resFiles:
totalRes = sum(len(shpConv.readShapefile(rf)[3]) for rf in resFiles)
if totalRes > 0:
f.write(f"No. of resistance areas: {totalRes}\n")
f.write("\n")
log.info(f"Scenario report written to: {reportPath}")
[docs]def createReportPlots(dirListGrouped, inputDEM, outputDir, groupExtents, groupFeatures):
"""Write visual reports summarizing the split inputs operation.
Creates two visual reports in PNG format:
1. Basic report showing DEM extent and release areas
2. Optional features report showing DEM extent with entrainment and resistance areas
Parameters
----------
dirListGrouped : list
List of dictionaries containing dirName and list of geometries
inputDEM : pathlib.Path
Path to input DEM file
outputDir : pathlib.Path
Path to output directory where reports will be saved
groupExtents : dict
Dictionary with dirName as key and (xMin, xMax, yMin, yMax) as value,
containing the DEM clipping extents for each group
groupFeatures : dict
Dictionary containing clipped features for each group and type
Returns
-------
none
"""
# Create basic features report
basicPath = createReportPlot(dirListGrouped, inputDEM, outputDir, groupExtents, groupFeatures, "basic")
log.info(f"Visual report (basic) written to: {basicPath}")
# Create optional features report
optionalPath = createReportPlot(
dirListGrouped, inputDEM, outputDir, groupExtents, groupFeatures, "optional"
)
log.info(f"Visual report (optional) written to: {optionalPath}")