"""
Tools specific to the com1DFA computational kernel
"""
import configparser
# Load modules
import logging
import math
import pathlib
import numpy as np
from deepdiff import DeepDiff
# local imports
import avaframe.com1DFA.deriveParameterSet as dP
import avaframe.in3Utils.initialiseDirs as inDirs
from avaframe.com1DFA import com1DFA
from avaframe.in1Data import getInput as gI
from avaframe.in3Utils import cfgUtils
from avaframe.in2Trans import rasterUtils as IOf
# create local logger
# change log level in calling module to DEBUG to see log messages
log = logging.getLogger(__name__)
[docs]def getPartInitMethod(cfg, csz, relThForPart):
"""Get particle initialization parameters
Get the massPerPart and nPPK corresponding to the desired initialization method
Parameters
----------
cfg: configparser
configuration for DFA simulation
csz: float
cell size
relThForPart: float
relTh value
Returns
-------
massPerPart : float
mass per particle desired
nPPK : int
number of particles per kernel radius desired
"""
rho = cfg.getfloat("rho")
massPerParticleDeterminationMethod = cfg["massPerParticleDeterminationMethod"]
nPPK = 0
# derive mass per particle to define number of particles per cell:
if massPerParticleDeterminationMethod == "MPPDIR":
massPerPart = cfg.getfloat("massPerPart")
log.debug("Number of particles defined by: mass per particle %s" % cfg["massPerPart"])
elif massPerParticleDeterminationMethod == "MPPDH":
deltaTh = cfg.getfloat("deltaTh")
ds = min(csz, cfg.getfloat("sphKernelRadius"))
massPerPart = rho * ds * ds * deltaTh
log.debug("Number of particles defined by: release thickness per particle: %s" % cfg["deltaTh"])
log.debug("mass per particle is %.2f" % massPerPart)
elif massPerParticleDeterminationMethod == "MPPKR":
sphKernelRadius = cfg.getfloat("sphKernelRadius")
cszMin = min(csz, sphKernelRadius)
nPPK0 = cfg.getfloat("nPPK0")
sphKR0 = cfg.getfloat("sphKR0")
aPPK = cfg.getfloat("aPPK")
nPPK = round(nPPK0 * (cszMin / sphKR0) ** aPPK)
massPerPart = rho * math.pi * cszMin * cszMin * relThForPart / nPPK
return massPerPart, nPPK
[docs]def setFrictTypeIndicator(simCfg):
"""Sets the friction type indicator for the simname
Default is L, otherwise M for samosATMedium, S for samosATSmall
Parameters
-----------
simCfg: dict
simulation configuration
Returns
--------
frictTypeIdentifier: str
L if samosAT, S for samosATSmall, M for samosATMedium
"""
frictTypeIdentifier = "L"
if simCfg["GENERAL"]["frictModel"].lower() == "samosatsmall":
frictTypeIdentifier = "S"
if simCfg["GENERAL"]["frictModel"].lower() == "samosatmedium":
frictTypeIdentifier = "M"
return frictTypeIdentifier
[docs]def compareSimCfgToDefaultCfgCom1DFA(simCfg, module=com1DFA):
"""Compares the given simulation configuration (as dict) to the default
com1DFA configuration. Disregards values like avalancheDir that are expected to
change. Returns True if it is the default + an identifier string: D = Default and
C = Changed
Parameters
-----------
simCfg: dict
simulation configuration
module: module
module to be used for task (optional)
Returns
--------
defaultIdentifierString: str
D if default and C if changed
"""
# extract the name of the module
modName = module.__name__.split(".")[-1]
log.info("Comparing simCfg to default cfg")
defaultIdentifierString = "D"
# Get default cfg and convert to dict for comparison
defCfgObject = cfgUtils.getDefaultModuleConfig(com1DFA, toPrint=False)
defCfg = cfgUtils.convertConfigParserToDict(defCfgObject)
# Which changes to ignore (in the case of com1DFA). These are expected
# to change...
excludeItems = [
"root['GENERAL']['avalancheDir']",
"root['GENERAL']['secRelArea']",
"root['GENERAL']['simTypeList']",
"root['INPUT']['releaseScenario']",
]
# If entrainment is requested, and it is set in shapefile, check if it contains the default entrainment thickness
# in ALL features of the shapefile
if simCfg["GENERAL"]["simTypeList"] == "ent" and simCfg["GENERAL"]["entThFromFile"] == "True":
defaultEntTh = defCfg["GENERAL"]["entThIfMissingInShp"]
# this try handles raster instead of shapefile
try:
if not all([x == defaultEntTh for x in simCfg["INPUT"]["entThThickness"].split("|")]):
defaultIdentifierString = "C"
log.info("Non-default entrainment value(s) used: %s" % simCfg["INPUT"]["entThThickness"])
except KeyError:
defaultIdentifierString = "D"
# Entrainment might not be set in shpfile, but still the default from
# ini file is used. This is still default D and not changed C
if simCfg["GENERAL"]["entThFromFile"] == "False":
# check if entTh is the default entTh if no other entTh is set
if simCfg["GENERAL"]["entTh"] == defCfg["GENERAL"]["entThIfMissingInShp"]:
excludeItems.append("root['GENERAL']['entThFromFile']")
excludeItems.append("root['GENERAL']['entTh']")
# sphKernelSize is set during runtime, make sure it is not reported
# as changed if default is set to meshCellSize
if modName in ["com1DFA", "com5SnowSlide", "com6RockAvalanche"]:
if defCfg["GENERAL"]["sphKernelRadius"] == "meshCellSize":
if simCfg["GENERAL"]["sphKernelRadius"] == simCfg["GENERAL"]["meshCellSize"]:
excludeItems.append("root['GENERAL']['sphKernelRadius']")
# do the diff and analyse
# this is the deepdiff > 8.0 version
# TODO: remove this again in the future when deepdiff > 8.0 is wider established
try:
diff = DeepDiff(defCfg, simCfg, exclude_paths=excludeItems, threshold_to_diff_deeper=0)
# for older deepdiff versions which don't know threshold_to_diff_deeper
except ValueError:
diff = DeepDiff(defCfg, simCfg, exclude_paths=excludeItems)
# Sometimes (after variation split) the type changes, so check if it is the default or something else
# If it is, check the type_changes and convert to values_change if necessary
if "type_changes" in diff:
diff = _treatTypeChangeDiff(diff)
valuesChanged = dict()
# This needs to be checked AFTER check for type_changes
if "values_changed" in diff:
defaultIdentifierString = "C"
for key, val in diff["values_changed"].items():
valuesChanged[_cleanDiffKey(key)] = val
log.info("Comparing to default cfg, values changed:")
log.info(valuesChanged)
else:
valuesChanged = None
if "dictionary_item_added" in diff:
log.debug("Comparing to default cfg, added items:")
log.debug(diff["dictionary_item_added"])
return defaultIdentifierString, valuesChanged
def _treatTypeChangeDiff(diff):
"""Internal function to convert type changes in the result of DeepDiff to actual changes
Parameters
----------
diff:
DeepDiff result to analyse
Returns
-------
diff:
DeepDiff result with changed values
"""
allChangedValues = dict()
for key, val in diff["type_changes"].items():
# The type(x)(y) applies the type of x on y and gives the value of y
if val["old_value"] == type(val["old_value"])(val["new_value"]):
continue
else:
del val["old_type"]
del val["new_type"]
val["new_value"] = type(val["old_value"])(val["new_value"])
allChangedValues[key] = val
if allChangedValues:
diff["values_changed"] = allChangedValues
return diff
def _cleanDiffKey(keyString):
"""small helper function to change format of key for DeepDiff results
Parameters
----------
keyString : str
string to change
Returns
-------
cleanString: str
cleaned string
"""
keyStr = keyString.replace("root", "")
keyStr = keyStr.replace("']['", "->")
keyStr = keyStr.replace("['", "")
keyStr = keyStr.replace("']", "")
return keyStr
[docs]def createSimDictFromCfgs(cfgMain, cfgPath, module=com1DFA):
"""From multiple cfg files create a simDict with one item for each simulation to perform
within these cfg files still parameter variations are allowed
Parameters
------------
cfgMain: configparser object
main configuration of AvaFrame
cfgPath: pathlib Path or str
path to directory with cfg files
module: module
module to be used for task (optional)
Returns
--------
simDict: dict
dictionary with info on simHash, releaseScenario, release area file path,
simType and contains full configuration configparser object for simulation run
"""
# fetch avaDir
avalancheDir = cfgMain["MAIN"]["avalancheDir"]
# fetch input data and create work and output directories
# TODO: so for now remeshed dir is cleaned before a run
inputSimFilesAll, outDir, simDFExisting, simNameExisting = initializeInputs(avalancheDir, True, module)
# save dem file path as it is deleted from input sim files dict once it is set in the config
demFile = inputSimFilesAll["demFile"]
# fetch all cfg files in configuration directory
cfgDir = pathlib.Path(cfgPath)
cfgFilesAll = list(cfgDir.glob("*.ini"))
if len(cfgFilesAll) == 0:
message = "No configuration file found to create simulation runs in: %s" % str(cfgDir)
log.error(message)
raise FileNotFoundError(message)
else:
log.info("Found %d configuration files" % len(cfgFilesAll))
# initialize simDict
simDictAll = {}
# loop over all cfgFiles and create simDict
for index, cfgFile in enumerate(cfgFilesAll):
# read configuration
cfgFromFile = cfgUtils.getModuleConfig(module, avalancheDir, fileOverride=cfgFile, toPrint=False)
# create dictionary with one key for each simulation that shall be performed
# NOTE: sims that are added don't need to be added to the simNameExisting list as
# if new identical sims are added the simDict entry is just updated and not a duplicate one added
simDict = dP.createSimDict(avalancheDir, module, cfgFromFile, inputSimFilesAll, simNameExisting)
simDictAll.update(simDict)
# reset dem file
inputSimFilesAll["demFile"] = demFile
return simDictAll, inputSimFilesAll, simDFExisting, outDir
[docs]def updateResCoeffFields(fields, cfg):
"""update fields of cRes and detK, coefficients of resistance parameter and detrainment parameter
according to the thresholds of FV and FT
if FV OR FT below min thresholds -> apply detrainment in that area
if FV AND FT above min thresholds AND below max thresholds -> apply cResH (additional friction) in that area
if FV OR FT above max thresholds -> no impact of resistance area
Parameters
------------
fields: dict
dictionary with cResRasterOrig, cResRaster and detRasterOrig, detRaster fields
cfg: configparser object
configuration of com1DFA, thresholds
Returns
------------
fields: dict
updated cRes and detK fields, required to write raster with header
"""
# fetch cRes and detK raster and thresholds for FV and FT
cResOrig = fields["cResRasterOrig"].copy()
detOrig = fields["detRasterOrig"].copy()
vMin = cfg.getfloat("forestVMin")
thMin = cfg.getfloat("forestThMin")
vMax = cfg.getfloat("forestVMax")
thMax = cfg.getfloat("forestThMax")
# create new rasters using FV, FT and thresholds to mask
detRasterInt = np.where(((fields["FV"] <= vMin) | (fields["FT"] <= thMin)), detOrig, 0.0)
detRaster = np.where(((fields["FV"] > vMax) | (fields["FT"] > thMax)), 0, detRasterInt)
cResRasterInt = np.where(((fields["FV"] > vMin) & (fields["FT"] > thMin)), cResOrig, 0.0)
cResRaster = np.where(((fields["FV"] > vMax) | (fields["FT"] > thMax)), 0, cResRasterInt)
if len(np.where((detRaster > 0) & (cResRaster > 0))[0]) > 0:
message = "Detrainment and increased friction within same cell!"
log.error(message)
raise AssertionError(message)
# if max thresholds are exceeded: forest destroyed remove forest
lTh = len(np.where((fields["FV"] > vMax) | (fields["FT"] > thMax))[0])
if lTh > 0:
cResOrig = np.where(((fields["FV"] > vMax) | (fields["FT"] > thMax)), 0, cResOrig)
detOrig = np.where(((fields["FV"] > vMax) | (fields["FT"] > thMax)), 0, detOrig)
fields["cResRasterOrig"] = cResOrig
fields["detRasterOrig"] = detOrig
log.debug(
"Resistance area removed %d cells because FV or FT exceeded %.2f ms-1, %.2f m"
% (lTh, vMax, thMax)
)
# update fields dictionary
fields["cResRaster"] = cResRaster
fields["detRaster"] = detRaster
return fields
[docs]def chooseDemPlot(dem, adaptedDemBackground=False):
"""
choose the DEM (rasterData) that is used for report plots
Parameters
-----------
adaptedDemBackground: bool
if True the adapted DEM is used as background in the plots
Returns
-------
demPlot: dict
contains the rasterdata that should be used in report plots as background
"""
demPlot = dem.copy()
# choose if the input DEM or the adapted DEM is used as background
if adaptedDemBackground:
demPlot["rasterData"] = dem["rasterData"]
log.info("The resulting adapted DEM is used in the plots.")
else:
demPlot["rasterData"] = dem["originalRasterData"]
log.info("The original DEM is used in the plots.")
return demPlot
[docs]def updateTimeField(fields, timeStep):
"""update filed indicating first time step mass entered a cell
Parameters
-----------
fields: dict
dictionary with fields
timeStep: float
actual time step
Returns
---------
fields: dict
updated timeInfo field
"""
FT = fields["FT"]
# set time step to previously not affected cells
fields["timeInfo"] = np.where(((fields["timeInfo"] == 0) & (FT != 0)), timeStep, fields["timeInfo"])
return fields