"""
Functions that are used specifically for modeling debris flows (within DebrisFrame).
"""
# Load modules
import logging
import numpy as np
import copy
import avaframe.com1DFA.DFAfunctionsCython as DFAfunC
import avaframe.com1DFA.particleTools as particleTools
import avaframe.in3Utils.geoTrans as geoTrans
import avaframe.com1DFA.com1DFA as com1DFA
import avaframe.in2Trans.shpConversion as shpConv
from avaframe.in1Data import getInput as gI
import avaframe.in3Utils.cfgUtils as cfgUtils
# create local logger
# change log level in calling module to DEBUG to see log messages
log = logging.getLogger(__name__)
[docs]def initializeTimeDepRelease(cfg, inputSimLines, particles, fields, dem, zPartArray0, t, atol=1e-05):
"""
Update particles with "new" particles initialised by a time dependent release.
Parameters
---------
cfg: configparser
configuration settings
inputSimLines : dict
dictionary with input data dictionaries (releaseLine,...)
particles : dict
particles dictionary at timestep t that are in the flow already
fields: dict
fields dictionary at timestep t
dem: dict
dictionary with info on DEM data
zPartArray0: dict
dictionary containing z - value of particles at timestep 0
t: float
timestep of iteration
atol: float
look for matching time steps with atol tolerance - default is atol=1.e-5
Returns
---------
particles: dict
particles dictionary at t including the new released particles
fields: dict
updated fields dictionary at t including the new released particles
zPartArray0: dict
dictionary containing z - value of particles at timestep 0
"""
timeDepRelValues = inputSimLines["releaseLine"]["timeDepRelValues"]
if np.isclose(t, timeDepRelValues["timeStep"], atol=atol, rtol=0).any():
iTup = np.where(np.isclose(t, timeDepRelValues["timeStep"], atol=atol, rtol=0))
# iTup is a tuple containing an array with one value in the first position, so we can extract the index:
i = iTup[0].item()
log.info(
"add release at timestep: %.2f s with thickness %s m and velocity %s m/s"
% (t, timeDepRelValues["thickness"][i], timeDepRelValues["velocity"][i])
)
# similar workflow to secondary release!
particles, zPartArray0 = addReleaseParticles(
cfg,
particles,
inputSimLines,
timeDepRelValues["thickness"][i],
timeDepRelValues["velocity"][i],
dem,
zPartArray0,
)
particles = DFAfunC.getNeighborsC(particles, dem)
# update fields (compute grid values)
if fields["computeTA"]:
particles = DFAfunC.computeTrajectoryAngleC(particles, zPartArray0)
particles, fields = DFAfunC.updateFieldsC(cfg["GENERAL"], particles, dem, fields)
return particles, fields, zPartArray0
[docs]def addReleaseParticles(cfg, particles, inputSimLines, thickness, velocityMag, dem, zPartArray0):
"""
add new particles initialized by a time dependent release to particles that are in the flow already
Parameters
---------
cfg: configparser object
configuration settings
particles : dict
particles dictionary at t that are in the flow already
inputSimLines : dict
dictionary with input data dictionaries (releaseLine,...)
thickness: float
thickness of current release
velocityMag: float
velocity of current release
dem: dict
dictionary with info on DEM data
zPartArray0: numpy array
z - value of particles at timestep 0
Returns
---------
particles: dict
particles dictionary at t including the current released particles
zPartArray0: dict
dictionary containing z - value of particles at timestep 0
"""
relLine = inputSimLines["releaseLine"]
relLine["header"] = dem["originalHeader"].copy()
relLine = geoTrans.prepareArea(
relLine,
dem,
np.sqrt(2),
thList=[thickness] * len(relLine["Name"]),
combine=True,
checkOverlap=False,
)
# check if already existing particles are within the release polygon
# it's possible that there are still a few particles in the polygon with low velocities
# TODO: could think of a threshold of number of particles that are still allowed in the polygons?
mask = geoTrans.getParticlesInPolygon(particles, relLine, cfg["GENERAL"].getfloat("thresholdPointInRel"))
if np.sum(mask) > 0:
# if there is at least one particle within the polygon (including the buffer):
message = (
"Already existing particles are within the release polygon, which can cause numerical instabilities (at timestep: %02f s)"
% (particles["t"] + particles["dt"])
)
# timestep in particles is not updated yet
log.error(message)
raise ValueError(message)
particlesRelease = com1DFA.initializeParticles(
cfg["GENERAL"],
relLine,
dem,
)
particlesRelease = DFAfunC.updateInitialVelocity(cfg["GENERAL"], particlesRelease, dem, velocityMag)
particles = particleTools.mergeParticleDict(particles, particlesRelease)
# save initial z position for travel angle computation
zPartArray0 = np.append(zPartArray0, copy.deepcopy(particlesRelease["z"]))
return particles, zPartArray0
[docs]def prepareTimeDepRelLine(releaseLine, cfg):
"""
read time dependent release values and return them as a dictionary containing:
- timestep
- thickness
- velocity
additionally, the values are saved into the configurationFiles folder
Parameters
----------
releaseLine: dict
contains information of release line
cfg: configparser object
configuration for simType
demOri : dict
dictionary with dem info (header original origin), raster data correct mesh cell size
Returns
-------
releaseLine: dict
added time dependent release values: timeStep, thickness, velocity
"""
try:
releaseLine["values"], timeDepRelValuesDF = gI.getTimeDepRelCsv(cfg["INPUT"]["timeDepRelCsv"])
releaseLine["thicknessSource"] = ["csv file"]
except:
message = "Provide a valid csv file containing time dependent release values"
log.error(message)
raise FileNotFoundError(message)
# write the time dependent values into configurationFiles folder
cfgUtils.writeReleaseCsvFile(cfg, timeDepRelValuesDF)
return releaseLine
[docs]def checkTravelledDistance(cfgGen, timeDepRelValues, timeDepRelCsv):
"""
not used at the moment (related to timeStepDistance in the configuration file)!
check if time steps of time dependent release are not too close so that the particle density cannot become too high
check that particles moved out of release area before new particles are initialized
time between release time steps
first timestep is skipped since this is always ok.
Parameters
-----------
cfgGen: configparser
configuration settings, section "GENERAL"
timeDepRelValues: dict
contains time dependent release values: timestep, thickness, velocity
timeDepRelCsv: str
directory to csv table containing time dependent release values
"""
timeStepUnique = np.unique(timeDepRelValues["timeStep"])
if timeStepUnique.ndim > 0:
relDT = np.append(timeDepRelValues["timeStep"], 0) - np.append(0, timeDepRelValues["timeStep"])
vel = np.where(np.array(timeDepRelValues["velocity"]) > 0, np.array(timeDepRelValues["velocity"]), 1)
distance = vel[:-1] * relDT[1:-1]
if np.any(distance < cfgGen.getfloat("timeStepDistance")):
message = "Please select timesteps with greater spacing in %s." % (timeDepRelCsv)
# TODO: error or warning?
log.error(message)
raise ValueError(message)