"""
Conversion functions to read/ write Shape files
"""
import pathlib
import shapefile
import copy
import numpy as np
import logging
# create local logger
log = logging.getLogger(__name__)
[docs]def SHP2Array(infile, defname=None):
""" Read shapefile and convert it to a python dictionary
The dictionary contains the name of the paths in the shape file, the np array with
the coordinates of the feature points (all stacked in the same array)
and information about the starting index and length of each feature
Parameters
----------
infile: str
path to shape file
defname: str
name to give to the feature in the shape file
Returns
-------
SHPdata : dict
sks :
projection information
Name : str
list of feature names
x : 1D numpy array
np array of the x coord of the points in the features
y : 1D numpy array
np array of the y coord of the points in the features
z : 1D numpy array
np array of zeros and same size as the x an y coordinates array
Start : 1D numpy array
np array with the starting index of each feature in the coordinates
arrays (as many indexes as features)
Lengt : 1D numpy array
np array with the length of each feature in the coordinates
arrays (as many indexes as features)
thickness (optional) : 1D numpy array
np array with the (release or entrainment) thickness of each feature (as many values as features)
id : list
list of oid as string for each feature
"""
# Input shapefile
sf = shapefile.Reader(str(infile))
infile = pathlib.Path(infile)
# set defaults for variables
layername = None
thickness = None
rho = None
sks = None
iso = None
id = None
# get coordinate system
prjfile = infile.with_suffix('.prj')
if prjfile.is_file():
prjf = open(prjfile, 'r')
sks = prjf.readline()
prjf.close()
# Start reading the shapefile
records = sf.shapeRecords()
shps = sf.shapes()
SHPdata = {}
SHPdata['sks'] = sks
Name = []
thicknessList = []
idList = []
Length = np.empty((0))
Start = np.empty((0))
Coordx = np.empty((0))
Coordy = np.empty((0))
Coordz = np.empty((0))
start = 0
for n, item in enumerate(shps):
pts = item.points
zs = [0.0] * len(pts)
# check if records are available and extract
if records:
# loop through fields
for (name, typ, size, deci), value in zip(sf.fields[1:], records[n].record):
# get entity name
name = name.lower()
if (name == 'name'):
layername = str(value)
if (name == 'thickness') or (name == 'd0'):
thickness = value
if (name == 'rho'):
rho = value
if (name == 'sks'):
sks = value
if (name == 'iso'):
iso = value
# if name is still empty go through file again and take Layer instead
if ((type(layername) is bytes) or (layername is None)):
for (name, typ, size, deci), value in zip(sf.fields[1:], records[n].record):
if (name == 'Layer'):
layername = value
# if layer still not defined, use generic
if layername is None:
layername = defname
Name.append(layername)
log.debug('SHPConv: Found layer %s', layername)
thicknessList.append(str(thickness))
Start = np.append(Start, start)
length = len(pts)
Length = np.append(Length, length)
start += length
for (pt, z) in zip(pts, zs):
Coordx = np.append(Coordx, pt[0])
Coordy = np.append(Coordy, pt[1])
Coordz = np.append(Coordz, z)
# get unique ID of features in shapefile
for rec in sf.records():
id = rec.oid
idList.append(str(id))
SHPdata['Name'] = Name
SHPdata['thickness'] = thicknessList
SHPdata['Start'] = Start
SHPdata['Length'] = Length
SHPdata['x'] = Coordx
SHPdata['y'] = Coordy
SHPdata['z'] = Coordz
SHPdata['id'] = idList
sf.close()
return SHPdata
[docs]def readThickness(infile, defname=None):
""" Read shapefile and fetch info on features' ids and thickness values
Parameters
----------
infile: str
path to shape file
defname: str
name to give to the feature in the shape file
Returns
-------
thickness: list
list of strings with the (release or entrainment) thickness of each feature (as many values as features)
id : list
list of strings for oid of each feature in shp file
"""
# Input shapefile
sf = shapefile.Reader(str(infile))
infile = pathlib.Path(infile)
thickness = None
id = None
# Start reading the shapefile
records = sf.shapeRecords()
shps = sf.shapes()
thicknessList = []
idList = []
for n, item in enumerate(shps):
pts = item.points
zs = [0.0] * len(pts)
# check if records are available and extract
if records:
# loop through fields
for (name, typ, size, deci), value in zip(sf.fields[1:], records[n].record):
# get entity name
name = name.lower()
if (name == 'thickness') or (name == 'd0'):
thickness = value
thicknessList.append(str(thickness))
# get unique ID of features in shapefile
for rec in sf.records():
id = rec.oid
idList.append(str(id))
sf.close()
return thicknessList, idList
[docs]def readLine(fname, defname, dem):
""" Read line from .shp
Use SHP2Array to read the shape file.
Check if the lines are laying inside the dem extend
Parameters
----------
fname: str
path to shape file
defname: str
name to give to the line in the shape file
dem: dict
dem dictionary
Returns
-------
Line : dict
Line['Name'] : list of lines names
Line['Coord'] : np array of the coords of points in lines
Line['Start'] : list of starting index of each line in Coord
Line['Length'] : list of length of each line in Coord
"""
log.debug('Reading avalanche path : %s ', str(fname))
header = dem['header']
rasterDEM = dem['rasterData']
Line = SHP2Array(fname, defname)
coordx = Line['x']
coordy = Line['y']
for i in range(len(coordx)):
Lx = (coordx[i] - header['xllcenter']) / header['cellsize']
Ly = (coordy[i] - header['yllcenter']) / header['cellsize']
if ((Ly < 0) or (Ly > header['nrows']-1) or (Lx < 0) or (Lx > header['ncols']-1)):
raise ValueError('The avalanche path exceeds dem extent. Try with another path')
elif np.isnan(rasterDEM[int(np.floor(Ly)), int(np.floor(Lx))]):
raise ValueError('Nan Value encountered. Try with another path')
return Line
[docs]def readPoints(fname, dem):
""" Read points from .shp
Use SHP2Array to read the shape file.
Check if the points are laying inside the dem extend
Parameters
----------
fname: str
path to shape file
defname: str
name to give to the points in the shape file
dem: dict
dem dictionary
Returns
-------
Line : dict
Line['Name'] : list of points names
Line['Coord'] : np array of the coords of points in points
Line['Start'] : list of starting index of each point in Coord
Line['Length'] : list of length of each point in Coord
"""
log.debug('Reading split point : %s ', str(fname))
header = dem['header']
rasterDEM = dem['rasterData']
defname = 'SHP'
Points = SHP2Array(fname, defname)
Pointx = Points['x']
Pointy = Points['y']
for i in range(len(Pointx)):
Lx = (Pointx[i] - header['xllcenter']) / header['cellsize']
Ly = (Pointy[i] - header['yllcenter']) / header['cellsize']
if (Ly < 0 or Ly > header['nrows']-1 or Lx < 0 or Lx > header['ncols']-1):
raise ValueError('The split point is not on the dem. Try with another split point')
elif np.isnan(rasterDEM[int(np.floor(Ly)), int(np.floor(Lx))]):
raise ValueError('Nan Value encountered. Try with another split point')
return Points
[docs]def removeFeature(featureIn, nFeature2Remove):
""" Remove feature number nFeature2Remove from featureIn
Parameters
----------
featureIn: dict
shape file dicionary (structure produced by SHP2Array, readLine or readPoint)
nFeature2Remove: int
index of feature to remove from featureIn
Returns
-------
featureOut : dict
shape file dicionary without feature nFeature2Remove
"""
StartRel = featureIn['Start']
LengthRel = featureIn['Length']
thickness = featureIn['thickness']
featureOut = copy.deepcopy(featureIn)
start = StartRel[nFeature2Remove]
end = start + LengthRel[nFeature2Remove]
# remove feature
featureOut['x'] = np.delete(featureIn['x'], np.arange(int(start), int(end)))
featureOut['y'] = np.delete(featureIn['y'], np.arange(int(start), int(end)))
if 'z' in featureIn.keys():
featureOut['z'] = np.delete(featureIn['z'], np.arange(int(start), int(end)))
del featureOut['Name'][nFeature2Remove]
StartRel = featureOut['Start']
StartRel[nFeature2Remove:] = StartRel[nFeature2Remove:] - LengthRel[nFeature2Remove]
featureOut['Start'] = np.delete(StartRel, nFeature2Remove)
featureOut['Length'] = np.delete(LengthRel, nFeature2Remove)
featureOut['thickness'] = np.delete(thickness, nFeature2Remove)
return featureOut
[docs]def writeLine2SHPfile(lineDict, lineName, fileName):
""" write a line to shapefile
Parameters
----------
lineDict: dict
line dict
lineName: str
line name
fileName: str or pathlib path
path where the line will be saved
Returns
-------
fileName : str
path where the line has been saved
"""
fileName = str(fileName)
line = np.zeros((np.size(lineDict['x']), 2))
line[:, 0] = lineDict['x']
line[:, 1] = lineDict['y']
w = shapefile.Writer(fileName)
w.field('name', 'C')
w.line([line])
w.record(lineName)
w.close()
return fileName