"""
Tools for generating an avalanche path from a DFA simulation
"""
# Load modules
import math
import numpy as np
import logging
import pathlib
import shutil
# Local imports
import avaframe.in2Trans.shpConversion as shpConv
from avaframe.in3Utils import cfgUtils
import avaframe.in3Utils.geoTrans as gT
import avaframe.com1DFA.particleTools as particleTools
import avaframe.com1DFA.DFAtools as DFAtls
from avaframe.com1DFA import com1DFA
import avaframe.out3Plot.outDebugPlots as debPlot
# create local logger
# change log level in calling module to DEBUG to see log messages
log = logging.getLogger(__name__)
cfgAVA = cfgUtils.getGeneralConfig()
debugPlot = cfgAVA['FLAGS'].getboolean('debugPlot')
[docs]def generateAveragePath(avalancheDir, pathFromPart, simName, dem, addVelocityInfo=False, flagAvaDir=True,
comModule='com1DFA'):
""" extract path from fileds or particles
Parameters
-----------
avalancheDir: pathlib
avalanche directory pathlib path
pathFromPart: boolean
compute path from particles if True, from fields (FT, FM, FV) if False
simName: str
simulation name
dem: dict
com1DFA simulation dictionary
addVelocityInfo: boolean
True to add (u2, ekin, totEKin) to result
Will only work if the particles (ux, uy, uz) exist or if 'FV' exists
flagAvaDir: bool
if True avalancheDir corresponds to an avalanche directory and data is
read from avaDir/Outputs/comModule/particles or avaDir/Outputs/comModule/peakFiles
depending on if pathFromPart is True or False
comModule: str
module that computed the particles or fields
Returns
--------
avaProfileMass: dict
mass averaged profile (x, y, z, 's')
if addVelocityInfo is True, kinetic energy and velocity information are added to
the avaProfileMass dict (u2, ekin, totEKin)
particlesIni: dict
x, y coord of the initial particles or flow thickness field
"""
if pathFromPart:
particlesList, timeStepInfo = particleTools.readPartFromPickle(avalancheDir, simName=simName, flagAvaDir=True,
comModule='com1DFA')
particlesIni = particlesList[0]
log.info('Using particles to generate avalanche path profile')
# postprocess to extract path and energy line
avaProfileMass = getDFAPathFromPart(particlesList, addVelocityInfo=addVelocityInfo)
else:
particlesList = ''
# read field
fieldName = ['FT', 'FM']
if addVelocityInfo:
fieldName.append['FV']
fieldsList, fieldHeader, timeList = com1DFA.readFields(avalancheDir, fieldName, simName=simName,
flagAvaDir=True, comModule='com1DFA')
# get fields header
ncols = fieldHeader['ncols']
nrows = fieldHeader['nrows']
csz = fieldHeader['cellsize']
# we want the origin to be in (0, 0) as it is in the avaProfile that comes in
X, Y = gT.makeCoordinateGrid(0, 0, csz, ncols, nrows)
indNonZero = np.where(fieldsList[0]['FD'] > 0)
# convert this data in a particles style (dict with x, y, z info)
particlesIni = {'x': X[indNonZero], 'y': Y[indNonZero]}
particlesIni, _ = gT.projectOnRaster(dem, particlesIni)
log.info('Using fields to generate avalanche path profile')
# postprocess to extract path and energy line
avaProfileMass = getDFAPathFromField(fieldsList, fieldHeader, dem)
return avaProfileMass, particlesIni
[docs]def getDFAPathFromPart(particlesList, addVelocityInfo=False):
""" compute mass averaged path from particles
Also returns the averaged velocity and kinetic energy associated
If addVelocityInfo is True, information about velocity and kinetic energy is computed
Parameters
-----------
particlesList: list
list of particles dict
addVelocityInfo: boolean
True to add (u2, ekin, totEKin) to result
Returns
--------
avaProfileMass: dict
mass averaged profile (x, y, z, 's', 'sCor')
if addVelocityInfo is True, kinetic energy and velocity information are added to
the avaProfileMass dict (u2, ekin, totEKin)
"""
propList = ['x', 'y', 'z', 's', 'sCor']
propListPart = ['x', 'y', 'z', 'trajectoryLengthXY', 'trajectoryLengthXYCor']
avaProfileMass = {}
# do we have velocity info?
if addVelocityInfo:
propList.append('u2')
propList.append('ekin')
propListPart.append('u2')
propListPart.append('ekin')
avaProfileMass['totEKin'] = np.empty((0, 1))
# initialize other properties
for prop in propList:
avaProfileMass[prop] = np.empty((0, 1))
avaProfileMass[prop + 'std'] = np.empty((0, 1))
# loop on each particle dictionary (ie each time step saved)
for particles in particlesList:
if particles['nPart'] > 0:
m = particles['m']
if addVelocityInfo:
ux = particles['ux']
uy = particles['uy']
uz = particles['uz']
u = DFAtls.norm(ux, uy, uz)
u2Array = u*u
kineticEneArray = 0.5*m*u2Array
particles['u2'] = u2Array
particles['ekin'] = kineticEneArray
# mass-averaged path
avaProfileMass = appendAverageStd(propList, avaProfileMass, particles, m, naming=propListPart)
if addVelocityInfo:
avaProfileMass['totEKin'] = np.append(avaProfileMass['totEKin'], np.nansum(kineticEneArray))
return avaProfileMass
[docs]def getDFAPathFromField(fieldsList, fieldHeader, dem):
""" compute mass averaged path from fields
Also returns the averaged velocity and kinetic energy associated
The dem and fieldsList (FT, FV and FM) need to have identical dimentions and cell size.
If FV is not provided, information about velocity and kinetic energy is not computed
Parameters
-----------
fieldsList: list
time sorted list of fields dict
fieldHeader: dict
field header dict
dem: dict
dem dict
Returns
--------
avaProfileMass: dict
mass averaged profile (x, y, z, 's')
if 'FV' in fieldsList, kinetic energy and velocity information are added to
the avaProfileMass dict (u2, ekin, totEKin)
"""
# get DEM
demRaster = dem['rasterData']
# get fields header
ncols = fieldHeader['ncols']
nrows = fieldHeader['nrows']
xllc = fieldHeader['xllcenter']
yllc = fieldHeader['yllcenter']
csz = fieldHeader['cellsize']
X, Y = gT.makeCoordinateGrid(xllc, yllc, csz, ncols, nrows)
propList = ['x', 'y', 'z']
avaProfileMass = {}
# do we have velocity info?
addVelocityInfo = False
if 'FV' in fieldsList[0]:
propList.append('u2')
propList.append('ekin')
avaProfileMass['totEKin'] = np.empty((0, 1))
addVelocityInfo = True
# initialize other properties
for prop in propList:
avaProfileMass[prop] = np.empty((0, 1))
avaProfileMass[prop + 'std'] = np.empty((0, 1))
# loop on each field dictionary (ie each time step saved)
for field in fieldsList:
# find cells with snow
nonZeroIndex = np.where(field['FT'] > 0)
xArray = X[nonZeroIndex]
yArray = Y[nonZeroIndex]
zArray, _ = gT.projectOnGrid(xArray, yArray, demRaster, csz=csz, xllc=xllc, yllc=yllc)
mArray = field['FM'][nonZeroIndex]
particles = {'x': xArray, 'y': yArray, 'z': zArray}
if addVelocityInfo:
uArray = field['FV'][nonZeroIndex]
u2Array = uArray*uArray
kineticEneArray = 0.5*mArray*u2Array
particles['u2'] = u2Array
particles['ekin'] = kineticEneArray
# mass-averaged path
avaProfileMass = appendAverageStd(propList, avaProfileMass, particles, mArray)
if addVelocityInfo:
avaProfileMass['totEKin'] = np.append(avaProfileMass['totEKin'], np.nansum(kineticEneArray))
avaProfileMass['x'] = avaProfileMass['x'] - xllc
avaProfileMass['y'] = avaProfileMass['y'] - yllc
# compute s
avaProfileMass = gT.computeS(avaProfileMass)
return avaProfileMass
[docs]def extendDFAPath(cfg, avaProfile, dem, particlesIni):
""" extend the DFA path at the top and bottom
avaProfile with x, y, z, s information
Parameters
-----------
cfg: configParser
configuration object with:
- extTopOption: int, how to extend towards the top? 0 for heighst point method, a for largest runout method
- nCellsResample: int, resampling length is given by nCellsResample*demCellSize
- nCellsMinExtend: int, when extending towards the bottom, take points at more
than nCellsMinExtend*demCellSize from last point to get the direction
- nCellsMaxExtend: int, when extending towards the bottom, take points
at less than nCellsMaxExtend*demCellSize from last point to get the direction
- factBottomExt: float, extend the profile from factBottomExt*sMax
avaProfile: dict
profile to be extended
dem: dict
dem dict
particlesIni: dict
initial particles dict
Returns
--------
avaProfile: dict
extended profile at top and bottom (x, y, z).
"""
# resample the profile
resampleDistance = cfg.getfloat('nCellsResample') * dem['header']['cellsize']
avaProfile, _ = gT.prepareLine(dem, avaProfile, distance=resampleDistance, Point=None)
avaProfile = extendProfileTop(cfg.getint('extTopOption'), particlesIni, avaProfile)
avaProfile = extendProfileBottom(cfg, dem, avaProfile)
return avaProfile
[docs]def extendProfileTop(extTopOption, particlesIni, profile):
""" extend the DFA path at the top (release)
Either towards the highest point in particlesIni (extTopOption = 0)
or the point leading to the longest runout (extTopOption = 1)
Parameters
-----------
extTopOption: int
decide how to extend towards the top
if 0, extrapolate towards the highest point in the release
if 1, extrapolate towards the point leading to the lonest runout
particlesIni: dict
initial particles dict
profile: dict
profile to extend
Returns
--------
profile: dict
extended profile
"""
if extTopOption == 0:
# get highest particle
indTop = np.argmax(particlesIni['z'])
xExtTop = particlesIni['x'][indTop]
yExtTop = particlesIni['y'][indTop]
zExtTop = particlesIni['z'][indTop]
dx = xExtTop - profile['x'][0]
dy = yExtTop - profile['y'][0]
ds = np.sqrt(dx**2 + dy**2)
elif extTopOption == 1:
# get point with the most important runout gain
# get first particle of the path
xFirst = profile['x'][0]
yFirst = profile['y'][0]
zFirst = profile['z'][0]
# get last particle of the path
sLast = profile['s'][-1]
zLast = profile['z'][-1]
# compute runout angle for averaged path
tanAngle = (zFirst-zLast)/sLast
# compute ds
dx = particlesIni['x'] - xFirst
dy = particlesIni['y'] - yFirst
ds = np.sqrt(dx**2 + dy**2)
# compute dz
dz = particlesIni['z'] - zFirst
# remove the elevation needed to match the runout angle
dz1 = dz - tanAngle * ds
# get the particle with the highest potential
indTop = np.argmax(dz1)
xExtTop = particlesIni['x'][indTop]
yExtTop = particlesIni['y'][indTop]
zExtTop = particlesIni['z'][indTop]
ds = ds[indTop]
# extend profile
profile['x'] = np.append(xExtTop, profile['x'])
profile['y'] = np.append(yExtTop, profile['y'])
profile['z'] = np.append(zExtTop, profile['z'])
profile['s'] = np.append(0, profile['s'] + ds)
if debugPlot:
debPlot.plotPathExtTop(profile, particlesIni, xFirst, yFirst, zFirst, dz1)
return profile
[docs]def extendProfileBottom(cfg, dem, profile):
""" extend the DFA path at the bottom (runout area)
Find the direction in which to extend considering the last point of the profile
and a few previous ones but discarding the ones that are too close
(nCellsMinExtend* csz < distFromLast <= nCellsMaxExtend * csz).
Extend in this diretion for a distance factBottomExt * length of the path.
Parameters
-----------
cfg: configParser
nCellsMinExtend: int, when extending towards the bottom, take points
at more than nCellsMinExtend*demCellSize from last point to get the direction
nCellsMaxExtend: int, when extending towards the bottom, take points at
less than nCellsMaxExtend*demCellSize from last point to get the direction
factBottomExt: float, extend the profile from factBottomExt*sMax
dem: dict
dem dict
profile: dict
profile to extend
Returns
--------
profile: dict
extended profile
"""
header = dem['header']
csz = header['cellsize']
zRaster = dem['rasterData']
# get last point
xLast = profile['x'][-1]
yLast = profile['y'][-1]
sLast = profile['s'][-1]
# compute distance from last point:
r = DFAtls.norm(profile['x']-xLast, profile['y']-yLast, 0)
# find the previous points
extendMinDistance = cfg.getfloat('nCellsMinExtend') * csz
extendMaxDistance = cfg.getfloat('nCellsMaxExtend') * csz
pointsOfInterestLast = np.where((r < extendMaxDistance) & (r > extendMinDistance))[0]
xInterest = profile['x'][pointsOfInterestLast]
yInterest = profile['y'][pointsOfInterestLast]
# check if points are found to compute direction of extension
if len(xInterest) > 0:
# find the direction in which we need to extend the path
vDirX = xLast - xInterest
vDirY = yLast - yInterest
vDirX, vDirY, vDirZ = DFAtls.normalize(np.array([vDirX]), np.array([vDirY]), 0*np.array([vDirY]))
vDirX = np.sum(vDirX)
vDirY = np.sum(vDirY)
vDirZ = np.sum(vDirZ)
vDirX, vDirY, vDirZ = DFAtls.normalize(np.array([vDirX]), np.array([vDirY]), np.array([vDirZ]))
# extend in this direction
factExt = cfg.getfloat('factBottomExt')
gamma = factExt * sLast / np.sqrt(vDirX**2 + vDirY**2)
xExtBottom = np.array([xLast + gamma * vDirX])
yExtBottom = np.array([yLast + gamma * vDirY])
# project on DEM
zExtBottom, _ = gT.projectOnGrid(xExtBottom, yExtBottom, zRaster, csz=csz)
# Dicothomie method to find the last point on the extention and on the dem
if np.isnan(zExtBottom):
factExt = factExt/2
stepSize = factExt
isOut = True
else:
isOut = False
stepSize = 0
count = 0
# remember last point found inside
factLast = 0
while count < cfg.getint('maxIterationExtBot') and stepSize * sLast > cfg.getint('nBottomExtPrecision')*csz:
count = count + 1
gamma = factExt * sLast / np.sqrt(vDirX**2 + vDirY**2)
xExtBottom = np.array([xLast + gamma * vDirX])
yExtBottom = np.array([yLast + gamma * vDirY])
# project on DEM
zExtBottom, _ = gT.projectOnGrid(xExtBottom, yExtBottom, zRaster, csz=csz)
stepSize = stepSize/2
if np.isnan(zExtBottom):
factExt = factExt - stepSize
isOut = True
else:
# remember last point found inside
factLast = factExt
factExt = factExt + stepSize
isOut = False
if isOut:
# the last iteration is not in the domain, fall back to last point in domain
factExt = factLast
gamma = factExt * sLast / np.sqrt(vDirX**2 + vDirY**2)
xExtBottom = np.array([xLast + gamma * vDirX])
yExtBottom = np.array([yLast + gamma * vDirY])
# project on DEM
zExtBottom, _ = gT.projectOnGrid(xExtBottom, yExtBottom, zRaster, csz=csz)
log.info('found extention after %d iterations, precision is %.2f m' % (count, stepSize * sLast))
# extend profile
profile['x'] = np.append(profile['x'], xExtBottom)
profile['y'] = np.append(profile['y'], yExtBottom)
profile['z'] = np.append(profile['z'], zExtBottom)
profile['s'] = np.append(profile['s'], sLast + np.sqrt((xLast-xExtBottom)**2 + (yLast-yExtBottom)**2))
else:
log.warning('Path not extended at bottom as no point of interest for computing direction \
of where to extend path is found')
if debugPlot:
debPlot.plotPathExtBot(profile, xInterest, yInterest, 0*yInterest, xLast, yLast)
return profile
[docs]def getParabolicFit(cfg, avaProfile, dem):
"""fit a parabola on a set of (s, z) points
first and last point match. Last constraint is given by fitOption (see below)
Parameters
-----------
cfg: configParser
configuration object with:
fitOption: int
how to optimize the fit
0 minimize distance between points
1 match the end slope
avaProfile: dict
profile to be extended
dem: dict
dem dict
Returns
--------
parabolicFit: dict
a, b, c coefficients of the parabolic fit (y = a*a*x + b*x + c)
"""
s = avaProfile['s']
sE = s[-1]
z = avaProfile['z']
z0 = z[0]
zE = z[-1]
# same start and end point, minimize distance between curves
if cfg.getfloat('fitOption') == 0:
SumNom = np.sum(s*(s-sE)*((zE-z0)/sE*s+z0-z))
SumDenom = s*(s-sE)
SumDenom = np.dot(SumDenom, SumDenom)
a = - SumNom/SumDenom
b = (zE-z0)/sE - a*sE
elif cfg.getfloat('fitOption') == 1:
angleProf, tmpProf, dsProf = gT.prepareAngleProfile(10, avaProfile, raiseWarning=False)
r = avaProfile['s'] - avaProfile['s'][-1]
resampleDistance = cfg.getfloat('nCellsSlope') * dem['header']['cellsize']
pointsOfInterestLast = np.where(np.abs(r) < resampleDistance)
slope = np.nansum(angleProf[pointsOfInterestLast])/np.size(pointsOfInterestLast)
slope = -np.tan(np.radians(slope))
a = (slope*sE + (z0 - zE))/(sE*sE)
b = -slope - 2*(z0 - zE)/sE
c = z0
parabolicFit = {'a': a, 'b': b, 'c': c}
return parabolicFit
[docs]def getSplitPoint(cfg, avaProfile, parabolicFit):
""" find the split point corresponding to an avalanche profile, with parabolic fit and the slopeSplitPoint
Parameters
-----------
cfg: configParser
configuration object with
- slopeSplitPoint: float, desired slope at split point (in degrees)
- dsMin: float, threshold distance [m] for looking for the
- split point (at least dsMin meters below split point angle)
avaProfile: dict
profile to be extended
parabolicFit: dict
a, b, c coeficients of the parabolic fit (y = a*a*x + b*x + c)
Returns
--------
splitPoint: dict
(x, y, z, zPra, s) at split point location.
"""
indFirst = avaProfile['indStartMassAverage']
indEnd = avaProfile['indEndMassAverage']
s0 = avaProfile['s'][indFirst]
sEnd = avaProfile['s'][indEnd]
s = avaProfile['s']
z = avaProfile['z']
sNew = s - s0
zPara = parabolicFit['a']*sNew*sNew+parabolicFit['b']*sNew+parabolicFit['c']
parabolicProfile = {'s': sNew, 'z': zPara}
anglePara, tmpPara, dsPara = gT.prepareAngleProfile(cfg.getfloat('slopeSplitPoint'), parabolicProfile,
raiseWarning=False)
try:
indSplitPoint = gT.findAngleProfile(tmpPara, dsPara, cfg.getfloat('dsMin'))
splitPoint = {'x': avaProfile['x'][indSplitPoint], 'y': avaProfile['y'][indSplitPoint],
'z': z[indSplitPoint], 'zPara': zPara[indSplitPoint], 's': sNew[indSplitPoint]}
except IndexError:
noSplitPointFoundMessage = ('Automated split point generation failed as no point where slope is less than %s°'
'was found, provide the split point manually.' % cfg.getfloat('slopeSplitPoint'))
splitPoint = ''
log.warning(noSplitPointFoundMessage)
if debugPlot:
angleProf, tmpProf, dsProf = gT.prepareAngleProfile(cfg.getfloat('slopeSplitPoint'), avaProfile)
debPlot.plotFindAngle(avaProfile, angleProf, parabolicProfile, anglePara, s0, sEnd, splitPoint, indSplitPoint)
return splitPoint
[docs]def resamplePath(cfg, dem, avaProfile):
"""
resample the path profile and keep track of indStartMassAverage and indEndMassAverage
Parameters
-----------
cfg: configParser
configuration object with
- nCellsResample: int, resampling length is given by nCellsResample*demCellSize
dem: dict
dem dict
avaProfile: dict
profile to be resampled
Returns
--------
avaProfile: dict
resampled path profile
"""
resampleDistance = cfg.getfloat('nCellsResample') * dem['header']['cellsize']
indFirst = avaProfile['indStartMassAverage']
indEnd = avaProfile['indEndMassAverage']
s0 = avaProfile['s'][indFirst]
sEnd = avaProfile['s'][indEnd]
avaProfile, _ = gT.prepareLine(dem, avaProfile, distance=resampleDistance, Point=None)
# make sure we get the good start and end point... prepareLine might make a small error on the s coord
indFirst = np.argwhere(avaProfile['s'] >= s0 - resampleDistance/3)[0][0]
# look for the first point in the extension and take the one before
indEnd = np.argwhere(avaProfile['s'] >= sEnd + resampleDistance/3)[0][0]-1
avaProfile['indStartMassAverage'] = indFirst
avaProfile['indEndMassAverage'] = indEnd
return avaProfile
[docs]def saveSplitAndPath(avalancheDir, simDFrow, splitPoint, avaProfileMass, dem):
"""
Save avaPath and split point to directory
Parameters
-----------
avalancheDir: pathlib
avalanche directory pathlib path
simDFrow: pandas series
row of the siulation dataframe coresponding to the current simulation analyzed
splitPoint: dict
point dictionary
avaProfileMass: dict
line dictionary
dem: dict
com1DFA simulation dictionary
Returns
--------
avaProfile: dict
resampled path profile
"""
# put path back in original location
if splitPoint != '':
splitPoint['x'] = splitPoint['x'] + dem['originalHeader']['xllcenter']
splitPoint['y'] = splitPoint['y'] + dem['originalHeader']['yllcenter']
avaProfileMass['x'] = avaProfileMass['x'] + dem['originalHeader']['xllcenter']
avaProfileMass['y'] = avaProfileMass['y'] + dem['originalHeader']['yllcenter']
# get projection from release shp layer
simName = simDFrow['simName']
relName = simName.split('_')[0]
inProjection = pathlib.Path(avalancheDir, 'Inputs', 'REL', relName + '.prj')
# save profile in Inputs
pathAB = pathlib.Path(avalancheDir, 'Outputs', 'DFAPath', 'massAvgPath_%s_AB_aimec' % simName)
name = 'massAvaPath'
shpConv.writeLine2SHPfile(avaProfileMass, name, pathAB)
if inProjection.is_file():
shutil.copy(inProjection, pathAB.with_suffix('.prj'))
else:
message = ('No projection layer for shp file %s' % inProjection)
log.warning(message)
log.info('Saved path to: %s', pathAB)
if splitPoint != '':
splitAB = pathlib.Path(avalancheDir, 'Outputs', 'DFAPath', 'splitPointParabolicFit_%s_AB_aimec' % simName)
name = 'parabolaSplitPoint'
shpConv.writePoint2SHPfile(splitPoint, name, splitAB)
if inProjection.is_file():
shutil.copy(inProjection, splitAB.with_suffix('.prj'))
log.info('Saved split point to: %s', splitAB)
[docs]def weightedAvgAndStd(values, weights):
"""
Return the weighted average and standard deviation.
values, weights -- Numpy ndarrays with the same shape.
"""
average = np.average(values, weights=weights)
# Fast and numerically precise:
variance = np.average((values-average)**2, weights=weights)
return (average, math.sqrt(variance))
[docs]def appendAverageStd(propList, avaProfile, particles, weights, naming=''):
""" append averaged to path
Parameters
-----------
propList: list
list of properties to average and append
avaProfile: dict
path
particles: dict
particles dict
weights: numpy array
array of weights (same size as particles)
naming: list
optional - list of properties at source (if different than final naming in avaProfile)
Returns
--------
avaProfile: dict
averaged profile
"""
propListNames = naming if naming != '' else propList
for prop, propName in zip(propList, propListNames):
avg, std = weightedAvgAndStd(particles[propName], weights)
avaProfile[prop] = np.append(avaProfile[prop], avg)
avaProfile[prop + 'std'] = np.append(avaProfile[prop + 'std'], std)
return avaProfile