Source code for com9MoTVoellmy.com9MoTVoellmy

import os
import platform
import logging
import pathlib
import time
import sys

from multiprocessing import Pool

import avaframe.com1DFA.com1DFA as com1DFA
from avaframe.in3Utils import cfgUtils
from avaframe.in2Trans import rasterUtils as rU
from avaframe.in1Data import getInput as gI
import avaframe.in3Utils.fileHandlerUtils as fU
from avaframe.out1Peak import outPlotAllPeak as oP
from avaframe.in3Utils.cfgUtils import cfgToRcf
import avaframe.in3Utils.MoTUtils as mT


# create a local logger
log = logging.getLogger(__name__)


[docs]def com9MoTVoellmyMain(cfgMain, cfgInfo=None): """Run MoT-Voellmy simulations using specified configurations. This function executes MoT-Voellmy simulations by handling preprocessing, parallel execution, and postprocessing steps. Parameters ---------- cfgMain : configparser.ConfigParser Main configuration settings for the simulation cfgInfo : str or pathlib.Path or configparser.ConfigParser, optional Additional configuration information, by default None Can be: - Path to configuration file for overrides - ConfigParser object with initial configuration - None to use local/default configuration Returns ------- None Results are written to the output directory structure Notes ----- The function performs several key steps: - Generates configuration settings for all simulations - Preprocesses input data and creates RCF configuration files - Executes simulations in parallel using multiple processes - Handles DEM preparation by converting NaN values to zeros """ # Get all necessary information from the configuration files currentModule = sys.modules[__name__] # As if you would to import com9MoTVoellmy simDict, _, inputSimFiles, _ = com1DFA.com1DFAPreprocess(cfgMain, cfgInfo, module=currentModule) # convert DEM from nan to 0 values # TODO: suggest MoT-PSA to handle nan values mT.rewriteDEMtoZeroValues(inputSimFiles["demFile"]) log.info("The following simulations will be performed") for key in simDict: log.info("Simulation: %s" % key) # Preprocess the simulations, mainly creating the rcf files rcfFiles = com9MoTVoellmyPreprocess(simDict, inputSimFiles, cfgMain) # And now we run the simulations startTime = time.time() log.info("--- STARTING (potential) PARALLEL PART ----") # Get number of CPU Cores wanted nCPU = cfgUtils.getNumberOfProcesses(cfgMain, len(rcfFiles)) # Create parallel pool and run # with multiprocessing.Pool(processes=nCPU) as pool: with Pool(processes=nCPU) as pool: results = pool.map(com9MoTVoellmyTask, rcfFiles) pool.close() pool.join() timeNeeded = "%.2f" % (time.time() - startTime) log.info("Overall (parallel) com9MoTVoellmy computation took: %s s " % timeNeeded) log.info("--- ENDING (potential) PARALLEL PART ----") # Postprocess the simulations com9MoTVoellmyPostprocess(simDict, cfgMain)
[docs]def com9MoTVoellmyPostprocess(simDict, cfgMain): """Post-process MoT-Voellmy simulation results. This function handles post-processing tasks after MoT-Voellmy simulations complete, including copying result files to output directories and generating visualization plots. Parameters ---------- simDict : dict Dictionary containing simulation configurations, with one entry per simulation cfgMain : configparser.ConfigParser Main configuration settings for the simulation inputSimFiles : dict Dictionary containing paths to input files (DEM, release areas, etc.) Returns ------- None Results are written to the output directory structure Notes ----- The function performs several key tasks: - Creates output directory structure - Copies simulation result files (DataTime.txt, ppr, pfd, pfv files) - Renames files according to conventions - Generates visualization plots of peak fields """ avalancheDir = cfgMain["MAIN"]["avalancheDir"] # Copy max files to output directory outputDirPeakFile = pathlib.Path(avalancheDir) / "Outputs" / "com9MoTVoellmy" / "peakFiles" fU.makeADir(outputDirPeakFile) for key in simDict: workDir = pathlib.Path(avalancheDir) / "Work" / "com9MoTVoellmy" / str(key) # Copy ppr files mT.copyMoTFiles(workDir, outputDirPeakFile, "p_max", "ppr") # Copy pfd files mT.copyMoTFiles(workDir, outputDirPeakFile, "h_max", "pfd") # Copy pfv files mT.copyMoTFiles(workDir, outputDirPeakFile, "s_max", "pfv") # Copy timestep directories to timesteps subfolder mT.copyMoTDirs(workDir, outputDirPeakFile, key, "s") mT.copyMoTDirs(workDir, outputDirPeakFile, key, "h") # create plots and report modName = __name__.split(".")[-1] reportDir = pathlib.Path(avalancheDir, "Outputs", modName, "reports") fU.makeADir(reportDir) # Generate plots for all peakFiles oP.plotAllPeakFields(avalancheDir, cfgMain["FLAGS"], modName)
[docs]def com9MoTVoellmyTask(rcfFile): """Execute a single MoT-PSA simulation using the provided configuration file. Parameters ---------- rcfFile : pathlib.Path Path to the RCF configuration file for the simulation Returns ------- list The command that was executed as a list containing the executable path and configuration file path Notes ----- Changes to the directory containing this module before executing the simulation. Uses runAndCheckMoT to execute and monitor the simulation process. """ os.chdir(os.path.dirname(os.path.abspath(__file__))) if os.name == "nt": exeName = "MoT-Voellmy_win.exe" elif platform.system() == "Darwin": message = "MoT-Voellmy does not support MacOS at the moment" log.error(message) raise OSError(message) else: exeName = "./MoT-Voellmy_linux.exe" command = [exeName, rcfFile] log.info("Run simulation: %s" % rcfFile) mT.runAndCheckMoT(command) return command
[docs]def com9MoTVoellmyPreprocess(simDict, inputSimFiles, cfgMain): """Preprocess data for MoT-PSA simulations. This function prepares the input data and configuration files needed to run MoT-PSA simulations. It processes release areas, creates required directories, and generates configuration files for each simulation. Parameters ---------- simDict : dict Dictionary containing simulation configurations with one entry per simulation inputSimFiles : dict Dictionary containing paths to input files (DEM, release areas, etc.) cfgMain : configparser.ConfigParser Main configuration settings for the simulation Returns ------- list List of pathlib.Path objects pointing to generated RCF configuration files that will be used to run the simulations Notes ----- The function performs several key steps: - Creates working directories for each simulation - Processes release areas and converts them to raster format - Generates configuration files in RCF format - Handles both single and multiple release scenarios """ # Load avalanche directory from general configuration file avalancheDir = cfgMain["MAIN"]["avalancheDir"] # set inputsDir where original input data and remeshed rasters are stored inputsDir = pathlib.Path(avalancheDir) / "Inputs" # create required Work und Outputs directories in avalancheDir workDir = pathlib.Path(avalancheDir) / "Work" / "com9MoTVoellmy" cfgFileDir = pathlib.Path(avalancheDir) / "Outputs" / "com9MoTVoellmy" / "configurationFiles" fU.makeADir(cfgFileDir) rcfFiles = list() # loop over all simulation to be performed for key in simDict: # Generate command and run via subprocess.run # Configuration that needs adjustment # Generate the work and data dirs for the current simHash # save derived fields from polygons, optionally zeroRasters and remeshedRasters to that folder cuWorkDir = workDir / key workInputDir = cuWorkDir / "Input" workOutputDir = cuWorkDir / key fU.makeADir(cuWorkDir) fU.makeADir(workInputDir) # load configuration object for current sim cfg = simDict[key]["cfgSim"] log.info("Prepare simulation configuration for key %s" % key) # select release area input data according to a chosen release scenario inputSimFiles = gI.selectReleaseFile(inputSimFiles, cfg["INPUT"]["releaseScenario"]) # create the required input from input files # if release, entrainment area are provided as shapefile - read shapefile attributes and values for current sim # if provided by raster - load raster data # load DEM and dem file type information demOri, inputSimLines = com1DFA.prepareInputData(inputSimFiles, cfg) demOri["originalHeader"] = demOri["header"] demSuffix = rU.getRasterFileTypeFromHeader(demOri["header"]) # set thickness values for the release area, entrainment areas relName, inputSimLines, badName = com1DFA.prepareReleaseEntrainment( cfg, inputSimFiles["releaseScenario"], inputSimLines ) # RELEASE AREA - fetch path to release raster releaseName, inputSimLines["releaseLine"] = gI.deriveLineRaster( cfg, inputSimLines["releaseLine"], demOri, workInputDir, inputsDir, "rel", rasterFileType=demSuffix, ) # ENTRAINMENT AREA - fetch path to entrainment (bedDepth) raster if "ent" in key: saveZeroRaster = False else: saveZeroRaster = True bedDepthName, inputSimLines["entLine"] = gI.deriveLineRaster( cfg, inputSimLines["entLine"], demOri, workInputDir, inputsDir, "ent", rasterFileType=demSuffix, saveZeroRaster=saveZeroRaster, ) # TODO: is this check if release and entrainment have overlap required? # if "ent" in key: # log.info("Check for overlap?") # # # check if entrainment and release area have overlap # _ = geoTrans.checkOverlap( # inputSimLines["entLine"]["rasterData"], # inputSimLines["releaseLine"]["rasterData"], # "Entrainment", # "Release", # crop=False, # ) # BED SHEAR - fetch path to tauC raster bedShearDict = { "initializedFrom": "raster", "fileName": inputSimLines["tauCFile"], } if inputSimLines["entResInfo"]["tauC"] == "Yes": saveZeroRaster = False else: saveZeroRaster = True bedShearName, bedShearDict = gI.deriveLineRaster( cfg, bedShearDict, demOri, workInputDir, inputsDir, "tauC", rasterFileType=demSuffix, saveZeroRaster=saveZeroRaster, ) # set configuration for MoT-Voellmy cfg["Run information"]["Area of Interest"] = cfgMain["MAIN"]["avalancheDir"] cfg["Run information"]["UTM zone"] = "32N" cfg["Run information"]["EPSG geodetic datum code"] = "31287" cfg["Run information"]["Run name"] = cfgMain["MAIN"]["avalancheDir"] cfg["File names"]["Grid filename"] = str(pathlib.Path(inputsDir / cfg["INPUT"]["DEM"])) cfg["File names"]["Release depth filename"] = str(releaseName) cfg["File names"]["Bed depth filename"] = str(bedDepthName) cfg["File names"]["Bed shear strength filename"] = str(bedShearName) cfg["File names"]["Output filename root"] = str(workOutputDir) # if _mu and _k files in avalancheDir/Inputs/RASTERS found - set paths to mu and k files # if not found then mu and k are set constant to values provided in cfg if cfg["Physical_parameters"]["Parameters"] == "auto": cfg = mT.setVariableFrictionParameters(cfg, inputSimFiles, workInputDir, inputsDir) else: # TODO FSO allow for options constant and variable message = "Currently only available option is auto for %s" % ( '["Physical_parameters"]["Parameters"]' ) log.error(message) raise AssertionError(message) if cfg["ENTRAINMENT"]["Entrainment"] == "auto": cfg = mT.setVariableEntrainmentParameters(cfg, inputSimFiles, workInputDir, inputsDir) else: message = "Currently only available option is auto for %s" % ('["ENTRAINMENT"]["Entrainment"]') log.error(message) raise AssertionError(message) # if a file in Inputs/RES is found (forest density info) and _bhd file in avalancheDir/Inputs/RASTERS found - set paths to nd and bhd files # if not found then forest effects are set to no if cfg["FOREST_EFFECTS"]["Forest effects"] == "auto": cfg = mT.setVariableForestParameters(cfg, inputSimFiles, workInputDir, inputsDir) else: message = "Currently only available option is auto for %s" % ( '["FOREST_EFFECTS"]["Forest effects"]' ) log.error(message) raise AssertionError(message) # elif cfg["FOREST_EFFECTS"]["Forest effects"] == "no": # cfg["File names"]["Forest density filename"] = "-" # cfg["File names"]["Tree diameter filename"] = "-" # else: # # if forest effects set to yes but files not found - error will be raised by setVariableForestParameters # cfg = mT.setVariableForestParameters(cfg, inputSimFiles, workInputDir, inputsDir) rcfFileName = cfgFileDir / (str(key) + ".rcf") currentModule = sys.modules[__name__] cfgUtils.writeCfgFile(avalancheDir, currentModule, cfg, str(key)) cfgToRcf(cfg, rcfFileName) rcfFiles.append(rcfFileName) log.info("rcf and ini file written for key %s-------------------------" % key) return rcfFiles