Source code for com1DFA.com1DFA

"""
    Main functions for python DFA kernel
"""

import logging
import time
import pathlib
import numpy as np
import pandas as pd
import math
import copy
import pickle
from datetime import datetime
import matplotlib.path as mpltPath
from itertools import product

# Local imports
import avaframe.in2Trans.shpConversion as shpConv
import avaframe.in3Utils.geoTrans as geoTrans
import avaframe.com1DFA.timeDiscretizations as tD
import avaframe.out3Plot.outCom1DFA as outCom1DFA
import avaframe.com1DFA.DFAtools as DFAtls
import avaframe.com1DFA.com1DFATools as com1DFATools
import avaframe.com1DFA.particleTools as particleTools
import avaframe.com1DFA.DFAfunctionsCython as DFAfunC
import avaframe.in2Trans.ascUtils as IOf
import avaframe.in3Utils.fileHandlerUtils as fU
from avaframe.in3Utils import cfgUtils
import avaframe.out3Plot.outDebugPlots as debPlot
import avaframe.in3Utils.initialiseDirs as inDirs
import avaframe.com1DFA.deriveParameterSet as dP
import avaframe.com1DFA.com1DFA as com1DFA
from avaframe.in1Data import getInput as gI
from avaframe.out1Peak import outPlotAllPeak as oP
from avaframe.log2Report import generateReport as gR
from avaframe.com1DFA import particleInitialisation as pI
from avaframe.ana5Utils import distanceTimeAnalysis as dtAna
import avaframe.out3Plot.outDistanceTimeAnalysis as dtAnaPlots

#######################################
# Set flags here
#######################################
# create local logger
log = logging.getLogger(__name__)
cfgAVA = cfgUtils.getGeneralConfig()
debugPlot = cfgAVA['FLAGS'].getboolean('debugPlot')


