Source code for ana4Stats.probAna

"""

This is a simple function for computing a probability map of all peak files of one parameter that
exceed a particular threshold

"""

import numpy as np
import logging
import pathlib

import avaframe.out3Plot.plotUtils as pU
from avaframe.in3Utils import cfgUtils
from avaframe.in3Utils import cfgHandling
from avaframe.in3Utils import fileHandlerUtils as fU
import avaframe.in2Trans.ascUtils as IOf
import avaframe.in1Data.computeFromDistribution as cP
import avaframe.com1DFA.deriveParameterSet as dP
from avaframe.in3Utils import geoTrans as gT


# create local logger
# change log level in calling module to DEBUG to see log messages
log = logging.getLogger(__name__)


[docs]def createComModConfig(cfgProb, avaDir, modName, cfgFileMod=''): """ create configuration file for performing sims with modName com module Parameters ----------- cfgProb: configParser object configuration settings avaDir: pathlib path path to avalanche directory modName: module computational module cfgFileMod: str path to cfgFile for computational module name - optional Returns ------- cfgFiles: dict dictionary of paths to newly generated configuration files for com module for all parameters """ # setup where configuration file is saved outDir = avaDir / 'Outputs' fU.makeADir(outDir) # loop over all parameters for performing parameter variation variationsDict = makeDictFromVars(cfgProb['PROBRUN']) cfgFiles = {} for varName in variationsDict: # define configuration files # get filename of module modNameString = str(pathlib.Path(modName.__file__).stem) cfgFile = outDir / ('probRun%sCfg%s.ini' % (modNameString, varName)) # use cfgFile, local com module settings or default settings if local not available modCfg = cfgUtils.getModuleConfig(modName, fileOverride=cfgFileMod) modCfg = updateCfgRange(modCfg, cfgProb, varName, variationsDict[varName]) with open(cfgFile, 'w') as configfile: modCfg.write(configfile) # append cfgFiles to list cfgFiles[varName] = {'cfgFile': cfgFile} return cfgFiles
[docs]def updateCfgRange(cfg, cfgProb, varName, varDict): """ update cfg with a range for parameters in cfgProb Parameters ----------- cfg: configparser object configuration object to update cfgProb: configParser object configparser object with info on update varName: str name of parameter used for variation varDict: dict dictionary with variationValue and numberOfSteps for varName Returns -------- cfg: configParser updated configuration object """ # set reference values of parameters - override values in com module configurations varParList = cfgProb['PROBRUN']['varParList'].split('|') # also for the other parameters that are varied subsequently # first check if no parameter variation in provided for these parameters in the com module ini # if so - errror for varPar in varParList: if any(chars in cfg['GENERAL'][varPar] for chars in ['|', '$', ':']): message = ('Only one reference value is allowed for %s: but %s is given' % (varPar, cfg['GENERAL'][varPar])) log.error(message) raise AssertionError(message) elif varPar in ['entTh', 'relTh', 'secondaryRelTh']: thPercentVariation = varPar + 'PercentVariation' thRangeVariation = varPar + 'RangeVariation' thDistVariation = varPar + 'DistVariation' if cfg['GENERAL'][thPercentVariation] != '' or cfg['GENERAL'][thRangeVariation] != '' or cfg['GENERAL'][thDistVariation] != '': message = ('Only one reference value is allowed for %s: but %s %s or %s %s is given' % (varPar, thPercentVariation, cfg['GENERAL'][thPercentVariation], thRangeVariation, cfg['GENERAL'][thRangeVariation])) log.error(message) raise AssertionError(message) # this is now done for parameter VARNAME from inputs # get range, steps and reference value of parameter to perform variations valVariation = varDict['variationValue'] valSteps = varDict['numberOfSteps'] valVal = cfg['GENERAL'][varName] if cfgProb['PROBRUN']['variationType'].lower() == 'normaldistribution': # get computeFromDistribution configuration and apply override cfgDist = cfgUtils.getModuleConfig(cP, fileOverride='', modInfo=False, toPrint=False, onlyDefault=cfgProb['computeFromDistribution_override'].getboolean('defaultConfig')) cfgDist, cfgProb = cfgHandling.applyCfgOverride(cfgDist, cfgProb, cP, addModValues=False) # set variation in configuration if varName in ['relTh', 'entTh', 'secondaryRelTh']: # if variation using normal distribution if cfgProb['PROBRUN']['variationType'].lower() == 'normaldistribution': parName = varName + 'DistVariation' if valVariation == '': valVariation = '-' parValue = (cfgProb['PROBRUN']['variationType'] + '$' + valSteps + '$' + valVariation + '$' + cfgDist['GENERAL']['minMaxInterval'] + '$' + cfgDist['GENERAL']['buildType'] + '$' + cfgDist['GENERAL']['support']) # if variation using percent elif cfgProb['PROBRUN']['variationType'].lower() == 'percent': parName = varName + 'PercentVariation' parValue = valVariation + '$' + valSteps # if variation using absolute range elif cfgProb['PROBRUN']['variationType'].lower() == 'range': parName = varName + 'RangeVariation' parValue = valVariation + '$' + valSteps else: message = ('Variation Type: %s - not a valid option, options are: percent, range, normaldistribution' % cfgProb['PROBRUN']['variationType']) log.error(message) raise AssertionError(message) # write parameter variation for varName in config file cfg['GENERAL'][parName] = parValue cfg['GENERAL']['addStandardConfig'] = cfgProb['PROBRUN']['addStandardConfig'] else: # set variation if cfgProb['PROBRUN']['variationType'].lower() == 'normaldistribution': cfgDist = {'sampleSize': valSteps, 'mean': valVal, 'buildType': cfgProb['computeFromDistribution_override']['buildType'], 'buildValue': valVariation, 'minMaxInterval': cfgDist['GENERAL']['minMaxInterval'], 'support': cfgDist['GENERAL']['support']} _, valValues, _, _ = cP.extractNormalDist(cfgDist) cfg['GENERAL'][varName] = dP.writeToCfgLine(valValues) elif cfgProb['PROBRUN']['variationType'].lower() == 'percent': cfg['GENERAL'][varName] = '%s$%s$%s' % (valVal, valVariation, valSteps) valValues = fU.splitIniValueToArraySteps(cfg['GENERAL'][varName]) elif cfgProb['PROBRUN']['variationType'].lower() == 'range': if '-' in valVariation or '+' in valVariation: valStart = str(float(valVal) + float(valVariation)) valStop = float(valVal) else: valStart = str(float(valVal) - float(valVariation)) valStop = str(float(valVal) + float(valVariation)) cfg['GENERAL'][varName] = '%s:%s:%s' % (valStart, valStop, valSteps) valValues = np.linspace(float(valStart), float(valStop), int(valSteps)) else: message = ('Variation Type: %s - not a valid option, options are: percent, range, normaldistribution' % cfgProb['PROBRUN']['variationType']) log.error(message) raise AssertionError(message) # if reference value is not in variation - add reference values if cfgProb['PROBRUN'].getboolean('addStandardConfig'): if np.isclose(valValues, float(valVal), atol=1.e-7, rtol=1.e-8).any(): log.info('Reference value is in variation for %s' % (varName)) else: log.info('Reference value of %s: %s is added' % (varName, str(valVal))) cfg['GENERAL'][varName] = cfg['GENERAL'][varName] + '&' + str(valVal) # add a scenario Name to VISUALISATION cfg['VISUALISATION']['scenario'] = varName return cfg
[docs]def probAnalysis(avaDir, cfg, module, parametersDict='', inputDir=''): """ Compute propability map of a given set of simulation result exceeding a particular threshold and save to outDir Parameters ---------- avaDir: str path to avalanche directory cfg : dict configuration read from ini file of probAna function module computational module that was used to run the simulations parametersDict: dict dictionary with simulation parameters to filter simulations inputDir : str optional - path to directory where data that should be analysed can be found in a subfolder called peakFiles and configurationFiles, required if not in module results """ # get filename of module modName = pathlib.Path(module.__file__).stem avaDir = pathlib.Path(avaDir) # set output directory outDir = avaDir / 'Outputs' / 'ana4Stats' fU.makeADir(outDir) # fetch all result files and filter simulations according to parametersDict simNameList = cfgHandling.filterSims(avaDir, parametersDict, specDir=inputDir) # initialize flag if analysis has been performed or e.g. no matching files found analysisPerformed = False if simNameList == []: # no matching sims found for filtering criteria log.warning('No matching simulations found for filtering criteria') return analysisPerformed # if matching sims found - perform analysis if inputDir == '': inputDir = avaDir / 'Outputs' / modName / 'peakFiles' flagStandard = True peakFilesDF = fU.makeSimDF(inputDir, avaDir=avaDir) else: inputDirPF = inputDir / 'peakFiles' peakFilesDF = fU.makeSimDF(inputDirPF, avaDir=avaDir) # get header info from peak files - this should be the same for all peakFiles header = IOf.readASCheader(peakFilesDF['files'][0]) cellSize = header['cellsize'] nRows = header['nrows'] nCols = header['ncols'] xllcenter = header['xllcenter'] yllcenter = header['yllcenter'] noDataValue = header['noDataValue'] # Initialise array for computations probSum = np.zeros((nRows, nCols)) count = 0 contourDict = {} # Loop through peakFiles and compute probability for m in range(len(peakFilesDF['names'])): # only take simulations that match filter criteria from parametersDict if peakFilesDF['simName'][m] in simNameList: # Load peak field for desired peak field parameter if peakFilesDF['resType'][m] == cfg['GENERAL']['peakVar']: # Load data fileName = peakFilesDF['files'][m] dataLim = np.zeros((nRows, nCols)) fileData = IOf.readRaster(fileName) data = np.flipud(fileData['rasterData']) # fetch contourline info xGrid, yGrid, _, _ = gT.makeCoordGridFromHeader(fileData['header']) x, y = pU.fetchContourCoords(xGrid, yGrid, data, float(cfg['GENERAL']['peakLim'])) contourDict[fileName.stem] = {'x': x, 'y': y} log.info('File Name: %s , simulation parameter %s ' % (fileName, cfg['GENERAL']['peakVar'])) # Check if peak values exceed desired threshold dataLim[data > float(cfg['GENERAL']['peakLim'])] = 1.0 probSum = probSum + dataLim count = count + 1 # Create probability map ranging from 0-1 probMap = probSum / count unit = pU.cfgPlotUtils['unit%s' % cfg['GENERAL']['peakVar']] log.info('probability analysis performed for peak parameter: %s and a peak value ' 'threshold of: %s %s' % (cfg['GENERAL']['peakVar'], cfg['GENERAL']['peakLim'], unit)) log.info('%s peak fields added to analysis' % count) # # Save to .asc file avaName = avaDir.name outFileName = '%s_probMap%s.asc' % (avaName, cfg['GENERAL']['peakLim']) outFile = outDir / outFileName IOf.writeResultToAsc(header, probMap, outFile) analysisPerformed = True return analysisPerformed, contourDict
[docs]def makeDictFromVars(cfg): """ create a dictionary with info on parameter variation for all parameter in varParList Parameters ----------- cfg: configparser object configuration settings, here varParList, variationValue, numberOfSteps Returns -------- variationsDict: dict dictionary with for each varName, varVariation, varSteps """ varParList = cfg['varParList'].split('|') varValues = cfg['variationValue'].split('|') varSteps = cfg['numberOfSteps'].split('|') # check if value is provided for each parameter if (len(varParList) == len(varValues) == len(varSteps)) is False: message = 'For every parameter in varParList a variationValue and numberOfSteps needs to be provided' log.error(message) raise AssertionError variationsDict = {} for idx, val in enumerate(varParList): variationsDict[val] = {'variationValue': varValues[idx], 'numberOfSteps': varSteps[idx]} return variationsDict