Source code for in1Data.getInput

"""
    Fetch input data for avalanche simulations
"""

# Load modules
import os
import glob
import logging

# Local imports
import avaframe.in2Trans.ascUtils as IOf
from avaframe.in3Utils import cfgUtils


# create local logger
# change log level in calling module to DEBUG to see log messages
log = logging.getLogger(__name__)


[docs]def readDEM(avaDir): """ read the ascii DEM file from a provided avalanche directory Parameters ---------- avaDir : str path to avalanche directory Returns ------- dem : dict dict with header and raster data """ # get dem file name demSource = getDEMPath(avaDir) log.debug('Read DEM: %s' % demSource) dem = IOf.readRaster(demSource) return(dem)
[docs]def getDEMPath(avaDir): """ get the DEM file path from a provided avalanche directory Parameters ---------- avaDir : str path to avalanche directory Returns ------- demFile : str (first element of list) full path to DEM .asc file """ demFile = glob.glob(os.path.join(avaDir, 'Inputs', '*.asc')) if not len(demFile) == 1: message = 'There should be exactly one topography .asc file in %s/Inputs/' % (avaDir) log.error(message) raise AssertionError(message) return demFile[0]
[docs]def getInputData(avaDir, cfg, flagDev=False): """ Fetch input datasets required for simulation, duplicated function because simulation type set differently in com1DFAOrig compared to com1DFA: TODO: remove duplicate once it is not required anymore Parameters ---------- avaDir : str path to avalanche directory cfg : dict configuration read from com1DFA simulation ini file flagDev : bool optional - if True: use for devREL folder to get release area scenarios Returns ------- demFile[0] : str (first element of list) list of full path to DEM .asc file relFiles : list list of full path to release area scenario .shp files entFile : str full path to entrainment area .shp file resFile : str full path to resistance area .shp file entResInfo : flag dict flag if Yes entrainment and/or resistance areas found and used for simulation """ # Set directories for inputs, outputs and current work inputDir = os.path.join(avaDir, 'Inputs') # Set flag if there is an entrainment or resistance area entResInfo= {'flagEnt': 'No', 'flagRes': 'No'} # Initialise release areas, default is to look for shapefiles if flagDev is True: releaseDir = 'devREL' relFiles = glob.glob(inputDir+os.sep + releaseDir+os.sep + '*.shp') elif cfg['releaseScenario'] != '': releaseDir = 'REL' relFiles = [] releaseFiles = cfg['releaseScenario'].split('|') for rel in releaseFiles: if '.shp' in rel: relf = os.path.join(inputDir, releaseDir, rel) else: relf = os.path.join(inputDir, releaseDir, '%s.shp' % (rel)) if not os.path.isfile(relf): message = 'No release scenario called: %s' % (relf) log.error(message) raise FileNotFoundError(message) relFiles.append(relf) log.debug('Release area file is specified to be: %s' % relFiles) else: releaseDir = 'REL' relFiles = sorted(glob.glob(inputDir+os.sep + releaseDir+os.sep + '*.shp')) log.info('Release area files are: %s' % relFiles) # Initialise resistance areas resFile, entResInfo['flagRes'] = getAndCheckInputFiles(inputDir, 'RES', 'Resistance') if resFile==None: resFile = '' # Initialise entrainment areas entFile, entResInfo['flagEnt'] = getAndCheckInputFiles(inputDir, 'ENT', 'Entrainment') if entFile==None: entFile = '' # Initialise DEM demFile = getDEMPath(avaDir) return demFile, relFiles, entFile, resFile, entResInfo
[docs]def getInputDataCom1DFA(avaDir, cfg): """ Fetch input datasets required for simulation, duplicated function because simulation type set differently in com1DFA compared to com1DFAOrig: TODO: remove duplicate once it is not required anymore Parameters ---------- avaDir : str path to avalanche directory cfg : dict configuration read from com1DFA simulation ini file Returns ------- inputSimFiles: dict dictionary with all the input files: demFile : str (first element of list) list of full path to DEM .asc file relFiles : list list of full path to release area scenario .shp files secondaryReleaseFile : str full path to secondary release area .shp file entFile : str full path to entrainment area .shp file resFile : str full path to resistance area .shp file entResInfo : flag dict flag if Yes entrainment and/or resistance areas found and used for simulation flag True if a Secondary Release file found and activated """ # Set directories for inputs, outputs and current work inputDir = os.path.join(avaDir, 'Inputs') # Set flag if there is an entrainment or resistance area entResInfo= {'flagEnt': 'No', 'flagRes': 'No'} # Initialise release areas, default is to look for shapefiles if cfg['flagDev'] == 'True': releaseDir = 'devREL' relFiles = glob.glob(inputDir+os.sep + releaseDir+os.sep + '*.shp') elif cfg['releaseScenario'] != '': releaseDir = 'REL' relFiles = [] releaseFiles = cfg['releaseScenario'].split('|') for rel in releaseFiles: if '.shp' in rel: relf = os.path.join(inputDir, releaseDir, rel) else: relf = os.path.join(inputDir, releaseDir, '%s.shp' % (rel)) if not os.path.isfile(relf): message = 'No release scenario called: %s' % (relf) log.error(message) raise FileNotFoundError(message) relFiles.append(relf) log.debug('Release area file is specified to be: %s' % relFiles) else: releaseDir = 'REL' relFiles = sorted(glob.glob(inputDir+os.sep + releaseDir+os.sep + '*.shp')) log.info('Release area files are: %s' % relFiles) # Initialise secondary release areas secondaryReleaseFile, entResInfo['flagSecondaryRelease'] = getAndCheckInputFiles(inputDir, 'SECREL', 'Secondary release') # Initialise resistance areas resFile, entResInfo['flagRes'] = getAndCheckInputFiles(inputDir, 'RES', 'Resistance') # Initialise entrainment areas entFile, entResInfo['flagEnt'] = getAndCheckInputFiles(inputDir, 'ENT', 'Entrainment') # Initialise DEM demFile = getDEMPath(avaDir) # return DEM, first item of release, entrainment and resistance areas inputSimFiles = {'demFile': demFile, 'relFiles': relFiles, 'secondaryReleaseFile': secondaryReleaseFile, 'entFile': entFile, 'resFile': resFile, 'entResInfo': entResInfo} return inputSimFiles
[docs]def getAndCheckInputFiles(inputDir, folder, inputType): """Fetch input shape files and check them Raises error if there is more than one shape file. Parameters ---------- inputDir : str path to avalanche input directory folder : str subfolder name where the shape file should be located (SECREL, ENT or RES) inputType : str type of input (used for the logging messages). Secondary release or Entrainment or Resistance Returns ------- OutputFile: str path to file checked available: str Yes or No depending of if there is a shape file available (if No, OutputFile is None) """ available = 'No' # Initialise secondary release areas OutputFile = glob.glob(inputDir+os.sep + folder + os.sep+'*.shp') if len(OutputFile) < 1: OutputFile = None elif len(OutputFile) > 1: message = 'There shouldn\'t be more than one %s .shp file in %s/%s/' % (inputType, inputDir, folder) log.error(message) raise AssertionError(message) else: available = 'Yes' OutputFile = OutputFile[0] return OutputFile, available