Source code for in3Utils.MoTUtils

import os
import platform
import subprocess
import logging
import pathlib
import shutil

import numpy as np

from avaframe.in2Trans import rasterUtils as rU

log = logging.getLogger(__name__)


[docs]def rewriteDEMtoZeroValues(demFile): """Set all NaN values in a DEM raster to zero and update the nodata value. This function reads a DEM raster file, replaces all NaN values with 0.0, updates the nodata value in the header to 0.0, and writes the modified raster back to a new file. Parameters ---------- demFile : pathlib.Path Path to the input DEM raster file Returns ------- None Writes a new raster file with zero values instead of NaN values. The output file is saved in the same directory as the input file, using the same stem name. Notes ----- The function uses the rasterUtils module for reading and writing raster data. The output raster is flipped during writing. """ demData = rU.readRaster(demFile) demData["rasterData"][np.isnan(demData["rasterData"])] = 0.0 demData["header"]["nodata_value"] = 0.0 newFileName = demFile.parent / demFile.stem rU.writeResultToRaster(demData["header"], demData["rasterData"], newFileName, flip=True)
[docs]def runAndCheckMoT(command): """Execute MoT command and monitor its output. This function runs a MoT command as a subprocess and monitors its output, filtering and logging specific messages while tracking time steps. Parameters ---------- command : str or list The command to execute. Can be a string or list of arguments. Returns ------- None Function runs the command and logs output but does not return a value. Notes ----- - Uses different shell settings based on operating system - Filters output to reduce noise from common status messages - Logs time step progress every 100 steps - Handles UTF-8 encoding with replacement of invalid characters """ if os.name == "nt": useShell = True elif platform.system() == "Darwin": useShell = False else: useShell = False # This starts the subprocess process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=useShell, encoding="utf-8", errors="replace", universal_newlines=True, ) printCounter = 0 counter = 1 while True: realtimeOutput = process.stdout.readline() if realtimeOutput == "" and process.poll() is not None: break if realtimeOutput: line = realtimeOutput.strip() # do not pollute output window with time step prints # TODO: hacky for now if "Step" in line: counter = counter + 1 printCounter = printCounter + 1 if printCounter > 100: # print('\r' + line, flush=True, end='') msg = "Process is running. Reported time steps: " + str(counter) log.info(msg) printCounter = 0 elif "find_dt" in line: continue elif "h1" in line: continue elif "h2" in line: continue elif "write_data" in line: continue elif "update_boundaries" in line: continue elif "V_tot" in line: continue else: log.info(line)
[docs]def copyMoTFiles(workDir, outputDir, searchString, replaceString): """ Copy and rename MoT result files from work directory to output directory. Parameters ---------- workDir : pathlib.Path Source directory containing the original p_max files outputDir : pathlib.Path Destination directory where renamed ppr files will be copied to searchString : str String pattern to search for in the original filenames replaceString : str String to replace the searchString with in the new filenames Returns ------- None Files are copied to the destination directory with renamed extensions """ varFiles = list(workDir.glob("*" + searchString + "*")) targetFiles = [pathlib.Path(str(f.name).replace(searchString, replaceString)) for f in varFiles] targetFiles = [outputDir / f for f in targetFiles] for source, target in zip(varFiles, targetFiles): shutil.copy2(source, target)
[docs]def copyMoTDirs(workDir, outputDir, simKey, dirName): """ Copy timestep directory from work directory to output directory. Parameters ---------- workDir : pathlib.Path Source work directory containing the simulation results outputDir : pathlib.Path Destination directory where timestep directories will be copied to simKey : str Simulation key used to construct source and target paths dirName : str Directory name to copy (e.g., 's' or 'h') Returns ------- None Directory is copied to the destination directory structure Notes ----- Creates a timesteps/{simKey}/{dirName} subdirectory structure in the output directory. Only copies files, not subdirectories within the specified directory. """ outputDirTimesteps = outputDir / "timesteps" / str(simKey) outputDirTimesteps.mkdir(parents=True, exist_ok=True) sourceDirPath = workDir / dirName if sourceDirPath.exists(): targetDirPath = outputDirTimesteps / dirName targetDirPath.mkdir(parents=True, exist_ok=True) for sourceFile in sourceDirPath.glob("*"): if sourceFile.is_file(): shutil.copy2(sourceFile, targetDirPath / sourceFile.name)
[docs]def setVariableFrictionParameters(cfg, inputSimFiles, workInputDir, inputsDir): """set file paths in cfg object for friction parameters (required if option variable is set) if _mu, _k files found in Inputs/RASTERS have to be remeshed, copy remeshed files to workInputDir with new file name ending _mu, _k Parameters ----------- cfg: configparser object configuration info for simulation inputSimFiles: dict dictionary with info on all input data found; here mu, k file and if remeshed workInputDir: pathlib path pathlib path to work Inputs folder for current simulation inputsDir: pathlib path path to avalancheDir/Inputs where original input data and remeshed rasters are stored Returns -------- cfg: configparser object updated configuration info for simulation with file paths to friction parameters """ fricParameters = {"mu": "Dry-friction coefficient (-)", "k": "Turbulent drag coefficient (-)"} if inputSimFiles["entResInfo"]["mu"] == "Yes" and inputSimFiles["entResInfo"]["k"] == "Yes": for fric in ["mu", "k"]: fricFile = inputsDir / cfg["INPUT"]["%sFile" % fric] # check first if remeshed files should be used if ( "_remeshed" in cfg["INPUT"]["%sFile" % fric] and inputSimFiles["entResInfo"]["%sRemeshed" % fric] == "Yes" ): fricFilePathNew = workInputDir / (fricFile.stem + "_%s" % fric + fricFile.suffix) shutil.copy2(fricFile, fricFilePathNew) cfg["Physical_parameters"][fricParameters[fric]] = str(fricFilePathNew) log.info( "Remeshed %s file copied to %s and set for %s" % (fric, str(fricFilePathNew), fricParameters[fric]) ) else: cfg["Physical_parameters"][fricParameters[fric]] = str(fricFile) cfg["Physical_parameters"]["Parameters"] = "variable" else: # TODO FSO implement if setting is variable or constant that if variable but file not found then error message = "Mu and k file not found in Inputs/RASTERS - check if file ending is correct (_mu, _k) - setting constant values of configuration file" log.warning(message) message2 = "Setting %s to constant value of %s, and %s to %s" % ( fricParameters["mu"], cfg["Physical_parameters"][fricParameters["mu"]], fricParameters["k"], cfg["Physical_parameters"][fricParameters["k"]], ) log.warning(message2) cfg["Physical_parameters"]["Parameters"] = "constant" # log.error(message) # raise FileNotFoundError(message) return cfg
[docs]def setVariableEntrainmentParameters(cfg, inputSimFiles, workInputDir, inputsDir): """set file path in cfg object for entrainment parameters (required if option variable is set) if _b0 , _tauc files found in Inputs/RASTERS and Inputs/ENT Parameters ----------- cfg: configparser object configuration info for simulation inputSimFiles: dict dictionary with info on all input data found; here b0, tauc file and if remeshed workInputDir: pathlib path pathlib path to work Inputs folder for current simulation inputsDir: pathlib path path to avalancheDir/Inputs where original input data and remeshed rasters are stored Returns -------- cfg: configparser object updated configuration info for simulation with file paths to friction parameters """ if inputSimFiles["entResInfo"]["flagEnt"] == "Yes" and inputSimFiles["entResInfo"]["tauC"] == "Yes": cfg["ENTRAINMENT"]["Entrainment"] = "TJEM" cfg["ENTRAINMENT"]["Bed strength profile"] = "constant" else: cfg["ENTRAINMENT"]["Entrainment"] = "none" return cfg
[docs]def setVariableForestParameters(cfg, inputSimFiles, workInputDir, inputsDir): """set file paths in cfg object for forest parameters. if _nd, _bhd files found in Inputs/RASTERS have to be remeshed, copy remeshed files to workInputDir with new file name ending _nd, _bhd Parameters ----------- cfg: configparser object configuration info for simulation inputSimFiles: dict dictionary with info on all input data found; here nd, bhd file and if remeshed workInputDir: pathlib path pathlib path to work Inputs folder for current simulation inputsDir: pathlib path path to avalancheDir/Inputs where original input data and remeshed rasters are stored Returns -------- cfg: configparser object updated configuration info for simulation with file paths to forest parameters """ if inputSimFiles["entResInfo"]["flagRes"] == "Yes" and inputSimFiles["entResInfo"]["bhd"] == "Yes": treeDiamFile = inputsDir / cfg["INPUT"]["bhdFile"] cfg["FOREST_EFFECTS"]["Forest effects"] = "yes" # TODO Make this remeshed compatible cfg["File names"]["Forest density filename"] = str(inputSimFiles["resFile"]) cfg["File names"]["Tree diameter filename"] = str(treeDiamFile) else: cfg["FOREST_EFFECTS"]["Forest effects"] = "no" cfg["File names"]["Forest density filename"] = "-" cfg["File names"]["Tree diameter filename"] = "-" # forestParameters = {"nd": "Forest density filename", "bhd": "Tree diameter filename"} # if inputSimFiles["entResInfo"]["nd"] == "Yes" and inputSimFiles["entResInfo"]["bhd"] == "Yes": # # for forestParam in ["nd", "bhd"]: # forestFile = inputsDir / cfg["INPUT"]["%sFile" % forestParam] # # # check first if remeshed files should be used # if ( # "_remeshed" in cfg["INPUT"]["%sFile" % forestParam] # and inputSimFiles["entResInfo"]["%sRemeshed" % forestParam] == "Yes" # ): # forestFilePathNew = workInputDir / ( # forestFile.stem + "_%s" % forestParam + forestFile.suffix # ) # shutil.copy2(forestFile, forestFilePathNew) # cfg["File names"][forestParameters[forestParam]] = str(forestFilePathNew) # log.info( # "Remeshed %s file copied to %s and set for %s" # % (forestParam, str(forestFilePathNew), forestParameters[forestParam]) # ) # else: # cfg["File names"][forestParameters[forestParam]] = str(forestFile) # # cfg["FOREST_EFFECTS"]["Forest effects"] = "yes" # # else: # # TODO FSO implement if setting is variable or constant that if variable but file not found then error # message = "nd and bhd file not found in Inputs/RASTERS - check if file ending is correct (_nd, _bhd) - setting forest effects to no" # log.warning(message) # # cfg["FOREST_EFFECTS"]["Forest effects"] = "no" # cfg["File names"]["Forest density filename"] = "-" # cfg["File names"]["Tree diameter filename"] = "-" return cfg