Source code for ana5Utils.DFAPathGeneration

"""
    Tools for generating an avalanche path from a DFA simulation
"""

# Load modules
import math
import heapq
import numpy as np
import logging
import pathlib
import shutil
from scipy.ndimage import distance_transform_edt

# Local imports
from avaframe.in1Data import getInput as gI
import avaframe.in2Trans.rasterUtils as IOf
import avaframe.in2Trans.shpConversion as shpConv
from avaframe.in3Utils import cfgUtils
from avaframe.in3Utils import fileHandlerUtils as fU
from avaframe.in3Utils import cfgHandling
import avaframe.in3Utils.initializeProject as initProj
import avaframe.in3Utils.geoTrans as gT
import avaframe.com1DFA.particleTools as particleTools
import avaframe.com1DFA.DFAtools as DFAtls
from avaframe.com1DFA import com1DFA
import avaframe.out3Plot.outDebugPlots as debPlot
from avaframe.out3Plot import outCom3Plots

# create local logger
# change log level in calling module to DEBUG to see log messages
log = logging.getLogger(__name__)
cfgAVA = cfgUtils.getGeneralConfig()
debugPlot = cfgAVA['FLAGS'].getboolean('debugPlot')

[docs]def generatePathAndSplitpoint(avalancheDir, cfgDFAPath, cfgMain, runDFAModule): """ Parameters: avalancheDir (str): path to the avalanche directory cfgDFAPath (configParser object): DFAPath configuration cfgMain (configParser object): main avaframe configuration runDFAModule (bool): whether to run the DFA module (True) or load existing results (False) Returns ------- massAvgPath: pathlib file path to the mass-averaged path result saved as a shapefile splitPoint: pathlib file path to the split point result saved as a shapefile """ if runDFAModule: # call DFA module to perform simulations with overrides from DFAPath config # Clean avalanche directory of old work and output files from module initProj.cleanModuleFiles(avalancheDir, com1DFA, deleteOutput=True) # create and read the default com1DFA config (no local is read) com1DFACfg = cfgUtils.getModuleConfig(com1DFA, avalancheDir, toPrint=False, onlyDefault=cfgDFAPath['com1DFA_com1DFA_override'].getboolean( 'defaultConfig')) # and override with settings from DFAPath config com1DFACfg, cfgDFAPath = cfgHandling.applyCfgOverride(com1DFACfg, cfgDFAPath, com1DFA, addModValues=False) outDir = pathlib.Path(avalancheDir, 'Outputs', 'ana5Utils', 'DFAPath') fU.makeADir(outDir) # write configuration to file for documentation com1DFACfgFile = outDir / 'com1DFAPathGenerationCfg.ini' with open(com1DFACfgFile, 'w') as configfile: com1DFACfg.write(configfile) # call com1DFA and perform simulations dem, plotDict, reportDictList, simDF = com1DFA.com1DFAMain(cfgMain, cfgInfo=com1DFACfg) else: # read existing simulation results # read simulation dem demOri = gI.readDEM(avalancheDir) dem = com1DFA.setDEMoriginToZero(demOri) dem['originalHeader'] = demOri['header'].copy() # load DFA results (use runCom1DFA to generate these results for example) # here is an example with com1DFA but another DFA computational module can be used # as long as it produces some pta, particles or FT, FM and FV results # create dataFrame of results (read DFA data) simDF, _ = cfgUtils.readAllConfigurationInfo(avalancheDir) if simDF is None: message = "Did not find any com1DFA simulations in %s/Outputs/com1DFA/" % avalancheDir log.error(message) raise FileExistsError(message) # the peak flow thickness fields are only needed when the path is extended to the deposit # front; parse the peak files once here instead of re-reading them for every simulation extendToFront = cfgDFAPath['PATH'].getint('extBottomOption', fallback=0) == 1 peakFilesDF = None if extendToFront: peakFilesDir = pathlib.Path(avalancheDir, 'Outputs', 'com1DFA', 'peakFiles') peakFilesDF = fU.makeSimDF(peakFilesDir, avaDir=avalancheDir) for simName, simDFrow in simDF.iterrows(): log.info('Computing avalanche path from simulation: %s', simName) pathFromPart = cfgDFAPath['PATH'].getboolean('pathFromPart') resampleDistance = cfgDFAPath['PATH'].getfloat('nCellsResample') * dem['header']['cellsize'] # peak flow thickness field, only needed when extending the path to the deposit front fieldPFT = None if extendToFront: fieldPFT = readPeakFT(peakFilesDF, simName) # get the mass average path avaProfileMass, particlesIni = generateMassAveragePath(avalancheDir, pathFromPart, simName, dem, addVelocityInfo=cfgDFAPath['PATH'].getboolean('addVelocityInfo')) avaProfileMass, _ = gT.prepareLine(dem, avaProfileMass, distance=resampleDistance, Point=None) avaProfileMass['indStartMassAverage'] = 1 avaProfileMass['indEndMassAverage'] = np.size(avaProfileMass['x']) # make the parabolic fit parabolicFit = getParabolicFit(cfgDFAPath['PATH'], avaProfileMass, dem) # here the avaProfileMass given in input is overwritten and returns only an x, y, z extended profile avaProfileMass = extendDFAPath(cfgDFAPath['PATH'], avaProfileMass, dem, particlesIni, fieldPFT=fieldPFT) # resample path and keep track of start and end of mass averaged part avaProfileMass = resamplePath(cfgDFAPath['PATH'], dem, avaProfileMass) # get split point splitPoint = getSplitPoint(cfgDFAPath['PATH'], avaProfileMass, parabolicFit) # make analysis and generate plots _ = outCom3Plots.generateCom1DFAPathPlot(avalancheDir, cfgDFAPath['PATH'], avaProfileMass, dem, parabolicFit, splitPoint, simName) # now save the path and split point as shapefiles avaPath,splitPoint = saveSplitAndPath(avalancheDir, simDFrow, splitPoint, avaProfileMass, dem) return avaPath, splitPoint
[docs]def generateMassAveragePath(avalancheDir, pathFromPart, simName, dem, addVelocityInfo=False, flagAvaDir=True, comModule='com1DFA'): """ extract path from fields or particles Parameters ----------- avalancheDir: pathlib avalanche directory pathlib path pathFromPart: boolean compute path from particles if True, from fields (FT, FM, FV) if False simName: str simulation name dem: dict com1DFA simulation dictionary addVelocityInfo: boolean True to add (u2, ekin, totEKin) to result Will only work if the particles (ux, uy, uz) exist or if 'FV' exists flagAvaDir: bool if True avalancheDir corresponds to an avalanche directory and data is read from avaDir/Outputs/comModule/particles or avaDir/Outputs/comModule/peakFiles depending on if pathFromPart is True or False comModule: str module that computed the particles or fields Returns -------- avaProfileMass: dict mass averaged profile (x, y, z, 's') if addVelocityInfo is True, kinetic energy and velocity information are added to the avaProfileMass dict (u2, ekin, totEKin) particlesIni: dict x, y coord of the initial particles or flow thickness field """ if pathFromPart: particlesList, timeStepInfo = particleTools.readPartFromPickle(avalancheDir, simName=simName, flagAvaDir=True, comModule='com1DFA') particlesIni = particlesList[0] log.info('Using particles to generate avalanche path profile') # postprocess to extract path and energy line avaProfileMass = getMassAvgPathFromPart(particlesList, addVelocityInfo=addVelocityInfo) else: particlesList = '' # read field fieldName = ['FT', 'FM'] if addVelocityInfo: fieldName.append('FV') fieldsList, fieldHeader, timeList = com1DFA.readFields(avalancheDir, fieldName, simName=simName, flagAvaDir=True, comModule='com1DFA') # get fields header ncols = fieldHeader['ncols'] nrows = fieldHeader['nrows'] csz = fieldHeader['cellsize'] # we want the origin to be in (0, 0) as it is in the avaProfile that comes in X, Y = gT.makeCoordinateGrid(0, 0, csz, ncols, nrows) indNonZero = np.where(fieldsList[0]['FT'] > 0) # convert this data in a particles style (dict with x, y, z info) particlesIni = {'x': X[indNonZero], 'y': Y[indNonZero]} particlesIni, _ = gT.projectOnRaster(dem, particlesIni) log.info('Using fields to generate avalanche path profile') # postprocess to extract path and energy line avaProfileMass = getMassAvgPathFromFields(fieldsList, fieldHeader, dem) return avaProfileMass, particlesIni
[docs]def getMassAvgPathFromPart(particlesList, addVelocityInfo=False): """ compute mass averaged path from particles Also returns the averaged velocity and kinetic energy associated If addVelocityInfo is True, information about velocity and kinetic energy is computed Parameters ----------- particlesList: list list of particles dict addVelocityInfo: boolean True to add (u2, ekin, totEKin) to result Returns -------- avaProfileMass: dict mass averaged profile (x, y, z, 's', 'sCor') if addVelocityInfo is True, kinetic energy and velocity information are added to the avaProfileMass dict (u2, ekin, totEKin) """ propList = ['x', 'y', 'z', 's', 'sCor'] propListPart = ['x', 'y', 'z', 'trajectoryLengthXY', 'trajectoryLengthXYCor'] avaProfileMass = {} # do we have velocity info? if addVelocityInfo: propList.append('u2') propList.append('ekin') propListPart.append('u2') propListPart.append('ekin') avaProfileMass['totEKin'] = np.empty((0, 1)) # initialize other properties for prop in propList: avaProfileMass[prop] = np.empty((0, 1)) avaProfileMass[prop + 'std'] = np.empty((0, 1)) # loop on each particle dictionary (ie each time step saved) for particles in particlesList: if particles['nPart'] > 0: m = particles['m'] if addVelocityInfo: ux = particles['ux'] uy = particles['uy'] uz = particles['uz'] u = DFAtls.norm(ux, uy, uz) u2Array = u*u kineticEneArray = 0.5*m*u2Array particles['u2'] = u2Array particles['ekin'] = kineticEneArray # mass-averaged path avaProfileMass = appendAverageStd(propList, avaProfileMass, particles, m, naming=propListPart) if addVelocityInfo: avaProfileMass['totEKin'] = np.append(avaProfileMass['totEKin'], np.nansum(kineticEneArray)) return avaProfileMass
[docs]def getMassAvgPathFromFields(fieldsList, fieldHeader, dem): """ compute mass averaged path from fields Also returns the averaged velocity and kinetic energy associated The dem and fieldsList (FT, FM and FV) need to have identical dimensions and cell size. If FV is not provided, information about velocity and kinetic energy is not computed Parameters ----------- fieldsList: list time sorted list of fields dict fieldHeader: dict field header dict dem: dict dem dict Returns -------- avaProfileMass: dict mass averaged profile (x, y, z, 's') if 'FV' in fieldsList, kinetic energy and velocity information are added to the avaProfileMass dict (u2, ekin, totEKin) """ # get DEM demRaster = dem['rasterData'] # get fields header ncols = fieldHeader['ncols'] nrows = fieldHeader['nrows'] xllc = fieldHeader['xllcenter'] yllc = fieldHeader['yllcenter'] csz = fieldHeader['cellsize'] X, Y = gT.makeCoordinateGrid(xllc, yllc, csz, ncols, nrows) propList = ['x', 'y', 'z'] avaProfileMass = {} # do we have velocity info? addVelocityInfo = False if 'FV' in fieldsList[0]: propList.append('u2') propList.append('ekin') avaProfileMass['totEKin'] = np.empty((0, 1)) addVelocityInfo = True # initialize other properties for prop in propList: avaProfileMass[prop] = np.empty((0, 1)) avaProfileMass[prop + 'std'] = np.empty((0, 1)) # loop on each field dictionary (ie each time step saved) for field in fieldsList: # find cells with snow nonZeroIndex = np.where(field['FT'] > 0) xArray = X[nonZeroIndex] yArray = Y[nonZeroIndex] zArray, _ = gT.projectOnGrid(xArray, yArray, demRaster, csz=csz, xllc=xllc, yllc=yllc) mArray = field['FM'][nonZeroIndex] particles = {'x': xArray, 'y': yArray, 'z': zArray} if addVelocityInfo: uArray = field['FV'][nonZeroIndex] u2Array = uArray*uArray kineticEneArray = 0.5*mArray*u2Array particles['u2'] = u2Array particles['ekin'] = kineticEneArray # mass-averaged path avaProfileMass = appendAverageStd(propList, avaProfileMass, particles, mArray) if addVelocityInfo: avaProfileMass['totEKin'] = np.append(avaProfileMass['totEKin'], np.nansum(kineticEneArray)) avaProfileMass['x'] = avaProfileMass['x'] - xllc avaProfileMass['y'] = avaProfileMass['y'] - yllc # compute s avaProfileMass = gT.computeS(avaProfileMass) return avaProfileMass
[docs]def readPeakFT(peakFilesDF, simName): """ get the peak flow thickness field (pft) of one simulation from the peak file dataframe Parameters ----------- peakFilesDF: pandas DataFrame peak files of all simulations (from fU.makeSimDF), parsed once for the whole run simName: str simulation name Returns -------- fieldPFT: numpy array peak flow thickness raster (same grid as the simulation dem), None if no pft peak field is available for this simulation """ # the simulation can be identified by its full name or by its hash (the index of the # configuration dataframe iterated in generatePathAndSplitpoint is the hash) isSim = (peakFilesDF['simName'] == simName) | (peakFilesDF['simID'] == simName) index = peakFilesDF.index[isSim & (peakFilesDF['resType'] == 'pft')] if len(index) == 0: log.warning('No pft peak field found for simulation %s' % simName) return None return IOf.readRaster(peakFilesDF.loc[index[0], 'files'])['rasterData']
[docs]def extendDFAPath(cfg, avaProfile, dem, particlesIni, fieldPFT=None): """ extend the DFA path at the top and bottom avaProfile with x, y, z, s information Parameters ----------- cfg: configParser configuration object with: - extTopOption: int, how to extend towards the top? 0 for heighst point method, a for largest runout method - extBottomOption: int, how to extend towards the bottom? 0 for the straight-line extrapolation method, 1 for the least-cost extension to the deposit front (requires fieldPFT) - nCellsResample: int, resampling length is given by nCellsResample*demCellSize - nCellsMinExtend: int, when extending towards the bottom, take points at more than nCellsMinExtend*demCellSize from last point to get the direction - nCellsMaxExtend: int, when extending towards the bottom, take points at less than nCellsMaxExtend*demCellSize from last point to get the direction - factBottomExt: float, extend the profile from factBottomExt*sMax avaProfile: dict profile to be extended dem: dict dem dict particlesIni: dict initial particles dict fieldPFT: numpy array, optional peak flow thickness field on the dem grid, only used if extBottomOption = 1 Returns -------- avaProfile: dict extended profile at top and bottom (x, y, z). """ # resample the profile resampleDistance = cfg.getfloat('nCellsResample') * dem['header']['cellsize'] avaProfile, _ = gT.prepareLine(dem, avaProfile, distance=resampleDistance, Point=None) avaProfile = extendProfileTop(cfg.getint('extTopOption'), particlesIni, avaProfile) if cfg.getint('extBottomOption', fallback=0) == 1: if fieldPFT is None: log.warning('extBottomOption is 1 but no peak flow thickness field was provided, ' 'falling back to the straight-line bottom extension') avaProfile = extendProfileBottom(cfg, dem, avaProfile) else: avaProfile = extendProfileToFront(cfg, dem, avaProfile, fieldPFT) else: avaProfile = extendProfileBottom(cfg, dem, avaProfile) return avaProfile
[docs]def extendProfileTop(extTopOption, particlesIni, profile): """ extend the DFA path at the top (release) Either towards the highest point in particlesIni (extTopOption = 0) or the point leading to the longest runout (extTopOption = 1) Parameters ----------- extTopOption: int decide how to extend towards the top if 0, extrapolate towards the highest point in the release if 1, extrapolate towards the point leading to the lonest runout particlesIni: dict initial particles dict profile: dict profile to extend Returns -------- profile: dict extended profile """ if extTopOption == 0: # get highest particle indTop = np.argmax(particlesIni['z']) xExtTop = particlesIni['x'][indTop] yExtTop = particlesIni['y'][indTop] zExtTop = particlesIni['z'][indTop] dx = xExtTop - profile['x'][0] dy = yExtTop - profile['y'][0] ds = np.sqrt(dx**2 + dy**2) elif extTopOption == 1: # get point with the most important runout gain # get first particle of the path xFirst = profile['x'][0] yFirst = profile['y'][0] zFirst = profile['z'][0] # get last particle of the path sLast = profile['s'][-1] zLast = profile['z'][-1] # compute runout angle for averaged path tanAngle = (zFirst-zLast)/sLast # compute ds dx = particlesIni['x'] - xFirst dy = particlesIni['y'] - yFirst ds = np.sqrt(dx**2 + dy**2) # compute dz dz = particlesIni['z'] - zFirst # remove the elevation needed to match the runout angle dz1 = dz - tanAngle * ds # get the particle with the highest potential indTop = np.argmax(dz1) xExtTop = particlesIni['x'][indTop] yExtTop = particlesIni['y'][indTop] zExtTop = particlesIni['z'][indTop] ds = ds[indTop] # extend profile profile['x'] = np.append(xExtTop, profile['x']) profile['y'] = np.append(yExtTop, profile['y']) profile['z'] = np.append(zExtTop, profile['z']) profile['s'] = np.append(0, profile['s'] + ds) if debugPlot: debPlot.plotPathExtTop(profile, particlesIni, xFirst, yFirst, zFirst, dz1) return profile
[docs]def extendProfileBottom(cfg, dem, profile): """ extend the DFA path at the bottom (runout area) Find the direction in which to extend considering the last point of the profile and a few previous ones but discarding the ones that are too close (nCellsMinExtend* csz < distFromLast <= nCellsMaxExtend * csz). Extend in this diretion for a distance factBottomExt * length of the path. Parameters ----------- cfg: configParser nCellsMinExtend: int, when extending towards the bottom, take points at more than nCellsMinExtend*demCellSize from last point to get the direction nCellsMaxExtend: int, when extending towards the bottom, take points at less than nCellsMaxExtend*demCellSize from last point to get the direction factBottomExt: float, extend the profile from factBottomExt*sMax dem: dict dem dict profile: dict profile to extend Returns -------- profile: dict extended profile """ header = dem['header'] csz = header['cellsize'] zRaster = dem['rasterData'] # get last point xLast = profile['x'][-1] yLast = profile['y'][-1] sLast = profile['s'][-1] # compute distance from last point: r = DFAtls.norm(profile['x']-xLast, profile['y']-yLast, 0) # find the previous points extendMinDistance = cfg.getfloat('nCellsMinExtend') * csz extendMaxDistance = cfg.getfloat('nCellsMaxExtend') * csz pointsOfInterestLast = np.where((r < extendMaxDistance) & (r > extendMinDistance))[0] xInterest = profile['x'][pointsOfInterestLast] yInterest = profile['y'][pointsOfInterestLast] # check if points are found to compute direction of extension if len(xInterest) > 0: # find the direction in which we need to extend the path vDirX = xLast - xInterest vDirY = yLast - yInterest vDirX, vDirY, vDirZ = DFAtls.normalize(np.array([vDirX]), np.array([vDirY]), 0*np.array([vDirY])) vDirX = np.sum(vDirX) vDirY = np.sum(vDirY) vDirZ = np.sum(vDirZ) vDirX, vDirY, vDirZ = DFAtls.normalize(np.array([vDirX]), np.array([vDirY]), np.array([vDirZ])) # extend in this direction factExt = cfg.getfloat('factBottomExt') gamma = factExt * sLast / np.sqrt(vDirX**2 + vDirY**2) xExtBottom = np.array([xLast + gamma * vDirX]) yExtBottom = np.array([yLast + gamma * vDirY]) # project on DEM zExtBottom, _ = gT.projectOnGrid(xExtBottom, yExtBottom, zRaster, csz=csz) # Dicothomie method to find the last point on the extention and on the dem if np.isnan(zExtBottom): factExt = factExt/2 stepSize = factExt isOut = True else: isOut = False stepSize = 0 count = 0 # remember last point found inside factLast = 0 while count < cfg.getint('maxIterationExtBot') and stepSize * sLast > cfg.getint('nBottomExtPrecision')*csz: count = count + 1 gamma = factExt * sLast / np.sqrt(vDirX**2 + vDirY**2) xExtBottom = np.array([xLast + gamma * vDirX]) yExtBottom = np.array([yLast + gamma * vDirY]) # project on DEM zExtBottom, _ = gT.projectOnGrid(xExtBottom, yExtBottom, zRaster, csz=csz) stepSize = stepSize/2 if np.isnan(zExtBottom): factExt = factExt - stepSize isOut = True else: # remember last point found inside factLast = factExt factExt = factExt + stepSize isOut = False if isOut: # the last iteration is not in the domain, fall back to last point in domain factExt = factLast gamma = factExt * sLast / np.sqrt(vDirX**2 + vDirY**2) xExtBottom = np.array([xLast + gamma * vDirX]) yExtBottom = np.array([yLast + gamma * vDirY]) # project on DEM zExtBottom, _ = gT.projectOnGrid(xExtBottom, yExtBottom, zRaster, csz=csz) log.info('found extention after %d iterations, precision is %.2f m' % (count, stepSize * sLast)) # extend profile profile['x'] = np.append(profile['x'], xExtBottom) profile['y'] = np.append(profile['y'], yExtBottom) profile['z'] = np.append(profile['z'], zExtBottom) profile['s'] = np.append(profile['s'], sLast + np.sqrt((xLast-xExtBottom)**2 + (yLast-yExtBottom)**2)) else: log.warning('Path not extended at bottom as no point of interest for computing direction \ of where to extend path is found') if debugPlot: debPlot.plotPathExtBot(profile, xInterest, yInterest, 0*yInterest, xLast, yLast) return profile
[docs]def extendProfileToFront(cfg, dem, profile, fieldPFT): """ extend the DFA path at the bottom to the front of the deposit Locate the front of the deposit in the peak flow thickness field (findFlowFront) and extend the profile to it along a least-cost path over the dem (leastCostPath). In contrast to extendProfileBottom, the extension follows the terrain and the deposit and stops at the front instead of extrapolating a straight line of fixed relative length. If the front cannot be located or reached, the profile falls back to the straight-line extension (extendProfileBottom). Parameters ----------- cfg: configParser configuration object with: - ftThreshold: float, minimum flow thickness (m) for a cell to belong to the flow footprint - lowFrontFraction: float, fraction of the flow elevation range defining the front band - upSlopePenalty: float, cost multiplier on the positive elevation gain of an edge - flowDistPenalty: float, cost multiplier on the distance (m) of a cell from the flow footprint dem: dict dem dict profile: dict profile to extend fieldPFT: numpy array peak flow thickness field (pft) on the same grid as the dem Returns -------- profile: dict extended profile (x, y, z, s) """ header = dem['header'] csz = header['cellsize'] zRaster = dem['rasterData'] if fieldPFT.shape != zRaster.shape: message = 'Peak flow thickness field and dem do not have the same shape' log.error(message) raise AssertionError(message) # get last point xLast = profile['x'][-1] yLast = profile['y'][-1] sLast = profile['s'][-1] # locate the deposit front frontRow, frontCol = findFlowFront(fieldPFT, zRaster, cfg.getfloat('ftThreshold'), cfg.getfloat('lowFrontFraction')) if frontRow is None: log.warning('No flow cell above ftThreshold was found, ' 'falling back to the straight-line bottom extension') return extendProfileBottom(cfg, dem, profile) # cell of the last profile point (profile and field share the dem grid) startRow = min(max(int(round((yLast - header['yllcenter']) / csz)), 0), zRaster.shape[0] - 1) startCol = min(max(int(round((xLast - header['xllcenter']) / csz)), 0), zRaster.shape[1] - 1) cellPath = leastCostPath((startRow, startCol), (frontRow, frontCol), fieldPFT, zRaster, csz, cfg.getfloat('ftThreshold'), cfg.getfloat('upSlopePenalty'), cfg.getfloat('flowDistPenalty')) if len(cellPath) < 2: log.warning('No least-cost path to the deposit front was found, ' 'falling back to the straight-line bottom extension') return extendProfileBottom(cfg, dem, profile) # drop the first cell (the path end itself) and convert to coordinates; the extension # points are cell centers of valid dem cells, so z is read directly from the raster rows, cols = np.array(cellPath[1:]).T xExtBottom = header['xllcenter'] + cols * csz yExtBottom = header['yllcenter'] + rows * csz zExtBottom = zRaster[rows, cols] dx = np.diff(np.append(xLast, xExtBottom)) dy = np.diff(np.append(yLast, yExtBottom)) sExtBottom = sLast + np.cumsum(np.sqrt(dx**2 + dy**2)) log.info('Path extended to the deposit front (%.0f m beyond the mass averaged path end)' % (sExtBottom[-1] - sLast)) # extend profile profile['x'] = np.append(profile['x'], xExtBottom) profile['y'] = np.append(profile['y'], yExtBottom) profile['z'] = np.append(profile['z'], zExtBottom) profile['s'] = np.append(profile['s'], sExtBottom) return profile
[docs]def findFlowFront(fieldPFT, demRaster, ftThreshold, lowFrontFraction): """ locate the front of the deposit in a peak flow thickness field The front is the flow-thickness-weighted centroid of the flow cells lying in the lowest lowFrontFraction of the flow elevation range. The band always contains the lowest flow cell; for a flat deposit it covers the whole footprint, so the front falls back to the centroid of the deposit. If the centroid falls outside the flow footprint (e.g. between two deposit lobes), the front is snapped to the nearest cell of the band. Parameters ----------- fieldPFT: numpy array peak flow thickness field demRaster: numpy array dem raster of the same shape ftThreshold: float minimum flow thickness (m) for a cell to belong to the flow footprint lowFrontFraction: float fraction of the flow elevation range defining the front band Returns -------- frontRow, frontCol: int cell of the front, (None, None) if there is no flow above ftThreshold """ flow = (fieldPFT > ftThreshold) & np.isfinite(demRaster) if not flow.any(): return None, None rows, cols = np.nonzero(flow) elev = demRaster[rows, cols] lowBand = elev <= elev.min() + lowFrontFraction * (elev.max() - elev.min()) weight = fieldPFT[rows, cols] * lowBand # weight.sum() is always positive: the lowest flow cell is in lowBand and, being a flow # cell, carries fieldPFT > ftThreshold >= 0, so no divide-by-zero guard is needed frontRow = int(round(np.sum(rows * weight) / weight.sum())) frontCol = int(round(np.sum(cols * weight) / weight.sum())) if not flow[frontRow, frontCol]: # centroid outside the footprint (e.g. two deposit lobes): snap to the band bandInd = np.flatnonzero(lowBand) iNear = bandInd[np.argmin((rows[bandInd] - frontRow)**2 + (cols[bandInd] - frontCol)**2)] frontRow, frontCol = int(rows[iNear]), int(cols[iNear]) return frontRow, frontCol
[docs]def leastCostPath(startCell, goalCell, fieldPFT, demRaster, csz, ftThreshold, upSlopePenalty, flowDistPenalty): """ Dijkstra least-cost path between two cells of the dem grid The cost of an edge is its horizontal length plus a penalty on the positive elevation gain and a penalty on the distance (m) of the target cell from the flow footprint, so that the path descends along the deposit. Parameters ----------- startCell, goalCell: tuple (row, col) of the start and goal cells fieldPFT: numpy array peak flow thickness field used to build the flow footprint demRaster: numpy array dem raster of the same shape csz: float cell size (m) ftThreshold: float minimum flow thickness (m) for a cell to belong to the flow footprint upSlopePenalty: float cost multiplier on the positive elevation gain of an edge flowDistPenalty: float cost multiplier on the distance (m) of a cell from the flow footprint Returns -------- cellPath: list (row, col) cells from start to goal, empty if the goal is unreachable """ if upSlopePenalty < 0 or flowDistPenalty < 0: message = 'The penalty parameters of the least-cost path must not be negative' log.error(message) raise AssertionError(message) # distance (m) from the flow footprint, used to keep the path on the deposit distToFlow = distance_transform_edt(~(fieldPFT > ftThreshold), sampling=csz) nRows, nCols = demRaster.shape demValid = np.isfinite(demRaster) neighbours = ((-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)) diagDist = csz * math.sqrt(2.) costGrid = np.full((nRows, nCols), np.inf) costGrid[startCell] = 0. previousCell = np.full((nRows, nCols, 2), -1, dtype=np.int64) queue = [(0., int(startCell[0]), int(startCell[1]))] while queue: cost, row, col = heapq.heappop(queue) if (row, col) == (goalCell[0], goalCell[1]): break if cost > costGrid[row, col]: continue for dRow, dCol in neighbours: nRow, nCol = row + dRow, col + dCol if 0 <= nRow < nRows and 0 <= nCol < nCols and demValid[nRow, nCol]: # no diagonal moves across the corner of two nodata cells if dRow and dCol and not (demValid[row, nCol] and demValid[nRow, col]): continue dz = demRaster[nRow, nCol] - demRaster[row, col] edge = ((diagDist if (dRow and dCol) else csz) + max(dz, 0.) * upSlopePenalty + distToFlow[nRow, nCol] * flowDistPenalty) if cost + edge < costGrid[nRow, nCol]: costGrid[nRow, nCol] = cost + edge previousCell[nRow, nCol] = (row, col) heapq.heappush(queue, (cost + edge, nRow, nCol)) if previousCell[goalCell[0], goalCell[1], 0] < 0 and tuple(goalCell) != tuple(startCell): return [] cellPath = [] row, col = int(goalCell[0]), int(goalCell[1]) while row >= 0: cellPath.append((row, col)) row, col = int(previousCell[row, col, 0]), int(previousCell[row, col, 1]) cellPath.reverse() return cellPath
[docs]def getParabolicFit(cfg, avaProfile, dem): """fit a parabola on a set of (s, z) points first and last point match. Last constraint is given by fitOption (see below) Parameters ----------- cfg: configParser configuration object with: fitOption: int how to optimize the fit 0 minimize distance between points 1 match the end slope avaProfile: dict profile to be extended dem: dict dem dict Returns -------- parabolicFit: dict a, b, c coefficients of the parabolic fit (y = a*a*x + b*x + c) """ s = avaProfile['s'] sE = s[-1] z = avaProfile['z'] z0 = z[0] zE = z[-1] # same start and end point, minimize distance between curves if cfg.getfloat('fitOption') == 0: SumNom = np.sum(s*(s-sE)*((zE-z0)/sE*s+z0-z)) SumDenom = s*(s-sE) SumDenom = np.dot(SumDenom, SumDenom) a = - SumNom/SumDenom b = (zE-z0)/sE - a*sE elif cfg.getfloat('fitOption') == 1: angleProf, tmpProf, dsProf = gT.prepareAngleProfile(10, avaProfile, raiseWarning=False) r = avaProfile['s'] - avaProfile['s'][-1] resampleDistance = cfg.getfloat('nCellsSlope') * dem['header']['cellsize'] pointsOfInterestLast = np.where(np.abs(r) < resampleDistance) slope = np.nansum(angleProf[pointsOfInterestLast])/np.size(pointsOfInterestLast) slope = -np.tan(np.radians(slope)) a = (slope*sE + (z0 - zE))/(sE*sE) b = -slope - 2*(z0 - zE)/sE c = z0 parabolicFit = {'a': a, 'b': b, 'c': c} return parabolicFit
[docs]def getSplitPoint(cfg, avaProfile, parabolicFit): """ find the split point corresponding to an avalanche profile, with parabolic fit and the slopeSplitPoint Parameters ----------- cfg: configParser configuration object with - slopeSplitPoint: float, desired slope at split point (in degrees) - dsMin: float, threshold distance [m] for looking for the - split point (at least dsMin meters below split point angle) avaProfile: dict profile to be extended parabolicFit: dict a, b, c coeficients of the parabolic fit (y = a*a*x + b*x + c) Returns -------- splitPoint: dict (x, y, z, zPra, s) at split point location. """ indFirst = avaProfile['indStartMassAverage'] indEnd = avaProfile['indEndMassAverage'] s0 = avaProfile['s'][indFirst] sEnd = avaProfile['s'][indEnd] s = avaProfile['s'] z = avaProfile['z'] sNew = s - s0 zPara = parabolicFit['a']*sNew*sNew+parabolicFit['b']*sNew+parabolicFit['c'] parabolicProfile = {'s': sNew, 'z': zPara} anglePara, tmpPara, dsPara = gT.prepareAngleProfile(cfg.getfloat('slopeSplitPoint'), parabolicProfile, raiseWarning=False) try: indSplitPoint = gT.findAngleProfile(tmpPara, dsPara, cfg.getfloat('dsMin')) splitPoint = {'x': avaProfile['x'][indSplitPoint], 'y': avaProfile['y'][indSplitPoint], 'z': z[indSplitPoint], 'zPara': zPara[indSplitPoint], 's': sNew[indSplitPoint]} except IndexError: noSplitPointFoundMessage = ('Automated split point generation failed as no point where slope is less than %s°' 'was found, setting split point at the top. Correct split point manually.' % cfg.getfloat('slopeSplitPoint')) splitPoint = {'x': avaProfile['x'][0], 'y': avaProfile['y'][0], 'z': z[0], 'zPara': zPara[0], 's': sNew[0], 'isTopSplitPoint': True} log.warning(noSplitPointFoundMessage) if debugPlot: angleProf, tmpProf, dsProf = gT.prepareAngleProfile(cfg.getfloat('slopeSplitPoint'), avaProfile) debPlot.plotFindAngle(avaProfile, angleProf, parabolicProfile, anglePara, s0, sEnd, splitPoint, indSplitPoint) return splitPoint
[docs]def resamplePath(cfg, dem, avaProfile): """ resample the path profile and keep track of indStartMassAverage and indEndMassAverage Parameters ----------- cfg: configParser configuration object with - nCellsResample: int, resampling length is given by nCellsResample*demCellSize dem: dict dem dict avaProfile: dict profile to be resampled Returns -------- avaProfile: dict resampled path profile """ resampleDistance = cfg.getfloat('nCellsResample') * dem['header']['cellsize'] indFirst = avaProfile['indStartMassAverage'] indEnd = avaProfile['indEndMassAverage'] s0 = avaProfile['s'][indFirst] sEnd = avaProfile['s'][indEnd] avaProfile, _ = gT.prepareLine(dem, avaProfile, distance=resampleDistance, Point=None) # make sure we get the good start and end point... prepareLine might make a small error on the s coord indFirst = np.argwhere(avaProfile['s'] >= s0 - resampleDistance/3)[0][0] # look for the first point in the extension and take the one before; if the extension is # shorter than a resample step, the mass averaged part reaches the last point indEndCandidates = np.argwhere(avaProfile['s'] >= sEnd + resampleDistance/3) indEnd = indEndCandidates[0][0]-1 if len(indEndCandidates) > 0 else np.size(avaProfile['s'])-1 avaProfile['indStartMassAverage'] = indFirst avaProfile['indEndMassAverage'] = indEnd return avaProfile
[docs]def saveSplitAndPath(avalancheDir, simDFrow, splitPoint, avaProfileMass, dem): """ Save avaPath and split point to directory Parameters ----------- avalancheDir: pathlib avalanche directory pathlib path simDFrow: pandas series row of the simulation dataframe corresponding to the current simulation analyzed splitPoint: dict point dictionary avaProfileMass: dict line dictionary dem: dict com1DFA simulation dictionary Returns -------- pathAB: pathlib file path to the saved shapefile for the mass-averaged path splitAB: pathlib file path to the saved shapefile for the split point """ # put path back in original location if splitPoint != '': splitPoint['x'] = splitPoint['x'] + dem['originalHeader']['xllcenter'] splitPoint['y'] = splitPoint['y'] + dem['originalHeader']['yllcenter'] avaProfileMass['x'] = avaProfileMass['x'] + dem['originalHeader']['xllcenter'] avaProfileMass['y'] = avaProfileMass['y'] + dem['originalHeader']['yllcenter'] # get projection from release shp layer simName = simDFrow['simName'] relName = cfgUtils.parseSimName(simName)["releaseName"] inProjection = pathlib.Path(avalancheDir, 'Inputs', 'REL', relName + '.prj') # save profile in Inputs pathAB = pathlib.Path(avalancheDir, 'Outputs', 'ana5Utils', 'DFAPath', 'massAvgPath_%s_AB_aimec' % simName) name = 'massAvaPath' shpConv.writeLine2SHPfile(avaProfileMass, name, pathAB) if inProjection.is_file(): shutil.copy(inProjection, pathAB.with_suffix('.prj')) else: message = ('No projection layer for shp file %s' % inProjection) log.warning(message) log.info('Saved path to: %s', pathAB) if splitPoint != '': splitAB = pathlib.Path(avalancheDir, 'Outputs', 'ana5Utils', 'DFAPath', 'splitPointParabolicFit_%s_AB_aimec' % simName) name = 'parabolaSplitPoint' shpConv.writePoint2SHPfile(splitPoint, name, splitAB) if inProjection.is_file(): shutil.copy(inProjection, splitAB.with_suffix('.prj')) log.info('Saved split point to: %s', splitAB) return pathAB,splitAB
[docs]def weightedAvgAndStd(values, weights): """ Return the weighted average and standard deviation. values, weights -- Numpy ndarrays with the same shape. """ average = np.average(values, weights=weights) # Fast and numerically precise: variance = np.average((values-average)**2, weights=weights) return (average, math.sqrt(variance))
[docs]def appendAverageStd(propList, avaProfile, particles, weights, naming=''): """ append averaged to path Parameters ----------- propList: list list of properties to average and append avaProfile: dict path particles: dict particles dict weights: numpy array array of weights (same size as particles) naming: list optional - list of properties at source (if different than final naming in avaProfile) Returns -------- avaProfile: dict averaged profile """ propListNames = naming if naming != '' else propList for prop, propName in zip(propList, propListNames): avg, std = weightedAvgAndStd(particles[propName], weights) avaProfile[prop] = np.append(avaProfile[prop], avg) avaProfile[prop + 'std'] = np.append(avaProfile[prop + 'std'], std) return avaProfile