# -*- coding: utf-8 -*-
"""
Misc utility module.
"""
from obspy.geodetics.base import gps2dist_azimuth
from obspy.core.event import Catalog
from obspy import read_events, UTCDateTime
from os.path import exists
import subprocess
import numpy
from fnmatch import fnmatch
try:
from shapely.geometry import LineString, Polygon
from shapely.ops import nearest_points
except Exception as e:
print('shapely skipped')
print(e)
try:
import cartopy.io.shapereader as shpreader
except Exception as e:
print('cartopy skipped')
print(e)
from pyshake import gm
[docs]
def inv2coord(inventory,
declust=0.01,
flat_latency={'*':0}):
"""
Returns the list of stations latitudes and longitudes,
omiting old stations being too close to another one
"""
statlats=[s.latitude for n in inventory for s in n ]
statlons=[s.longitude for n in inventory for s in n ]
statoff=[]
statid=[]
for n in inventory:
for s in n :
for c in s :
mem=0
mseedid = '%s.%s.%s.%s'%(n.code,s.code,c.location_code,c.code)
for k in flat_latency:
if (len(k)>mem and
fnmatch(mseedid,k)):
mem=len(k)
delay=flat_latency[k]
memid=mseedid
statoff+=[delay]
statid+=[mseedid]
tooclose=[]
statindexes=range(len(statlats))
for i in statindexes:
for j in range(i+1,len(statlats)):
if (abs(statlats[i]-statlats[j])<declust and
abs(statlons[i]-statlons[j])<declust):
tooclose+=[i]
if statoff[i]>statoff[j]:
statoff[i]=statoff[j]
statlats = [statlats[i] for i in statindexes if i not in tooclose]
statlons = [statlons[i] for i in statindexes if i not in tooclose]
statoff = [statoff[i] for i in statindexes if i not in tooclose]
return statlats,statlons,statoff
from numpy import deg2rad,sin,cos,sin,arcsin,sqrt,abs
[docs]
def haversine(lon1, lat1, lon2, lat2,
r = 6371 # Radius of earth in kilometers. Use 3956 for miles
):
"""
Calculate the great circle distance between two points
on the earth (specified in decimal degrees)
"""
# convert decimal degrees to radians
lon1, lat1, lon2, lat2 = map(deg2rad, [lon1, lat1, lon2, lat2])
# haversine formula
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * arcsin(sqrt(a))
return abs(c * r)
taup=True
try:
from obspy.taup import TauPyModel
except Exception as e:
print(e)
taup=False
from scipy.interpolate import interp1d
from numpy import linspace
[docs]
def tttinterp(model='iasp91',
depth=5,
ttt={'p':None,'s':None},
dmax=3):
"""
Return the dict of interpolants for p and s-phase
travel times modeling at any distance such as:
`time = ttt[phases](dist)`
"""
if not taup:
print('TauP cannot be imported, this does not work')
return ttt
mod = TauPyModel(model=model)
distances=linspace(.001,dmax,64)
for phases in ttt.keys():
arrivals=[]
for d in distances:
opt={'source_depth_in_km':depth,
'distance_in_degree':d,
'phase_list':['tt'+phases],
'receiver_depth_in_km':0.0}
arrivals+=[min([a.time for a in mod.get_travel_times(**opt)])]
ttt[phases] = interp1d(distances,arrivals)
#time = ttt[phases](dist)
return ttt
from geopip import search
[docs]
def isincountries(la, lo, countrycodes,
dthresh=1.4):
"""
The function `isincountries` checks if a given latitude and longitude pair
is within a specified distance threshold of any country in a list of ISO2
country codes. The function uses the `geopip` library to determine the country
code for the given coordinates
:param la: Input latitude.
:type la: :py:class:`float`
:param lo: Input longitude.
:type lo: :py:class:`float`
:param countrycodes: A comma separated list of ISO2 country codes.
:type countrycodes: :py:class:`str`
:param dthresh: The distance threshold within which the function will search
for countries.
:type dthresh: :py:class:`float`
:return: The function will return `True` if the coordinates are within any of the
specified countries, and `False` otherwise.
:rtype: :py:class:`bool`
"""
if countrycodes is None:
return True
inside=False
lalo=[la,lo]
for lad in [la+dthresh,la-dthresh,la]:
for lod in [lo+dthresh,lo-dthresh,lo]:
gcode=search(lng=lod, lat=lad)
if gcode is None or gcode['ISO2'] is None:
continue
if gcode['ISO2'].lower() in countrycodes:
return True
if gcode is None or gcode['ISO2'] is None:#gcode.country_code is None:
return
[docs]
def eventdistance(la, lo, de,
refla, reflo, refde,
v=1,
meters=800):
"""
The function `eventdistance` calculates the distance between two geographical
coordinates, taking into account the depth of the locations if provided.
:param la: Latitude of the first location.
:param la: :py:class:`float`
:param lo: Longitude of the first location.
:param lo: :py:class:`float`
:param de: Depth of the first location (set to None if unknown).
:param de: :py:class:`float`
:param refla: Latitude of the second location.
:param refla: :py:class:`float`
:param reflo: Longitude of the second location.
:param reflo: :py:class:`float`
:param refde: Depth of the second location (set to None if unknown).
:param refde: :py:class:`float`
:param v: Divider of the horizontal distance output in case depth is unknown.
:param v: :py:class:`float`
:param meters: If the absolute value of the depth (`de` or `refde`) is
greater than this threshold, the function assumes that the depth is
in kilometers instead of meters.
:param meters: :py:class:`float`
:return: The distance in kilometers between the two geographical coordinates.
:rtype: :py:class:`float`
"""
laloref=[la,lo]
lalo = [refla, reflo]
#d = 111.2*locations2degrees(lalo[0],lalo[1],laloref[0],laloref[1])
d = gps2dist_azimuth(lalo[0],lalo[1],laloref[0],laloref[1])[0]/1000
if refde is None or de is None:
return d/v
if abs(de)>meters:
print('Assuming depth %s in m'%de)
de = de/1000
if abs(refde)>meters:
print('Assuming depth %s in m'%refde)
refde = refde/1000
return ((d**2 + (de-refde)**2)**.5)/v
[docs]
def eventcountrydistance(countryname,
events,
res=1,
defaultdepth=5,
defaultepd=2,
quickndirty=False):
"""Compute the distance from given hypocenter(s) to given country.
In case the hypocenter is within the country, the output distance is taken from hypocenter depth (default to 5 km).
.. code:: python
from obspy.clients.fdsn.client import Client
cat = Client('ETH').get_events(limit=1,orderby='magnitude',maxdepth=10000)
from pyshake.util import eventcountrydistance
events = [[ e.preferred_origin().longitude,
e.preferred_origin().latitude,
e.preferred_origin().depth/1000 ] for e in cat]
countrydists = eventcountrydistance(country, events)
:param countryname: The country name.
:type countryname: :py:class:`str`
:param events: The event(s) hypocenter coordinates.
:type events: :py:class:`list` of :py:class:`list` e.g., [[lo1,la1],[lo2,la2,z2]].
:return: Country distance for each event in the input list.
:rtype: :py:class:`list`
"""
single=False
if not isinstance(events[0],(list,tuple)) :
single=True
events=[events]
if False in [len(e)>2 for e in events]:
print('ERROR: Input(s) are not at least 2d:')
print(events)
for e,event in enumerate(events):
if len(event)==5:
elon = event[0] + event[3]/111.2 * numpy.sin(numpy.deg2rad(event[4]))
elat = event[1] + event[3]/111.2 * numpy.cos(numpy.deg2rad(event[4]))
elon = [event[0]]+[l for l in numpy.linspace(event[0],elon,int(event[3]/res))]+[elon]
elat = [event[1]]+[l for l in numpy.linspace(event[1],elat,int(event[3]/res))]+[elat]
events[e] = { 'line' : LineString([(elon[p],elat[p]) for p in range(len(elon))]),
'depth' : event[2] }
elif len(event)==3:
events[e] = { 'line' : LineString([(event[0],event[1]), (event[0],event[1])]),
'depth' : event[2] }
elif len(event)==2:
events[e] = { 'line' : LineString([(event[0],event[1]), (event[0],event[1])]),
'depth' :defaultdepth }
else:
print('ERROR: Input are not at least 2d:')
print(event)
if events[e]['depth'] is None:
print('WARNING: Event (%s) depth is None, using %s default depth)'%(','.join(['%s'%f for f in event]), defaultdepth))
events[e]['depth'] = defaultdepth
distances=[None for e in events]
url = shpreader.natural_earth(resolution='110m',
category='cultural',
name='admin_0_countries')
reader = shpreader.Reader(url)
countries = reader.records()
country = [country for country in countries if countryname.lower() in country.attributes['NAME'].lower() ]
if not len(country):
print('ERROR: Could not find %s in [%s]'%(countryname,','.join([country.attributes['NAME'] for country in countries])))
else:
if len(country)>1:
print('WARNING: Considering the first of [%s] as matching [%s]'%(','.join([c.attributes['NAME'] for c in country]),countryname))
country=country[0]
for e,event in enumerate(events):
if country.geometry.intersects(event['line']):
#print('INFO: Event intersects %s using depth %d km as country distance'%(countryname, event.depth))
distances[e] = ( event['depth']**2 + defaultepd**2 )**.5
else:
if quickndirty:
print('WARNING: Event outside %s using shapely distance as country distance (can be VERY incorrect)'%(countryname))
distances[e] = ((country.geometry.distance(event['line'])*111.2)**2 + event['depth']**2)**.5
else:
impact_trajectory = nearest_points(event['line'],country.geometry)
impact_trajectory = [c for p in impact_trajectory for c in p.coords[:][0][::-1]]
distances[e] = ((gps2dist_azimuth(*impact_trajectory)[0]/1000)**2 + event['depth']**2)**.5
if single:
return distances[0]
return distances
[docs]
def polygonthreshold(polygon,
countryname='Switzerland',
minintensity=4,
mineventdepth=1,
minepicentraldistance=1,
groundmotionmodel=gm.gmm,
quickndirty=False,
**kwargs):
"""
The function `polygonthreshold` calculates the minimum magnitude of an earthquake
required to reach a given shaking intensity level anywhere inside a given country.
.. code:: python
from pyshake.util import polygonthreshold
# a polygon over Zurich
polygon=[(8,47),
(9,47),
(9,47.5),
(8,47.5)]
# the minimum magnitude to reach MMI 3 in Switzerland
# from any event in the polygon over Zurich
# `s = 0` : No amplification
polygonthreshold(polygon,
countryname = 'Switzerland',
minintensity = 3,
s = 0)
# 0.97
# the minimum magnitude to reach MMI 3 in Italy
# from any event in the polygon over Zurich
# `s = 1` : An amplification of 1 intensity unit
polygonthreshold(polygon,
countryname = 'Italy',
minintensity = 3,
s = 1)
# 3.99
:param polygon: The polygon parameter is a set of coordinates that define the shape
of the polygon. It is used to determine if the event intersects with the specified
country
:type polygon: :py:class:`list` of :py:class:`list` e.g., [[lo1,la1],[lo2,la2]].
:param countryname: The name of the country where the intensity is evaluated. The
default value is 'Switzerland', defaults to Switzerland (optional)
:type countryname: :py:class:`str`
:param minintensity: The minimum intensity threshold for the polygon. Any earthquake
with an intensity below this threshold will not be considered, defaults to 4
(optional)
:type minintensity: :py:class:`float`
:param mineventdepth: The mineventdepth parameter represents the depth of the event
in kilometers. It is used to calculate the distance between the event and the
polygon, defaults to 1 (optional)
:type mineventdepth: :py:class:`float`
:param minepicentraldistance: The parameter "minepicentraldistance" represents the
minimum epicentral distance in kilometers. It is used in the calculation of the
distance between the event and the country, defaults to 1 (optional)
:type minepicentraldistance: :py:class:`float`
:param groundmotionmodel: The groundmotionmodel parameter is a function that
calculates the intensity of ground motion given the distance and magnitude of an
earthquake. In this code, it is set to gmm.gm, which is a reference to a default
ground motion model function by Allen et al. (2012)
:type groundmotionmodel: :class:`pyshake.gm.gmm`
:param quickndirty: The parameter "quickndirty" is a boolean flag that determines
whether a quick and dirty calculation should be used for the distance calculation
if the event is outside the specified country. If set to True, the shapely
distance between the event and the country will be used, which can be very
incorrect, defaults to False (optional)
:type quickndirty: :py:class:`bool`
:return: the minimum magnitude of an earthquake that would produce a ground motion
intensity greater than or equal to the specified minimum intensity, given the
parameters and inputs provided.
:rtype: :py:class:`float`
"""
polygon = Polygon(polygon)
#polygon.depth = mineventdepth
url = shpreader.natural_earth(resolution='110m',
category='cultural',
name='admin_0_countries')
reader = shpreader.Reader(url)
countries = reader.records()
country = [country for country in countries if countryname.lower() in country.attributes['NAME'].lower() ]
if not len(country):
print('ERROR: Could not find %s in [%s]'%(countryname,','.join([country.attributes['NAME'] for country in countries])))
else:
if len(country)>1:
print('WARNING: Considering the first of [%s] as matching [%s]'%(','.join([c.attributes['NAME'] for c in country]),countryname))
country=country[0]
if country.geometry.intersects(polygon):
print('INFO: Event intersects %s using depth %.1f km and minimal epicentral distance of %.1f km as distance'%(countryname, mineventdepth, minepicentraldistance))
distance = ( mineventdepth**2 + minepicentraldistance**2 )**.5
else:
if quickndirty:
print('WARNING: Event outside %s using shapely distance as country distance (can be VERY incorrect)'%(countryname))
distance = ((country.geometry.distance(polygon)*111.2)**2 + mineventdepth**2)**.5
else:
impact_trajectory = nearest_points(polygon,country.geometry)
impact_trajectory = [c for p in impact_trajectory for c in p.coords[:][0][::-1]]
distance = ((gps2dist_azimuth(*impact_trajectory)[0]/1000)**2 + mineventdepth**2)**.5
intensity = 99
for minmag in numpy.arange(10,-10,-0.01):
if intensity <= minintensity:
return minmag
intensity = groundmotionmodel.get_intensity(distance,minmag, **kwargs)
return None
[docs]
def ssh_event(sshuserhost='user@host',
dbplugin='dbmysql',
db='localhost/seiscomp3',
opt='-fma',
seiscomp='',
eventid=None,
minlatitude=-90,
maxlatitude=90,
minlongitude=-180,
maxlongitude=180,
minmagnitude=0,
maxmagnitude=10,
mintime=UTCDateTime()-60*60,
maxtime=UTCDateTime()):
ssh = subprocess.Popen(["ssh",
"-i ~/.ssh/id_rsa",
sshuserhost],
stdin =subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
bufsize=0)
if eventid is None:
queryparam = (minlatitude,
maxlatitude,
minlongitude,
maxlongitude,
minmagnitude,
maxmagnitude,
mintime.isoformat().replace('T',' ')[:19],
maxtime.isoformat().replace('T',' ')[:19])
query="SELECT PEvent.publicID, Origin.time_value AS OT, Origin.latitude_value,Origin.longitude_value, Origin.depth_value, Magnitude.magnitude_value, Magnitude.type FROM Origin,PublicObject as POrigin, Event, PublicObject AS PEvent, Magnitude, PublicObject as PMagnitude WHERE Event._oid = PEvent._oid AND Origin._oid = POrigin._oid AND Magnitude._oid = PMagnitude._oid AND PMagnitude.publicID=Event.preferredMagnitudeID AND POrigin.publicID = Event.preferredOriginID AND Origin.latitude_value >= %s AND Origin.latitude_value <= %s AND Origin.longitude_value >= %s AND Origin.longitude_value <= %s AND Magnitude.magnitude_value >= %s AND Magnitude.magnitude_value <= %s AND Origin.time_value >= '%s' AND Origin.time_value <= '%s';"%queryparam
scquery = '%s scquery --debug --plugins %s -d %s -Q \"%s\" |grep -v "^$"| while read ID TRASH;do %s scxmldump %s --debug --plugins %s -d %s -E $ID ;done' % (seiscomp, dbplugin, db, query, seiscomp, opt, dbplugin, db)
ssh.stdin.write(scquery)
else :
#%s scquery --debug --plugins %s -d %s -Q \"%s\" |grep -v "^$"| while read ID TRASH;do % seiscomp, dbplugin, db, query,
scdump = '%s scxmldump %s --debug --plugins %s -d %s -E %s ' % (seiscomp, opt, dbplugin, db, eventid)
ssh.stdin.write(scdump)
ssh.stdin.close()
# Fetch output
string = ''.join([line.replace('>\n','>') for line in ssh.stdout if ('<' in line and '>' in line)])
if len(string):
string=string.replace('0.11" version="0.11','0.10" version="0.10')
string=string.replace('0.12" version="0.12','0.10" version="0.10')
f=open('/tmp/tmp.xml', 'w+')
f.write(string)
f.close()
try:
return read_events('/tmp/tmp.xml',format='SC3ML')
except:
for n in range(9999):
path_to_file='invalid.%s.xml'%n
if not exists(path_to_file):
break
f=open(path_to_file, 'w+')
f.write(string)
f.close()
print('Cannot read',path_to_file)
return Catalog()
else:
#print('Failed:')
#print(scquery)
#print([line for line in ssh.stderr])
string = ''.join([line for line in ssh.stderr])
f=open('.stderr', 'w+')
f.write(string)
f.close()
print('Cannot get event from %s .stderr'%sshuserhost)
return Catalog()