Source code for ana4Stats.probAna

"""

This is a simple function for computing a mask of all peak files of one parameter that exceed a particular threshold

"""

import os
import numpy as np
import logging
from matplotlib import pyplot as plt
import pathlib

import avaframe.out3Plot.plotUtils as pU
from avaframe.in3Utils import cfgUtils
from avaframe.in3Utils import fileHandlerUtils as fU
import avaframe.in2Trans.ascUtils as IOf

# create local logger
# change log level in calling module to DEBUG to see log messages
log = logging.getLogger(__name__)


[docs]def probAnalysis(avaDir, cfg, module, parametersDict='', inputDir=''): """ Compute propability map of a given set of simulation result exceeding a particular threshold and save to outDir Parameters ---------- avaDir: str path to avalanche directory cfg : dict configuration read from ini file of probAna function module computational module that was used to run the simulations parametersDict: dict dictionary with simulation parameters to filter simulations inputDir : str optional - path to directory where data that should be analysed can be found, required if not in module results """ # get filename of module modName = pathlib.Path(module.__file__).stem # set output directory outDir = pathlib.Path(avaDir, 'Outputs', 'ana4Stats') fU.makeADir(outDir) # fetch all result files and filter simulations according to parametersDict simNameList = cfgUtils.filterSims(avaDir, parametersDict, specDir=inputDir) if inputDir == '': inputDir = pathlib.Path(avaDir, 'Outputs', modName, 'peakFiles') flagStandard = True peakFilesDF = fU.makeSimDF(inputDir, avaDir=avaDir) else: peakFilesDF = fU.makeSimDF(inputDir, avaDir=avaDir) # get header info from peak files - this should be the same for all peakFiles header = IOf.readASCheader(peakFilesDF['files'][0]) cellSize = header.cellsize nRows = header.nrows nCols = header.ncols xllcenter = header.xllcenter yllcenter = header.yllcenter noDataValue = header.noDataValue # Initialise array for computations probSum = np.zeros((nRows, nCols)) count = 0 # Loop through peakFiles and compute probability for m in range(len(peakFilesDF['names'])): # only take simulations that match filter criteria from parametersDict if peakFilesDF['simName'][m] in simNameList: # Load peak field for desired peak field parameter if peakFilesDF['resType'][m] == cfg['GENERAL']['peakVar']: # Load data fileName = peakFilesDF['files'][m] data = np.loadtxt(fileName, skiprows=6) dataLim = np.zeros((nRows, nCols)) log.info('File Name: %s , simulation parameter %s ' % (fileName, cfg['GENERAL']['peakVar'])) # Check if peak values exceed desired threshold dataLim[data > float(cfg['GENERAL']['peakLim'])] = 1.0 probSum = probSum + dataLim count = count + 1 # Create probability map ranging from 0-1 probMap = probSum / count unit = pU.cfgPlotUtils['unit%s' % cfg['GENERAL']['peakVar']] log.info('probability analysis performed for peak parameter: %s and a peak value threshold of: %s %s' % (cfg['GENERAL']['peakVar'], cfg['GENERAL']['peakLim'], unit)) log.info('%s peak fields added to analysis' % count) # # Save to .asc file avaName = os.path.basename(avaDir) outFileName = '%s_probMap%s.asc' % (avaName, cfg['GENERAL']['peakLim']) outFile = outDir / outFileName IOf.writeResultToAsc(header, probMap, outFile)