Source code for in1Data.getInput

"""
Fetch input data for avalanche simulations
"""

import logging

# Load modules
import os
import pathlib

import numpy as np
import pandas as pd
import shapely as shp

import avaframe.com1DFA.DFAtools as DFAtls
import avaframe.com1DFA.deriveParameterSet as dP
import avaframe.in2Trans.rasterUtils as IOf
import avaframe.in2Trans.shpConversion as shpConv
import avaframe.in3Utils.fileHandlerUtils as fU
import avaframe.in3Utils.geoTrans as geoTrans

# Local imports
from avaframe.in3Utils import cfgUtils
from avaframe.out3Plot import in1DataPlots

# create local logger
# change log level in calling module to DEBUG to see log messages
log = logging.getLogger(__name__)


[docs]def readDEM(avaDir): """read the DEM (ascii or tif) file from a provided avalanche directory Parameters ---------- avaDir : str path to avalanche directory Returns ------- dem : dict dict with header and raster data """ # get dem file name demSource = getDEMPath(avaDir) log.debug("Read DEM: %s" % demSource) dem = IOf.readRaster(demSource) return dem
[docs]def getDEMPath(avaDir): """get the DEM file path from a provided avalanche directory Parameters ---------- avaDir : str path to avalanche directory Returns ------- demFile : str (first element of list) full path to DEM .asc/.tif file """ # if more than one .asc / .tif file found throw error inputDir = pathlib.Path(avaDir, "Inputs") demFile = list(inputDir.glob("*.tif")) + list(inputDir.glob("*.asc")) if len(demFile) > 1: message = "There should be exactly one topography .asc/.tif file in %s/Inputs/" % avaDir log.error(message) raise AssertionError(message) elif len(demFile) == 0: message = "No topography .asc / .tif file in %s/Inputs/" % avaDir log.error(message) raise FileNotFoundError(message) return demFile[0]
[docs]def getDEMFromConfig(avaDir, fileName=""): """get dem file path in avaDir/Inputs Parameters ----------- avaDir: str or pathlib path to avalancheDir fileName: pathlib path to dem file with filename in avaDir/Inputs Returns -------- demFile: pathlib path to dem file """ inputDir = pathlib.Path(avaDir, "Inputs") demFile = inputDir / fileName if demFile.is_file() is False: message = "Dem file: %s does not exist" % (str(demFile)) log.error(message) raise FileNotFoundError(message) return demFile
[docs]def getInputData(avaDir, cfg): """Fetch input datasets required for simulation, duplicated function because simulation type set differently in com1DFAOrig compared to com1DFA: TODO: remove duplicate once it is not required anymore Parameters ---------- avaDir : str path to avalanche directory cfg : dict configuration read from com1DFA simulation ini file Returns ------- demFile[0] : str (first element of list) list of full path to DEM .asc file relFiles : list list of full path to release area scenario .shp files entFile : str full path to entrainment area .shp file resFile : str full path to resistance area .shp file wallFile: str full path to wall line .shp file entResInfo : flag dict flag if Yes entrainment and/or resistance areas found and used for simulation """ # Set directories for inputs, outputs and current work avaDir = pathlib.Path(avaDir) inputDir = avaDir / "Inputs" # Set flag if there is an entrainment or resistance area entResInfo = {"flagEnt": "No", "flagRes": "No"} # Initialise release areas, default is to look for shapefiles releaseDir = "REL" if cfg["releaseScenario"] != "": relFiles = [] releaseFiles = cfg["releaseScenario"].split("|") for rel in releaseFiles: if ".shp" in rel: relf = inputDir / releaseDir / rel else: relf = inputDir / releaseDir / ("%s.shp" % rel) if not os.path.isfile(relf): message = "No release scenario called: %s" % relf log.error(message) raise FileNotFoundError(message) relFiles.append(relf) log.debug("Release area file is specified to be: %s" % relFiles) else: relDir = inputDir / releaseDir relFiles = sorted(relDir.glob("*.shp")) log.info("Release area files are: %s" % relFiles) # Initialise resistance areas resFile, entResInfo["flagRes"], _ = getAndCheckInputFiles(inputDir, "RES", "Resistance", fileExt="shp") if resFile is None: resFile = "" # Initialise entrainment areas entFile, entResInfo["flagEnt"], _ = getAndCheckInputFiles(inputDir, "ENT", "Entrainment", fileExt="shp") if entFile is None: entFile = "" # Initialise dam line wallFile, entResInfo["flagWall"], _ = getAndCheckInputFiles(inputDir, "DAM", "Dam", fileExt="shp") # Initialise DEM demFile = getDEMPath(avaDir) return demFile, relFiles, entFile, resFile, wallFile, entResInfo
[docs]def getInputDataCom1DFA(avaDir): """Fetch input datasets required for simulation, duplicated function because now fetch all available files simulation type set differently in com1DFA compared to com1DFAOrig: TODO: remove duplicate once it is not required anymore Parameters ---------- avaDir : str or pathlib object path to avalanche directory Returns ------- inputSimFiles: dict dictionary with all the input files - demFile : str (first element of list), list of full path to DEM .asc file - relFiles : list, list of full path to release area scenario .shp files - secondaryReleaseFile : str, full path to secondary release area .shp file - entFile : str, full path to entrainment area .shp file - resFile : str, full path to resistance area .shp file - entResInfo : flag dict - timeDepRelCsv : str, full path to time dependent release values .csv file flag if Yes entrainment and/or resistance areas found and used for simulation flag True if a Secondary Release file found and activated """ # Set directories for inputs, outputs and current work inputDir = pathlib.Path(avaDir, "Inputs") # Set flag if there is an entrainment or resistance area entResInfo = {} releaseDir = inputDir / "REL" relFiles = sorted( list(releaseDir.glob("*.shp")) + list(releaseDir.glob("*.tif")) + list(releaseDir.glob("*.asc")) ) relSuffixList = [relF.suffix for relF in relFiles] if ".shp" in relSuffixList and (".asc" in relSuffixList or ".tif" in relSuffixList): message = "Release area information - use either .shp or .asc/.tif files" log.error(message) raise AssertionError(message) if len(relFiles) == 0: message = "No release area is found - provide a .shp or .asc or .tif file" log.error(message) raise FileNotFoundError(message) else: log.info("Release area files are: %s" % [str(relFilestr) for relFilestr in relFiles]) entResInfo["relThFileType"] = relFiles[0].suffix entResInfo["flagRel"] = "Yes" # Initialise secondary release areas ( secondaryReleaseFile, entResInfo["flagSecondaryRelease"], entResInfo["secondaryRelThFileType"], ) = getAndCheckInputFiles(inputDir, "SECREL", "Secondary release", fileExt=["shp", "asc", "tif"]) if secondaryReleaseFile: log.info("Secondary release file is: %s" % secondaryReleaseFile) # Initialise resistance areas resFile, entResInfo["flagRes"], entResInfo["resFileType"] = getAndCheckInputFiles( inputDir, "RES", "Resistance", fileExt=["shp", "asc", "tif"] ) if resFile: log.info("Resistance file is: %s" % resFile) # Initialise entrainment areas entFile, entResInfo["flagEnt"], entResInfo["entThFileType"] = getAndCheckInputFiles( inputDir, "ENT", "Entrainment", fileExt=["shp", "asc", "tif"] ) if entFile: log.info("Entrainment file is: %s" % entFile) # Initialise dam line damFile, entResInfo["dam"], _ = getAndCheckInputFiles(inputDir, "DAM", "Dam", fileExt="shp") if damFile: log.info("Dam file is: %s" % damFile) # Initialise DEM demFile = getDEMPath(avaDir) # check if mu frictionParameter file is available muFile, entResInfo["mu"], _ = getAndCheckInputFiles( inputDir, "RASTERS", "mu parameter data", fileExt="raster", fileSuffix="_mu" ) # check if xi frictionParameter file is available xiFile, entResInfo["xi"], _ = getAndCheckInputFiles( inputDir, "RASTERS", "xi parameter data", fileExt="raster", fileSuffix="_xi" ) # check if k frictionParameter file is available kFile, entResInfo["k"], _ = getAndCheckInputFiles( inputDir, "RASTERS", "k parameter data", fileExt="raster", fileSuffix="_k" ) # check if tauc frictionParameter file is available tauCFile, entResInfo["tauC"], _ = getAndCheckInputFiles( inputDir, "RASTERS", "tauC parameter data", fileExt="raster", fileSuffix="_tauc" ) # check if bhd (tree diameter) parameter file is available - forest density (nd) needs to be in RES folder bhdFile, entResInfo["bhd"], _ = getAndCheckInputFiles( inputDir, "RASTERS", "bhd parameter data", fileExt="raster", fileSuffix="_bhd" ) entResInfo["relRemeshed"] = "No" entResInfo["secondaryRelRemeshed"] = "No" entResInfo["entRemeshed"] = "No" entResInfo["tauCRemeshed"] = "No" entResInfo["kRemeshed"] = "No" entResInfo["muRemeshed"] = "No" entResInfo["xiRemeshed"] = "No" entResInfo["resRemeshed"] = "No" entResInfo["bhdRemeshed"] = "No" timeDepRelFiles = sorted(list(releaseDir.glob("*.csv"))) if len(timeDepRelFiles) > 0: entResInfo["timeDepRelCsvAvailable"] = "Yes" else: entResInfo["timeDepRelCsvAvailable"] = "No" # return DEM, first item of release, entrainment and resistance areas inputSimFiles = { "demFile": demFile, "relFiles": relFiles, "secondaryRelFile": secondaryReleaseFile, "entFile": entFile, "resFile": resFile, "damFile": damFile, "entResInfo": entResInfo, "muFile": muFile, "xiFile": xiFile, "kFile": kFile, "tauCFile": tauCFile, "bhdFile": bhdFile, "timeDepRelCsv": timeDepRelFiles, } for thFile in ["rel", "secondaryRel", "ent"]: if entResInfo["%sThFileType" % thFile] in [".asc", ".tif"]: if thFile == "rel": inputSimFiles["relThFile"] = relFiles else: inputSimFiles["%sThFile" % thFile] = inputSimFiles["%sFile" % thFile] else: inputSimFiles["%sThFile" % thFile] = None return inputSimFiles
[docs]def getAndCheckInputFiles(inputDir, folder, inputType, fileExt="shp", fileSuffix=""): """Fetch fileExt files and check if they exist and if it is not more than one. If fileExt is set to "raster" both .asc and .tif files will be searched. Raises error if there is more than one fileExt file. If fileExt is empty, only suffix will be checked. If a file is found, it is checked to be of an allowed extension; currently .shp .asc and .tif are supported. Parameters ---------- inputDir : pathlib object or str path to avalanche input directory folder : str subfolder name where the shape file should be located (SECREL, ENT or RES) inputType : str type of input (used for the logging messages). fileExt: str, list file extension e.g. shp, asc, tif - optional; default is shp fileSuffix: str file name part before extension Returns ------- OutputFile: str path to file checked available: str Yes or No depending on if there is a file available (if No, OutputFile is None) """ available = "No" supportedFileFormats = [".shp", ".asc", ".tif"] # Define the directory to search and the extensions if fileExt == "": extensions = [""] elif isinstance(fileExt, list): extensions = fileExt elif fileExt.lower() == "raster": extensions = ["asc", "tif"] else: extensions = [fileExt] inDir = pathlib.Path(inputDir, folder) if fileSuffix == "": OutputFile = list([file for ext in extensions for file in inDir.glob(f"*.{ext}")]) else: OutputFile = list([file for ext in extensions for file in inDir.glob(f"*{fileSuffix}.{ext}")]) # check for number of files if len(OutputFile) < 1: OutputFile = None fileTypeFormat = None elif len(OutputFile) > 1: message = "More than one %s .%s file in %s/%s/ not allowed" % ( inputType, fileExt, inputDir, folder, ) log.error(message) raise AssertionError(message) else: available = "Yes" OutputFile = OutputFile[0] fileTypeFormat = OutputFile.suffix if OutputFile.suffix not in supportedFileFormats: message = ( "Unsupported file format found for OutputFile %s; shp, asc, tif are allowed" % OutputFile ) log.error(message) raise AssertionError(message) return OutputFile, available, fileTypeFormat
[docs]def getThicknessInputSimFiles(inputSimFiles): """add thickness of shapefiles to dictionary Parameters ----------- inputSimFiles: dict dictionary with info on release and entrainment file paths Returns -------- inputSimFiles: dict updated dictionary with thickness info read from shapefile attributes now includes one separate dictionary for each release, entrainment or secondary release scenario with a thickness and id value for each feature (given as list) """ # fetch thickness attribute of entrainment area and secondary release for thType in ["ent", "secondaryRel"]: if inputSimFiles[thType + "File"] is not None: if inputSimFiles["entResInfo"][thType + "Th" + "FileType"] == ".shp": thicknessList, idList, ci95List = shpConv.readThickness(inputSimFiles[thType + "File"]) else: thicknessList = None idList = None ci95List = None inputSimFiles[inputSimFiles[thType + "File"].stem] = { "thickness": thicknessList, "id": idList, "ci95": ci95List, } # initialize release scenario list releaseScenarioList = [] # fetch thickness attribute of release areas and add info to input dict for releaseA in inputSimFiles["relFiles"]: # fetch thickness and id info from input data if inputSimFiles["entResInfo"]["relThFileType"] == ".shp": thicknessList, idList, ci95List = shpConv.readThickness(releaseA) else: thicknessList = None idList = None ci95List = None inputSimFiles[releaseA.stem] = { "thickness": thicknessList, "id": idList, "ci95": ci95List, } # append release scenario name to list releaseScenarioList.append(releaseA.stem) # append release scenario names inputSimFiles["releaseScenarioList"] = releaseScenarioList return inputSimFiles
[docs]def updateThicknessCfg(inputSimFiles, cfgInitial): """add available release scenarios to ini file and set thickness values in ini files Parameters ----------- inputSimFiles: dict dictionary with info on release and entrainment file paths cfgInitial: configParser object with the current (and possibly overridden) configuration Returns -------- inputSimFiles: dict updated dictionary with thickness info read from shapefile attributes now includes one separate dictionary for each release, entrainment or secondary release scenario with a thickness and id value for each feature (given as list) cfgInitial: configparser object updated config object with release scenario, thickness info, etc. """ # check if thickness info is required from entrainment and secondary release according to simType simTypeList = cfgInitial["GENERAL"]["simTypeList"].split("|") thTypeList = [] if any(simType in ["ent", "entres", "available"] for simType in simTypeList): thTypeList.append("entFile") if cfgInitial["GENERAL"].getboolean("secRelArea"): thTypeList.append("secondaryRelFile") if cfgInitial["GENERAL"].getboolean("timeDependentRelease"): thTypeList.append("timeDepRelFile") # initialize release scenario list releaseScenarioIni = cfgInitial["INPUT"]["releaseScenario"] if releaseScenarioIni == "": releaseScenarioList = inputSimFiles["releaseScenarioList"] else: for scenario in cfgInitial["INPUT"]["releaseScenario"].split("|"): if scenario not in inputSimFiles["releaseScenarioList"]: message = "Chosen release scenario: %s not available" % scenario log.error(message) raise FileNotFoundError(message) releaseScenarioList = cfgInitial["INPUT"]["releaseScenario"].split("|") # add input data info to cfg object # fetch thickness attribute of release areas and add info to input dict for releaseA in releaseScenarioList: # update configuration with thickness value to be used for simulations cfgInitial = dP.getThicknessValue(cfgInitial, inputSimFiles, releaseA, "relTh") cfgInitial["INPUT"]["relThFile"] = "" if inputSimFiles["entResInfo"]["relThFileType"] != ".shp": cfgInitial["INPUT"]["relThFile"] = str( pathlib.Path("REL", releaseA + inputSimFiles["entResInfo"]["relThFileType"]) ) # add entrainment and secondary release thickness in input data info and in cfg object if inputSimFiles["entFile"] is not None and "entFile" in thTypeList: cfgInitial = dP.getThicknessValue(cfgInitial, inputSimFiles, inputSimFiles["entFile"].stem, "entTh") cfgInitial["INPUT"]["entThFile"] = "" if inputSimFiles["entResInfo"]["entThFileType"] != ".shp": cfgInitial["INPUT"]["entThFile"] = str(pathlib.Path("ENT", inputSimFiles["entFile"].name)) cfgInitial["INPUT"]["entrainmentScenario"] = inputSimFiles["entFile"].stem if inputSimFiles["secondaryRelFile"] is not None and "secondaryRelFile" in thTypeList: cfgInitial = dP.getThicknessValue( cfgInitial, inputSimFiles, inputSimFiles["secondaryRelFile"].stem, "secondaryRelTh", ) cfgInitial["INPUT"]["secondaryRelThFile"] = "" if inputSimFiles["entResInfo"]["secondaryRelThFileType"] != ".shp": cfgInitial["INPUT"]["secondaryRelThFile"] = str( pathlib.Path("SECREL", inputSimFiles["secondaryRelFile"].name) ) cfgInitial["INPUT"]["secondaryReleaseScenario"] = inputSimFiles["secondaryRelFile"].stem # get time dependent release scenario if inputSimFiles["timeDepRelCsv"] is not None and "timeDepRelFile" in thTypeList: timeDepRelFileIni = cfgInitial["GENERAL"]["timeDependentReleaseScenarios"] availableTimeDepRelScenarios = [] for file in inputSimFiles["timeDepRelCsv"]: availableTimeDepRelScenarios.append(file.stem) if timeDepRelFileIni == "": # if no scenario is specified in the ini file, use all available csv files timeDepRelScenarioList = availableTimeDepRelScenarios else: # use specified scenario timeDepRelScenarioList = [] for timeDepScenario in cfgInitial["GENERAL"]["timeDependentReleaseScenarios"].split("|"): timeDepRelScenarioList.append(pathlib.Path(timeDepScenario).stem) timeDepRelScenariosCfg = cfgUtils.convertToCfgList(timeDepRelScenarioList) if timeDepRelFileIni == "": cfgInitial["GENERAL"]["timeDependentReleaseScenarios"] = timeDepRelScenariosCfg # check if a csv file exists for the specified scenario(s) for timeDepIniFileName in cfgInitial["GENERAL"]["timeDependentReleaseScenarios"].split("|"): timeDepIniFileName = pathlib.Path(timeDepIniFileName).stem if timeDepIniFileName not in availableTimeDepRelScenarios: message = "Chosen time dependent release scenario: %s not available" % timeDepIniFileName log.error(message) raise FileNotFoundError(message) else: cfgInitial["GENERAL"]["timeDependentReleaseScenarios"] = timeDepRelScenariosCfg # create cfg string from release scenario list and add to cfg object releaseScenarioName = cfgUtils.convertToCfgList(releaseScenarioList) if cfgInitial["INPUT"]["releaseScenario"] == "": cfgInitial["INPUT"]["releaseScenario"] = releaseScenarioName log.info("Chosen release scenarios: %s" % cfgInitial["INPUT"]["releaseScenario"]) return cfgInitial
[docs]def initializeRelTh(cfg, dOHeader): """check for relThFile and load to dict (if relTh is read from file) Parameters ----------- cfg: configparser used are avalanchDir and relThFile demHeader: header of dem to check for matching rows and numbers Returns -------- relThFieldData: ndarray or str with release thickness field data if no relThFile is available an empty string is returned relThFile: path updated path to relThFile (needed in case of remeshing) """ avaDir = cfg["GENERAL"]["avalancheDir"] if cfg["INPUT"]["relThFile"] != "": relThFile = pathlib.Path(avaDir, "Inputs", cfg["INPUT"]["relThFile"]) relThField = IOf.readRaster(relThFile) relThFieldData = relThField["rasterData"] if ( dOHeader["ncols"] != relThField["header"]["ncols"] or dOHeader["nrows"] != relThField["header"]["nrows"] ): message = ( "Release thickness field read from %s does not match the number of rows and columns of the dem" % relThFile ) log.error(message) raise AssertionError(message) elif np.isnan(relThFieldData).any() == True: message = ( "Release thickness field contains nans - not allowed no release thickness must be set to 0" ) log.error(message) raise AssertionError(message) else: relThFieldData = "" relThFile = "" return relThFieldData, relThFile
[docs]def initializeDEM(avaDir, demPath=""): """check for dem and load to dict Parameters ----------- avaDir: str or pathlib path path to avalanche directory demPath: str or pathlib Path path to dem relative to Inputs - optional if not provided read DEM from Inputs Returns -------- demOri: dict dem dictionary with header and data """ if demPath == "": dem = readDEM(avaDir) else: # build full path and load data to dict demFile = pathlib.Path(avaDir, "Inputs", demPath) dem = IOf.readRaster(demFile, noDataToNan=True) return dem
[docs]def selectReleaseFile(inputSimFiles, releaseScenario): """select release scenario Parameters ----------- inputSimFiles: dict dictionary with info on input data releaseScenario: str name of release scenario Returns ------- inputSimFiles: dict dictionary with info on input data updated with releaseScenario """ # fetch release file path for scenario relFiles = inputSimFiles["relFiles"] for relF in relFiles: if relF.stem == releaseScenario: releaseScenarioPath = relF inputSimFiles["releaseScenario"] = releaseScenarioPath return inputSimFiles
[docs]def fetchReleaseFile(inputSimFiles, releaseScenario, cfgSim, releaseList): """select release scenario, update configuration to only include thickness info of current scenario and return file path Parameters ----------- inputSimFiles: dict dictionary with info on input data releaseScenario: str name of release scenario cfgSim: conigparser object configuration of simulation releaseList: list list of available release scenarios Returns ------- releaseScenarioPath: pathlib path file path to release scenario shp file cfgSim: configparser object updated cfg object, removed thickness info from not other release scenarios than used one and rename thickness values of chosen scenario to relThThickness, relThId, ... """ # fetch release files paths relFiles = inputSimFiles["relFiles"] foundScenario = False for relF in relFiles: if relF.stem == releaseScenario: releaseScenarioPath = relF foundScenario = True if foundScenario is False: message = "Release area scenario %s not found - check input data" % releaseScenario log.error(message) raise FileNotFoundError(message) # update config entry for release scenario, thickness and id cfgSim["INPUT"]["releaseScenario"] = str(releaseScenario) # check if release thickness is read from shapefile or raster file if releaseScenarioPath.suffix in [".asc", ".tif"]: # raster file - set relThFile path cfgSim["INPUT"]["relThFile"] = str( releaseScenarioPath.parts[-2] + "/" + releaseScenarioPath.parts[-1] ) elif ( cfgSim["GENERAL"]["relThFromFile"] == "True" and cfgSim["GENERAL"]["timeDependentRelease"] == "False" ): # shapefile with thickness attributes - handle thickness/id/ci95 values for scenario in releaseList: if scenario == releaseScenario: cfgSim["INPUT"]["relThId"] = cfgSim["INPUT"][scenario + "_" + "relThId"] cfgSim["INPUT"]["relThThickness"] = cfgSim["INPUT"][scenario + "_" + "relThThickness"] cfgSim["INPUT"]["relThCi95"] = cfgSim["INPUT"][scenario + "_" + "relThCi95"] # remove thickness, id and ci95 values specified by releaseScenario cfgSim["INPUT"].pop(scenario + "_" + "relThId") cfgSim["INPUT"].pop(scenario + "_" + "relThThickness") cfgSim["INPUT"].pop(scenario + "_" + "relThCi95") relThFileList = [relF for relF in inputSimFiles["relFiles"] if relF.stem == releaseScenario] if len(relThFileList) == 0: relThFile = None elif len(relThFileList) == 1: relThFile = relThFileList[0] else: message = ( "multiple files found for release scenario name %s in relThFiles list - check input data" % releaseScenario ) log.error(message) raise AssertionError(message) return releaseScenarioPath, cfgSim, relThFile
[docs]def createReleaseStats(avaDir, cfg): """create a csv file with info on release shp file on: max, mean and min elevation, slope and projected and real area Parameters ------------- Returns -------- fPath: pathlib Path file path """ # fetch input data (dem and release area shp file) inputData = getInputDataCom1DFA(avaDir) # get line from release area polygon relFiles = inputData["relFiles"] # initialize dem and get real area of dem dem = IOf.readRaster(inputData["demFile"], noDataToNan=True) dem["originalHeader"] = dem["header"].copy() methodMeshNormal = cfg.getfloat("GENERAL", "methodMeshNormal") # get normal vector of the grid mesh dem = geoTrans.getNormalMesh(dem, methodMeshNormal) dem = DFAtls.getAreaMesh(dem, methodMeshNormal) # loop over all relFiles and compute information saved to dictionary of dataframes relPath = pathlib.Path(avaDir, "Outputs", "com1DFA", "releaseInfoFiles") fU.makeADir(relPath) relDFDict = {} # loop over all relFiles for relFile in relFiles: # for relF in relFiles: releaseLine = shpConv.readLine(relFile, "release1", dem) releaseLine["file"] = relFile releaseLine["type"] = "Release" releaseLine["thicknessSource"] = ["artificial"] * len(releaseLine["id"]) releaseLine["thickness"] = ["1."] * len(releaseLine["id"]) releaseLine["initializedFrom"] = "shapefile" # convert release line to a raster with 1 set inside of the release line # and compute projected and actual areas areaActualList, areaProjectedList, releaseLine = computeAreasFromRasterAndLine(releaseLine, dem) # compute max, min elevation, mean slope and save all to dict relInfo = computeRelStats(releaseLine, dem) # create dict and dataFrame relD = { "release feature": relInfo["featureNames"], "MaxZ [m]": relInfo["zMax"], "MinZ [m]": relInfo["zMin"], "slope [deg]": relInfo["meanSlope"], "projected area [ha]": [areaP / 10000.0 for areaP in areaProjectedList], "actual area [ha]": [area / 10000.0 for area in areaActualList], } relDF = pd.DataFrame(data=relD, index=np.arange(len(relInfo["featureNames"]))) relInfo = relPath / ("%s.csv" % relFile.stem) relDF.to_csv(relInfo, index=False, sep=";", float_format="%.4f") log.info("Written relInfo file for %s to %s" % (relFile.stem, str(relPath))) relDFDict[relFile.stem] = relDF return relDFDict
[docs]def computeAreasFromRasterAndLine(line, dem): """compute the area covered by 1) a polygon by creating a raster from polygon or 2) where in a raster values > 0 projected area and actual area using a dem info Parameters ----------- line: dict dictionary with info on polygon or area from raster dem: dict dictionary with dem data, header and areaRaster Returns -------- areaActual: float actual area taking slope into account by using dem area areaProjected: float projected area in xy plane """ csz = dem["header"]["cellsize"] # create dict for raster data for each feature areaProjectedList = [] areaActualList = [] if line["initializedFrom"] == "shapefile": line = geoTrans.prepareArea(line, dem, 0.01, combine=False, checkOverlap=False) rasterList = line["rasterData"] else: rasterList = [line["rasterData"]] for index, lineRaster in enumerate(rasterList): lineRasterOnes = np.where(lineRaster > 0, 1.0, 0.0) areaActualList.append(np.nansum(lineRasterOnes * dem["areaRaster"])) areaProjectedList.append(np.sum(csz * csz * lineRasterOnes)) return areaActualList, areaProjectedList, line
[docs]def computeRelStats(line, dem): """compute stats of a polygon and a dem actual area (taking slope into account), projected area, max, mean, min elevation, mean slope Parameters ----------- line: dict dictionary with info on line (x, y, Start, Length, rasterData, ...) dem: dict dictionary with info on dem (header, rasterData, normals, areaRaster) Returns -------- lineDict: dict dictionary with stats info: featureNames, zMax, zMin, Slope, Area, AreaP """ # compute projected areas using shapely projectedAreas = computeAreasFromLines(line) # create dict for raster data for each feature lineDict = {} for index, relRaster in enumerate(line["rasterData"]): zArray = np.where(relRaster > 0, dem["rasterData"], np.nan) # compute slope of release area _, _, NzNormed = DFAtls.normalize(dem["Nx"], dem["Ny"], dem["Nz"]) nzArray = np.where(relRaster > 0, NzNormed, np.nan) slopeArray = np.rad2deg(np.arccos(nzArray)) lineDict.setdefault("meanSlope", []).append(np.nanmean(slopeArray)) # compute elevation stats lineDict.setdefault("zMax", []).append(np.nanmax(zArray)) lineDict.setdefault("zMean", []).append(np.nanmean(zArray)) lineDict.setdefault("zMin", []).append(np.nanmin(zArray)) # create a dataframe lineDict.setdefault("featureNames", []).append(line["Name"][index]) # print info log.debug("++++ Release feature %s ++++++++++" % line["Name"][index]) log.debug("maximum elevation: %.2f" % np.nanmax(zArray)) log.debug("mean elevation: %.2f" % np.nanmean(zArray)) log.debug("minimum elevation: %.2f" % np.nanmin(zArray)) log.debug("mean slope: %.2f" % np.nanmean(slopeArray)) return lineDict
[docs]def computeAreasFromLines(line): """compute the area of a polygon in xy using shapely Parameters ----------- line: dict dictionary with info on line x, y coordinates start, end of each line feature Returns -------- projectedAreas: list list of projected area for each polygon in line dict """ projectedAreas = [] name = line["Name"] start = line["Start"] Length = line["Length"] # fetch individual polygons for i in range(len(name)): end = start[i] + Length[i] x = line["x"][int(start[i]) : int(end)] y = line["y"][int(start[i]) : int(end)] # create shapely polygon for m in range(len(x)): avaPoly = shp.Polygon(list(zip(x, y))) projectedAreas.append(shp.area(avaPoly)) return projectedAreas
[docs]def getInputPaths(avaDir): """Fetch paths to dem and first release area shp file found Parameters ---------- avaDir : str or pathlib object path to avalanche directory Returns ------- demFile : pathlib path full path to DEM .asc file relFiles : list list of full paths to release area scenario .shp files found in avaDir/Inputs/REL relFieldFiles : list list of full paths to release area thickness .asc files found in avaDir/Inputs/RELTH """ # Set directories for inputs, outputs and current work inputDir = pathlib.Path(avaDir, "Inputs") # fetch release area shp files releaseShpDir = inputDir / "REL" relFiles = sorted(list(releaseShpDir.glob("*.shp"))) log.info("Release area files are: %s" % [str(relFilestr) for relFilestr in relFiles]) # fetch release thickness fields releaseFieldDir = inputDir / "RELTH" relFieldFiles = sorted(list(releaseFieldDir.glob("*.asc")) + list(releaseFieldDir.glob("*.tif"))) if len(relFieldFiles) > 0: log.info("Release area files are: %s" % [str(relFFilestr) for relFFilestr in relFieldFiles]) else: relFieldFiles = None # Initialise DEM demFile = getDEMPath(avaDir) return demFile, relFiles, relFieldFiles
[docs]def checkForMultiplePartsShpArea(avaDir, lineDict, modName, type=""): """check if in polygon read from shape file holes are present, if so error and save a plot to Outputs/com1DFA procedure: check if polygon has several parts Parameters ----------- avaDir: str path to avalanche directory lineDict: dict dictionary with info read from shape file used: x, y, Start, Length, nParts, nFeatures modName: str name of computational module where to save error plots to Outputs subfolder type: str type of shp file area (release, secondary release, entrainment, resistance) Returns -------- error if number of parts is bigger 1 save plot showing all parts of each polygon feature in Outputs/modName """ foundMultipleParts = False # loop over all polygons in scenario for lineFeature in range(lineDict["nFeatures"]): if len(lineDict["nParts"][lineFeature]) > 2: # fetch number of parts for each lineFeature nParts = lineDict["nParts"][lineFeature] # create x, y coordinates of each lineFeature xFeat = lineDict["x"][ int(lineDict["Start"][lineFeature]) : int( lineDict["Start"][lineFeature] + lineDict["Length"][lineFeature] ) ] yFeat = lineDict["y"][ int(lineDict["Start"][lineFeature]) : int( lineDict["Start"][lineFeature] + lineDict["Length"][lineFeature] ) ] # check for type of area if type.lower() in ["entrainment", "resistance", "secondary release"]: lineFileName = lineDict["fileName"] else: lineFileName = lineDict["file"] # create plot of parts of feature for analysis title = "Parts of lineFeature %d of %s" % (lineFeature, lineFileName.stem) outDir = pathlib.Path(avaDir, "Outputs", modName) pathDict = { "pathResult": outDir, "title": title, "outFileName": ("%s feature%d_errorPlot" % (type, lineFeature)), } in1DataPlots.plotAreaShpError(xFeat, yFeat, nParts, pathDict) # set flag foundMultipleParts = True # if polygon has multiple parts - error if foundMultipleParts: message = "One or more %s features in %s have holes - check error plots in %s" % ( type, lineFileName.name, outDir, ) log.error(message) raise AssertionError(message)
[docs]def deriveLineRaster( cfg, lineDict, dem, outDir, inputsDir, rasterType, rasterFileType="", saveZeroRaster=False, ): """derive raster and return path to raster file if derived from polygon, create raster with thickness read from lineDict and save to file fileName based on derivedFrom_shapeFileName - same format as DEM Parameters: ------------ cfg: configparser object configuration settings of current simulation lineDict: dict dictionary with info on area: thickness and polygon coordinates if initialized from shapefile if read from raster fetch file Path only dem: dict dictionary with info on dem file including header and rasterData outDir: pathlib path path to directory where to save raster file inputsDir: pathlib path path to avalancheDir/Inputs where original input data and remeshed rasters are stored rasterType: str name of type of data, available options: rel, ent, tauC, secondaryRel rasterFileType: str type of raster file to save: .asc, .tif saveZeroRaster: bool whether to save raster with zero values as dummyRasterType to outDir with same header as dem Returns --------- rasterPath: pathlib Path path to raster file """ thresholdPointInPoly = cfg["GENERAL"].getfloat("thresholdPointInPoly") if rasterType not in [ "rel", "ent", "tauC", "secondaryRel", "releaseLayer2", "bedDepo", ]: message = "%s is not in list of available options: rel, ent, tauC, secondaryRel" % rasterType log.error(message) raise AssertionError(message) rasterNameStr = { "ent": "bedDepth", "rel": "release", "tauC": "bedShear", "secondaryRel": "secondary release", "releaseLayer2": "releaseLayer2", "bedDepo": "bedDepo", } if rasterType in ["rel", "ent", "secondaryRel"]: fileInd = "Th" else: fileInd = "" if rasterType == "rel": fileKey = "file" else: fileKey = "fileName" if saveZeroRaster: zeroRaster = np.full_like(dem["rasterData"], 0) zeroRasterNoSuffix = outDir / ("dummy" + rasterType.upper()[0] + rasterType[1:]) rasterPath = pathlib.Path(str(zeroRasterNoSuffix) + rasterFileType) IOf.writeResultToRaster(dem["header"], zeroRaster, zeroRasterNoSuffix) log.info( "Zero raster file written for %s saved to %s" % (rasterNameStr[rasterType], rasterPath.name) ) elif lineDict["initializedFrom"] == "shapefile": # check if release features overlap between features geoTrans.prepareArea(lineDict, dem, thresholdPointInPoly, combine=True, checkOverlap=True) # polygon from shapefile - set thickness according to shapefile or ini file lineDict = geoTrans.prepareArea( lineDict, dem, np.sqrt(2), thList=lineDict["thickness"], combine=True, checkOverlap=False, ) lineArea = lineDict["rasterData"] lineNameNoSuffix = outDir / ("derivedFrom_" + lineDict[fileKey].stem) rasterPath = pathlib.Path(str(lineNameNoSuffix) + rasterFileType) IOf.writeResultToRaster(dem["header"], lineArea, lineNameNoSuffix, flip=True) log.info("%s derived from shapefile %s " % (rasterNameStr[rasterType], lineDict[fileKey].name)) else: rasterPath = inputsDir / cfg["INPUT"]["%s%sFile" % (rasterType, fileInd)] if not rasterPath.is_file(): message = "%s file not found" % rasterPath log.error(message) raise FileNotFoundError(message) log.info("%s read from %s" % (rasterNameStr[rasterType], str(rasterPath))) return rasterPath, lineDict
[docs]def getTimeDepRelCsv(timeDepRelCsv): """ get time dependent release values from the csv table TODO: now the first column is defined as timestep, the second as thickness, third as velocity see DebrisFrame Issue #18 Parameters ----------- timeDepRelCsv: str path to csv file containing time dependent release values Returns ----------- timeDepRelValues: dict contains time dependent release values: timestep, thickness, velocity timeDepRelDF: dataframe contains time dependent release values: timestep, thickness, velocity """ if timeDepRelCsv is None: message = "No csv file containing time dependent release values is provided" log.error(message) raise FileNotFoundError(message) timeDepRelDF = pd.read_csv(timeDepRelCsv, index_col=False) timeDepRelDF = timeDepRelDF.fillna(0.0) # delete empty spaces and write column names in low case timeDepRelDF.columns = timeDepRelDF.columns.str.strip().str.lower() # sort the columns according to the first column (timestep) timeDepRelDF = timeDepRelDF.sort_values(by="timestep") timeDepRelValues = { "timeStep": timeDepRelDF["timestep"].to_numpy(dtype=np.float64), "thickness": timeDepRelDF["thickness"].to_numpy(dtype=np.float64), "velocity": timeDepRelDF["velocity"].to_numpy(dtype=np.float64), } # check if some criterias are satisfied in the csv file checkTimeDepRelease(timeDepRelValues, timeDepRelCsv) return timeDepRelValues, timeDepRelDF
[docs]def checkTimeDepRelease(timeDepRelValues, timeDepRelCsv): """ check if time dependent release values satisfy the following requirements: - release - timesteps are unique - the release - timesteps are not too close (that the particle density becomes too high) - provided release - thickness is larger than zero - provided velocity is zero or larger. Parameters ----------- timeDepRelCsv: str directory to csv table containing time dependent release values timeDepRelValues: dict contains time dependent release values: timestep, thickness, velocity """ # check if timesteps are unique timeStepUnique = np.unique(timeDepRelValues["timeStep"]) if timeStepUnique.ndim == 0: if timeStepUnique != timeDepRelValues["timeStep"]: message = "The provided time dependent release time steps in %s are not unique" % (timeDepRelCsv) log.error(message) raise ValueError(message) elif len(timeStepUnique) != len(timeDepRelValues["timeStep"]): message = "The provided time dependent release timesteps in %s are not unique" % (timeDepRelCsv) log.error(message) raise ValueError(message) # check if a timestep = 0 is provided if 0 not in timeStepUnique: message = ( "If release is time dependent, a thickness needs to be provided for time step 0 s in %s" % (timeDepRelCsv) ) log.error(message) raise ValueError(message) # check that release thickness > 0 for th in timeDepRelValues["thickness"]: if th <= 0: message = "For every release time step a thickness > 0 needs to be provided in %s" % ( timeDepRelCsv ) log.error(message) raise ValueError(message) for vel in timeDepRelValues["velocity"]: if vel < 0: message = "The initial velocity provided in %s can not be negative." % (timeDepRelCsv) log.error(message) raise ValueError(message)