Source code for ana3AIMEC.ana3AIMEC

"""
    AIMEC post processing workflow
"""

import logging
import numpy as np
import pathlib

# Local imports
import avaframe.ana3AIMEC.aimecTools as aT
import avaframe.in2Trans.ascUtils as IOf
import avaframe.out3Plot.outAIMEC as outAimec

# create local logger
log = logging.getLogger(__name__)


[docs]def mainAIMEC(pathDict, inputsDF, cfg): """ Main logic for AIMEC postprocessing Reads the required files location for ana3AIMEC postprocessing given an avalanche directory Make the domain transformation corresponding to the input avalanche path Transform 2D fields (pressure, flow thickness ...) Analyze transformed rasters and mass Produce plots and report Parameters ---------- pathDict : dict dictionary with paths to dem and lines for Aimec analysis inputsDF : dataFrame dataframe with simulations to analyze and associated path to raster data cfg : configparser configparser with ana3AIMEC settings defined in ana3AIMECCfg.ini Returns ------- rasterTransfo: dict domain transformation information resAnalysisDF: dataFrame results of ana3AIMEC analysis plotDict: dict dictionary for report """ # Extract input config parameters cfgSetup = cfg['AIMECSETUP'] cfgFlags = cfg['FLAGS'] interpMethod = cfgSetup['interpMethod'] log.info('Prepare data for post-processing') # Read input dem demSource = pathDict['demSource'] dem = IOf.readRaster(demSource) # get hash of the reference refSimRowHash = pathDict['refSimRowHash'] # read reference file and raster and config refResultSource = inputsDF.loc[refSimRowHash, cfgSetup['runoutResType']] refRaster = IOf.readRaster(refResultSource) refRasterData = refRaster['rasterData'] refHeader = refRaster['header'] log.debug('Data-file %s analysed and domain transformation done' % demSource) # Make domain transformation log.debug("Creating new deskewed raster and preparing new raster assignment function") log.debug('Analyze and make domain transformation on Data-file %s' % demSource) rasterTransfo = aT.makeDomainTransfo(pathDict, dem, refHeader['cellsize'], cfgSetup) # #################################################### # visualisation # TODO: needs to be moved somewhere else slRaster = aT.transform(refRaster, refResultSource, rasterTransfo, interpMethod) newRasters = {} log.debug("Assigning dem data to deskewed raster") newRasters['newRasterDEM'] = aT.transform(dem, demSource, rasterTransfo, interpMethod) inputData = {} inputData['slRaster'] = slRaster inputData['xyRaster'] = refRasterData inputData['xyHeader'] = refHeader outAimec.visuTransfo(rasterTransfo, inputData, cfgSetup, pathDict) # postprocess reference... contourDict = {} timeMass = None resAnalysisDF, newRasters, timeMass, contourDict = postProcessAIMEC(cfg, rasterTransfo, pathDict, inputsDF, newRasters, timeMass, refSimRowHash, contourDict) # postprocess other simulations for simRowHash, inputsDFrow in inputsDF.iterrows(): if simRowHash != refSimRowHash: resAnalysisDF, newRasters, timeMass, contourDict = postProcessAIMEC(cfg, rasterTransfo, pathDict, resAnalysisDF, newRasters, timeMass, simRowHash, contourDict) pathDict['simRowHash'] = simRowHash # ----------------------------------------------------------- # result visualisation + report # ToDo: should we move this somewere else, this is now just plotting, it should be accessible from outside # ----------------------------------------------------------- plotDict = {} log.info('Visualisation of AIMEC results') # plot the contour lines of all sims for the thresholdValue of runoutResType outAimec.plotContoursTransformed(contourDict, pathDict, rasterTransfo, cfgSetup) if sorted(pathDict['resTypeList']) == sorted(['ppr', 'pft', 'pfv']): outAimec.visuSimple(cfgSetup, rasterTransfo, resAnalysisDF, newRasters, pathDict) if len(resAnalysisDF.index) == 2: if sorted(pathDict['resTypeList']) == sorted(['ppr', 'pft', 'pfv']): plotName = outAimec.visuRunoutComp(rasterTransfo, resAnalysisDF, cfgSetup, pathDict) else: plotName = outAimec.visuRunoutStat(rasterTransfo, inputsDF, resAnalysisDF, newRasters, cfgSetup, pathDict) areaDict = {('area comparison plot ' + resAnalysisDF.loc[k, 'simName']): v for k, v in resAnalysisDF['areasPlot'].to_dict().items()} plotDict['areasPlot'] = {'Aimec area analysis': areaDict} if cfgFlags.getboolean('flagMass'): massDict = {('mass comparison plot ' + resAnalysisDF.loc[k, 'simName']): v for k, v in resAnalysisDF['massPlotName'].to_dict().items()} plotDict['massAnalysisPlot'] = {'Aimec mass analysis': massDict} outAimec.resultVisu(cfgSetup, inputsDF, pathDict, cfgFlags, rasterTransfo, resAnalysisDF) plotDict['slCompPlot'] = {'Aimec comparison of mean and max values along path': {'AIMEC ' + cfgSetup['runoutResType'] + ' analysis': plotName}} log.info('Writing results to file') outAimec.resultWrite(pathDict, cfg, rasterTransfo, resAnalysisDF) return rasterTransfo, resAnalysisDF, plotDict
[docs]def postProcessAIMEC(cfg, rasterTransfo, pathDict, resAnalysisDF, newRasters, timeMass, simRowHash, contourDict): """ Apply domain transformation and analyse result data (for example pressure, thickness, velocity...) Apply the domain tranformation to peak results Analyse them. Calculate runout, Max Peak Values, Average Peak Values... Get mass and entrainement Parameters ---------- cfg: configParser objec parameters for AIMEC analysis rasterTransfo: dict transformation information pathDict : dict dictionary with paths to dem and lines for Aimec analysis resAnalysisDF : dataFrame dataframe with simulations to analyze and associated path to raster data newRasters: dict dictionary containing pressure, velocity and flow thickness rasters after transformation for the reference and the current simulation timeMass: 1D numpy array time array for mass analysis (if flagMass=True, otherwise None) simRowHash: str dataframe hash of the current simulation to analyze contourDict: dict dictionary with one key per sim and its x, y coordinates for contour line of runoutresType for thresholdValue Returns ------- resAnalysisDF: dataFrame input DF with results from Aimec Analysis updated with results from curent simulation: -maxACrossMax: float max max A (A depends on what is in resTypes) -ACrossMax: 1D numpy array max A in each cross section (A depends on what is in resTypes) -ACrossMean: 1D numpy array mean A in each cross section (A depends on what is in resTypes) -xRunout: float x coord of the runout point calculated from the MAX peak result in each cross section (runoutResType provided in the ini file) -yRunout: float y coord of the runout point calculated from the MAX peak result in each cross section (runoutResType provided in the ini file) -sRunout: float projected runout distance calculated from the MAX peak result in each cross section (runoutResType provided in the ini file) -xMeanRunout: float x coord of the runout point calculated from the MEAN peak result in each cross section (runoutResType provided in the ini file) -yMeanRunout: float y coord of the runout point calculated from the MEAN peak result in each cross section (runoutResType provided in the ini file) -sMeanRunout: float projected runout distance calculated from the MEAN peak result in each cross section (runoutResType provided in the ini file) -elevRel: float elevation of the release area (based on first point with peak field > thresholdValue) -deltaH: float elevation fall difference between elevRel and altitude of run-out point -TP: float ref = True sim2 = True -FN: float ref = False sim2 = True -FP: float ref = True sim2 = False -TN: float ref = False sim2 = False if mass analysis is performed -relMass: float release mass -entMass: float entrained mass -finalMass: float final mass -relativMassDiff: float the final mass diff with ref (in %) -growthIndex: float growth index -growthGrad: float growth gradient contourDict: dict dictionary with one key per sim and its x, y coordinates for contour line of runoutresType for thresholdValue - updated with info for current simulation """ cfgSetup = cfg['AIMECSETUP'] cfgFlags = cfg['FLAGS'] interpMethod = cfgSetup['interpMethod'] flagMass = cfgFlags.getboolean('flagMass') refSimRowHash = pathDict['refSimRowHash'] resTypeList = pathDict['resTypeList'] # apply domain transformation log.info('Analyzing data in path coordinate system') for resType in resTypeList: log.debug("Assigning %s data to deskewed raster" % resType) inputFiles = resAnalysisDF.loc[simRowHash, resType] if isinstance(inputFiles, pathlib.PurePath): rasterData = IOf.readRaster(inputFiles) newRaster = aT.transform(rasterData, inputFiles, rasterTransfo, interpMethod) newRasters['newRaster' + resType.upper()] = newRaster if simRowHash == refSimRowHash: newRasters['newRefRaster' + resType.upper()] = newRaster resAnalysisDF[resType + 'CrossMax'] = np.nan resAnalysisDF[resType + 'CrossMax'] = resAnalysisDF[resType + 'CrossMax'].astype(object) resAnalysisDF[resType + 'CrossMean'] = np.nan resAnalysisDF[resType + 'CrossMean'] = resAnalysisDF[resType + 'CrossMax'].astype(object) # add max, min and std values of result fields resAnalysisDF.at[simRowHash, resType + 'FieldMax'] = np.nanmax(rasterData['rasterData']) resAnalysisDF.at[simRowHash, resType + 'FieldMean'] = np.nanmean(rasterData['rasterData']) resAnalysisDF.at[simRowHash, resType + 'FieldStd'] = np.nanstd(rasterData['rasterData']) # analyze all fields resAnalysisDF = aT.analyzeField(simRowHash, rasterTransfo, newRaster, resType, resAnalysisDF) # compute runout based on runoutResType resAnalysisDF = aT.computeRunOut(cfgSetup, rasterTransfo, resAnalysisDF, newRasters, simRowHash) if flagMass: # perform mass analysis fnameMass = resAnalysisDF.loc[simRowHash, 'massBal'] resAnalysisDF, timeMass = aT.analyzeMass(fnameMass, simRowHash, refSimRowHash, resAnalysisDF, time=timeMass) if simRowHash != refSimRowHash: massPlotName = outAimec.visuMass(resAnalysisDF, pathDict, simRowHash, refSimRowHash, timeMass) resAnalysisDF.loc[simRowHash, 'massPlotName'] = massPlotName else: timeMass = None resAnalysisDF, compPlotPath, contourDict = aT.analyzeArea(rasterTransfo, resAnalysisDF, simRowHash, newRasters, cfgSetup, pathDict, contourDict) resAnalysisDF.loc[simRowHash, 'areasPlot'] = compPlotPath return resAnalysisDF, newRasters, timeMass, contourDict
[docs]def aimecRes2ReportDict(resAnalysisDF, reportD, benchD, pathDict): """ Gather aimec results and append them to report the dictionary Parameters ---------- resAnalysisDF : dataFrame dataframe with aimec analysis results reportD: dict report dictionary to be updated benchD: dict benchmark dictionary to be updated pathDict : dict dictionary with refSimRowHash Returns -------- reportD: dict report dictionary updated with the 'Aimec analysis' section benchD: dict benchmark dictionary updated with the 'Aimec analysis' section """ refSimRowHash = pathDict['refSimRowHash'] resAnalysisDFRef = resAnalysisDF.loc[refSimRowHash].squeeze().to_dict() resAnalysisDFComp = resAnalysisDF.loc[resAnalysisDF.index != refSimRowHash].squeeze().to_dict() reportD['Aimec analysis'] = {'type': 'list', 'runout [m]': resAnalysisDFComp['sRunout'], 'max peak pressure [kPa]': resAnalysisDFComp['maxpprCrossMax'], 'max peak flow thickness [m]': resAnalysisDFComp['maxpftCrossMax'], 'max peak flow velocity [ms-1]': resAnalysisDFComp['maxpfvCrossMax']} benchD['Aimec analysis'] = {'type': 'list', 'runout [m]': resAnalysisDFRef['sRunout'], 'max peak pressure [kPa]': resAnalysisDFRef['maxpprCrossMax'], 'max peak flow thickness [m]': resAnalysisDFRef['maxpftCrossMax'], 'max peak flow velocity [ms-1]': resAnalysisDFRef['maxpfvCrossMax']} # add mass info if "relMass" in resAnalysisDF.columns: reportD['Aimec analysis'].update({'Initial mass [kg]': resAnalysisDFComp['relMass']}) reportD['Aimec analysis'].update({'Final mass [kg]': resAnalysisDFComp['finalMass']}) reportD['Aimec analysis'].update({'Entrained mass [kg]': resAnalysisDFComp['entMass']}) benchD['Aimec analysis'].update({'Initial mass [kg]': resAnalysisDFRef['relMass']}) benchD['Aimec analysis'].update({'Final mass [kg]': resAnalysisDFRef['finalMass']}) benchD['Aimec analysis'].update({'Entrained mass [kg]': resAnalysisDFRef['entMass']}) return reportD, benchD
[docs]def aimecTransform(rasterTransfo, particle, dem): """ Adding s projected and l projected to the particle dictionary Parameters ---------- rasterTransfo: dict domain transformation information Returns ------- particle: dict particle dictionary with s grid and l grid added """ lList = [] sList = [] xllcenter = dem['header']['xllcenter'] yllcenter = dem['header']['yllcenter'] for x, y in zip(particle['x'], particle['y']): # calculating the distance between the particle position and the grid points distance = np.sqrt((x+xllcenter-rasterTransfo['gridx'])**2 + (y+yllcenter-rasterTransfo['gridy'])**2) # Finding the coordinates of the grid point which minimizes the difference (sIndex, lIndex) = np.unravel_index(np.argmin(distance, axis=None), distance.shape) lList.append(rasterTransfo['l'][lIndex]) sList.append(rasterTransfo['s'][sIndex]) particle['lAimec'] = lList particle['sAimec'] = sList return(particle)