Source code for ana1Tests.damBreak

"""
Simple python script to reproduce analytic solution for a Riemann problem,
following the derivations in Faccanoni and Mangeney (2012), Test 2, Case 1.2.
but scaled up in size.

Here the instantanous release of fluid from rest is described using incompressible,
thickness-avaeraged mass and momentum conservation equations and a Coulomb-tpye friction law.
"""

import numpy as np
import logging

# local imports
import avaframe.com1DFA.com1DFA as com1DFA
from avaframe.com1DFA import DFAtools
import avaframe.ana1Tests.analysisTools as anaTools
import avaframe.out3Plot.outAna1Plots as outAna1Plots

# create local logger
# change log level in calling module to DEBUG to see log messages
log = logging.getLogger(__name__)


[docs]def damBreakSol(avaDir, cfg, cfgC, outDirTest): """ Compute analytical Flow thickness and velocity for dam break test case for a granular flow over a dry rough sloping bed with the Savage Hutter model Parameters ----------- avaDir: str path to avalanche directory cfg: configParser object main avaframe configuration - here showPlot flag used cfgC: configParser object configuration setting for avalanche simulation including DAMBREAK section outDirTest: pathlib path path to output directory Returns ------- solDam: dict analytic solution dictionary: tAna: 1D numpy array time array h0: float release thickness hAna: 2D numpy array Flow thickness (rows for x and columns for time) uAna: 2D numpy array flow velocity (rows for x and columns for time) xAna: 2D numpy array extent of domain in the horizontal plane coordinate system (rows for x and columns for time) xMidAna: 1D numpy array middle of the material in x dir in the horizontal plane coordinate system (used to compute the error) """ # Set Parameters # Coordinate system chosen in the direction of the inclined plane g = cfgC['GENERAL'].getfloat('gravAcc') # acceleration due to gravity [ms-2] phiDeg = cfgC['DAMBREAK'].getfloat('phi') # slope angle [°] deltaDeg = cfgC['DAMBREAK'].getfloat('delta') # friction angle [°] phi = np.radians(phiDeg) # slope angle [rad] delta = np.radians(deltaDeg) # bed friction angle [rad] gz = g * np.cos(phi) # projection of g perpendicular to the inclined plane m0 = gz * (np.tan(phi) - np.tan(delta)) # constant x-acceleration resulting from gravity and friction force hL = cfgC['GENERAL'].getfloat('relTh') # initial height [m] in Riemann problem in state 1 (x<0), hR (x>0)=0 cL = np.sqrt(gz * hL) x0R = cfgC['DAMBREAK'].getfloat('xBack') / np.cos(phi) tSave = cfgC['DAMBREAK'].getfloat('tSave') # Define time [0-1] seconds and space [-2,2] meters domains multiplied times 100 tAna = np.arange(0, cfgC['DAMBREAK'].getfloat('tEnd'), cfgC['DAMBREAK'].getfloat('dt')) x = np.arange(cfgC['DAMBREAK'].getfloat('xStart') / np.cos(phi) , cfgC['DAMBREAK'].getfloat('xEnd') / np.cos(phi), cfgC['DAMBREAK'].getfloat('dx')) nt = len(tAna) nx = len(x) # Initialise Flow thickness solution and velocity h = np.zeros((nx, nt)) u = np.zeros((nx, nt)) xMiddle = np.zeros(nt) # Compute exact solution for case: 'dry bed' - including three different states for m in range(nt): cond1R = x0R + ((m0*tAna[m]) / 2.0 - 2*cL) * tAna[m] cond2R = x0R + ((m0*tAna[m]) / 2.0 + cL) * tAna[m] cond1 = ((m0*tAna[m]) / 2.0 - cL) * tAna[m] cond2 = (2.0 *cL + ((m0*tAna[m]) / 2.0)) * tAna[m] # elif x[k] > cond2: u[:, m] = 0 h[:, m] = 0 # elif cond1 < x[k] <= cond2: if tAna[m] > 0: u[:, m] = np.where(cond2 >= x, (2./3.) * (cL + (x / tAna[m]) + m0 * tAna[m]), u[:, m]) h[:, m] = np.where(cond2 >= x, ((2.* cL - (x / tAna[m]) + ((m0 * tAna[m]) / 2.))**2) / (9. * gz), h[:, m]) # if x[k] <= cond1: u[:, m] = np.where(cond1 >= x, m0 * tAna[m], u[:, m]) h[:, m] = np.where(cond1 >= x, hL, h[:, m]) # uncomment this if you also want the rear part of the dam break # to get the analytical solution on the back part (not working very well, theory is probably wrong) # # elif cond1R < x[k] <= cond2R: # if tAna[m] > 0: # u[:, m] = np.where(cond2R >= x, (2./3.) * (-cL + ((x-x0R) / tAna[m]) + m0 * tAna[m]), u[:, m]) # h[:, m] = np.where(cond2R >= x, ((2.* cL + ((x-x0R) / tAna[m]) - # ((m0 * tAna[m]) / 2.))**2) / (9. * gz), h[:, m]) # # if x[k] <= cond1R: # u[:, m] = np.where(cond1R >= x, 0, u[:, m]) # h[:, m] = np.where(cond1R >= x, 0, h[:, m]) xMiddle[m] = (2*cond2R + cond1)/3 #-----------------------------Plot results -------------- # Reproduce figure 6, case 1.2.1 - Test 2 outAna1Plots.plotDamAnaResults(tAna, x, xMiddle, h, u, tSave, cfg, outDirTest) x = x * np.cos(phi) # projected on the horizontal plane xMiddle = xMiddle * np.cos(phi) # projected on the horizontal plane solDam = {'tAna': tAna, 'h0': hL, 'xAna': x, 'hAna': h, 'uAna': u, 'xMidAna': xMiddle} return solDam
[docs]def postProcessDamBreak(avalancheDir, cfgMain, cfgDam, simDF, solDam, outDirTest): """ loop on all DFA simulations and compare then to the anlytic solution Parameters ----------- avalancheDir: str or pathlib path avalanche directory cfgMain: confiparser avaframeCfg configuration cfgDam: configParser object configuration setting for avalanche simulation including DAMBREAK section simDF: pandas dataFrame configuration DF solDam: dict analytic solution dictionary outDirTest: pathlib path path to output directory Returns -------- simDF: pandas dataFrame configuration DF appended with the analysis results """ # loop on all the simulations and make the comparison to reference for simHash, simDFrow in simDF.iterrows(): simName = simDFrow['simName'] # fetch the simulation results fieldsList, fieldHeader, timeList = com1DFA.readFields(avalancheDir, ['FT', 'FV', 'Vx', 'Vy', 'Vz'], simName=simName, flagAvaDir=True, comModule='com1DFA') # analyze and compare results if cfgDam['DAMBREAK']['tSave'] == '': indTime = -1 tSave = timeList[-1] else: tSave = cfgDam['DAMBREAK'].getfloat('tSave') indTime = min(np.searchsorted(timeList, tSave), min(len(timeList)-1, len(fieldsList)-1)) if cfgDam['DAMBREAK'].getboolean('onlyLast'): fieldsList = [fieldsList[0], fieldsList[indTime]] timeList = [0, timeList[indTime]] indTime = 1 # computeAndPlotGradient(avalancheDir, particlesList, timeList, solDam, cfgDam, outDirTest, simHash, simDFrow) hEL2Array, hELMaxArray, vhEL2Array, vhELMaxArray, t = analyzeResults(avalancheDir, fieldsList, timeList, solDam, fieldHeader, cfgDam, outDirTest, simHash, simDFrow) # add result of error analysis # save results in the simDF simDF.loc[simHash, 'timeError'] = t simDF.loc[simHash, 'hErrorL2'] = hEL2Array[indTime] simDF.loc[simHash, 'vhErrorL2'] = vhEL2Array[indTime] simDF.loc[simHash, 'hErrorLMax'] = hELMaxArray[indTime] simDF.loc[simHash, 'vhErrorLMax'] = vhELMaxArray[indTime] # +++++++++POSTPROCESS++++++++++++++++++++++++ # ------------------------------- name = 'results' + str(round(t)) + '.p' simDF.to_pickle(outDirTest / name) return simDF
[docs]def analyzeResults(avalancheDir, fieldsList, timeList, solDam, fieldHeader, cfg, outDirTest, simHash, simDFrow): """Compare analytical and com1DFA results, compute error Parameters ----------- avalancheDir: pathlib path path to avalanche directory fieldsList: list list of fields dictionaries timeList: list list of time steps solDam: dict analytic solution dictionary fieldHeader: dict header dictionary with info about the extend and cell size cfg: configParser object configuration setting for avalanche simulation including DAMBREAK section outDirTest: pathlib path path output directory (where to save the figures) simHash: str com1DFA simulation id simDFrow: pandas object com1DFA simulation rox coresponding to simHash Returns -------- hErrorL2Array: numpy array L2 error on Flow thickness for saved time steps hErrorLMaxArray: numpy array LMax error on Flow thickness for saved time steps vErrorL2Array: numpy array L2 error on flow velocity for saved time steps vErrorLMaxArray: numpy array LMax error on flow velocity for saved time steps tSave: float time corresponding the errors """ cfgDam = cfg['DAMBREAK'] phi = cfgDam.getfloat('phi') phiRad = np.radians(phi) cellSize = fieldHeader['cellsize'] xMiddle = solDam['xMidAna'] tAna = solDam['tAna'] xAna = solDam['xAna'] hAna = solDam['hAna'] uAna = solDam['uAna'] # get plots limits limits = outAna1Plots.getPlotLimits(cfgDam, fieldsList, fieldHeader) hErrorL2Array = np.zeros((len(fieldsList))) vhErrorL2Array = np.zeros((len(fieldsList))) hErrorLMaxArray = np.zeros((len(fieldsList))) vhErrorLMaxArray = np.zeros((len(fieldsList))) count = 0 # run the comparison routine for each saved time step for t, field in zip(timeList, fieldsList): # get similartiy solution h, u at required time step indTime = np.searchsorted(tAna, t) xM = xMiddle[indTime] # get extent where the sol should be compared xDamPlus, nColMid, nColMax, nRowMin, nRowMax = getDamExtend(fieldHeader, xM, cfgDam) # get analytical solution (in same format as the simulation results) hDamPlus = np.interp(xDamPlus, xAna, hAna[:, indTime]) uDamPlus = np.interp(xDamPlus, xAna, uAna[:, indTime]) # get numirical sol on good interval hNumPlus = field['FT'][nRowMin:nRowMax, nColMid:nColMax] dataNumV = DFAtools.scalProd(field['Vx'], field['Vy'], field['Vz'], np.cos(phiRad), 0, -np.sin(phiRad)) uNumPlus = dataNumV[nRowMin:nRowMax, nColMid:nColMax] # make analytical results 2D nrows, ncols = np.shape(hNumPlus) O = np.ones((nrows, ncols)) hDamPlus = O*hDamPlus uDamPlus = O*uDamPlus hEL2Plus, hL2RelPlus, hLmaxPlus, hLmaxRelPlus = anaTools.normL2Scal(hDamPlus, hNumPlus, cellSize, np.cos(phiRad)) vhL2Plus, vhL2RelPlus, vhLmaxPlus, vhLmaxRelPlus = anaTools.normL2Scal(hDamPlus*uDamPlus, hNumPlus*uNumPlus, cellSize, np.cos(phiRad)) if cfgDam.getboolean('relativError'): hErrorL2Array[count] = hL2RelPlus hErrorLMaxArray[count] = hLmaxRelPlus vhErrorL2Array[count] = vhL2RelPlus vhErrorLMaxArray[count] = vhLmaxRelPlus else: hErrorL2Array[count] = hEL2Plus hErrorLMaxArray[count] = hLmaxPlus vhErrorL2Array[count] = vhL2Plus vhErrorLMaxArray[count] = vhLmaxPlus title = outAna1Plots.getTitleError(cfgDam.getboolean('relativError')) log.debug("L2 %s error on the Flow thickness at t=%.2f s is : %.4f" % (title, t, hEL2Plus)) log.debug("L2 %s error on the momentum at t=%.2f s is : %.4f" % (title, t, vhL2Plus)) # Make all individual time step comparison plot if cfgDam.getboolean('plotSequence'): outAna1Plots.plotComparisonDam(cfgDam, simHash, fieldsList[0], field, fieldHeader, solDam, t, limits, outDirTest) count = count + 1 tSave = cfgDam.getfloat('tSave') indT = min(np.searchsorted(timeList, tSave), min(len(timeList)-1, len(fieldsList)-1)) tSave = timeList[indT] if cfgDam.getboolean('plotErrorTime') and len(timeList)>1: # Create result plots outAna1Plots.plotErrorTime(timeList, hErrorL2Array, hErrorLMaxArray, vhErrorL2Array, vhErrorLMaxArray, cfgDam.getboolean('relativError'), tSave, simHash, outDirTest) outAna1Plots.plotDamBreakSummary(avalancheDir, timeList, fieldsList, fieldHeader, solDam, hErrorL2Array, hErrorLMaxArray, vhErrorL2Array, vhErrorLMaxArray, outDirTest, simDFrow, simHash, cfg) return hErrorL2Array, hErrorLMaxArray, vhErrorL2Array, vhErrorLMaxArray, tSave
[docs]def getDamExtend(fieldHeader, xM, cfgDam): """ Get the extend where the analytic and simulation should be compared Parameters ----------- fieldHeader: dict header dictionary with info about the extend and cell size xM: float x coordinate ot the start of the comparizon domain cfgDam: configParser object configuration setting for the DAMBREAK section Returns -------- xDamPlus: 1D numpy array x array corresponding to the comparizon domain nColMid: int index of the column corresponding to xM nColMax: int index of the column corresponding to xEnd nRowMin: int index of the row corresponding to yStart nRowMax: int index of the row corresponding to yEnd """ cellSize = fieldHeader['cellsize'] xllc = fieldHeader['xllcenter'] yllc = fieldHeader['yllcenter'] nCols = fieldHeader['ncols'] xEnd = cfgDam.getfloat('xEnd') yStart = cfgDam.getfloat('yStart') yEnd = cfgDam.getfloat('yEnd') xArray = np.arange(nCols)*cellSize + xllc nColMid = round((xM-xllc)/cellSize) nColMax = round((xEnd-xllc)/cellSize) nRowMin = round((yStart-yllc)/cellSize) nRowMax = round((yEnd-yllc)/cellSize) xDamPlus = xArray[nColMid:nColMax] return xDamPlus, nColMid, nColMax, nRowMin, nRowMax