[docs]def setRelThIni(avaDir, modName, cfgInitial, cfgFile=''): """ Add thickness values in configuration file according to thickness flags, ini settings or shapefile attributes and create one cfgFile for each releaseScenario Parameters ----------- avaDir: str or pathlib path path to avalanche directory modName: module computational module cfgInitial: configparser object full configuration settings of com Module cfgFile: str or pathlib path path to cfgFile to read overall configuration - optional if not provided the local or default config is used Returns -------- inputSimFilesAll: dict dictionary with infos about input data file paths and flags for entrainment, resistance cfgFilesRels: list list of paths to one cfgFile for each releaseScenario with updated thickness values """ # check if thickness settings in ini file are valid for thType in ['entTh', 'relTh', 'secondaryRelTh']: thicknessSettingsCorrect = dP.checkThicknessSettings(cfgInitial, thType) # fetch input data - dem, release-, entrainment- and resistance areas (and secondary release areas) inputSimFilesAll = gI.getInputDataCom1DFA(avaDir, cfgInitial) # get thickness of release and entrainment areas (and secondary release areas) -if thFromShp = True inputSimFilesAll, cfgFilesRels = gI.getThickness(inputSimFilesAll, avaDir, modName, cfgFile, cfgInitial) return inputSimFilesAll, cfgFilesRels
[docs]def com1DFAMain(avalancheDir, cfgMain, cfgFile=''): """ preprocess information from ini and run all desired simulations, create outputs and reports Parameters ------------ avalancheDir: str or pathlib Path path to avalanche data cfgMain: configparser object main configuration of AvaFrame cfgFile: str or pathlib Path path to configuration file if overwrite is desired - optional Returns -------- dem: dict dictionary with dem header and raster data (that has been used for computations) plotDict: dict information on result plot paths reportDictList: list list of report dictionaries for all performed simulations simDF: pandas dataFrame configuration dataFrame of the simulations computed (if no simulation computed, configuration dataFrame of the already existing ones) """ modName = 'com1DFA' # read initial configuration cfgStart = cfgUtils.getModuleConfig(com1DFA, fileOverride=cfgFile, toPrint=False) # Create output and work directories workDir, outDir = inDirs.initialiseRunDirs(avalancheDir, modName, cfgStart['GENERAL'].getboolean('cleanDEMremeshed')) # create one cfg files for each releaseScenarios and fetch input data inputSimFilesAll, cfgFilesRels = setRelThIni(avalancheDir, com1DFA, cfgStart, cfgFile=cfgFile) # initialise reportDictList and flag indicating whether simulations have been performed reportDictList = [] simsPerformed = False # loop over cfg files of all release scenarios for cfgFileRel in cfgFilesRels: log.debug('Full cfg file %s' % cfgFileRel) # copy inputSimFilesAll - as this becomes changed according to current release scenario inputSimFiles = inputSimFilesAll.copy() # reset variationDict variationDict = '' # get information on simulations that shall be performed according to parameter variation modCfg, variationDict = dP.getParameterVariationInfo(avalancheDir, com1DFA, cfgFileRel) # select release input data according to chosen release scenario inputSimFiles = gI.selectReleaseScenario(inputSimFiles, modCfg['INPUT']) # TODO: once it is confirmed that inputSimFiles is not changed within sim # keep for now for testing inputSimFilesTest = inputSimFiles.copy() # first remove demFile entry as this is removed once the simulation DEMs are set inputSimFilesTest.pop('demFile') # create a list of simulations and generate an individual configuration object for each simulation # if need to reproduce exactly the hash - need to be strings with exactely the same number of digits!! # first get info on already existing simulations in Outputs simDFOld, simNameOld = cfgUtils.readAllConfigurationInfo(avalancheDir, specDir='') # prepare simulations to run (only the new ones) simDict = prepareVarSimDict(modCfg, inputSimFiles, variationDict, simNameOld=simNameOld) # is there any simulation to run? if bool(simDict): # reset simDF and timing simDF = '' tCPUDF = '' # loop over all simulations for cuSim in simDict: # load configuration dictionary for cuSim cfg = simDict[cuSim]['cfgSim'] # save configuration settings for each simulation simHash = simDict[cuSim]['simHash'] cfgUtils.writeCfgFile(avalancheDir, com1DFA, cfg, fileName=cuSim) # append configuration to dataframe simDF = cfgUtils.appendCgf2DF(simHash, cuSim, cfg, simDF) # log simulation name log.info('Run simulation: %s' % cuSim) # ++++++++++PERFORM com1DFA SIMULAITON++++++++++++++++ dem, reportDict, cfgFinal, tCPU, inputSimFilesNEW, particlesList, fieldsList, tSave = com1DFA.com1DFACore(cfg, avalancheDir, cuSim, inputSimFiles, outDir, simHash=simHash) simDF.at[simHash, 'nPart'] = str(int(particlesList[0]['nPart'])) # TODO check if inputSimFiles not changed within sim if inputSimFilesNEW != inputSimFilesTest: log.error('InputFilesDict has changed') tCPUDF = cfgUtils.appendTcpu2DF(simHash, tCPU, tCPUDF) # +++++++++EXPORT RESULTS AND PLOTS++++++++++++++++++++++++ # add report dict to list for report generation reportDictList.append(reportDict) # create hash to check if configuration didn't change simHashFinal = cfgUtils.cfgHash(cfgFinal) if simHashFinal != simHash: cfgUtils.writeCfgFile(avalancheDir, com1DFA, cfg, fileName='%s_butModified' % simHash) message = 'Simulation configuration has been changed since start' log.error(message) raise AssertionError(message) # prepare for writing configuration info simDF = cfgUtils.convertDF2numerics(simDF) # add cpu time info to the dataframe simDF = simDF.join(tCPUDF) # append new simulations configuration to old ones (if they exist), return total dataFrame and write it to csv simDFNew = pd.concat([simDF, simDFOld], axis=0) cfgUtils.writeAllConfigurationInfo(avalancheDir, simDFNew, specDir='') # write full configuration (.ini file) to file date = datetime.today() fileName = 'sourceConfiguration_' + cfg['INPUT']['releaseScenario'] + '_' + '{:%d_%m_%Y_%H_%M_%S}'.format(date) cfgUtils.writeCfgFile(avalancheDir, com1DFA, modCfg, fileName=fileName) simsPerformed = True else: log.warning('There is no simulation to be performed for releaseScenario') if simsPerformed: # Set directory for report reportDir = pathlib.Path(avalancheDir, 'Outputs', modName, 'reports') # Generate plots for all peakFiles plotDict = oP.plotAllPeakFields(avalancheDir, cfgMain['FLAGS'], modName, demData=dem) # write report gR.writeReport(reportDir, reportDictList, cfgMain['FLAGS'], plotDict) return dem, plotDict, reportDictList, simDFNew else: return 0, {}, [], ''
[docs]def com1DFACore(cfg, avaDir, cuSimName, inputSimFiles, outDir, simHash=''): """ Run main com1DFA model This will compute a dense flow avalanche Parameters ---------- cfg : dict configuration read from ini file cuSimName: str name of simulation inputSimFiles: dict dictionary with input files avaDir : str or pathlib object path to avalanche directory outDir: str or pathlib object path to Outputs simHash: str unique sim ID Returns ------- reportDictList : list list of dictionaries that contain information on simulations that can be used for report generation """ # Setup configuration cfgGen = cfg['GENERAL'] # create required input from files demOri, inputSimLines = prepareInputData(inputSimFiles, cfg) if cfgGen.getboolean('iniStep'): # append buffered release Area inputSimLines = pI.createReleaseBuffer(cfg, inputSimLines) # find out which simulations to perform relName, inputSimLines, badName = prepareReleaseEntrainment(cfg, inputSimFiles['releaseScenario'], inputSimLines) log.info('Perform %s simulation' % cuSimName) # +++++++++PERFORM SIMULAITON++++++++++++++++++++++ # for timing the sims startTime = time.time() particles, fields, dem, reportAreaInfo = initializeSimulation(cfg, demOri, inputSimLines, cuSimName) # ------------------------ # Start time step computation Tsave, particlesList, fieldsList, infoDict = DFAIterate(cfg, particles, fields, dem, simHash=simHash) # write mass balance to File writeMBFile(infoDict, avaDir, cuSimName) tCPUDFA = '%.2f' % (time.time() - startTime) log.info(('cpu time DFA = %s s' % (tCPUDFA))) cfgTrackPart = cfg['TRACKPARTICLES'] # track particles if cfgTrackPart.getboolean('trackParticles'): particlesList, trackedPartProp, track = trackParticles(cfgTrackPart, demOri, particlesList) if track: outDirData = outDir / 'particles' fU.makeADir(outDirData) outCom1DFA.plotTrackParticle(outDirData, particlesList, trackedPartProp, cfg, demOri) # export particles dictionaries of saving time steps # (if particles is not in resType, only first and last time step are saved) outDirData = outDir / 'particles' fU.makeADir(outDirData) savePartToPickle(particlesList, outDirData, cuSimName) # export particles properties for visulation if cfg['VISUALISATION'].getboolean('writePartToCSV'): particleTools.savePartToCsv(cfg['VISUALISATION']['particleProperties'], particlesList, outDir) # Result parameters to be exported exportFields(cfg, Tsave, fieldsList, demOri, outDir, cuSimName) # write report dictionary reportDict = createReportDict(avaDir, cuSimName, relName, inputSimLines, cfgGen, reportAreaInfo) # add time and mass info to report reportDict = reportAddTimeMassInfo(reportDict, tCPUDFA, infoDict) return dem, reportDict, cfg, infoDict['tCPU'], inputSimFiles, particlesList, fieldsList, Tsave
[docs]def prepareReleaseEntrainment(cfg, rel, inputSimLines): """ get Simulation to run for a given release Parameters ---------- cfg : dict configuration parameters - keys: relTh, secRelArea, secondaryRelTh rel : str path to release file inputSimLines: dict dictionary with dictionaries with input data infos (releaseLine, entLine, ...) Returns ------- relName : str release name relDict : list release dictionary badName : boolean changed release name """ # load info entResInfo = inputSimLines['entResInfo'] # Set release areas and release thickness relName = rel.stem simName = relName badName = False if '_' in relName: badName = True log.warning('Release area scenario file name includes an underscore \ the suffix _AF will be added for the simulation name') # set release thickness if cfg['GENERAL'].getboolean('relThFromFile') is False: releaseLine = setThickness(cfg, inputSimLines['releaseLine'], 'relTh') inputSimLines['releaseLine'] = releaseLine log.debug('Release area scenario: %s - perform simulations' % (relName)) if cfg['GENERAL'].getboolean('iniStep'): # set release thickness for buffer releaseLineBuffer = setThickness(cfg, inputSimLines['releaseLineBuffer'], 'relTh') inputSimLines['releaseLineBuffer'] = releaseLineBuffer if cfg.getboolean('GENERAL', 'secRelArea'): secondaryReleaseLine = setThickness(cfg, inputSimLines['secondaryReleaseLine'], 'secondaryRelTh') else: inputSimLines['entResInfo']['flagSecondaryRelease'] = 'No' secondaryReleaseLine = None inputSimLines['secondaryReleaseLine'] = secondaryReleaseLine if cfg['GENERAL']['simTypeActual'] in ['ent', 'entres']: # set entrainment thickness entLine = setThickness(cfg, inputSimLines['entLine'], 'entTh') inputSimLines['entLine'] = entLine return relName, inputSimLines, badName
[docs]def setThickness(cfg, lineTh, typeTh): """ set thickness in line dictionary of release, entrainment, second. release Parameters ----------- cfg: config parser configuration settings lineTh: dict dictionary with info on line (e.g. release area line) typeTh: str type of thickness to be set (e.g. relTh for release thickness) Returns -------- lineTh: dict updated dictionary with new key: thickness and thicknessSource """ # create thickness flag name thFlag = typeTh + 'FromShp' thVariation = typeTh + 'PercentVariation' # set thickness source info if cfg['GENERAL'].getboolean(thFlag): lineTh['thicknessSource'] = ['shp file'] * len(lineTh['thickness']) else: lineTh['thicknessSource'] = ['ini file'] * len(lineTh['thickness']) # set thickness value info read from ini file that has been updated from shp or ini previously for count, id in enumerate(lineTh['id']): sectionTh = 'GENERAL' if cfg['GENERAL'].getboolean(thFlag): thName = typeTh + id lineTh['thickness'][count] = cfg['GENERAL'].getfloat(thName) else: thName = typeTh lineTh['thickness'][count] = cfg['GENERAL'].getfloat(thName) return lineTh
[docs]def prepareInputData(inputSimFiles, cfg): """ Fetch input data Parameters ---------- inputSimFiles : dict relFile : str path to release area file demFile : str path to dem file in Inputs/ secondaryReleaseFile : str path to secondaryRelease file entFiles : str path to entrainment file resFile : str path to resistance file entResInfo : flag dict flag if Yes entrainment and/or resistance areas found and used for simulation flag True if a Secondary Release file found and activated cfg: configparser object configuration for simType and secondary rel Returns ------- demOri : dict dictionary with dem info (header original origin), raster data correct mesh cell size this dem has been remeshed/read from remeshed if chosen cell size is not equal to cell size of DEM in Inputs/ inputSimLines : dict releaseLine : dict release line dictionary secondaryReleaseLine : dict secondaryRelease line dictionary entLine : dict entrainment line dictionary resLine : dict resistance line dictionary entrainmentArea : str entrainment file name resistanceArea : str resistance file name entResInfo : flag dict flag if Yes entrainment and/or resistance areas found and used for simulation flag True if a Secondary Release file found and activated """ # load data entResInfo = inputSimFiles['entResInfo'].copy() relFile = inputSimFiles['releaseScenario'] relThFile = inputSimFiles['relThFile'] # get dem dictionary - already read DEM with correct mesh cell size demOri = gI.initializeDEM(cfg['GENERAL']['avalancheDir'], cfg['INPUT']['DEM']) # read data from relThFile if relThFile != '': relThField = IOf.readRaster(relThFile) relThFieldData = relThField['rasterData'] if demOri['header']['ncols'] != relThField['header']['ncols'] or demOri['header']['ncols'] != relThField['header']['ncols']: message = ('Release thickness field read from %s does not match the number of rows and columns of the dem' % inputSimFiles['relThFile']) log.error(message) raise AssertionError(message) else: relThFieldData = '' # get line from release area polygon releaseLine = shpConv.readLine(relFile, 'release1', demOri) releaseLine['file'] = relFile releaseLine['type'] = 'Release' # get line from secondary release area polygon if cfg['GENERAL'].getboolean('secRelArea'): if entResInfo['flagSecondaryRelease'] == 'Yes': secondaryReleaseFile = inputSimFiles['secondaryReleaseFile'] secondaryReleaseLine = shpConv.readLine(secondaryReleaseFile, '', demOri) secondaryReleaseLine['fileName'] = [secondaryReleaseFile] secondaryReleaseLine['type'] = 'Secondary release' else: message = 'No secondary release file found' log.error(message) raise FileNotFoundError(message) else: secondaryReleaseLine = None # set False entResInfo['flagSecondaryRelease'] = 'No' # get line from entrainement area polygon if cfg['GENERAL']['simTypeActual'] in ['ent', 'entres']: entFile = inputSimFiles['entFile'] entLine = shpConv.readLine(entFile, '', demOri) entrainmentArea = entFile.name entLine['fileName'] = entFile entLine['type'] = 'Entrainment' else: entLine = None entrainmentArea = '' # get line from resistance area polygon if cfg['GENERAL']['simTypeActual'] in ['entres', 'res']: resFile = inputSimFiles['resFile'] resLine = shpConv.readLine(resFile, '', demOri) resistanceArea = resFile.name resLine['fileName'] = resFile resLine['type'] = 'Resistance' else: resLine = None resistanceArea = '' inputSimLines = {'releaseLine': releaseLine, 'secondaryReleaseLine': secondaryReleaseLine, 'entLine': entLine, 'resLine': resLine, 'entrainmentArea': entrainmentArea, 'resistanceArea': resistanceArea, 'entResInfo': entResInfo, 'relThField': relThFieldData} return demOri, inputSimLines
[docs]def createReportDict(avaDir, logName, relName, inputSimLines, cfgGen, reportAreaInfo): """ create simulaton report dictionary Parameters ---------- logName : str simulation scenario name relName : str release name relDict : dict release dictionary cfgGen : configparser general configuration file entrainmentArea : str entrainment file name resistanceArea : str resistance file name Returns ------- reportST : dict simulation scenario dictionary """ # load parameters dateTimeInfo = datetime.now().strftime("%d/%m/%Y %H:%M:%S") entInfo = reportAreaInfo['entrainment'] resInfo = reportAreaInfo['resistance'] entrainmentArea = inputSimLines['entrainmentArea'] resistanceArea = inputSimLines['resistanceArea'] relDict = inputSimLines['releaseLine'] # Create dictionary reportST = {} reportST = {} reportST = {'headerLine': {'type': 'title', 'title': 'com1DFA Simulation'}, 'avaName': {'type': 'avaName', 'name': str(avaDir)}, 'simName': {'type': 'simName', 'name': logName}, 'time': {'type': 'time', 'time': dateTimeInfo}, 'Simulation Parameters': { 'type': 'list', 'Program version': 'development', 'Parameter set': '', 'Release Area Scenario': relName, 'Entrainment': entInfo, 'Resistance': resInfo, 'Parameter variation on': '', 'Parameter value': '', 'Mu': cfgGen['mu'], 'Density [kgm-3]': cfgGen['rho'], 'Friction model': cfgGen['frictModel']}, 'Release Area': {'type': 'columns', 'Release area scenario': relName, 'Release Area': relDict['Name'], 'Release thickness [m]': relDict['thickness']}} if entInfo == 'Yes': entDict = inputSimLines['entLine'] reportST.update({'Entrainment area': {'type': 'columns', 'Entrainment area scenario': entrainmentArea, 'Entrainment thickness [m]': entDict['thickness'], 'Entrainment density [kgm-3]': cfgGen['rhoEnt']}}) if resInfo == 'Yes': reportST.update({'Resistance area': {'type': 'columns', 'Resistance area scenario': resistanceArea}}) reportST['Release Area'].update(reportAreaInfo['Release area info']) return reportST
[docs]def reportAddTimeMassInfo(reportDict, tCPUDFA, infoDict): """ Add time and mass info to report """ # add mass info reportDict['Simulation Parameters'].update({'Initial mass [kg]': ('%.2f' % infoDict['initial mass'])}) reportDict['Simulation Parameters'].update({'Final mass [kg]': ('%.2f' % infoDict['final mass'])}) reportDict['Simulation Parameters'].update({'Entrained mass [kg]': ('%.2f' % infoDict['entrained mass'])}) reportDict['Simulation Parameters'].update({'Entrained volume [m3]': ('%.2f' % infoDict['entrained volume'])}) # add stop info reportDict['Simulation Parameters'].update(infoDict['stopInfo']) # add computation time to report dict reportDict['Simulation Parameters'].update({'Computation time [s]': tCPUDFA}) return reportDict
[docs]def initializeMesh(cfg, demOri, num): """ Create rectangular mesh Reads the DEM information, computes the normal vector field and boundries to the DEM. Also generates the grid for the neighbour search Parameters ---------- demOri : dict dictionary with initial dem information num : int chose between 4, 6 or 8 (using then 4, 6 or 8 triangles) or 1 to use the simple cross product method Returns ------- dem : dict dictionary relocated in (0,0) and completed with normal field and boundaries as well as neighbour search grid information """ # set origin to 0, 0 for computations, store original origin dem = setDEMoriginToZero(demOri) dem['originalHeader'] = demOri['header'].copy() # read dem header headerDEM = dem['header'] nColsDEM = headerDEM['ncols'] nRowsDEM = headerDEM['nrows'] cszDEM = headerDEM['cellsize'] # get normal vector of the grid mesh Nx, Ny, Nz = DFAtls.getNormalMesh(dem, num) # if no normal available, put 0 for Nx and Ny and 1 for Nz dem['Nx'] = np.where(np.isnan(Nx), 0., Nx) dem['Ny'] = np.where(np.isnan(Ny), 0., Ny) # build no data mask (used to find out of dem particles) outOfDEM = np.where(np.isnan(dem['rasterData']), 1, 0).astype(bool).flatten() dem['Nz'] = Nz dem['outOfDEM'] = outOfDEM # Prepare SPH grid headerNeighbourGrid = {} cszNeighbourGrid = cfg.getfloat('sphKernelRadius') headerNeighbourGrid['cellsize'] = cszNeighbourGrid headerNeighbourGrid['ncols'] = np.ceil(nColsDEM * cszDEM / cszNeighbourGrid) headerNeighbourGrid['nrows'] = np.ceil(nRowsDEM * cszDEM / cszNeighbourGrid) headerNeighbourGrid['xllcenter'] = 0 headerNeighbourGrid['yllcenter'] = 0 dem['headerNeighbourGrid'] = headerNeighbourGrid # get real Area areaRaster = DFAtls.getAreaMesh(Nx, Ny, Nz, cszDEM, num) dem['areaRaster'] = areaRaster projArea = nColsDEM * nRowsDEM * cszDEM * cszDEM actualArea = np.nansum(areaRaster) log.debug('Largest cell area: %.2f m²' % (np.nanmax(areaRaster))) log.debug('Projected Area : %.2f' % projArea) log.debug('Total Area : %.2f' % actualArea) return dem
[docs]def setDEMoriginToZero(demOri): """ set origin of DEM to 0,0 """ dem = copy.deepcopy(demOri) dem['header']['xllcenter'] = 0 dem['header']['yllcenter'] = 0 return dem
[docs]def initializeSimulation(cfg, demOri, inputSimLines, logName): """ create simulaton report dictionary Parameters ---------- cfg : str simulation scenario name demOri : dict dictionary with original dem inputSimLines : dict releaseLine : dict release line dictionary releaseLineBuffer: dict release line buffer dictionary - optional if iniStep True secondaryReleaseLine : dict secondary release line dictionary entLine : dict entrainment line dictionary resLine : dict resistance line dictionary logName : str simulation scenario name Returns ------- particles : dict particles dictionary at initial time step list of secondary release particles to be used fields : dict fields dictionary at initial time step dem : dict dictionary with new dem (lower left center at origin) """ cfgGen = cfg['GENERAL'] methodMeshNormal = cfg.getfloat('GENERAL', 'methodMeshNormal') thresholdPointInPoly = cfgGen.getfloat('thresholdPointInPoly') relThField = inputSimLines['relThField'] # ----------------------- # Initialize mesh log.debug('Initializing Mesh') dem = initializeMesh(cfgGen, demOri, methodMeshNormal) # ------------------------ log.debug('Initializing main release area') # process release info to get it as a raster if cfg['GENERAL'].getboolean('iniStep'): releaseLine = inputSimLines['releaseLineBuffer'] releaseLineReal = inputSimLines['releaseLine'] # check if release features overlap between features prepareArea(releaseLineReal, dem, thresholdPointInPoly, combine=True, checkOverlap=True) buffer1 = (cfg['GENERAL'].getfloat('sphKernelRadius') * cfg['GENERAL'].getfloat('additionallyFixedFactor') * cfg['GENERAL'].getfloat('bufferZoneFactor')) if len(relThField) == 0: # if no release thickness field or function - set release according to shapefile or ini file # this is a list of release rasters that we want to combine releaseLineReal = prepareArea(releaseLineReal, dem, buffer1, thList=releaseLineReal['thickness'], combine=True, checkOverlap=False) else: # if relTh provided - set release thickness with field or function releaseLineReal = prepareArea(releaseLineReal, dem, buffer1, combine=True, checkOverlap=False) else: releaseLine = inputSimLines['releaseLine'] # check if release features overlap between features prepareArea(releaseLine, dem, thresholdPointInPoly, combine=True, checkOverlap=True) if len(relThField) == 0: # if no release thickness field or function - set release according to shapefile or ini file # this is a list of release rasters that we want to combine releaseLine = prepareArea(releaseLine, dem, np.sqrt(2), thList=releaseLine['thickness'], combine=True, checkOverlap=False) else: # if relTh provided - set release thickness with field or function releaseLine = prepareArea(releaseLine, dem, np.sqrt(2), combine=True, checkOverlap=False) # compute release area header = dem['header'] csz = header['cellsize'] relRaster = releaseLine['rasterData'] relRasterOnes = np.where(relRaster > 0, 1., 0.) relAreaActual = np.nansum(relRasterOnes*dem['areaRaster']) relAreaProjected = np.sum(csz*csz*relRasterOnes) reportAreaInfo = {'Release area info': {'Projected Area [m2]': '%.2f' % (relAreaProjected), 'Actual Area [m2]': '%.2f' % (relAreaActual)}} # ------------------------ # initialize simulation # create primary release area particles and fields releaseLine['header'] = dem['originalHeader'] inputSimLines['releaseLine']['header'] = dem['originalHeader'] particles = initializeParticles(cfgGen, releaseLine, dem, inputSimLines=inputSimLines, logName=logName, relThField=relThField) particles, fields = initializeFields(cfgGen, dem, particles) # perform initialisation step for redistributing particles if cfg['GENERAL'].getboolean('iniStep'): startTimeIni = time.time() particles, fields = pI.getIniPosition(cfg, particles, dem, fields, inputSimLines, relThField) tIni = time.time() - startTimeIni log.info('Ini step for initialising particles finalized, total mass: %.2f, number of particles: %d' % (np.sum(particles['m']), particles['nPart'])) log.debug('Time needed for ini step: %.2f s' % (tIni)) # ------------------------ # process secondary release info to get it as a list of rasters if inputSimLines['entResInfo']['flagSecondaryRelease'] == 'Yes': log.info('Initializing secondary release area') secondaryReleaseInfo = inputSimLines['secondaryReleaseLine'] secondaryReleaseInfo['header'] = dem['originalHeader'] # fetch secondary release areas secondaryReleaseInfo = prepareArea(secondaryReleaseInfo, dem, np.sqrt(2), thList=secondaryReleaseInfo['thickness'], combine=False) # remove overlap with main release areas noOverlaprasterList = [] for secRelRatser, secRelName in zip(secondaryReleaseInfo['rasterData'], secondaryReleaseInfo['Name']): noOverlaprasterList.append(geoTrans.checkOverlap(secRelRatser, relRaster, 'Secondary release ' + secRelName, 'Release', crop=True)) secondaryReleaseInfo['flagSecondaryRelease'] = 'Yes' secondaryReleaseInfo['rasterList'] = noOverlaprasterList else: secondaryReleaseInfo = {} secondaryReleaseInfo['flagSecondaryRelease'] = 'No' particles['secondaryReleaseInfo'] = secondaryReleaseInfo # initialize entrainment and resistance # get info of simType and whether or not to initialize resistance and entrainment simTypeActual = cfgGen['simTypeActual'] entrMassRaster, reportAreaInfo = initializeMassEnt(dem, simTypeActual, inputSimLines['entLine'], reportAreaInfo, thresholdPointInPoly, cfgGen.getfloat('rhoEnt')) # check if entrainment and release overlap entrMassRaster = geoTrans.checkOverlap(entrMassRaster, relRaster, 'Entrainment', 'Release', crop=True) # check for overlap with the secondary release area if secondaryReleaseInfo['flagSecondaryRelease'] == 'Yes': for secRelRaster in secondaryReleaseInfo['rasterList']: entrMassRaster = geoTrans.checkOverlap(entrMassRaster, secRelRaster, 'Entrainment', 'Secondary release ', crop=True) # surfacic entrainment mass available (unit kg/m²) fields['entrMassRaster'] = entrMassRaster entreainableMass = np.nansum(fields['entrMassRaster']*dem['areaRaster']) log.info('Mass available for entrainment: %.2f kg' % (entreainableMass)) log.debug('Initializing resistance area') cResRaster, reportAreaInfo = initializeResistance(cfgGen, dem, simTypeActual, inputSimLines['resLine'], reportAreaInfo, thresholdPointInPoly) fields['cResRaster'] = cResRaster return particles, fields, dem, reportAreaInfo
[docs]def initializeParticles(cfg, releaseLine, dem, inputSimLines='', logName='', relThField=''): """ Initialize DFA simulation Create particles and fields dictionary according to config parameters release raster and dem Parameters ---------- cfg: configparser configuration for DFA simulation releaseLine: dict dictionary with info on release dem : dict dictionary with dem information inputSimLines: dictionary info on input files; real releaseline info required for iniStep relThField: 2D numpy array if the release depth is not uniform, give here the releaseRaster Returns ------- particles : dict particles dictionary at initial time step fields : dict fields dictionary at initial time step """ # get simulation parameters rho = cfg.getfloat('rho') gravAcc = cfg.getfloat('gravAcc') avaDir = cfg['avalancheDir'] massPerParticleDeterminationMethod = cfg['massPerParticleDeterminationMethod'] interpOption = cfg.getfloat('interpOption') # read dem header header = dem['header'] ncols = header['ncols'] nrows = header['nrows'] csz = header['cellsize'] # if the release is not constant but given by a varying function, we need both the mask giving the cells # to be initialized and the raster giving the flow depth value relRasterMask = releaseLine['rasterData'] if len(relThField) == 0: relRaster = releaseLine['rasterData'] else: log.info('Release thickness read from relThFile') relRaster = relThField areaRaster = dem['areaRaster'] # get the initialization method used relThForPart = getRelThFromPart(cfg, releaseLine, relThField) massPerPart, nPPK = com1DFATools.getPartInitMethod(cfg, csz, relThForPart) # initialize arrays partPerCell = np.zeros(np.shape(relRaster), dtype=np.int64) # find all non empty cells (meaning release area) indRelY, indRelX = np.nonzero(relRasterMask) if inputSimLines != '': indRelYReal, indRelXReal = np.nonzero(inputSimLines['releaseLine']['rasterData']) else: indRelYReal, indRelXReal = np.nonzero(relRaster) iReal = list(zip(indRelYReal, indRelXReal)) # make option available to read initial particle distribution from file if cfg.getboolean('initialiseParticlesFromFile'): if cfg.getboolean('iniStep'): message = 'If initialiseParticlesFromFile is used, iniStep cannot be performed - chose only one option' log.error(message) raise AssertionError(message) particles, hPartArray = particleTools.initialiseParticlesFromFile(cfg, avaDir, releaseLine['file'].stem) else: # initialize random generator rng = np.random.default_rng(cfg.getint('seed')) nPart = 0 xPartArray = np.empty(0) yPartArray = np.empty(0) mPartArray = np.empty(0) aPartArray = np.empty(0) hPartArray = np.empty(0) idFixed = np.empty(0) if len(relThField) != 0 and cfg.getboolean('iniStep'): # set release thickness to a constant value for initialisation relRaster = np.where(relRaster > 0., cfg.getfloat('relTh'), 0.) log.warning('relThField!= 0, but relRaster set to relTh value (from ini) here so that uniform number of particles created') # loop on non empty cells for indRelx, indRely in zip(indRelX, indRelY): # compute number of particles for this cell hCell = relRaster[indRely, indRelx] aCell = areaRaster[indRely, indRelx] xPart, yPart, mPart, n, aPart = particleTools.placeParticles(hCell, aCell, indRelx, indRely, csz, massPerPart, nPPK, rng, cfg) nPart = nPart + n partPerCell[indRely, indRelx] = n # initialize particles position, mass, height... xPartArray = np.append(xPartArray, xPart) yPartArray = np.append(yPartArray, yPart) mPartArray = np.append(mPartArray, mPart * np.ones(n)) aPartArray = np.append(aPartArray, aPart * np.ones(n)) if (indRely, indRelx) in iReal: idFixed = np.append(idFixed, np.zeros(n)) else: idFixed = np.append(idFixed, np.ones(n)) hPartArray = DFAfunC.projOnRaster(xPartArray, yPartArray, relRaster, csz, ncols, nrows, interpOption) hPartArray = np.asarray(hPartArray) # for the MPPKR option use hPart and aPart to define the mass of the particle (this means, within a cell # partticles have the same area but may have different flow depth which means a different mass) if massPerParticleDeterminationMethod == 'MPPKR': mPartArray = rho * aPartArray * hPartArray # create dictionnary to store particles properties particles = {} particles['nPart'] = nPart particles['x'] = xPartArray particles['y'] = yPartArray # adding z component particles, _ = geoTrans.projectOnRaster(dem, particles, interp='bilinear') particles['m'] = mPartArray particles['idFixed'] = idFixed particles['massPerPart'] = massPerPart particles['mTot'] = np.sum(particles['m']) particles['h'] = hPartArray particles['ux'] = np.zeros(np.shape(hPartArray)) particles['uy'] = np.zeros(np.shape(hPartArray)) particles['uz'] = np.zeros(np.shape(hPartArray)) particles['s'] = np.zeros(np.shape(hPartArray)) particles['sCor'] = np.zeros(np.shape(hPartArray)) particles['l'] = np.zeros(np.shape(hPartArray)) particles['travelAngle'] = np.zeros(np.shape(hPartArray)) particles['stoppCriteria'] = False mPartArray = particles['m'] kineticEne = np.sum(0.5 * mPartArray * DFAtls.norm2(particles['ux'], particles['uy'], particles['uz'])) particles['kineticEne'] = kineticEne particles['potentialEne'] = np.sum(gravAcc * mPartArray * particles['z']) particles['peakKinEne'] = kineticEne particles['peakForceSPH'] = 0.0 particles['forceSPHIni'] = 0.0 particles['peakMassFlowing'] = 0 particles['simName'] = logName particles['xllcenter'] = dem['originalHeader']['xllcenter'] particles['yllcenter'] = dem['originalHeader']['yllcenter'] # remove particles that might lay outside of the release polygon if not cfg.getboolean('iniStep') and not cfg.getboolean('initialiseParticlesFromFile'): particles = checkParticlesInRelease(particles, releaseLine, cfg.getfloat('thresholdPointInPoly')) # add a particles ID: # integer ranging from 0 to nPart in the initialisation. # Everytime that a new particle is created, it gets a new ID that is > nID # where nID is the number of already used IDs # (enable tracking of particles even if particles are added or removed) # unique identifier for each particle particles['ID'] = np.arange(particles['nPart']) # keep track of the identifier (usefull to add identifier to newparticles) particles['nID'] = particles['nPart'] # keep track of parents (usefull for new particles created after splitting) particles['parentID'] = np.arange(particles['nPart']) # initialize time t = 0 particles['t'] = t relCells = np.size(indRelY) partPerCell = particles['nPart']/relCells if massPerParticleDeterminationMethod != 'MPPKR': # we need to set the nPPK aTot = np.sum(particles['m'] / (rho * particles['h'])) # average number of particles per kernel radius nPPK = particles['nPart'] * math.pi * csz**2 / aTot particles['nPPK'] = nPPK log.info('Initialized particles. MTot = %.2f kg, %s particles in %.2f cells.' % (particles['mTot'], particles['nPart'], relCells)) log.info('Mass per particle = %.2f kg and particles per cell = %.2f.' % (particles['mTot']/particles['nPart'], partPerCell)) if debugPlot: debPlot.plotPartIni(particles, dem) return particles
[docs]def getRelThFromPart(cfg, releaseLine, relThField): """ get release thickness for initialising particles - use max value Parameters ----------- cfg: configparser object configuration settings releaseLine: dict info on releaseline (thickness) relThField: numpy array or str release thickness field if used, else empty string Returns -------- relThForPart: float max value of release thickness """ if len(relThField) != 0: relThForPart = np.amax(relThField) elif cfg.getboolean('relThFromShp'): relThForPart = np.amax(np.asarray(releaseLine['thickness'], dtype=float)) else: relThForPart = cfg.getfloat('relTh') return relThForPart
[docs]def initializeFields(cfg, dem, particles): """Initialize fields and update particles flow depth Parameters ---------- cfg: configparser configuration for DFA simulation dem : dict dictionary with dem information particles : dict particles dictionary at initial time step Returns ------- particles : dict particles dictionary at initial time step updated with the flow depth fields : dict fields dictionary at initial time step """ # read dem header header = dem['header'] ncols = header['ncols'] nrows = header['nrows'] PFV = np.zeros((nrows, ncols)) PP = np.zeros((nrows, ncols)) FD = np.zeros((nrows, ncols)) TA = np.zeros((nrows, ncols)) fields = {} fields['pfv'] = PFV fields['ppr'] = PP fields['pfd'] = FD fields['pta'] = TA fields['FV'] = PFV fields['P'] = PP fields['FD'] = FD fields['Vx'] = PFV fields['Vy'] = PFV fields['Vz'] = PFV fields['TA'] = TA particles = DFAfunC.getNeighborsC(particles, dem) particles, fields = DFAfunC.updateFieldsC(cfg, particles, dem, fields) return particles, fields
[docs]def initializeMassEnt(dem, simTypeActual, entLine, reportAreaInfo, thresholdPointInPoly, rhoEnt): """ Initialize mass for entrainment Parameters ---------- dem: dict dem dictionary simTypeActual: str simulation type entLine: dict entrainment line dictionary reportAreaInfo: dict simulation area information dictionary thresholdPointInPoly: float threshold val that decides if a point is in the polygon, on the line or very close but outside rhoEnt: float density of entrainment snow Returns ------- entrMassRaster : 2D numpy array raster of available mass for entrainment reportAreaInfo: dict simulation area information dictionary completed with entrainment area info """ # read dem header header = dem['originalHeader'] ncols = header['ncols'] nrows = header['nrows'] if 'ent' in simTypeActual: entrainmentArea = entLine['fileName'] log.info('Initializing entrainment area: %s' % (entrainmentArea)) log.info('Entrainment area features: %s' % (entLine['Name'])) entLine = prepareArea(entLine, dem, thresholdPointInPoly, thList=entLine['thickness']) entrMassRaster = entLine['rasterData'] reportAreaInfo['entrainment'] = 'Yes' else: entrMassRaster = np.zeros((nrows, ncols)) reportAreaInfo['entrainment'] = 'No' entrMassRaster = entrMassRaster * rhoEnt return entrMassRaster, reportAreaInfo
[docs]def initializeResistance(cfg, dem, simTypeActual, resLine, reportAreaInfo, thresholdPointInPoly): """ Initialize resistance matrix Parameters ---------- dem: dict dem dictionary simTypeActual: str simulation type resLine: dict resistance line dictionary reportAreaInfo: dict simulation area information dictionary thresholdPointInPoly: float threshold val that decides if a point is in the polygon, on the line or very close but outside Returns ------- cResRaster : 2D numpy array raster of resistance coefficients reportAreaInfo: dict simulation area information dictionary completed with entrainment area info """ d = cfg.getfloat('dRes') cw = cfg.getfloat('cw') sres = cfg.getfloat('sres') # read dem header header = dem['originalHeader'] ncols = header['ncols'] nrows = header['nrows'] if simTypeActual in ['entres', 'res']: resistanceArea = resLine['fileName'] log.info('Initializing resistance area: %s' % (resistanceArea)) log.info('Resistance area features: %s' % (resLine['Name'])) resLine = prepareArea(resLine, dem, thresholdPointInPoly) mask = resLine['rasterData'] cResRaster = 0.5 * d * cw / (sres*sres) * mask reportAreaInfo['resistance'] = 'Yes' else: cResRaster = np.zeros((nrows, ncols)) reportAreaInfo['resistance'] = 'No' return cResRaster, reportAreaInfo
[docs]def DFAIterate(cfg, particles, fields, dem, simHash=''): """ Perform time loop for DFA simulation Save results at desired intervals Parameters ---------- cfg: configparser configuration for DFA simulation particles : dict particles dictionary at initial time step secondaryReleaseParticles : list list of secondary release area particles dictionaries at initial time step fields : dict fields dictionary at initial time step dem : dict dictionary with dem information Returns ------- particlesList : list list of particles dictionary fieldsList : list list of fields dictionary (for each time step saved) tCPU : dict computation time dictionary infoDict : dict Dictionary of all simulations carried out """ cfgGen = cfg['GENERAL'] # Initialise cpu timing tCPU = {'timeLoop': 0, 'timeForce': 0., 'timeForceSPH': 0., 'timePos': 0., 'timeNeigh': 0., 'timeField': 0.} # Load configuration settings tEnd = cfgGen.getfloat('tEnd') dtSave = fU.splitTimeValueToArrayInterval(cfgGen['tSteps'], tEnd) sphOption = cfgGen.getint('sphOption') log.debug('using sphOption %s:' % sphOption) # desired output fields resTypes = fU.splitIniValueToArraySteps(cfgGen['resType']) # add particles to the results type if trackParticles option is activated if cfg.getboolean('TRACKPARTICLES', 'trackParticles'): resTypes = list(set(resTypes + ['particles'])) # make sure to save all desiered resuts for first and last time step for # the report resTypesReport = fU.splitIniValueToArraySteps(cfg['REPORT']['plotFields']) # always add particles to first and last time step resTypesLast = list(set(resTypes + resTypesReport + ['particles'])) # derive friction type # turn friction model into integer frictModelsList = ['samosat', 'coulomb', 'voellmy'] frictModel = cfgGen['frictModel'].lower() frictType = frictModelsList.index(frictModel) + 1 log.debug('Friction Model used: %s, %s' % (frictModelsList[frictType-1], frictType)) # Initialise Lists to save fields and add initial time step particlesList = [] fieldsList = [] timeM = [] massEntrained = [] massTotal = [] # TODO: add here different time stepping options log.debug('Use standard time stepping') # Initialize time and counters nSave = 1 tCPU['nSave'] = nSave nIter = 1 nIter0 = 1 particles['iterate'] = True t = particles['t'] log.debug('Saving results for time step t = %f s', t) fieldsList, particlesList = appendFieldsParticles(fieldsList, particlesList, particles, fields, resTypesLast) zPartArray0 = copy.deepcopy(particles['z']) ######## create range time diagram ##################### # check if range-time diagram should be performed, if yes - initialize if cfg['VISUALISATION'].getboolean('createRangeTimeDiagram'): demRT = dtAna.setDemOrigin(dem) mtiInfo, dtRangeTime, cfgRangeTime = dtAna.initializeRangeTime(dtAna, cfg, demRT, simHash) # fetch initial time step too mtiInfo, dtRangeTime = dtAna.fetchRangeTimeInfo(cfgRangeTime, cfg, dtRangeTime, t, demRT['header'], fields, mtiInfo) ######################################################### # add initial time step to Tsave array Tsave = [0] # derive time step for first iteration if cfgGen.getboolean('cflTimeStepping') and nIter > cfgGen.getfloat('cflIterConstant'): # overwrite the dt value in the cfg dt = tD.getcflTimeStep(particles, dem, cfgGen) elif cfgGen.getboolean('sphKernelRadiusTimeStepping'): dt = tD.getSphKernelRadiusTimeStep(dem, cfgGen) else: # get time step dt = cfgGen.getfloat('dt') particles['dt'] = dt t = t + dt # Start time step computation while t <= tEnd*(1.+1.e-13) and particles['iterate']: startTime = time.time() log.debug('Computing time step t = %f s, dt = %f s' % (t, dt)) # Perform computations particles, fields, zPartArray0, tCPU = computeEulerTimeStep(cfgGen, particles, fields, zPartArray0, dem, tCPU, frictType) tCPU['nSave'] = nSave particles['t'] = t # write mass balance info massEntrained.append(particles['massEntrained']) massTotal.append(particles['mTot']) timeM.append(t) # print progress to terminal print("time step t = %f s\r" % t, end="") ######## create range time diagram ##################### # determine avalanche front and flow characteristics in respective coodrinate system if cfg['VISUALISATION'].getboolean('createRangeTimeDiagram') and t >= dtRangeTime[0]: mtiInfo, dtRangeTime = dtAna.fetchRangeTimeInfo(cfgRangeTime, cfg, dtRangeTime, t, demRT['header'], fields, mtiInfo) ######################################################### # make sure the array is not empty if t >= dtSave[0]: Tsave.append(t) log.debug('Saving results for time step t = %f s', t) log.debug('MTot = %f kg, %s particles' % (particles['mTot'], particles['nPart'])) log.debug(('cpu time Force = %s s' % (tCPU['timeForce'] / nIter))) log.debug(('cpu time ForceSPH = %s s' % (tCPU['timeForceSPH'] / nIter))) log.debug(('cpu time Position = %s s' % (tCPU['timePos'] / nIter))) log.debug(('cpu time Neighbour = %s s' % (tCPU['timeNeigh'] / nIter))) log.debug(('cpu time Fields = %s s' % (tCPU['timeField'] / nIter))) fieldsList, particlesList = appendFieldsParticles(fieldsList, particlesList, particles, fields, resTypes) # remove saving time steps that have already been saved dtSave = updateSavingTimeStep(dtSave, cfg['GENERAL'], t) # derive time step if cfgGen.getboolean('cflTimeStepping') and nIter > cfgGen.getfloat('cflIterConstant'): # overwrite the dt value in the cfg dt = tD.getcflTimeStep(particles, dem, cfgGen) elif cfgGen.getboolean('sphKernelRadiusTimeStepping'): dt = tD.getSphKernelRadiusTimeStep(dem, cfgGen) else: # get time step dt = cfgGen.getfloat('dt') particles['dt'] = dt t = t + dt nIter = nIter + 1 nIter0 = nIter0 + 1 tCPUtimeLoop = time.time() - startTime tCPU['timeLoop'] = tCPU['timeLoop'] + tCPUtimeLoop tCPU['nIter'] = nIter log.info('Ending computation at time t = %f s', t-dt) log.debug('Saving results for time step t = %f s', t-dt) log.info('MTot = %f kg, %s particles' % (particles['mTot'], particles['nPart'])) log.info('Computational performances:') log.info(('cpu time Force = %s s' % (tCPU['timeForce'] / nIter))) log.info(('cpu time ForceSPH = %s s' % (tCPU['timeForceSPH'] / nIter))) log.info(('cpu time Position = %s s' % (tCPU['timePos'] / nIter))) log.info(('cpu time Neighbour = %s s' % (tCPU['timeNeigh'] / nIter))) log.info(('cpu time Fields = %s s' % (tCPU['timeField'] / nIter))) log.info(('cpu time timeLoop = %s s' % (tCPU['timeLoop'] / nIter))) log.info(('cpu time total other = %s s' % ((tCPU['timeForce'] + tCPU['timeForceSPH'] + tCPU['timePos'] + tCPU['timeNeigh'] + tCPU['timeField']) / nIter))) Tsave.append(t-dt) fieldsList, particlesList = appendFieldsParticles(fieldsList, particlesList, particles, fields, resTypesLast) # create infoDict for report and mass log file infoDict = {'massEntrained': massEntrained, 'timeStep': timeM, 'massTotal': massTotal, 'tCPU': tCPU, 'final mass': massTotal[-1], 'initial mass': massTotal[0], 'entrained mass': np.sum(massEntrained), 'entrained volume': (np.sum(massEntrained)/cfgGen.getfloat('rhoEnt'))} # determine if stop criterion is reached or end time stopCritNotReached = particles['iterate'] avaTime = particles['t'] stopCritPer = cfgGen.getfloat('stopCrit') * 100. # update info dict with stopping info for report if stopCritNotReached: infoDict.update({'stopInfo': {'Stop criterion': 'end Time reached: %.2f' % avaTime, 'Avalanche run time [s]': '%.2f' % avaTime}}) else: infoDict.update({'stopInfo': {'Stop criterion': '< %.2f percent of PKE' % stopCritPer, 'Avalanche run time [s]': '%.2f' % avaTime}}) ######## create range time diagram ##################### # export data for range-time diagram if cfg['VISUALISATION'].getboolean('createRangeTimeDiagram'): lastTimeStep = t - dt # first append final time step mtiInfo, dtRangeTime = dtAna.fetchRangeTimeInfo(cfgRangeTime, cfg, dtRangeTime, lastTimeStep, demRT['header'], fields, mtiInfo) dtAna.exportData(mtiInfo, cfgRangeTime, 'com1DFA') ######################################################### return Tsave, particlesList, fieldsList, infoDict
[docs]def updateSavingTimeStep(dtSave, cfg, t): """ update saving time step list Parameters ----------- dtSave: list list of time steps that shall be saved cfg: configparser object configuration settings, end time step t: float actual time step Returns -------- dtSave: list updated list of saving time steps """ if dtSave.size == 1: dtSave = np.asarray([2*cfg.getfloat('tEnd')]) else: indSave = np.where(dtSave > t) dtSave = dtSave[indSave] return dtSave
[docs]def appendFieldsParticles(fieldsList, particlesList, particles, fields, resTypes): """ append fields and optionally particle dictionaries to list for export Parameters ------------ particles: dict dictionary with particle properties fields: dict dictionary with all result type fields resTypes: list list with all result types that shall be exported Returns ------- Fields: list updated list with desired result type fields dictionary Particles: list updated list with particles dicionaries """ fieldAppend = {} for resType in resTypes: if resType == 'particles': particlesList.append(copy.deepcopy(particles)) elif resType != '': fieldAppend[resType] = copy.deepcopy(fields[resType]) fieldsList.append(fieldAppend) return fieldsList, particlesList
[docs]def writeMBFile(infoDict, avaDir, logName): """ write mass balance info to file Parameters ----------- infoDict: dict info on mass avaDir: str or pathlib path path to avalanche directory logName: str simulation name """ t = infoDict['timeStep'] massEntrained = infoDict['massEntrained'] massTotal = infoDict['massTotal'] # write mass balance info to log file massDir = pathlib.Path(avaDir, 'Outputs', 'com1DFA') fU.makeADir(massDir) with open(massDir / ('mass_%s.txt' % logName), 'w') as mFile: mFile.write('time, current, entrained\n') for m in range(len(t)): mFile.write('%.02f, %.06f, %.06f\n' % (t[m], massTotal[m], massEntrained[m]))
[docs]def computeEulerTimeStep(cfg, particles, fields, zPartArray0, dem, tCPU, frictType): """ compute next time step using an euler forward scheme Parameters ---------- cfg: configparser configuration for DFA simulation particles : dict particles dictionary at t fields : dict fields dictionary at t zPartArray0 : dict z coordinate of particles at t=0s dem : dict dictionary with dem information tCPU : dict computation time dictionary frictType: int indicator for chosen type of friction model Returns ------- particles : dict particles dictionary at t + dt fields : dict fields dictionary at t + dt tCPU : dict computation time dictionary """ # get forces startTime = time.time() # loop version of the compute force log.debug('Compute Force C') particles, force, fields = DFAfunC.computeForceC(cfg, particles, fields, dem, frictType) tCPUForce = time.time() - startTime tCPU['timeForce'] = tCPU['timeForce'] + tCPUForce # compute lateral force (SPH component of the calculation) startTime = time.time() if cfg.getint('sphOption') == 0: force['forceSPHX'] = np.zeros(np.shape(force['forceX'])) force['forceSPHY'] = np.zeros(np.shape(force['forceY'])) force['forceSPHZ'] = np.zeros(np.shape(force['forceZ'])) else: log.debug('Compute Force SPH C') particles, force = DFAfunC.computeForceSPHC(cfg, particles, force, dem, cfg.getint('sphOption'), gradient=0) tCPUForceSPH = time.time() - startTime tCPU['timeForceSPH'] = tCPU['timeForceSPH'] + tCPUForceSPH # update velocity and particle position startTime = time.time() # particles = updatePosition(cfg, particles, dem, force) log.debug('Update position C') particles = DFAfunC.updatePositionC(cfg, particles, dem, force, typeStop=0) tCPUPos = time.time() - startTime tCPU['timePos'] = tCPU['timePos'] + tCPUPos # Split particles if cfg.getint('splitOption') == 0: # split particles with too much mass # this only splits particles that grew because of entrainment particles = particleTools.splitPartMass(particles, cfg) elif cfg.getint('splitOption') == 1: # split merge operation # first update fields (compute grid values) because we need the h of the particles to get the aPart # ToDo: we could skip the update field and directly do the split merge. This means we would use the old h startTime = time.time() log.debug('update Fields C') particles, fields = DFAfunC.updateFieldsC(cfg, particles, dem, fields) tcpuField = time.time() - startTime tCPU['timeField'] = tCPU['timeField'] + tcpuField # Then split merge particles particles = particleTools.splitPartArea(particles, cfg, dem) particles = particleTools.mergePartArea(particles, cfg, dem) # release secondary release area? if particles['secondaryReleaseInfo']['flagSecondaryRelease'] == 'Yes': particles, zPartArray0 = releaseSecRelArea(cfg, particles, fields, dem, zPartArray0) # get particles location (neighbours for sph) startTime = time.time() log.debug('get Neighbours C') particles = DFAfunC.getNeighborsC(particles, dem) tCPUNeigh = time.time() - startTime tCPU['timeNeigh'] = tCPU['timeNeigh'] + tCPUNeigh # update fields (compute grid values) startTime = time.time() log.debug('update Fields C') particles = computeTravelAngle(cfg, dem, particles, zPartArray0) particles, fields = DFAfunC.updateFieldsC(cfg, particles, dem, fields) tCPUField = time.time() - startTime tCPU['timeField'] = tCPU['timeField'] + tCPUField return particles, fields, zPartArray0, tCPU
[docs]def computeTravelAngle(cfgGen, dem, particles, zPartArray0): """Compute the travel angle associated to the particles Parameters ---------- cfgGen: configparser configuration for DFA simulation dem : dict dictionary with dem information particles : dict particles dictionary at t zPartArray0 : dict z coordinate of particles at t=0s Returns ------- particles : dict particles dictionary updated with the travel angle """ # first compute travel angle for each particle # get parent Id in order to get the first z position parentID = particles['parentID'] nPart = particles['nPart'] # get z0 z0 = zPartArray0[parentID.astype(int)] # compute tan of the travel angle nonZeroSInd = np.nonzero(particles['s']) tanGamma = np.zeros((nPart)) tanGamma[nonZeroSInd] = (z0 - particles['z'])[nonZeroSInd] / (particles['s'][nonZeroSInd]) # get the travel angle gamma = np.degrees(np.arctan(tanGamma)) particles['travelAngle'] = gamma return particles
[docs]def prepareArea(line, dem, radius, thList='', combine=True, checkOverlap=True): """ convert shape file polygon to raster Parameters ---------- line: dict line dictionary dem : dict dictionary with dem information radius : float include all cells which center is in the polygon or close enough thList: list thickness values for all features in the line dictionary combine : Boolean if True sum up the rasters in the area list to return only 1 raster if False return the list of distinct area rasters this option works only if thList is not empty checkOverlap : Boolean if True check if features are overlaping and return an error if it is the case if False check if features are overlaping and average the value for overlaping areas Returns ------- updates the line dictionary with the rasterData: Either Raster : 2D numpy array raster of the area (returned if relRHlist is empty OR if combine is set to True) or RasterList : list list of 2D numpy array rasters (returned if relRHlist is not empty AND if combine is set to True) """ NameRel = line['Name'] StartRel = line['Start'] LengthRel = line['Length'] RasterList = [] for i in range(len(NameRel)): name = NameRel[i] start = StartRel[i] end = start + LengthRel[i] avapath = {} avapath['x'] = line['x'][int(start):int(end)] avapath['y'] = line['y'][int(start):int(end)] avapath['Name'] = name # if relTh is given - set relTh if thList != '': log.info('%s feature %s, thickness: %.2f - read from %s' % (line['type'], name, thList[i], line['thicknessSource'][i])) Raster = polygon2Raster(dem['originalHeader'], avapath, radius, th=thList[i]) else: Raster = polygon2Raster(dem['originalHeader'], avapath, radius) RasterList.append(Raster) # if RasterList not empty check for overlap between features Raster = np.zeros(np.shape(dem['rasterData'])) for rast in RasterList: ind1 = Raster > 0 ind2 = rast > 0 indMatch = np.logical_and(ind1, ind2) if indMatch.any(): # if there is an overlap, raise error if checkOverlap: message = 'Features are overlaping - this is not allowed' log.error(message) raise AssertionError(message) else: # if there is an overlap, take average of values for the overlapping cells Raster = np.where(((Raster > 0) & (rast > 0)), (Raster + rast)/2, Raster + rast) else: Raster = Raster + rast if debugPlot: debPlot.plotAreaDebug(dem, avapath, Raster) if combine: line['rasterData'] = Raster return line else: line['rasterData'] = RasterList return line
[docs]def polygon2Raster(demHeader, Line, radius, th=''): """ convert line to raster Parameters ---------- demHeader: dict dem header dictionary Line : dict line dictionary radius : float include all cells which center is in the polygon or close enough th: float thickness value ot the line feature Returns ------- Mask : 2D numpy array updated raster """ # adim and center dem and polygon ncols = demHeader['ncols'] nrows = demHeader['nrows'] xllc = demHeader['xllcenter'] yllc = demHeader['yllcenter'] csz = demHeader['cellsize'] xCoord0 = (Line['x'] - xllc) / csz yCoord0 = (Line['y'] - yllc) / csz if (xCoord0[0] == xCoord0[-1]) and (yCoord0[0] == yCoord0[-1]): xCoord = np.delete(xCoord0, -1) yCoord = np.delete(yCoord0, -1) else: xCoord = copy.deepcopy(xCoord0) yCoord = copy.deepcopy(yCoord0) xCoord0 = np.append(xCoord0, xCoord0[0]) yCoord0 = np.append(yCoord0, yCoord0[0]) # get the raster corresponding to the polygon polygon = np.stack((xCoord, yCoord), axis=-1) path = mpltPath.Path(polygon) # add a tolerance to include cells for which the center is on the lines # for this we need to know if the path is clockwise or counter clockwise # to decide if the radius should be positif or negatif in contains_points is_ccw = geoTrans.isCounterClockWise(path) r = (radius*is_ccw - radius*(1-is_ccw)) x = np.linspace(0, ncols-1, ncols) y = np.linspace(0, nrows-1, nrows) X, Y = np.meshgrid(x, y) X = X.flatten() Y = Y.flatten() points = np.stack((X, Y), axis=-1) mask = path.contains_points(points, radius=r) Mask = mask.reshape((nrows, ncols)).astype(int) # thickness field is provided, then return array with ones if th != '': log.debug('REL set from dict, %.2f' % th) Mask = np.where(Mask > 0, th, 0.) else: Mask = np.where(Mask > 0, 1., 0.) if debugPlot: debPlot.plotRemovePart(xCoord0, yCoord0, demHeader, X, Y, Mask, mask) return Mask
[docs]def checkParticlesInRelease(particles, line, radius): """ remove particles laying outside the polygon Parameters ---------- particles : dict particles dictionary line: dict line dictionary radius: float threshold val that decides if a point is in the polygon, on the line or very close but outside Returns ------- particles : dict particles dictionary where particles outside of the polygon have been removed """ NameRel = line['Name'] StartRel = line['Start'] LengthRel = line['Length'] Mask = np.full(np.size(particles['x']), False) for i in range(len(NameRel)): name = NameRel[i] start = StartRel[i] end = start + LengthRel[i] avapath = {} avapath['x'] = line['x'][int(start):int(end)] avapath['y'] = line['y'][int(start):int(end)] avapath['Name'] = name mask = pointInPolygon(line['header'], particles, avapath, radius) Mask = np.logical_or(Mask, mask) # also remove particles with negative mass mask = np.where(particles['m'] <= 0, False, True) Mask = np.logical_and(Mask, mask) nRemove = len(Mask)-np.sum(Mask) if nRemove > 0: particles = particleTools.removePart(particles, Mask, nRemove, 'because they are not within the release polygon') log.debug('removed %s particles because they are not within the release polygon' % (nRemove)) return particles
[docs]def pointInPolygon(demHeader, points, Line, radius): """ find particles within a polygon Parameters ---------- demHeader: dict dem header dictionary points: dict points to check Line : dict line dictionary radius: float threshold val that decides if a point is in the polygon, on the line or very close but outside Returns ------- Mask : 1D numpy array Mask of particles to keep """ xllc = demHeader['xllcenter'] yllc = demHeader['yllcenter'] xCoord0 = (Line['x'] - xllc) yCoord0 = (Line['y'] - yllc) if (xCoord0[0] == xCoord0[-1]) and (yCoord0[0] == yCoord0[-1]): xCoord = np.delete(xCoord0, -1) yCoord = np.delete(yCoord0, -1) else: xCoord = copy.deepcopy(xCoord0) yCoord = copy.deepcopy(yCoord0) xCoord0 = np.append(xCoord0, xCoord0[0]) yCoord0 = np.append(yCoord0, yCoord0[0]) # get the raster corresponding to the polygon polygon = np.stack((xCoord, yCoord), axis=-1) path = mpltPath.Path(polygon) # add a tolerance to include cells for which the center is on the lines # for this we need to know if the path is clockwise or counter clockwise # to decide if the radius should be positif or negatif in contains_points is_ccw = geoTrans.isCounterClockWise(path) r = (radius*is_ccw - radius*(1-is_ccw)) points2Check = np.stack((points['x'], points['y']), axis=-1) mask = path.contains_points(points2Check, radius=r) mask = np.where(mask > 0, True, False) if debugPlot: debPlot.plotPartAfterRemove(points, xCoord0, yCoord0, mask) return mask
[docs]def releaseSecRelArea(cfg, particles, fields, dem, zPartArray0): """ Release secondary release area if trigered Initialize particles of the trigured secondary release area and add them to the simulation (particles dictionary) """ secondaryReleaseInfo = particles['secondaryReleaseInfo'] flowDepthField = fields['FD'] secRelRasterList = secondaryReleaseInfo['rasterData'] secRelRasterNameList = secondaryReleaseInfo['Name'] count = 0 indexRel = [] for secRelRaster, secRelRasterName in zip(secRelRasterList, secRelRasterNameList): # do the two arrays intersect (meaning a flowing particle entered the # secondary release area) mask = (secRelRaster > 0) & (flowDepthField > 0) if mask.any(): # create secondary release area particles log.info('Initializing secondary release area feature %s' % secRelRasterName) secRelInfo = shpConv.extractFeature(secondaryReleaseInfo, count) secRelInfo['rasterData'] = secRelRaster secRelParticles = initializeParticles(cfg, secRelInfo, dem) # release secondary release area by just appending the particles log.info('Releasing secondary release area %s at t = %.2f s' % (secRelRasterName, particles['t'])) particles = particleTools.mergeParticleDict(particles, secRelParticles) # save index of secRel feature indexRel.append(secRelRasterName) # save initial z position for travel angle computation zPartArray0 = np.append(zPartArray0, copy.deepcopy(secRelParticles['z'])) count = count + 1 secondaryReleaseInfo['rasterData'] = secRelRasterList particles['secondaryReleaseInfo'] = secondaryReleaseInfo for item in indexRel: iR = secRelRasterNameList.index(item) # remove it from the secondary release area list secRelRasterList.pop(iR) secondaryReleaseInfo = shpConv.removeFeature(secondaryReleaseInfo, iR) secRelRasterNameList.pop(iR) # update secondaryReleaseInfo secondaryReleaseInfo['rasterData'] = secRelRasterList particles['secondaryReleaseInfo'] = secondaryReleaseInfo return particles, zPartArray0
[docs]def savePartToPickle(dictList, outDir, logName): """ Save each dictionary from a list to a pickle in outDir; works also for one dictionary instead of list Parameters --------- dictList: list or dict list of dictionaries or single dictionary outDir: str path to output directory logName : str simulation Id """ if isinstance(dictList, list): for dict in dictList: pickle.dump(dict, open(outDir / ("particles_%s_%09.4f.p" % (logName, dict['t'])), "wb")) else: pickle.dump(dictList, open(outDir / ("particles_%s_%09.4f.p" % (logName, dictList['t'])), "wb"))
[docs]def trackParticles(cfgTrackPart, dem, particlesList): """ track particles from initial area Find all particles in an initial area. Find the same particles in the other time steps (+ the children if they were splitted). Extract time series of given properties of the tracked particles Parameters ----------- cfgTrackPart: configParser centerTrackPartPoint : str centerTrackPartPoint of the location of the particles to track (x|y coordinates) radius : str radius of the circle around point particleProperties: str list of particles properties to extract ('x', 'y', 'ux', 'm'...) dem: dict dem dictionary particlesList: list list of particles dictionary Returns ------- particlesList : list Particles list of dict updated with the 'trackedParticles' array (in the array, ones for particles that are tracked, zeros otherwise) trackedPartProp: dict dictionary with time series of the wanted properties for tracked particles track: boolean False if no particles are tracked """ # read particle properties to be extracted particleProperties = cfgTrackPart['particleProperties'] if particleProperties == '': particleProperties = ['x', 'y', 'z', 'ux', 'uy', 'uz', 'm', 'h'] else: particleProperties = set(['x', 'y', 'z', 'ux', 'uy', 'uz', 'm', 'h'] + particleProperties.split('|')) # read location of particle to be tracked radius = cfgTrackPart.getfloat('radius') centerList = cfgTrackPart['centerTrackPartPoint'] centerList = centerList.split('|') centerTrackPartPoint = {'x': np.array([float(centerList[0])]), 'y': np.array([float(centerList[1])])} centerTrackPartPoint, _ = geoTrans.projectOnRaster( dem, centerTrackPartPoint, interp='bilinear') centerTrackPartPoint['x'] = (centerTrackPartPoint['x'] - dem['originalHeader']['xllcenter']) centerTrackPartPoint['y'] = (centerTrackPartPoint['y'] - dem['originalHeader']['yllcenter']) # start by finding the particles to be tracked particles2Track, track = particleTools.findParticles2Track(particlesList[0], centerTrackPartPoint, radius) if track: # find those same particles and their children in the particlesList particlesList, nPartTracked = particleTools.getTrackedParticles(particlesList, particles2Track) # extract the wanted properties for the tracked particles trackedPartProp = particleTools.getTrackedParticlesProperties(particlesList, nPartTracked, particleProperties) else: trackedPartProp = None return particlesList, trackedPartProp, track
[docs]def readFields(inDir, resType, simName='', flagAvaDir=True, comModule='com1DFA', timeStep='', atol=1.e-6): """ Read ascii files within a directory and return List of dictionaries Parameters ----------- inDir: str path to input directory resType: list list of desired result types simName : str simulation name flagAvaDir: bool if True inDir corresponds to an avalanche directory and pickles are read from avaDir/Outputs/com1DFA/particles comModule: str module that computed the particles timeStep: float or list of floats desired time step if difference to time step of field file is smaller than atol field is found - optional atol: float look for matching time steps with atol tolerance - default is atol=1.e-6 Returns ------- fieldsList : list list of fields dictionaries fieldHeader: dict raster header corresponding to first element in fieldsList timeList: list tme corresponding to elements in fieldsList """ if flagAvaDir: inDir = pathlib.Path(inDir, 'Outputs', comModule, 'peakFiles', 'timeSteps') # initialise list of fields dictionaries fieldsList = [] timeList = [] first = True for r in resType: # search for all files within directory if simName: name = '*' + simName + '*' + r + '*.asc' else: name = '*' + r + '*.asc' FieldsNameList = list(inDir.glob(name)) timeListTemp = [float(element.stem.split('_t')[-1]) for element in FieldsNameList] FieldsNameList = [x for _, x in sorted(zip(timeListTemp, FieldsNameList))] count = 0 for fieldsName in FieldsNameList: t = float(fieldsName.stem.split('_t')[-1]) if timeStep == '' or np.isclose(timeStep, t, atol=atol).any(): # initialize field Dict if first: fieldsList.append({}) timeList.append(t) field = IOf.readRaster(fieldsName) fieldsList[count][r] = field['rasterData'] count = count + 1 first = False if count == 0: log.warning('No matching fields found in %s' % inDir) fieldHeader = None fieldsList = [] else: fieldHeader = field['header'] return fieldsList, fieldHeader, timeList
[docs]def exportFields(cfg, Tsave, fieldsList, demOri, outDir, logName): """ export result fields to Outputs directory according to result parameters and time step that can be specified in the configuration file Parameters ----------- cfg: dict configurations Tsave: list list of time step that corresponds to each dict in Fields Fields: list list of Fields for each dtSave outDir: str outputs Directory Returns -------- exported peak fields are saved in Outputs/com1DFA/peakFiles """ resTypesGen = fU.splitIniValueToArraySteps(cfg['GENERAL']['resType']) resTypesReport = fU.splitIniValueToArraySteps(cfg['REPORT']['plotFields']) if 'particles' in resTypesGen: resTypesGen.remove('particles') if 'particles' in resTypesReport: resTypesReport.remove('particles') numberTimes = len(Tsave)-1 countTime = 0 for timeStep in Tsave: if (countTime == numberTimes) or (countTime == 0): # for last time step we need to add the report fields resTypes = list(set(resTypesGen + resTypesReport)) else: resTypes = resTypesGen for resType in resTypes: resField = fieldsList[countTime][resType] if resType == 'ppr': resField = resField * 0.001 dataName = (logName + '_' + resType + '_' + 't%.2f' % (Tsave[countTime]) + '.asc') # create directory outDirPeak = outDir / 'peakFiles' / 'timeSteps' fU.makeADir(outDirPeak) outFile = outDirPeak / dataName IOf.writeResultToAsc( demOri['header'], resField, outFile, flip=True) if countTime == numberTimes: log.debug('Results parameter: %s has been exported to Outputs/peakFiles for time step: %.2f - FINAL time step ' % (resType, Tsave[countTime])) dataName = logName + '_' + resType + '.asc' # create directory outDirPeakAll = outDir / 'peakFiles' fU.makeADir(outDirPeakAll) outFile = outDirPeakAll / dataName IOf.writeResultToAsc(demOri['header'], resField, outFile, flip=True) else: log.debug('Results parameter: %s has been exported to Outputs/peakFiles for time step: %.2f ' % (resType, Tsave[countTime])) countTime = countTime + 1
[docs]def prepareVarSimDict(standardCfg, inputSimFiles, variationDict, simNameOld=''): """ Prepare a dictionary with simulations that shall be run with varying parameters following the variation dict Parameters ----------- standardCfg : configParser object default configuration or local configuration inputSimFiles: dict info dict on available input data variationDict: dict dictionary with parameter to be varied as key and list of it's values simNameOld: list list of simulation names that already exist (optional). If provided, only carry on simulation that do not exist Returns ------- simDict: dict dicionary with info on simHash, releaseScenario, release area file path, simType and contains full configuration configparser object for simulation run """ # get list of simulation types that are desired if 'simTypeList' in variationDict: simTypeList = variationDict['simTypeList'] del variationDict['simTypeList'] else: simTypeList = standardCfg['GENERAL']['simTypeList'].split('|') # get a list of simulation types that are desired AND available simTypeList = getSimTypeList(simTypeList, inputSimFiles) # set simTypeList (that has been checked if available) as parameter in variationDict variationDict['simTypeList'] = simTypeList # create a dataFrame with all possible combinations of the variationDict values variationDF = pd.DataFrame(product(*variationDict.values()), columns=variationDict.keys()) # generate a dictionary of full simulation info for all simulations to be performed # simulation info must contain: simName, releaseScenario, relFile, configuration as dictionary simDict = {} # create release scenario name for simulation name rel = inputSimFiles['relFiles'][0] relName = rel.stem if '_' in relName: relNameSim = relName + '_AF' else: relNameSim = relName # loop over all simulations that shall be performed according to variationDF # one row per simulation for row in variationDF.itertuples(): # convert full configuration to dict cfgSim = cfgUtils.convertConfigParserToDict(standardCfg) # update info for parameters that are given in variationDF for parameter in variationDict: # add simType cfgSim['GENERAL']['simTypeActual'] = row._asdict()['simTypeList'] # update parameter value - now only single value for each parameter keyList = ['relThPercentVariation', 'entThPercentVariation', 'secondaryRelThPercentVariation', 'relThRangeVariation', 'entThRangeVariation', 'secondaryRelThRangeVariation'] if parameter in keyList: # set thickness value according to percent variation info cfgSim = dP.setThicknessValueFromVariation(parameter, cfgSim, cfgSim['GENERAL']['simTypeActual'], row) else: cfgSim['GENERAL'][parameter] = row._asdict()[parameter] # update INPUT section - delete non relevant parameters if cfgSim['GENERAL']['simTypeActual'] not in ['ent', 'entres']: cfgSim['INPUT'].pop('entrainmentScenario', None) cfgSim['INPUT'].pop('entThId', None) cfgSim['INPUT'].pop('entThThickness', None) if cfgSim['GENERAL']['secRelArea'] == 'False': cfgSim['INPUT'].pop('secondaryReleaseScenario', None) cfgSim['INPUT'].pop('secondaryRelThId', None) cfgSim['INPUT'].pop('secondaryRelThThickness', None) # check if DEM in Inputs has desired mesh size pathToDem = dP.checkDEM(cfgSim, inputSimFiles['demFile']) cfgSim['INPUT']['DEM'] = pathToDem # add thickness values if read from shp and not varied cfgSim = dP.appendShpThickness(cfgSim) # convert back to configParser object cfgSimObject = cfgUtils.convertDictToConfigParser(cfgSim) # create unique hash for simulation configuration simHash = cfgUtils.cfgHash(cfgSimObject) simName = (relNameSim + '_' + row._asdict()['simTypeList'] + '_' + cfgSim['GENERAL']['modelType'] + '_' + simHash) # check if simulation exists. If yes do not append it if simName not in simNameOld: simDict[simName] = {'simHash': simHash, 'releaseScenario': relName, 'simType': row._asdict()['simTypeList'], 'relFile': rel, 'cfgSim': cfgSimObject} else: log.warning('Simulation %s already exists, not repeating it' % simName) log.info('The following simulations will be performed') for key in simDict: log.info('Simulation: %s' % key) inputSimFiles.pop('demFile') return simDict
[docs]def getSimTypeList(simTypeList, inputSimFiles): """ Define available simulation types of requested types Parameters ----------- standardCfg : configParser object default configuration or local configuration inputSimFiles: dict info dict on available input data Returns -------- simTypeList: list list of requested simTypes where also the required input data is available """ # read entrainment resistance info entResInfo = inputSimFiles['entResInfo'] # define simulation type if 'available' in simTypeList: if entResInfo['flagEnt'] == 'Yes' and entResInfo['flagRes'] == 'Yes': simTypeList.append('entres') elif entResInfo['flagEnt'] == 'Yes' and entResInfo['flagRes'] == 'No': simTypeList.append('ent') elif entResInfo['flagEnt'] == 'No' and entResInfo['flagRes'] == 'Yes': simTypeList.append('res') # always add null simulation simTypeList.append('null') simTypeList.remove('available') # remove duplicate entries simTypeList = set(simTypeList) simTypeList = sorted(list(simTypeList), reverse=False) if 'ent' in simTypeList or 'entres' in simTypeList: if entResInfo['flagEnt'] == 'No': message = 'No entrainment file found' log.error(message) raise FileNotFoundError(message) if 'res' in simTypeList or 'entres' in simTypeList: if entResInfo['flagRes'] == 'No': message = 'No resistance file found' log.error(message) raise FileNotFoundError(message) return simTypeList