import os
import platform
import subprocess
import logging
import pathlib
import shutil
import numpy as np
from avaframe.in2Trans import rasterUtils as rU
log = logging.getLogger(__name__)
[docs]def rewriteDEMtoZeroValues(demFile):
"""Set all NaN values in a DEM raster to zero and update the nodata value.
This function reads a DEM raster file, replaces all NaN values with 0.0,
updates the nodata value in the header to 0.0, and writes the modified
raster back to a new file.
Parameters
----------
demFile : pathlib.Path
Path to the input DEM raster file
Returns
-------
None
Writes a new raster file with zero values instead of NaN values.
The output file is saved in the same directory as the input file,
using the same stem name.
Notes
-----
The function uses the rasterUtils module for reading and writing raster data.
The output raster is flipped during writing.
"""
demData = rU.readRaster(demFile)
demData["rasterData"][np.isnan(demData["rasterData"])] = 0.0
demData["header"]["nodata_value"] = 0.0
newFileName = demFile.parent / demFile.stem
rU.writeResultToRaster(demData["header"], demData["rasterData"], newFileName, flip=True)
[docs]def runAndCheckMoT(command):
"""Execute MoT command and monitor its output.
This function runs a MoT command as a subprocess and monitors its output,
filtering and logging specific messages while tracking time steps.
Parameters
----------
command : str or list
The command to execute. Can be a string or list of arguments.
Returns
-------
None
Function runs the command and logs output but does not return a value.
Notes
-----
- Uses different shell settings based on operating system
- Filters output to reduce noise from common status messages
- Logs time step progress every 100 steps
- Handles UTF-8 encoding with replacement of invalid characters
"""
if os.name == "nt":
useShell = True
elif platform.system() == "Darwin":
useShell = False
else:
useShell = False
# This starts the subprocess
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=useShell,
encoding="utf-8",
errors="replace",
universal_newlines=True,
)
printCounter = 0
counter = 1
while True:
realtimeOutput = process.stdout.readline()
if realtimeOutput == "" and process.poll() is not None:
break
if realtimeOutput:
line = realtimeOutput.strip()
# do not pollute output window with time step prints
# TODO: hacky for now
if "Step" in line:
counter = counter + 1
printCounter = printCounter + 1
if printCounter > 100:
# print('\r' + line, flush=True, end='')
msg = "Process is running. Reported time steps: " + str(counter)
log.info(msg)
printCounter = 0
elif "find_dt" in line:
continue
elif "h1" in line:
continue
elif "h2" in line:
continue
elif "write_data" in line:
continue
elif "update_boundaries" in line:
continue
elif "V_tot" in line:
continue
else:
log.info(line)
[docs]def copyMoTFiles(workDir, outputDir, searchString, replaceString):
"""
Copy and rename MoT result files from work directory to output directory.
Parameters
----------
workDir : pathlib.Path
Source directory containing the original p_max files
outputDir : pathlib.Path
Destination directory where renamed ppr files will be copied to
searchString : str
String pattern to search for in the original filenames
replaceString : str
String to replace the searchString with in the new filenames
Returns
-------
None
Files are copied to the destination directory with renamed extensions
"""
varFiles = list(workDir.glob("*" + searchString + "*"))
targetFiles = [pathlib.Path(str(f.name).replace(searchString, replaceString)) for f in varFiles]
targetFiles = [outputDir / f for f in targetFiles]
for source, target in zip(varFiles, targetFiles):
shutil.copy2(source, target)
[docs]def copyMoTDirs(workDir, outputDir, simKey, dirName):
"""
Copy timestep directory from work directory to output directory.
Parameters
----------
workDir : pathlib.Path
Source work directory containing the simulation results
outputDir : pathlib.Path
Destination directory where timestep directories will be copied to
simKey : str
Simulation key used to construct source and target paths
dirName : str
Directory name to copy (e.g., 's' or 'h')
Returns
-------
None
Directory is copied to the destination directory structure
Notes
-----
Creates a timesteps/{simKey}/{dirName} subdirectory structure in the output directory.
Only copies files, not subdirectories within the specified directory.
"""
outputDirTimesteps = outputDir / "timesteps" / str(simKey)
outputDirTimesteps.mkdir(parents=True, exist_ok=True)
sourceDirPath = workDir / dirName
if sourceDirPath.exists():
targetDirPath = outputDirTimesteps / dirName
targetDirPath.mkdir(parents=True, exist_ok=True)
for sourceFile in sourceDirPath.glob("*"):
if sourceFile.is_file():
shutil.copy2(sourceFile, targetDirPath / sourceFile.name)
[docs]def setVariableFrictionParameters(cfg, inputSimFiles, workInputDir, inputsDir):
"""set file paths in cfg object for friction parameters (required if option variable is set)
if _mu, _k files found in Inputs/RASTERS have to be remeshed, copy remeshed files
to workInputDir with new file name ending _mu, _k
Parameters
-----------
cfg: configparser object
configuration info for simulation
inputSimFiles: dict
dictionary with info on all input data found; here mu, k file and if remeshed
workInputDir: pathlib path
pathlib path to work Inputs folder for current simulation
inputsDir: pathlib path
path to avalancheDir/Inputs where original input data and remeshed rasters are stored
Returns
--------
cfg: configparser object
updated configuration info for simulation with file paths to friction parameters
"""
fricParameters = {"mu": "Dry-friction coefficient (-)", "k": "Turbulent drag coefficient (-)"}
if inputSimFiles["entResInfo"]["mu"] == "Yes" and inputSimFiles["entResInfo"]["k"] == "Yes":
for fric in ["mu", "k"]:
fricFile = inputsDir / cfg["INPUT"]["%sFile" % fric]
# check first if remeshed files should be used
if (
"_remeshed" in cfg["INPUT"]["%sFile" % fric]
and inputSimFiles["entResInfo"]["%sRemeshed" % fric] == "Yes"
):
fricFilePathNew = workInputDir / (fricFile.stem + "_%s" % fric + fricFile.suffix)
shutil.copy2(fricFile, fricFilePathNew)
cfg["Physical_parameters"][fricParameters[fric]] = str(fricFilePathNew)
log.info(
"Remeshed %s file copied to %s and set for %s"
% (fric, str(fricFilePathNew), fricParameters[fric])
)
else:
cfg["Physical_parameters"][fricParameters[fric]] = str(fricFile)
cfg["Physical_parameters"]["Parameters"] = "variable"
else:
# TODO FSO implement if setting is variable or constant that if variable but file not found then error
message = "Mu and k file not found in Inputs/RASTERS - check if file ending is correct (_mu, _k) - setting constant values of configuration file"
log.warning(message)
message2 = "Setting %s to constant value of %s, and %s to %s" % (
fricParameters["mu"],
cfg["Physical_parameters"][fricParameters["mu"]],
fricParameters["k"],
cfg["Physical_parameters"][fricParameters["k"]],
)
log.warning(message2)
cfg["Physical_parameters"]["Parameters"] = "constant"
# log.error(message)
# raise FileNotFoundError(message)
return cfg
[docs]def setVariableEntrainmentParameters(cfg, inputSimFiles, workInputDir, inputsDir):
"""set file path in cfg object for entrainment parameters (required if option variable is set)
if _b0 , _tauc files found in Inputs/RASTERS and Inputs/ENT
Parameters
-----------
cfg: configparser object
configuration info for simulation
inputSimFiles: dict
dictionary with info on all input data found; here b0, tauc file and if remeshed
workInputDir: pathlib path
pathlib path to work Inputs folder for current simulation
inputsDir: pathlib path
path to avalancheDir/Inputs where original input data and remeshed rasters are stored
Returns
--------
cfg: configparser object
updated configuration info for simulation with file paths to friction parameters
"""
if inputSimFiles["entResInfo"]["flagEnt"] == "Yes" and inputSimFiles["entResInfo"]["tauC"] == "Yes":
cfg["ENTRAINMENT"]["Entrainment"] = "TJEM"
cfg["ENTRAINMENT"]["Bed strength profile"] = "constant"
else:
cfg["ENTRAINMENT"]["Entrainment"] = "none"
return cfg
[docs]def setVariableForestParameters(cfg, inputSimFiles, workInputDir, inputsDir):
"""set file paths in cfg object for forest parameters.
if _nd, _bhd files found in Inputs/RASTERS have to be remeshed, copy remeshed files
to workInputDir with new file name ending _nd, _bhd
Parameters
-----------
cfg: configparser object
configuration info for simulation
inputSimFiles: dict
dictionary with info on all input data found; here nd, bhd file and if remeshed
workInputDir: pathlib path
pathlib path to work Inputs folder for current simulation
inputsDir: pathlib path
path to avalancheDir/Inputs where original input data and remeshed rasters are stored
Returns
--------
cfg: configparser object
updated configuration info for simulation with file paths to forest parameters
"""
if inputSimFiles["entResInfo"]["flagRes"] == "Yes" and inputSimFiles["entResInfo"]["bhd"] == "Yes":
treeDiamFile = inputsDir / cfg["INPUT"]["bhdFile"]
cfg["FOREST_EFFECTS"]["Forest effects"] = "yes"
# TODO Make this remeshed compatible
cfg["File names"]["Forest density filename"] = str(inputSimFiles["resFile"])
cfg["File names"]["Tree diameter filename"] = str(treeDiamFile)
else:
cfg["FOREST_EFFECTS"]["Forest effects"] = "no"
cfg["File names"]["Forest density filename"] = "-"
cfg["File names"]["Tree diameter filename"] = "-"
# forestParameters = {"nd": "Forest density filename", "bhd": "Tree diameter filename"}
# if inputSimFiles["entResInfo"]["nd"] == "Yes" and inputSimFiles["entResInfo"]["bhd"] == "Yes":
#
# for forestParam in ["nd", "bhd"]:
# forestFile = inputsDir / cfg["INPUT"]["%sFile" % forestParam]
#
# # check first if remeshed files should be used
# if (
# "_remeshed" in cfg["INPUT"]["%sFile" % forestParam]
# and inputSimFiles["entResInfo"]["%sRemeshed" % forestParam] == "Yes"
# ):
# forestFilePathNew = workInputDir / (
# forestFile.stem + "_%s" % forestParam + forestFile.suffix
# )
# shutil.copy2(forestFile, forestFilePathNew)
# cfg["File names"][forestParameters[forestParam]] = str(forestFilePathNew)
# log.info(
# "Remeshed %s file copied to %s and set for %s"
# % (forestParam, str(forestFilePathNew), forestParameters[forestParam])
# )
# else:
# cfg["File names"][forestParameters[forestParam]] = str(forestFile)
#
# cfg["FOREST_EFFECTS"]["Forest effects"] = "yes"
#
# else:
# # TODO FSO implement if setting is variable or constant that if variable but file not found then error
# message = "nd and bhd file not found in Inputs/RASTERS - check if file ending is correct (_nd, _bhd) - setting forest effects to no"
# log.warning(message)
#
# cfg["FOREST_EFFECTS"]["Forest effects"] = "no"
# cfg["File names"]["Forest density filename"] = "-"
# cfg["File names"]["Tree diameter filename"] = "-"
return cfg