Source code for in1Data.getInput
"""
Fetch input data for avalanche simulations
"""
# Load modules
import os
import glob
import pathlib
import logging
# Local imports
from avaframe.in3Utils import cfgUtils
import avaframe.in2Trans.ascUtils as IOf
import avaframe.in2Trans.shpConversion as shpConv
import avaframe.com1DFA.deriveParameterSet as dP
# create local logger
# change log level in calling module to DEBUG to see log messages
log = logging.getLogger(__name__)
[docs]def readDEM(avaDir):
""" read the ascii DEM file from a provided avalanche directory
Parameters
----------
avaDir : str
path to avalanche directory
Returns
-------
dem : dict
dict with header and raster data
"""
# get dem file name
demSource = getDEMPath(avaDir)
log.debug('Read DEM: %s' % demSource)
dem = IOf.readRaster(demSource)
return(dem)
[docs]def getDEMPath(avaDir):
""" get the DEM file path from a provided avalanche directory
Parameters
----------
avaDir : str
path to avalanche directory
Returns
-------
demFile : str (first element of list)
full path to DEM .asc file
"""
# if more than one .asc file found throw error
inputDir = pathlib.Path(avaDir, 'Inputs')
demFile = list(inputDir.glob('*.asc'))
if len(demFile) > 1:
message = 'There should be exactly one topography .asc file in %s/Inputs/' % (avaDir)
log.error(message)
raise AssertionError(message)
# if is no .asc file found - throw error
filesFound = list(inputDir.glob('*.*'))
if len(demFile) == 0 and len(filesFound):
for fileF in filesFound:
message = 'DEM file format not correct in %s/Inputs/ - only .asc is allowed but %s is provided' % (avaDir, fileF.name)
log.error(message)
raise AssertionError(message)
elif len(demFile) == 0:
message = 'No topography .asc file in %s/Inputs/' % (avaDir)
log.error(message)
raise FileNotFoundError(message)
return demFile[0]
[docs]def getInputData(avaDir, cfg):
""" Fetch input datasets required for simulation, duplicated function because
simulation type set differently in com1DFAOrig compared to com1DFA:
TODO: remove duplicate once it is not required anymore
Parameters
----------
avaDir : str
path to avalanche directory
cfg : dict
configuration read from com1DFA simulation ini file
Returns
-------
demFile[0] : str (first element of list)
list of full path to DEM .asc file
relFiles : list
list of full path to release area scenario .shp files
entFile : str
full path to entrainment area .shp file
resFile : str
full path to resistance area .shp file
entResInfo : flag dict
flag if Yes entrainment and/or resistance areas found and used for simulation
"""
# Set directories for inputs, outputs and current work
inputDir = os.path.join(avaDir, 'Inputs')
# Set flag if there is an entrainment or resistance area
entResInfo= {'flagEnt': 'No', 'flagRes': 'No'}
# Initialise release areas, default is to look for shapefiles
if cfg['releaseScenario'] != '':
releaseDir = 'REL'
relFiles = []
releaseFiles = cfg['releaseScenario'].split('|')
for rel in releaseFiles:
if '.shp' in rel:
relf = os.path.join(inputDir, releaseDir, rel)
else:
relf = os.path.join(inputDir, releaseDir, '%s.shp' % (rel))
if not os.path.isfile(relf):
message = 'No release scenario called: %s' % (relf)
log.error(message)
raise FileNotFoundError(message)
relFiles.append(relf)
log.debug('Release area file is specified to be: %s' % relFiles)
else:
releaseDir = 'REL'
relFiles = sorted(glob.glob(inputDir+os.sep + releaseDir+os.sep + '*.shp'))
log.info('Release area files are: %s' % relFiles)
# Initialise resistance areas
resFile, entResInfo['flagRes'] = getAndCheckInputFiles(inputDir, 'RES', 'Resistance', fileExt='shp')
if resFile==None:
resFile = ''
# Initialise entrainment areas
entFile, entResInfo['flagEnt'] = getAndCheckInputFiles(inputDir, 'ENT', 'Entrainment', fileExt='shp')
if entFile==None:
entFile = ''
# Initialise DEM
demFile = getDEMPath(avaDir)
return demFile, relFiles, entFile, resFile, entResInfo
[docs]def getInputDataCom1DFA(avaDir, cfg):
""" Fetch input datasets required for simulation, duplicated function because
simulation type set differently in com1DFA compared to com1DFAOrig:
TODO: remove duplicate once it is not required anymore
Parameters
----------
avaDir : str or pathlib object
path to avalanche directory
cfg : dict
configuration read from com1DFA simulation ini file
Returns
-------
inputSimFiles: dict
dictionary with all the input files:
demFile : str (first element of list)
list of full path to DEM .asc file
relFiles : list
list of full path to release area scenario .shp files
secondaryReleaseFile : str
full path to secondary release area .shp file
entFile : str
full path to entrainment area .shp file
resFile : str
full path to resistance area .shp file
entResInfo : flag dict
flag if Yes entrainment and/or resistance areas found and used for simulation
flag True if a Secondary Release file found and activated
"""
# Set directories for inputs, outputs and current work
inputDir = pathlib.Path(avaDir, 'Inputs')
# Set flag if there is an entrainment or resistance area
entResInfo= {'flagEnt': 'No', 'flagRes': 'No'}
# Initialise release areas, default is to look for shapefiles
if cfg['INPUT']['releaseScenario'] != '':
releaseDir = 'REL'
relFiles = []
releaseFiles = cfg['INPUT']['releaseScenario'].split('|')
for rel in releaseFiles:
if '.shp' in rel:
relf = inputDir / releaseDir / rel
else:
relf = inputDir / releaseDir / ('%s.shp' % (rel))
if not relf.is_file():
message = 'No release scenario called: %s' % (relf)
log.error(message)
raise FileNotFoundError(message)
relFiles.append(relf)
log.debug('Release area file is specified to be: %s' % relFiles)
else:
releaseDir = 'REL'
releaseDir = inputDir / 'REL'
relFiles = sorted(list(releaseDir.glob('*.shp')))
log.info('Release area files are: %s' % [str(relFilestr) for relFilestr in relFiles])
# check for release thickness file if relThFromFile
if cfg['GENERAL'].getboolean('relThFromFile'):
relThFile, entResInfo['releaseThicknessFile'] = getAndCheckInputFiles(inputDir,
'RELTH', 'release thickness data', fileExt='asc')
else:
relThFile = ''
# Initialise secondary release areas
secondaryReleaseFile, entResInfo['flagSecondaryRelease'] = getAndCheckInputFiles(inputDir,
'SECREL', 'Secondary release', 'shp')
# Initialise resistance areas
resFile, entResInfo['flagRes'] = getAndCheckInputFiles(inputDir, 'RES', 'Resistance', fileExt='shp')
# Initialise entrainment areas
entFile, entResInfo['flagEnt'] = getAndCheckInputFiles(inputDir, 'ENT', 'Entrainment', fileExt='shp')
# Initialise DEM
demFile = getDEMPath(avaDir)
# return DEM, first item of release, entrainment and resistance areas
inputSimFiles = {'demFile': demFile, 'relFiles': relFiles, 'secondaryReleaseFile': secondaryReleaseFile,
'entFile': entFile, 'resFile': resFile, 'entResInfo': entResInfo, 'relThFile': relThFile}
return inputSimFiles
[docs]def getAndCheckInputFiles(inputDir, folder, inputType, fileExt='shp'):
"""Fetch fileExt files and check if they exist and if it is not more than one
Raises error if there is more than one fileExt file.
Parameters
----------
inputDir : pathlib object or str
path to avalanche input directory
folder : str
subfolder name where the shape file should be located (SECREL, ENT or RES)
inputType : str
type of input (used for the logging messages). Secondary release or Entrainment or Resistance
fileExt: str
file extension e.g. shp, asc - optional default is shp
Returns
-------
OutputFile: str
path to file checked
available: str
Yes or No depending of if there is a shape file available (if No, OutputFile is None)
"""
available = 'No'
# Initialise secondary release areas
dir = pathlib.Path(inputDir, folder)
OutputFile = list(dir.glob('*.%s' % fileExt))
if len(OutputFile) < 1:
OutputFile = None
elif len(OutputFile) > 1:
message = 'More than one %s .%s file in %s/%s/ not allowed' % (inputType, fileExt, inputDir, folder)
log.error(message)
raise AssertionError(message)
else:
available = 'Yes'
OutputFile = OutputFile[0]
return OutputFile, available
[docs]def getThickness(inputSimFiles, avaDir, modName, cfgFile, cfg):
""" add thickness of shapefiles to dictionary, create one ini file per releaseScenario and
set thickness values in ini files
Parameters
-----------
inputSimFiles: dict
dictionary with info on release and entrainment file paths
avaDir: str or pathlib path
path to avalanche directory
modName : computational module
computational module
cfgFile: str
path to cfgFile for reading overall config of comModule - if empty read local or default ini file
Returns
--------
inputSimFiles: dict
updated dictionary with thickness info read from shapefile attributes
now includes one separate dictionary for each release, entrainment or secondary release
scenario with a thickness and id value for each feature (given as list)
cfgFilesRels: list
list of updated ini files - one for each release Scenario
"""
# create pathlib Path
avaDir = pathlib.Path(avaDir)
# get name of module as string
modNameString = str(pathlib.Path(modName.__file__).stem)
# initialise list for cfgFiles
cfgFilesRels = []
# check if thickness info is required from entrainment and secondary release according to simType
simTypeList = cfg['GENERAL']['simTypeList'].split('|')
thTypeList = []
if any(simType in ['ent', 'entres', 'available'] for simType in simTypeList):
thTypeList.append('entFile')
if cfg['GENERAL'].getboolean('secRelArea'):
thTypeList.append('secondaryReleaseFile')
# fetch thickness attribute of entrainment area and secondary release
for thType in ['entFile', 'secondaryReleaseFile']:
if inputSimFiles[thType] != None:
thicknessList, idList = shpConv.readThickness(inputSimFiles[thType])
inputSimFiles[inputSimFiles[thType].stem] = {'thickness': thicknessList, 'id': idList}
# fetch thickness attribute of release areas and create cfg file for each release scenario
for releaseA in inputSimFiles['relFiles']:
# fetch thickness and id info from input data
thicknessList, idList = shpConv.readThickness(releaseA)
inputSimFiles[releaseA.stem] = {'thickness': thicknessList, 'id': idList}
# load configuration
cfgInitial = cfgUtils.getModuleConfig(modName, fileOverride=cfgFile, toPrint=False)
# add input data info
cfgInitial['INPUT'] = {'DEM': inputSimFiles['demFile'].stem, 'releaseScenario': releaseA.stem}
# update configuration with thickness value to be used for simulations
cfgInitial = dP.getThicknessValue(cfgInitial, inputSimFiles, releaseA.stem, 'relTh')
if cfgInitial['GENERAL'].getboolean('relThFromFile'):
cfgInitial['INPUT']['relThFile'] = str(pathlib.Path('RELTH', inputSimFiles['relThFile'].name))
# add entrainment and secondary release thickness in input data info
if inputSimFiles['entFile'] != None and 'entFile' in thTypeList:
cfgInitial = dP.getThicknessValue(cfgInitial, inputSimFiles, inputSimFiles['entFile'].stem, 'entTh')
cfgInitial['INPUT']['entrainmentScenario'] = inputSimFiles['entFile'].stem
if inputSimFiles['secondaryReleaseFile'] != None and 'secondaryReleaseFile' in thTypeList:
cfgInitial = dP.getThicknessValue(cfgInitial, inputSimFiles,
inputSimFiles['secondaryReleaseFile'].stem, 'secondaryRelTh')
cfgInitial['INPUT']['secondaryReleaseScenario'] = inputSimFiles['secondaryReleaseFile'].stem
# create new ini file for each release scenario with updated info on thickness values (and parameter variation)
cfgFileRelease = avaDir / 'Outputs' / modNameString / ('%s_com1DFACfg.ini' % releaseA.stem)
with open(cfgFileRelease, 'w') as configfile:
cfgInitial.write(configfile)
cfgFilesRels.append(cfgFileRelease)
return inputSimFiles, cfgFilesRels
[docs]def initializeDEM(avaDir, demPath=''):
""" check for dem and load to dict
Parameters
-----------
avaDir: str or pathlib path
path to avalanche directory
demPath: str or pathlib Path
path to dem relative to Inputs - optional if not provided read DEM from Inputs
Returns
--------
demOri: dict
dem dictionary with header and data
"""
if demPath == '':
dem = readDEM(avaDir)
else:
# build full path and load data to dict
demFile = pathlib.Path(avaDir, 'Inputs', demPath)
dem = IOf.readRaster(demFile, noDataToNan=True)
return dem
[docs]def selectReleaseScenario(inputSimFiles, cfg):
""" select release scenario and remove other release files in inputSimFiles dictionary
Parameters
-----------
inputSimFiles: dict
dictionary with info on input data
cfg: conigparser object
configuration, here Flag releaseScenario is used
Returns
-------
inputSimFiles: dict
updated dictionary with only one releaseScenario
"""
relFiles = inputSimFiles['relFiles']
foundScenario = False
for relF in relFiles:
if relF.stem == cfg['releaseScenario']:
releaseScenario = relF
foundScenario = True
if foundScenario is False:
message = 'Release area scenario %s not found - check input data' % (cfg['releaseScenario'])
log.error(message)
raise FileNotFoundError(message)
inputSimFiles['relFiles'] = [releaseScenario]
# add release area scenario
inputSimFiles['releaseScenario'] = releaseScenario
return inputSimFiles