"""
Conversion functions to read/ write Shape files
"""
import pathlib
import shapefile
import copy
import numpy as np
import logging
# create local logger
log = logging.getLogger(__name__)
[docs]def SHP2Array(infile, defname=None):
""" Read shapefile and convert it to a python dictionary
The dictionary contains the name of the paths in the shape file, the np array with
the coordinates of the feature points (all stacked in the same array)
and information about the starting index and length of each feature
Parameters
----------
infile: str
path to shape file
defname: str
name to give to the feature in the shape file
Returns
-------
SHPdata : dict
sks :
projection information
Name : str
list of feature names
x : 1D numpy array
np array of the x coord of the points in the features
y : 1D numpy array
np array of the y coord of the points in the features
z : 1D numpy array
np array of zeros and same size as the x an y coordinates array
Start : 1D numpy array
np array with the starting index of each feature in the coordinates
arrays (as many indexes as features)
Length : 1D numpy array
np array with the length of each feature in the coordinates
arrays (as many indexes as features)
thickness (optional) : 1D numpy array
np array with the (release or entrainment) thickness of each feature (as many values as features)
id : list
list of oid as string for each feature
ci95: list
list of 95% confidence interval of thickness value
nParts: list
list of parts of polygon (added the total number of points as list item, so if multiple parts len>2)
nFeatures: int
number of features per line (parts)
"""
# Input shapefile
sf = shapefile.Reader(str(infile))
infile = pathlib.Path(infile)
# set defaults for variables
layername = None
thickness = None
slope = None
rho = None
sks = None
iso = None
id = None
ci95 = None
layerN = None
# get coordinate system
sks = getSHPProjection(infile)
# Start reading the shapefile
records = sf.shapeRecords()
shps = sf.shapes()
SHPdata = {}
SHPdata['sks'] = sks
Name = []
thicknessList = []
idList = []
ci95List = []
layerNameList = []
Length = np.empty((0))
Start = np.empty((0))
Coordx = np.empty((0))
Coordy = np.empty((0))
Coordz = np.empty((0))
start = 0
nParts = []
for n, item in enumerate(shps):
pts = item.points
# is there a z coordinate?
try:
# for dams
zs = item.z
except AttributeError:
zs = [0.0] * len(pts)
# add info on number of parts
nParts.append(list(item.parts) + [len(item.points)])
# check if records are available and extract
if records:
# loop through fields
for (name, typ, size, deci), value in zip(sf.fields[1:], records[n].record):
# get entity name
name = name.lower()
if (name == 'name'):
layername = str(value)
if (name == 'thickness') or (name == 'd0'):
thickness = value
if (name == 'ci95'):
ci95 = value
if (name == 'slope'):
# for dams
slope = value
if (name == 'rho'):
rho = value
if (name == 'sks'):
sks = value
if (name == 'iso'):
iso = value
if (name == 'layer'):
layerN = value
# if name is still empty go through file again and take Layer instead
if ((type(layername) is bytes) or (layername is None)):
for (name, typ, size, deci), value in zip(sf.fields[1:], records[n].record):
if (name == 'Layer'):
layername = value
# if layer still not defined, use generic
if layername is None:
layername = defname
Name.append(layername)
log.debug('SHPConv: Found layer %s', layername)
thicknessList.append(str(thickness))
ci95List.append(str(ci95))
layerNameList.append(layerN)
Start = np.append(Start, start)
length = len(pts)
Length = np.append(Length, length)
start += length
for (pt, z) in zip(pts, zs):
Coordx = np.append(Coordx, pt[0])
Coordy = np.append(Coordy, pt[1])
Coordz = np.append(Coordz, z)
# get unique ID of features in shapefile
for rec in sf.records():
id = rec.oid
idList.append(str(id))
SHPdata['Name'] = Name
SHPdata['thickness'] = thicknessList
SHPdata['slope'] = slope
SHPdata['Start'] = Start
SHPdata['Length'] = Length
SHPdata['x'] = Coordx
SHPdata['y'] = Coordy
SHPdata['z'] = Coordz
SHPdata['id'] = idList
SHPdata['ci95'] = ci95List
SHPdata['layerName'] = layerNameList
SHPdata['nParts'] = nParts
SHPdata['nFeatures'] = len(Start)
sf.close()
return SHPdata
[docs]def getSHPProjection(infile):
""" Fetch projection from shp file
Parameters
----------
infile: str
path to shape file
Returns
-------
sks: str
projection string (if available, None if not)
"""
# get coordinate system
prjfile = infile.with_suffix('.prj')
if prjfile.is_file():
prjf = open(prjfile, 'r')
sks = prjf.readline()
prjf.close()
return sks
else:
message = ('No projection layer for shp file %s' % infile)
log.warning(message)
return None
[docs]def readThickness(infile, defname=None):
""" Read shapefile and fetch info on features' ids and thickness values
Parameters
----------
infile: str
path to shape file
defname: str
name to give to the feature in the shape file
Returns
-------
thickness: list
list of strings with the (release or entrainment) thickness of each feature (as many values as features)
id : list
list of strings for oid of each feature in shp file
ci95: list
list of all ci95 values if provided
"""
# Input shapefile
sf = shapefile.Reader(str(infile))
infile = pathlib.Path(infile)
thickness = None
id = None
ci95 = None
# Start reading the shapefile
records = sf.shapeRecords()
shps = sf.shapes()
thicknessList = []
idList = []
ci95List = []
for n, item in enumerate(shps):
pts = item.points
zs = [0.0] * len(pts)
# check if records are available and extract
if records:
# loop through fields
for (name, typ, size, deci), value in zip(sf.fields[1:], records[n].record):
# get entity name
name = name.lower()
if (name == 'thickness') or (name == 'd0'):
thickness = value
if (name == 'ci95'):
ci95 = value
thicknessList.append(str(thickness))
ci95List.append(str(ci95))
# get unique ID of features in shapefile
for rec in sf.records():
id = rec.oid
idList.append(str(id))
sf.close()
return thicknessList, idList, ci95List
[docs]def readLine(fname, defname, dem):
""" Read line from .shp
Use SHP2Array to read the shape file.
Check if the lines are laying inside the dem extend
Parameters
----------
fname: str
path to shape file
defname: str
name to give to the line in the shape file
dem: dict
dem dictionary
Returns
-------
Line : dict
Line['Name'] : list of lines names
Line['Coord'] : np array of the coords of points in lines
Line['Start'] : list of starting index of each line in Coord
Line['Length'] : list of length of each line in Coord
"""
log.debug('Reading avalanche path : %s ', str(fname))
header = dem['header']
rasterDEM = dem['rasterData']
Line = SHP2Array(fname, defname)
coordx = Line['x']
coordy = Line['y']
for i in range(len(coordx)):
Lx = (coordx[i] - header['xllcenter']) / header['cellsize']
Ly = (coordy[i] - header['yllcenter']) / header['cellsize']
if ((Ly < 0) or (Ly > header['nrows']-1) or (Lx < 0) or (Lx > header['ncols']-1)):
log.error(fname)
raise ValueError('This shape file exceeds dem extent. This is not allowed')
elif np.isnan(rasterDEM[int(np.floor(Ly)), int(np.floor(Lx))]):
log.error(fname)
raise ValueError('This shape file is at least partially outside of DEM, this is not allowed!')
return Line
[docs]def readPoints(fname, dem):
""" Read points from .shp
Use SHP2Array to read the shape file.
Check if the points are laying inside the dem extend
Parameters
----------
fname: str
path to shape file
defname: str
name to give to the points in the shape file
dem: dict
dem dictionary
Returns
-------
Line : dict
Line['Name'] : list of points names
Line['Coord'] : np array of the coords of points in points
Line['Start'] : list of starting index of each point in Coord
Line['Length'] : list of length of each point in Coord
"""
log.debug('Reading split point : %s ', str(fname))
header = dem['header']
rasterDEM = dem['rasterData']
defname = 'SHP'
Points = SHP2Array(fname, defname)
Pointx = Points['x']
Pointy = Points['y']
for i in range(len(Pointx)):
Lx = (Pointx[i] - header['xllcenter']) / header['cellsize']
Ly = (Pointy[i] - header['yllcenter']) / header['cellsize']
if (Ly < 0 or Ly > header['nrows']-1 or Lx < 0 or Lx > header['ncols']-1):
raise ValueError('The split point is not on the dem. Try with another split point')
elif np.isnan(rasterDEM[int(np.floor(Ly)), int(np.floor(Lx))]):
raise ValueError('Nan Value encountered. Try with another split point')
return Points
[docs]def removeFeature(featureIn, nFeature2Remove):
""" Remove feature number nFeature2Remove from featureIn
Parameters
----------
featureIn: dict
shape file dicionary (structure produced by SHP2Array, readLine or readPoint)
nFeature2Remove: int
index of feature to remove from featureIn
Returns
-------
featureOut : dict
shape file dicionary without feature nFeature2Remove
"""
StartRel = featureIn['Start']
LengthRel = featureIn['Length']
thickness = featureIn['thickness']
featureOut = copy.deepcopy(featureIn)
start = StartRel[nFeature2Remove]
end = start + LengthRel[nFeature2Remove]
# remove feature
featureOut['x'] = np.delete(featureIn['x'], np.arange(int(start), int(end)))
featureOut['y'] = np.delete(featureIn['y'], np.arange(int(start), int(end)))
if 'z' in featureIn.keys():
featureOut['z'] = np.delete(featureIn['z'], np.arange(int(start), int(end)))
del featureOut['Name'][nFeature2Remove]
StartRel = featureOut['Start']
StartRel[nFeature2Remove:] = StartRel[nFeature2Remove:] - LengthRel[nFeature2Remove]
featureOut['Start'] = np.delete(StartRel, nFeature2Remove)
featureOut['Length'] = np.delete(LengthRel, nFeature2Remove)
featureOut['thickness'] = np.delete(thickness, nFeature2Remove)
return featureOut
[docs]def writeLine2SHPfile(lineDict, lineName, fileName, header=''):
""" write a line to shapefile
Parameters
----------
lineDict: dict
line dict
lineName: str
line name
fileName: str or pathlib path
path where the line will be saved
line name
header: dict
optional argument ('' by default). If provided, header dictionary with 'xllcenter' and 'yllcenter' to add to the
line
Returns
-------
fileName : str
path where the line has been saved
"""
fileName = str(fileName)
line = np.zeros((np.size(lineDict['x']), 2))
line[:, 0] = lineDict['x']
line[:, 1] = lineDict['y']
if header:
line[:, 0] = line[:, 0] + header['xllcenter']
line[:, 1] = line[:, 1] + header['yllcenter']
w = shapefile.Writer(fileName)
w.field('name', 'C')
w.line([line])
w.record(lineName)
w.close()
return fileName
[docs]def writePoint2SHPfile(pointDict, pointName, fileName):
""" write a point to shapefile
Parameters
----------
pointDict: dict
point dict
pointName: str
point name
fileName: str or pathlib path
path where the point will be saved
Returns
-------
fileName : str
path where the point has been saved
"""
fileName = str(fileName)
w = shapefile.Writer(fileName)
w.field('name', 'C')
if len(pointDict['x']) > 1 or len(pointDict['y']) > 1:
message = 'Length of pointDict is not allowed to exceed one'
log.error(message)
raise ValueError(message)
w.point(pointDict['x'][0], pointDict['y'][0])
w.record(pointName)
w.close()
return fileName