"""
Directory and file handling helper functions
"""
# Load modules
import os
import glob
import logging
import pathlib
import numpy as np
import pandas as pd
import shutil
# Local imports
import avaframe.in2Trans.ascUtils as IOf
# create local logger
# change log level in calling module to DEBUG to see log messages
log = logging.getLogger(__name__)
[docs]def makeADir(dirName):
""" Check if a directory exists, if not create directory
Parameters
----------
dirName : str
path of directory that should be made
"""
# If directory already exists - Delete directory first is default
if os.path.isdir(dirName):
log.debug('Be careful directory %s already existed - data saved on top of it' % (dirName))
else:
# due parallel computing it might be problematic if exist_ok is not set to True
os.makedirs(dirName, exist_ok=True)
log.debug('Directory: %s created' % dirName)
[docs]def checkPathlib(checkPath):
""" check if pathlib.PurePath if not convert to
Parameters
----------
checkPath: str or pathlib path
path to be checked
Returns
-------
checkPath: pathlib path
pathlib path version of checkPath
"""
if not isinstance(checkPath, pathlib.PurePath):
checkPath = pathlib.Path(checkPath)
return checkPath
[docs]def readLogFile(logName, cfg=''):
""" Read experiment log file and make dictionary that contains general info on all simulations
Parameters
----------
logName : str
path to log file
cfg : dict
optional - configuration read from com1DFA simulation
Returns
-------
logDict : dict
dictionary with number of simulation (noSim), name of simulation (simName),
parameter variation, full name
"""
# Read log file
logFile = open(logName, 'r')
log.debug('Take com1DFA full experiment log')
# Parameter variation
if cfg != '':
varPar = cfg['varPar']
else:
varPar = 'Mu'
# Save info to dictionary, add all result parameters that are saved in com1DFA Outputs
logDict = {'noSim': [], 'simName': [], varPar: [], 'fullName': []}
lines = logFile.readlines()[1:]
countSims = 1
for line in lines:
vals = line.strip().split()
logDict['noSim'].append(countSims)
logDict['simName'].append(vals[1])
logDict[varPar].append(float(vals[2]))
logDict['fullName'].append(vals[1] + '_' + '%.5f' % float(vals[2]))
countSims = countSims + 1
logFile.close()
return logDict
[docs]def checkIfFileExists(filePath, fileType=''):
""" test if file exists if not throw error
Parameters
-----------
filePath: pathlib path
path to file
fileType: str
string for error message which kind of file is not found
"""
if not isinstance(filePath, pathlib.PurePath):
filePath = pathlib.Path(filePath)
if not filePath.is_file():
message = 'No %s file found called: %s' % (fileType, str(filePath))
log.error(message)
raise FileNotFoundError(message)
[docs]def checkCommonSims(logName, localLogName):
""" Check which files are common between local and full ExpLog """
if os.path.isfile(localLogName) is False:
localLogName = logName
# Read log files and extract info
logDict = readLogFile(logName)
logDictLocal = readLogFile
# Identify common simulations
setSim = set(logDictLocal['simName'])
indSims = [i for i, item in enumerate(logDict['simName']) if item in setSim]
log.info('Common simulations are: %d' % indSims)
return indSims
[docs]def getFilterDict(cfg, section):
""" Create parametersDict from ini file, for filtering simulations
Parameters
-----------
cfg: configParser object
configuration with information on filtering criteria
section: str
section of cfg where filtering criteria can be found
Returns
---------
parametersDict : dict
dictionary with parameter and parameter values for filtering simulation results
"""
parametersDict = {}
if cfg.has_section(section):
for key, value in cfg.items(section):
if value == '':
parametersDict.pop(key, None)
elif ':' in value or '|' in value:
locValue = splitIniValueToArraySteps(value)
parametersDict[key] = locValue
log.info('Filter simulations that match %s: %s' % (key, locValue))
elif '>' in value:
parametersDict[key] = [value]
log.info('Filter simulations where %s is greater than %s' % (key, value))
elif '<' in value:
parametersDict[key] = [value]
log.info('Filter simulations where %s is smaller than %s' % (key, value))
elif isinstance(value, str):
testValue = value.replace('.', '')
if testValue.isdigit():
parametersDict[key] = [float(value)]
else:
parametersDict[key] = [value]
log.info('Filter simulations that match %s: %s' % (key, value))
else:
log.warning('No section %s in configuration file found - cannot create dict for filtering' % section)
return parametersDict
[docs]def splitIniValueToArraySteps(cfgValues, returnList=False):
""" read values in ini file and return numpy array or list if the items are strings;
values can either be separated by | or provided in start:end:numberOfSteps format
if separated by : or $ also optional add one additional value using &
if format of refVal$percent$steps is used - an array is created with +- percent of refVal in nsteps
Parameters
----------
cfgValues : str
values of parameter to be read from ini file
returnList: bool
if True force to return values as list
Returns
--------
items : 1D numpy array or list
values as 1D numpy array or list (in the case of strings)
"""
if ':' in cfgValues:
if '&' in cfgValues:
itemsInputBig = cfgValues.split('&')
itemsInput = itemsInputBig[0].split(':')
else:
itemsInput = cfgValues.split(':')
items = np.linspace(float(itemsInput[0]), float(itemsInput[1]), int(itemsInput[2]))
if '&' in cfgValues:
items = np.append(items, float(itemsInputBig[1]))
elif cfgValues == '':
items = []
elif '$' in cfgValues:
if '&' in cfgValues:
itemsPBig = cfgValues.split('&')
itemsP = itemsPBig[0].split('$')
else:
itemsP = cfgValues.split('$')
itemsPRange = (float(itemsP[1]) / 100.) * float(itemsP[0])
# check if pos or neg or full var
if '-' in itemsP[1] or '+' in itemsP[1]:
items = np.linspace(float(itemsP[0]), float(itemsP[0])+itemsPRange, int(itemsP[2]))
else:
items = np.linspace(float(itemsP[0])-itemsPRange, float(itemsP[0])+itemsPRange, int(itemsP[2]))
if '&' in cfgValues:
items = np.append(items, float(itemsPBig[1]))
else:
itemsL = cfgValues.split('|')
if returnList:
items = itemsL
else:
flagFloat = False
flagString = False
for its in itemsL:
if its.upper().isupper() and '.e' not in its and '.E' not in its:
flagString = True
if '.' in its:
flagFloat = True
if flagString:
items = itemsL
elif flagFloat:
items = np.array(itemsL, dtype=float)
else:
items = np.array(itemsL, dtype=int)
return items
[docs]def splitTimeValueToArrayInterval(cfgValues, endTime):
""" read save time step info from ini file and return numpy array of values
values can either be separated by | or provided in start:interval format
Parameters
----------
cfgValues: str
time steps info
endTime: float
end time
Returns
--------
items : 1D numpy array
time step values as 1D numpy array
"""
if ':' in cfgValues:
itemsInput = cfgValues.split(':')
if float(endTime - float(itemsInput[0])) < float(itemsInput[1]):
items = np.array([float(itemsInput[0]), endTime])
else:
items = np.arange(float(itemsInput[0]), endTime, float(itemsInput[1]))
elif cfgValues == '':
items = np.array([2*endTime])
else:
itemsL = cfgValues.split('|')
items = np.array(itemsL, dtype=float)
items = np.sort(items)
# make sure that 0 is not in the array (initial time step is any ways saved)
if items[0] == 0:
items = np.delete(items, 0)
# make sure the array is not empty
# ToDo : make it work without this arbitrary 2*timeEnd
if items.size == 0:
items = np.array([2*endTime])
return items
[docs]def exportcom1DFAOrigOutput(avaDir, cfg='', addTSteps=False):
""" Export the simulation results from com1DFA output to desired location
Parameters
----------
avaDir: str
path to avalanche directory
cfg : dict
configuration read from ini file that has been used for the com1DFAOrig simulation
addTSteps : bool
if True: first and last time step of flow thickness are exported
"""
# Initialise directories
inputDir = pathlib.Path(avaDir, 'Work', 'com1DFAOrig')
outDir = pathlib.Path(avaDir, 'Outputs', 'com1DFAOrig')
outDirPF = outDir / 'peakFiles'
outDirRep = outDir / 'reports'
makeADir(outDir)
makeADir(outDirPF)
makeADir(outDirRep)
# Read log file information
logName = inputDir / 'ExpLog.txt'
if cfg != '':
logDict = readLogFile(logName, cfg)
varPar = cfg['varPar']
else:
logDict = readLogFile(logName)
varPar = 'Mu'
# Get number of values
sNo = len(logDict['noSim'])
# Path to com1DFA results
resPath = inputDir / ('FullOutput_%s_' % varPar)
if addTSteps is True:
timeStepDir = outDirPF / 'timeSteps'
makeADir(timeStepDir)
# Export peak files and reports
for k in range(sNo):
pathFrom = pathlib.Path('%s%.05f' % (resPath, logDict[varPar][k]),
logDict['simName'][k], 'raster',
'%s_pfd.asc' % logDict['simName'][k])
pathTo = outDirPF / ('%s_%.05f_pft.asc' % (logDict['simName'][k], logDict[varPar][k]))
shutil.copy(pathFrom, pathTo)
if addTSteps is True:
pathFrom = pathlib.Path('%s%.05f' % (resPath, logDict[varPar][k]),
logDict['simName'][k], 'raster',
'%s_fd.asc' % logDict['simName'][k])
pathTo = outDirPF / 'timeSteps' / ('%s_%.05f_tLast_ft.asc' % (logDict['simName'][k], logDict[varPar][k]))
shutil.copy(pathFrom, pathTo)
pathFrom = pathlib.Path('%s%.05f' % (resPath, logDict[varPar][k]),
logDict['simName'][k], 'raster',
'%s_ppr.asc' % logDict['simName'][k])
pathTo = outDirPF / ('%s_%.05f_ppr.asc' % (logDict['simName'][k], logDict[varPar][k]))
shutil.copy(pathFrom, pathTo)
pathFrom = pathlib.Path('%s%.05f' % (resPath, logDict[varPar][k]),
logDict['simName'][k], 'raster',
'%s_pv.asc' % logDict['simName'][k])
pathTo = outDirPF / ('%s_%.05f_pfv.asc' % (logDict['simName'][k], logDict[varPar][k]))
shutil.copy(pathFrom, pathTo)
pathFrom = pathlib.Path('%s%.05f' % (resPath, logDict[varPar][k]),
'%s.html' % logDict['simName'][k])
pathTo = outDirRep / ('%s_%.05f.html' % (logDict['simName'][k], logDict[varPar][k]))
shutil.copy(pathFrom, pathTo)
if addTSteps is True:
# Export peak files and reports
for k in range(sNo):
pathFrom = pathlib.Path('%s%.05f' % (resPath, logDict[varPar][k]),
'%s_tFirst_fd.txt' % logDict['simName'][k])
pathTo = outDirPF / 'timeSteps' / ('%s_%.05f_tFirst_fd.asc' % (logDict['simName'][k],
logDict[varPar][k]))
shutil.copy(pathFrom, pathTo)
# Export ExpLog to Outputs/com1DFA
shutil.copy2(pathlib.Path('%s' % inputDir, 'ExpLog.txt'), outDir)
[docs]def fetchFlowFields(flowFieldsDir, suffix=''):
""" fetch paths to all desired flow fields within folder
Parameters
------------
flowFieldsDir: str or pathlib path
path to flow field ascii files
suffix: str
suffix in flow field name to be searched for
Returns
--------
flowFields: list
list of pathlib paths to flow fields
"""
# check if pathlib path
if isinstance(flowFieldsDir, pathlib.PurePath):
flowFieldsDir = pathlib.Path(flowFieldsDir)
if suffix == '':
searchString = '*.asc'
else:
searchString = '*%s*.asc' % suffix
flowFields = list(flowFieldsDir.glob(searchString))
return flowFields
[docs]def fileNotFoundMessage(messageName):
""" throw error if file not found with message and path
Parameters
-----------
messageName: str
error message
"""
log.error(messageName)
raise FileNotFoundError(messageName)
# ToDo Maybe try to use makeSimFromResDF instead of makeSimDF
[docs]def makeSimDF(inputDir, avaDir='', simID='simID'):
""" Create a dataFrame that contains all info on simulations
this can then be used to filter simulations for example
Parameters
----------
inputDir : str
path to directory of simulation results
avaDir : str
optional - path to avalanche directory
simID : str
optional - simulation identification, depending on the computational module:
com1DFA: simHash
com1DFAOrig: Mu or parameter that has been used in parameter variation
Returns
-------
dataDF : dataFrame
dataframe with full file path, file name, release area scenario, simulation type (null, entres, etc.),
model type (dfa, ref, etc.), simID, result type (ppr, pft, etc.), simulation name,
cell size and optional name of avalanche, optional time step
"""
# Load input datasets from input directory
if isinstance(inputDir, pathlib.Path) is False:
inputDir = pathlib.Path(inputDir)
datafiles = list(inputDir.glob('*.asc'))
# Sort datafiles by name
datafiles = sorted(datafiles)
# Set name of avalanche if avaDir is given
# Make dictionary of input data info
data = {'files': [], 'names': [], 'resType': [], 'simType': [], 'isDefault': [],
'frictCalib': [], 'simName': [],
'modelType': [], 'releaseArea': [], 'cellSize': [], simID: [], 'timeStep': []}
# Set name of avalanche if avaDir is given
if avaDir != '':
avaDir = pathlib.Path(avaDir)
avaName = avaDir.name
data.update({'avaName': []})
for m in range(len(datafiles)):
data['files'].append(datafiles[m])
name = datafiles[m].stem
data['names'].append(name)
if '_AF_' in name:
nameParts = name.split('_AF_')
fNamePart = nameParts[0] + '_AF'
relNameSim = nameParts[0]
infoParts = nameParts[1].split('_')
else:
nameParts = name.split('_')
fNamePart = nameParts[0]
relNameSim = nameParts[0]
infoParts = nameParts[1:]
data['releaseArea'].append(relNameSim)
data[simID].append(infoParts[0])
indiStr = ['_C_', '_D_']
if any(x in name for x in indiStr):
data['isDefault'].append(infoParts[1])
# now check for friction calibration info
frictIndi = ['_S_', '_M_', '_L_']
if any(x in name for x in frictIndi):
data['frictCalib'].append(infoParts[2])
j = 1 # j indicates whether there's an additional info
else:
data['frictCalib'].append(None)
j = 0
data['simType'].append(infoParts[2+j])
data['modelType'].append(infoParts[3+j])
data['resType'].append(infoParts[4+j])
data['simName'].append(fNamePart + '_' + ('_'.join(infoParts[0:(4+j)])))
header = IOf.readASCheader(datafiles[m])
data['cellSize'].append(header['cellsize'])
if len(infoParts) == (6+j):
data['timeStep'].append(infoParts[5+j])
else:
data['timeStep'].append('')
# If it still is an 'old' simname
# This can be removed at one point
else:
data['isDefault'].append(None)
data['frictCalib'].append(None)
data['simType'].append(infoParts[1])
data['modelType'].append(infoParts[2])
data['resType'].append(infoParts[3])
data['simName'].append(fNamePart + '_' + ('_'.join(infoParts[0:3])))
header = IOf.readASCheader(datafiles[m])
data['cellSize'].append(header['cellsize'])
if len(infoParts) == 5:
data['timeStep'].append(infoParts[4])
else:
data['timeStep'].append('')
# Set name of avalanche if avaDir is given
if avaDir != '':
data['avaName'].append(avaName)
dataDF = pd.DataFrame.from_dict(data)
return dataDF
[docs]def makeSimFromResDF(avaDir, comModule, inputDir='', simName=''):
""" Create a dataFrame that contains all info on simulations in output/comModule/peakFiles
One line for each simulation - so all peakfiles that belong to one simulation are listed in one line
that corresponds to that simulation
Parameters
----------
avaDir : str
path to avalanche directory
comModule : str
module used to create the results
inputDir : str
optional - path to directory of simulation results
simName : str
optional - key phrase to be found in the simulation result name
Returns
-------
dataDF : dataFrame
dataframe with for each simulation, the full file path, file name, release area scenario,
simulation type (null, entres, etc.), model type (dfa, ref, etc.), simID,
path to result files (ppr, pft, etc.), simulation name,
cell size and optional name of avalanche, optional time step
resTypeListAll: list
list of res types available for all simulations
"""
# get path to folder containing the raster files
if inputDir == '':
inputDir = pathlib.Path(avaDir, 'Outputs', comModule, 'peakFiles')
if isinstance(inputDir, pathlib.Path) is False:
inputDir = pathlib.Path(inputDir)
if inputDir.is_dir() is False:
message = 'Input directory %s does not exist - check anaMod' % inputDir
log.error(message)
raise FileNotFoundError(message)
# Load input datasets from input directory
if simName != '':
name = '*' + simName + '*.asc'
else:
name = '*.asc'
datafiles = list(inputDir.glob(name))
# build the result data frame
resTypeListFromFiles = list(set([file.stem.split('_')[-1] for file in datafiles]))
columnsList = ['simName', 'releaseArea', 'simHash', 'simModified', 'simType', 'modelType', 'cellSize'] + resTypeListFromFiles
dataDF = pd.DataFrame(columns=columnsList)
resTypeListOne = []
for file in datafiles:
name = file.stem
if '_AF_' in name:
nameParts = name.split('_AF_')
fNamePart = nameParts[0] + '_AF'
relNameSim = nameParts[0]
infoParts = nameParts[1].split('_')
resType = infoParts[-1]
else:
nameParts = name.split('_')
fNamePart = nameParts[0]
relNameSim = nameParts[0]
infoParts = nameParts[1:]
resType = infoParts[-1]
simName = fNamePart + '_' + ('_'.join(infoParts[0:-1]))
# add line in the DF if the simulation does not exist yet
if simName not in dataDF.simName.values:
newLine = pd.DataFrame([[simName]], columns=['simName'], index=[simName])
dataDF = pd.concat([dataDF, newLine], ignore_index=False)
dataDF.loc[simName, 'releaseArea'] = relNameSim
dataDF.loc[simName, 'simHash'] = infoParts[0]
# TODO: remove once all simNames are updated to include C or D as simModified
if len(infoParts) == 6: #this is the _C_M_ etc variant
dataDF.loc[simName, 'simModified'] = infoParts[1]
dataDF.loc[simName, 'simType'] = infoParts[3]
dataDF.loc[simName, 'modelType'] = infoParts[4]
elif len(infoParts) == 5:
dataDF.loc[simName, 'simModified'] = infoParts[1]
dataDF.loc[simName, 'simType'] = infoParts[2]
dataDF.loc[simName, 'modelType'] = infoParts[3]
elif len(infoParts) == 4:
dataDF.loc[simName, 'simModified'] = 'not specified'
dataDF.loc[simName, 'simType'] = infoParts[1]
dataDF.loc[simName, 'modelType'] = infoParts[2]
else:
message = 'simName format not recognized for simName: %s' % simName
log.error(message)
raise AssertionError(message)
# add info about the cell size
header = IOf.readASCheader(file)
dataDF.loc[simName, 'cellSize'] = header['cellsize']
# add full path to resType
dataDF.loc[simName, resType] = pathlib.Path(file)
# list all res types found
if resType not in resTypeListOne:
resTypeListOne.append(resType)
# add a hash for each line of the DF and use as index - required for identifcation
hash = pd.util.hash_pandas_object(dataDF)
# reset the index using the dataframe hash
dataDF = dataDF.set_index(hash)
# now find res types available for all simulations
resTypeListAll = []
for resType in resTypeListOne:
if not dataDF[resType].isnull().values.any():
resTypeListAll.append(resType)
return dataDF, resTypeListAll