# -*- coding: utf-8 -*-
"""
Catalog module.
"""
import io, glob
from numpy import argmin, asarray, sqrt, argsort, nan, sort, linspace
from pandas import read_csv, DataFrame, concat
import cartopy.crs as ccrs
from obspy.core import UTCDateTime
from obspy.clients.fdsn import Client
from obspy.geodetics.base import gps2dist_azimuth
from pyshake.util import isincountries, eventdistance, eventcountrydistance
from pyshake.plotting import plot, bmap, size2mag, mag2size
from pyshake.gm import gmm
import importlib as imp
import pyshake.plotting
imp.reload(pyshake.plotting)
plot = pyshake.plotting.plot
from matplotlib.pyplot import figure
from matplotlib import colors
from matplotlib.ticker import AutoMinorLocator
from matplotlib import cm
from matplotlib.colors import NoNorm
from matplotlib.legend_handler import HandlerPathCollection
import matplotlib.patheffects as PathEffects
stroke_effect = [PathEffects.withStroke(linewidth=2, foreground="w")]
#from mpl_toolkits.axes_grid1.inset_locator import inset_axes
#print(Tp)
# 0 1 2 3 4 5 6 7 8 9 10 11
#[-91.3759, 13.988, 78.238, 5.4, UTCDateTime(2023, 12, 14, 17, 55, 31, 836000), -9.836, -91.58, 13.99, 62.51, 5.15, UTCDateTime(2023, 12, 14, 17, 55), 28.93],
# lon lat dep ma time bad dt lon lat dep ma time ref dt
# rf ref ref ref ref ? EEW eew eew eew eew eew
#print(Fp)
#print(Fn)
[docs]
def sortarg(x):
s = sorted(x)
return [s.index(d) for d in x]
[docs]
def get_eventindex(url,method='pandas',**opt):
"""
Rapid event index list from FDSN web service. Uses the event
FDSN web-service, returns the list of events.
.. code:: python
from pyshake.catalog import get_eventindex
get_eventindex('http://eida.ethz.ch', limit=100)
:param url: The fdsnws base url (e.g. 'http://eida.ethz.ch').
:type string: :py:class:`string`.
:param method: 'pandas' or 'obspy' defines whether the event list
should returned as a :external:py:mod:`pandas.DataFrame` or a
:external:py:mod:`obspy.core.event.Catalog`.
:type string: :py:class:`string`.
.. Note::
Any additional keyword arguments will be passed to the web-service as
additional arguments. Passing any non-default parameters that the
web-service does not support will raise an error.
:return: The list of events.
:rtype: :external:py:mod:`pandas.DataFrame` or :external:py:mod:`obspy.core.event.Catalog`
"""
if method == 'pandas':
url += '/fdsnws/event/1/query?format=text&'
url += '&'.join(['%s=%s'%(k,opt[k])for k in opt])
print(url)
return read_csv(url, delimiter="|")
elif method == 'obspy':
return Client("IRIS").get_events(format='text',**opt)
[docs]
def eew_report2dict(files='.seiscomp/EEW_reports/*txt'):
"""
Reads earthquake early warning (EEW) report files from a specified
file-path pattern, and returns a dictionary of the extracted information.
It supports the following formats:
.. code:: shell
Tdiff |Type|Mag.|Lat. |Lon. |Depth |origin time (UTC) |Lik.|Or.|Ma.|Str.|Len. |Author |Creation t. |Tdiff(current o.)
------------------------------------------------------------------------------------------------------------------------------------------
80.60| MVS|3.42| 15.00| -92.89| 5.00|2019-09-24T20:21:01.06Z|0.99| 8| 4| | |scvsmag@s|2019-09-24T20:22:21.66Z| 80.60
Mag.|Lat. |Lon. |tdiff |Depth |creation time (UTC) |origin time (UTC) |likeh.|#st.(org.) |#st.(mag.)
------------------------------------------------------------------------------------------------------------------
5.15| 10.31| -86.27| 47.14| 10.00|2017-03-17T10:32:56.1282Z|2017-03-17T10:32:08.9848Z| 0.30| 19| 7
:param files: A string that represents the file path pattern for the EEW report
files to be processed.
:type files: :py:class:`str`
:return: A dictionary where the keys are event identifiers (from file paths of EEW report
files from the specified file path pattern), and the values are dictionaries
representing the contents of the EEW reports.
:rtype: :py:class:`dict`
"""
reports={}
for reportfile in glob.glob(files):
with open(reportfile) as lines:
lines = [[elt.replace(' ','')[:19] for elt in line.split('|')] for line in lines.read().splitlines() if '--' not in line and ' |#St. | ' not in line]
if not len(lines):
print(reportfile, 'empty')
continue
reports[reportfile] = {}
report = {k:[l[i] for l in lines[2:]] for i,k in enumerate(lines[0])}
for k in report:
if 'time' in k or 'reation' in k or 'origin' in k:
report[k] = [UTCDateTime(e) for e in report[k]]
elif k in ['Type' , 'Author'] :
continue
else:
try :
report[k] = [e.isspace() or not len(e) or float(e) for e in report[k]]
except:
print('invalid!!!',reportfile)
print(lines[:3],k)
for k in report:
for i,e in enumerate(report[k]):
if isinstance(e,bool) and e==True:
report[k][i]=None
if 'creation time (UTC)' in report:#'creationtime(UTC)', 'origintime(UTC)'
report['Creation t.']=report['creation time (UTC)']
if 'creationtime(UTC)' in report:#'creationtime(UTC)', 'origintime(UTC)'
report['Creation t.']=report['creationtime(UTC)']
if 'Creationt.' in report:
report['Creation t.']=report['Creationt.']
if 'origintime(UTC)' in report:#'creationtime(UTC)', 'origintime(UTC)'
report['origin time (UTC)']=report['origintime(UTC)']
if 'Type' not in report:
report['Type']=['MVS' for e in report['origintime(UTC)']]
if '#st.(org.)' in report:
try:
report['Tdiff']=report['tdiff']
report['Lik.']=report['likeh.']
report['Or.']=report['#st.(org.)']
report['Ma.']=report['#st.(mag.)']
report['Str.']=[None for e in report['#st.(mag.)']]
report['Len.']=[None for e in report['#st.(mag.)']]
except:
print(report.keys())
print('aliases undone')
reports[reportfile] = report
return reports
[docs]
def reportfilter(reports,
countrycodes='ni',
dthresh=1.4
):
"""
Filters out reports based on specified country codes and distance
threshold, removing duplicate locations and empty reports.
:param reports: A dictionary of EEW reports as provided by
:func:`pyshake.catalog.eew_report2dict`.
:type reports: :py:class:`dict`
:param countrycodes: A string that specifies the country code or
codes to filter the reports by. By default, it is set to 'ni',
which represents the country code for Nicaragua.
:type countrycodes: :py:class:`str`, optional
:param dthresh: The distance threshold (in degrees) used to determine
if a given (latitude and longitude) point is close enough to the
specified country to be kept.
:type dthresh: :py:class:`float`, optional
:return: A filtered version of the input `reports` dictionary based
on the specified criteria.
:rtype: :py:class:`dict`
"""
notin=[]
for eventid in reports:
report = reports[eventid]
llin=[]
llout=[]
out=[]
for i,lo in enumerate(report['Lon.']):
la = report['Lat.'][i]
if [la,lo] in llin:
continue
elif [la,lo] in llout:
out+=[i]
continue
if isincountries(la,lo,countrycodes,dthresh=dthresh):
llin+=[[la,lo]]
else:
llout+=[[la,lo]]
out+=[i]
for k in report:
reports[eventid][k] = [e for j,e in enumerate(reports[eventid][k]) if j not in out]
empty=[]
for eventid in reports:
if len(reports[eventid]['Lon.'])==0:
empty+=[eventid]
for eventid in empty:
reports.pop(eventid)
return reports
[docs]
def getref(reports,
url='USGS',
kind='us',
dt=60*4,
dla=1.5,
dlo=1.5,
f=None,
names=[],
sep=','):
"""
Retrieves earthquake event data based on the input EEW report time-period and area,
and returns the corresponding earthquake catalog.
:param reports: A dictionary of EEW reports as provided by
:func:`pyshake.catalog.eew_report2dict` or
:func:`pyshake.catalog.reportfilter`.
:type reports: :py:class:`dict`
:param url: The source FDSN web service URL for retrieving reference earthquake
reports (see :external:py:mod:`obspy.clients.fdsn`).
:type url: :py:class:`str`, optional
:param kind: The type of web service to use for retrieving earthquake data. The
following are implemented: sc (SeisComP implementation of FDSN web service), or
us (USGS implementation of FDSN web service)
:type kind: :py:class:`str`, optional
:param dt: The time window in seconds that is added before and after the event
origin times for querying seismic events FDSN web service.
:type dt: :py:class:`float`, optional
:param dla: The latitude range in degrees that is added and subtracted to the event
latitudes for querying seismic events FDSN web service.
:type dla: :py:class:`float`, optional
:param dlo: The latitude range in degrees that is added and subtracted to the event
latitudes for querying seismic events FDSN web service.
:type dlo: :py:class:`float`, optional
:return: A reference catalog of seismic events covering the time-period and the area
of the input catalog.
:rtype: :external:py:class:`pandas.DataFrame`
"""
starttime = UTCDateTime()
endtime = UTCDateTime('1970-01-01')
minlon = 360
maxlon = -360
minlat = 90
maxlat= -90
for eventid in reports:
report = reports[eventid]
if not len(report['Lon.']):
continue
starttime = min([ starttime, min(report['origin time (UTC)'])])
endtime = max([endtime, max(report['origin time (UTC)'])])
minlon = min([ minlon, min(report['Lon.'])])
maxlon = max([ maxlon, max(report['Lon.'])])
minlat = min([ minlat, min(report['Lat.'])])
maxlat = max([ maxlat, max(report['Lat.'])])
if f is None:
f=io.BytesIO()
opt={'starttime':starttime-dt,
'endtime':endtime+dt,
'minlongitude':minlon-dlo,
'maxlongitude':maxlon+dlo,
'minlatitude':minlat-dla,
'maxlatitude':maxlat+dla,
'format':'csv',
#'limit':10,
'filename':f}
if kind=='us': # USGS web service
names=['time','latitude','longitude','depth','mag',
'magType','nst','gap','dmin','rms','net','id','updated','place','type',
'horizontalError','depthError','magError','magNst','status',
'locationSource','magSource']
sep = ','
elif kind == 'sc': # SeisComP web service
names=['id','time','latitude','longitude','depth',
'locationSource','Catalog','Contributor','ContributorID',
'MagType','mag','magSource',
'place']
opt['format']='text'
sep = '|'
print(opt)
Client(url).get_events(**opt)
f.seek(0)
else:
print(f'Using existing {f}')
cat = read_csv(f,header=0,names=names,sep=sep, index_col=False)
if f is not None:
cat.time=[UTCDateTime(e)+30 for e in cat.time.values]
cat = cat[cat['time']>=starttime-dt]
cat = cat[cat['time']<=endtime+dt]
cat = cat[cat['longitude']>=minlon-dlo]
cat = cat[cat['longitude']<=maxlon+dlo]
cat = cat[cat['latitude']>=minlat-dla]
cat = cat[cat['latitude']<=maxlat+dlo]
else:
if kind=='sc': # SeisComP web service
cat = cat[cat['locationSource'].isnull() == False]
cat = cat[cat['mag'].isnull() == False]
cat = cat[cat['locationSource'].str.contains('scautoloc') == False]
cat = cat[cat['locationSource'].str.contains('screloc') == False]
cat = cat[cat['locationSource'].str.contains('scanloc') == False]
cat = cat[cat['locationSource'].str.contains('scvsloc') == False]
cat = cat[cat['locationSource'].str.contains('scvsnloc') == False]
cat.time=[UTCDateTime(e) for e in cat.time.values]
return cat
[docs]
def matchreports(reports, cat,
maxhypocentraldist=150,
maxtimediff=100,
v=(5.5+3.3)/2):
"""
Compares earthquake reports with a reference catalog using event sources parameters
to find matching events within the specified maximum hypocentral distance and
origin time difference.
:param reports: A dictionary of EEW reports as provided by
:func:`pyshake.catalog.eew_report2dict` or
:func:`pyshake.catalog.reportfilter`.
:type reports: :py:class:`dict`
:param cat: The reference catalog of seismic event as provided by
:func:`pyshake.catalog.getref`
:type: :external:py:class:`pandas.DataFrame`
:param maxhypocentraldist: The maximum hypocentral distance in kilometers allowed
between two seismic events to be considered as possibly matching.
:type maxhypocentraldist: :py:class:`float`, optional
:param maxtimediff: The maximum time difference in seconds allowed between the origin
times of two seismic events to be considered as possibly matching.
:type maxtimediff: :py:class:`float`, optional
:param v: The average seismic velocity used in the calculation of reduced event distance
based on their origin time difference.
:type v: :py:class:`float`, optional
:return: The dictionary of EEW reports updated with the reference source parameters from
the matching reference seismic events.
:rtype: :py:class:`dict`
"""
if True:
print('Similar origin times')
ots={str(ot):[ot] for eventid in reports for ot in reports[eventid]['origin time (UTC)']}
n=0
for ot in ots:
ots[ot] = cat.loc[ abs(ots[ot]-cat.time)<maxtimediff ] # () & ((ots[ot]-cat.time)>-10)
if len(ots[ot].longitude):
n+=1
print(n,'reports with matching origin time')
else:
print('Similar creation times')
cts={str(ct):[ct] for eventid in reports for ct in reports[eventid]['Creation t.']}
n=0
for ct in cts:
cts[ct] = cat.loc[ ( (cts[ct]-cat.time)<maxtimediff ) & ((cat.time-cts[ct])>5) ]
if len(cts[ct].longitude):
n+=1
print(n,'reports with matching creation time')
print('Similar origin locations')
n1=0
n2=0
for eventid in reports:
reports[eventid]['reference']=[None for e in reports[eventid]['Lon.']]
reports[eventid]['reference solution']=[None for e in reports[eventid]['Lon.']]
for i,ot in enumerate(reports[eventid]['origin time (UTC)']):
eew=[reports[eventid]['Lat.'][i],reports[eventid]['Lon.'][i],reports[eventid]['Depth'][i]]
tmp=ots[str(ot)]
#for i,ct in enumerate(reports[eventid]['Creation t.']):
# ot=reports[eventid]['origin time (UTC)'][i]
# eew=[reports[eventid]['Lat.'][i],reports[eventid]['Lon.'][i],reports[eventid]['Depth'][i]]
# tmp=cts[str(ct)]
if not len(tmp.longitude):
continue
hyperdistances=[]
for j in range(len(tmp.longitude)):
#if reports[eventid]['Creation t.'][i] < tmp.time.values[j]-5:
# print('EEW before origin:', reports[eventid]['Creation t.'][i], '<', tmp.time.values[j], '- 5s')
# continue
ref=[tmp.latitude.values[j],tmp.longitude.values[j],tmp.depth.values[j]]
hypocentral_distance = eventdistance(*ref,*eew)
if hypocentral_distance>maxhypocentraldist:
n1+=1
continue
reduced_distance = abs(tmp.time.values[j]-ot)*v
hyper_distance = (hypocentral_distance**2+reduced_distance**2)**.5
if False:#hyper_distance>maxhyperdist:
n2+=1
continue
hyperdistances += [hyper_distance]
if not len(hyperdistances):
if reports[eventid]['Mag.'][i]>5:
print('Warning: missing ref for')
print(eventid)
continue
reports[eventid]['reference'][i] = tmp.id.values[argmin(hyperdistances)]
reports[eventid]['reference solution'][i] = tmp.iloc[argmin(hyperdistances)]
print(n1,'origin loc mismatchs')
#print(n2,'origin loc+time mismatchs')
return reports
[docs]
def alert_accuracy(reports, references,
MMIcountry='Costa Rica',
Mtypes='Mfd,MVS',
Mmin=5.5,
Lmin=0.8,
Dmin=0.1,
MMImin=4,
ipm=gmm,
stationmin=2,
final=False):
"""
Evaluates the accuracy of earthquake reports by comparing their alert
parameters (from the event first solution over alerting criteria) with the
corresponding reference values based on the input reference catalog of
seismic event source parameters.
The alert parameters (maximal shaking intensity, MMI) are inferred from the
source parameters and the alerting parameters (Mmin, Lmin, MMImin, MMIcountry,
ipm, and Mtypes) provided as options.
:param reports: A dictionary of EEW reports with reference solutions as
provided by :func:`pyshake.catalog.matchreports`.
:type reports: :py:class:`dict`
:param references: The reference catalog of seismic events as provided by
:func:`pyshake.catalog.getref` to find events missing from reports.
:type references: :external:py:class:`pandas.DataFrame`
:param MMIcountry: The country for which the maximal Modified Mercalli Intensity
(MMI) is calculated.
:type MMIcountry: :py:class:`str`
:param Mtypes: The comma-separated list of types of magnitudes that are
considered for alerting in the event reports.
:type Mtypes: :py:class:`str`
:param Mmin: The minimum magnitude threshold for an earthquake event to be
considered in the analysis.
:type Mmin: :py:class:`float`, optional
:param Lmin: The minimum likelihood value for a seismic event reports to be
considered in the analysis.
:type Lmin: :py:class:`float`, optional
:param stationmin: The minimum number of station magnitude contributions for
a seismic event reports to be considered in the analysis.
:type stationmin: :py:class:`int`, optional
:param Dmin: The minimum allowable difference in longitude and latitude values
when comparing locations.
:type Dmin: :py:class:`float`, optional
:param MMImin: The minimum value of the Modified Mercalli Intensity (MMI) that
an earthquake must reach to be considered in the evaluation.
:type MMImin: :py:class:`float`, optional
:param ipm: A prediction model to calculate the shaking intensity of a seismic event
based on the country distance and magnitude of the event. See :mod:`pyshake.gm`.
:type ipm: :class:`pyshake.gm.gmm`, optional
:param final: If True, only the final event solutions are considered
for the analysis.
:type final: :py:class:`bool`, optional
:return: Four dictionaries with magnitude type keys include EEW and reference
source parameters of Tp (true positives), Fp (false positives), Fn (false
negatives), and bounds.
:rtype: :py:class:`tuple`
"""
bounds = [180,-180,90,-90]
Fp={mtype:[] for mtype in Mtypes.split(',')}
Fn={'None':[] }
Tp={mtype:[] for mtype in Mtypes.split(',')}
FPids=[]
for eventid in reports:
FP=True
for ref in reports[eventid]['reference']:
if ref is not None:
FP=False
break
if FP and eventid not in FPids :
FPids+=[eventid]
FNids=[]
for ref_id in references.id.values:
FN=True
for eventid in reports:
for ref in reports[eventid]['reference']:
if ref == ref_id:
FN=False
break
if not FN:
break
if FN and ref_id not in FNids :
FNids+=[ref_id]
print('%d/%d F+ and %d/%d F-'%(len(FPids),len(reports),len(FNids),len(reports)))
for evtid in reports:
tmp_report = reports[evtid]
if final:
tmp_report = { k: reports[evtid][k][::-1] for k in reports[evtid] }
for i,mtype in enumerate(tmp_report['Type']):
if Mtypes is not None and mtype not in Mtypes:
continue
nmag = tmp_report['Ma.'][i]
if nmag is not None and nmag != 0 and nmag<stationmin:
print('!!!!!',nmag,'station only in',evtid)
continue
norg = tmp_report['Or.'][i]
if norg is not None and norg != 0 and norg<stationmin:
print('!!!!!',norg,'station only in',evtid)
continue
m = tmp_report['Mag.'][i]
if m is None :
continue
if Mmin is not None and m < Mmin:
continue
if tmp_report['Lik.'][i] is None :
continue
if Lmin is not None and tmp_report['Lik.'][i] < Lmin:
continue
lo = tmp_report['Lon.'][i]
la = tmp_report['Lat.'][i]
de = tmp_report['Depth'][i]
di = eventcountrydistance(MMIcountry,
(lo,la,de))
mmi = ipm.get_intensity(di,m)
if MMImin is not None and mmi < MMImin:
continue
dt = tmp_report['Tdiff'][i]
ot = tmp_report['origin time (UTC)'][i]
eew = [lo,la,de,m,ot,dt,evtid]
refsol = tmp_report['reference solution'][i]
if evtid in FPids :
Fp[mtype] += [eew]
elif refsol is None or len(refsol)==0:
print('Extra False positive: There is no reference solution for the following EEW')
print(','.join(['%s: %s'%(k,tmp_report[k][i]) for k in tmp_report]))
Fp[mtype] += [eew]
else:
lo = refsol.longitude
la = refsol.latitude
de = refsol.depth
m = refsol.mag
ot = refsol.time
dt = tmp_report['Creation t.'][i] - refsol.time
Tp[mtype] += [[lo,la,de,m,ot,dt]+eew]
bounds = [min([bounds[0],lo]),
max([bounds[1],lo]),
min([bounds[2],la]),
max([bounds[3],la])]
break
milo = bounds[0]
malo = bounds[1]
mila = bounds[2]
mala = bounds[3]
for ref in range(len(references.longitude)):
if references.id.values[ref] not in FNids :
continue
m = references.mag.values[ref]
if Mmin is not None and m < Mmin:
continue
lo = references.longitude.values[ref]
la = references.latitude.values[ref]
if (lo<(milo-Dmin)
or lo>(malo+Dmin)
or la<(mila-Dmin)
or la>(mala+Dmin)):
continue
de = references.depth.values[ref]
di = eventcountrydistance(MMIcountry,
(lo,la,de))
mmi = ipm.get_intensity(di,m)
if MMImin is not None and mmi < MMImin:
continue
ot = references.time.values[ref]
Fn['None'] += [[lo,la,de,m,ot,None]]
bounds = [min([bounds[0],lo]),
max([bounds[1],lo]),
min([bounds[2],la]),
max([bounds[3],la])]
return Tp, Fp, Fn, bounds
[docs]
def map(reports,references,
TFpn=None,
rightside=True,
top=False,
title=None,
fig=None,ax=None,
figsize=(8,8),
legendsize=True,
legendsizenum=4,
legendloc=None,
legendfaults=True,
subtitle=False,
colorbar=True,
bg=True,
**options):
"""
Run :func:`pyshake.catalog.alert_accuracy` (supporting all its related
parameters) and generate the map of results, including legends and title
with specified options.
:param reports: A dictionary of EEW reports with reference solutions as
provided by :func:`pyshake.catalog.matchreports`.
:type reports: :py:class:`dict`
:param references: The reference catalog of seismic events as provided by
:func:`pyshake.catalog.getref` to find events missing from reports.
:type references: :external:py:class:`pandas.DataFrame`
:param TFpn: Pre-computer event classification as provided by
:func:`pyshake.catalog.alert_accuracy`, making table output faster.
:type TFpn: :py:class:`tuple`, optional
:param rightside: Determines whether the legend should be displayed on the
right side of the plot or not (left side).
:type rightside: :py:class:`bool`, optional
:param title: Provide a descriptive title for the map.
:type title: :py:class:`str`, optional
:param fig: Specify the figure object where the map will be plotted.
:type fig: :external:py:class:`matplotlib.figure.Figure`, optional
:param ax: Specify the axe object where the map will be plotted.
:type ax: :external:py:class:`matplotlib.axes.Axes`, optional
:param figsize: Specify the size of the figure (width, height) that will be
created for the plot. See :external:py:class:`matplotlib.figure.Figure`.
:type figsize: :py:class:`tuple`, optional
:param legendsize: Whether the legend should include the event size scale.
:type legendsize: :py:class:`bool`, optional
:param legendfaults: Whether the legend should include the fault types.
:type legendfaults: :py:class:`bool`, optional
:return: The matplotlib axis where the plots have been created
:rtype: :external:py:class:`matplotlib.axes.Axes`
"""
if 'Mmin' not in options:
options['Mmin']=None
if 'Lmin' not in options:
options['Lmin']=None
if 'Mtypes' not in options:
options['Mtypes']=None
if 'MMIcountry' not in options:
options['MMIcountry']=None
if 'MMImin' not in options:
options['MMImin']=None
if 'stationmin' not in options:
options['stationmin']=None
if TFpn is None:
Tp, Fp, Fn, bounds = alert_accuracy(reports,references,
Mmin=options['Mmin'],
Lmin=options['Lmin'],
Mtypes=options['Mtypes'],
MMIcountry=options['MMIcountry'],
MMImin=options['MMImin'],
stationmin=options['stationmin'])
else:
Tp, Fp, Fn, bounds = TFpn
n = 0
for tmp in [Tp, Fn]:
for mtype in tmp:
n+=len(tmp[mtype])
ax=bmap(bounds=bounds,
fig=fig,
ax=ax,
figsize=figsize,
legendfaults=legendfaults,
bg=bg,
top=top,
rightside=rightside)
times = [xyz[4] for mtype in Tp for xyz in Tp[mtype]]+\
[xyz[4] for mtype in Fp for xyz in Fp[mtype]]+\
[xyz[4] for mtype in Fn for xyz in Fn[mtype]]
times = [min(times),max(times)]
mtypes = list(set([mtype for mtype in Tp]+[mtype for mtype in Fp])) + \
['None']
scatter,scattercmaps = plot(Tp,
mtypes,
ax,
marker='o',
times=times,
transform=ccrs.Geodetic(),
zorder=97)
plot({mtype: [ xyz[:6] for xyz in Fp[mtype]] for mtype in Fp},
mtypes,
ax,
label='Fp$^{%s}$',
marker='X',
times=times,
transform=ccrs.Geodetic(),
nodt=True,
#facecolors={'Mfd':[1,0,0],'MVS':[0,0,1],'None':[1,0,1]},
#cmaps={'Mfd':None,'MVS':None,'None':None},
zorder=98)
plot(Fn,
mtypes,
ax,
label='Fn$^{%s}$',
marker='s',
times=times,
transform=ccrs.Geodetic(),
nodt=True,
zorder=99)
ax.legend()
h, l = ax.get_legend_handles_labels()
if legendsize:
kw = dict(prop="sizes",
num=legendsizenum,
color='k',#scatter[0].cmap(scatter[0].norm(150)),
fmt="M{x:.2g}",
func=size2mag)
h2, l2 = [l[::-1] for l in scatter[0].legend_elements(**kw)]
h = h2 + h[::-1]
l = l2 + l[::-1]
h = h[::-1]
l = l[::-1]
locbar = 'left'
loclegend = 'right'
bbox_to_anchor = 0
if colorbar:
bbox = ax.get_position()
if not rightside:
locbar = 'right'
loclegend = 'left'
bbox_to_anchor = 1
cax = ax.figure.add_axes([bbox.x0+bbox.width,
bbox.y0+bbox.height-bbox.height/2.5,
bbox.width/1.5,
bbox.height/2.5])
else:
cax = ax.figure.add_axes([bbox.x0-bbox.width/2.5,#-bbox.width/1.5,
bbox.y0+bbox.height/15,#.5bbox.height-bbox.height/2.5,
bbox.width/1.5,
bbox.height/3])
#cax = inset_axes(ax,
# width="5%", # width: 50% of parent_bbox width
# height="5%", # height: 5%
# loc="lower left",
# )
cax.axis('off')
for im in scattercmaps:
cbar = ax.figure.colorbar(im[0],
ax=cax,
location=locbar,
fraction=0.8,
pad=0,
panchor=(0,0),
#use_gridspec=True,
#shrink=0.5
)
cbar.set_label(im[1],
rotation=90)
if locbar == 'left':
locbar = 'right'
else:
locbar = 'left'
# Apply path_effects to the colorbar components
# 1. Ticks and Tick Labels
for tick in cbar.ax.get_yticklines(): # Tick lines
tick.set_path_effects(stroke_effect)
for label in cbar.ax.get_yticklabels(): # Tick labels
label.set_path_effects(stroke_effect)
# 2. Axis Label
cbar.ax.yaxis.label.set_path_effects(stroke_effect)
# 3. Title (if any)
if cbar.ax.title:
cbar.ax.title.set_path_effects(stroke_effect)
# 4. Axis Spine
for spine in cbar.ax.spines.values():
spine.set_path_effects(stroke_effect)
l=ax.legend(h, l,
#ncol=1,
#fontsize='x-small',
#bbox_to_anchor=(bbox_to_anchor,0),
loc=legendloc,
)
t = '%d ev.'%n
if title is not None:
t = '%s %s'%(title,t)
if subtitle:
t += '\n$^{%s}_{%s}$'%(times[0].isoformat()[:16], times[1].isoformat()[:16])
if options['MMImin'] is not None:
t += '\n$MMI$'
if options['MMIcountry'] is not None:
t += '$_{%s}$'%options['MMIcountry'].capitalize()
t += '$>%.1g$'%options['MMImin']
if options['Mmin'] is not None and options['Mtypes'] is not None:
t += '\n$M_{%s}$'%options['Mtypes'].replace('M','')
t += '$>%.1f$'%options['Mmin']
if options['Lmin'] is not None:
t += '\n$Lik.>%.1g$'%options['Lmin']
l.set_title(title=t, prop={'weight':'bold'})
return ax
[docs]
def table(reports=None,references=None,TFpn=None,**options):
"""
Run :func:`pyshake.catalog.alert_accuracy` (supporting all its related
parameters) and generate a table of false events.
:param reports: A dictionary of EEW reports with reference solutions as
provided by :func:`pyshake.catalog.matchreports`.
:type reports: :py:class:`dict`
:param references: The reference catalog of seismic events as provided by
:func:`pyshake.catalog.getref` to find events missing from reports.
:type references: :external:py:class:`pandas.DataFrame`
:param TFpn: Pre-computer event classification as provided by
:func:`pyshake.catalog.alert_accuracy`, making table output faster.
:type TFpn: :py:class:`tuple`, optional
:return: The matplotlib axis where the plots have been created
:rtype: :external:py:class:`matplotlib.axes.Axes`
"""
if 'Mmin' not in options:
options['Mmin']=None
if 'Lmin' not in options:
options['Lmin']=None
if 'Mtypes' not in options:
options['Mtypes']=None
if 'MMIcountry' not in options:
options['MMIcountry']=None
if 'MMImin' not in options:
options['MMImin']=None
if 'stationmin' not in options:
options['stationmin']=None
if TFpn is None:
Tp, Fp, Fn, bounds = alert_accuracy(reports,references,
Mmin=options['Mmin'],
Lmin=options['Lmin'],
Mtypes=options['Mtypes'],
MMIcountry=options['MMIcountry'],
MMImin=options['MMImin'],
stationmin=options['stationmin'])
else:
Tp, Fp, Fn, bounds = TFpn
#print(Fp)
#print(Fn)
columns=('Longitude','Latitude','Depth','Magnitude','Origin time')#,'EEW delay')
dtfs = []
for mtype in Fp:
dtfs += [DataFrame([xyz[:5] for xyz in Fp[mtype]], #Fp[mtype],
columns=columns,
index=[('F+',mtype) for e in Fp[mtype] ])]
for mtype in Fn:
dtfs += [DataFrame([xyz[:5] for xyz in Fn[mtype]], #Fn[mtype],
columns=columns,
index=[('F-','') for e in Fn[mtype] ])]
return concat(dtfs)
[docs]
def errors(reports=None,
references=None,
TFpn=None,
ax=None,
title=None,
subtitle=False,
cmaps={'Mfd':'autumn','MVS':'winter'},
colorbar=True,
rightside=False,
cax=None,
legendsize=True,
lax=None,
final=False,
**options):
"""
Run :func:`pyshake.catalog.alert_accuracy` (supporting all its related
parameters) and generate a location and magnitude error histogram.
:param reports: A dictionary of EEW reports with reference solutions as
provided by :func:`pyshake.catalog.matchreports`.
:type reports: :py:class:`dict`
:param references: The reference catalog of seismic events as provided by
:func:`pyshake.catalog.getref` to find events missing from reports.
:type references: :external:py:class:`pandas.DataFrame`
:param TFpn: Pre-computer event classification as provided by
:func:`pyshake.catalog.alert_accuracy`, making table output faster.
:type TFpn: :py:class:`tuple`, optional
:return: The table of false events that can be viewed with :py:func:`print`
or :py:func:`display`.
:rtype: :external:py:class:`matplotlib.DataFrame`
"""
if ax is None:
ax = figure().gca()
if 'Mmin' not in options:
options['Mmin']=None
if 'Lmin' not in options:
options['Lmin']=None
if 'Mtypes' not in options:
options['Mtypes']=None
if 'MMIcountry' not in options:
options['MMIcountry']=None
if 'MMImin' not in options:
options['MMImin']=None
if 'stationmin' not in options:
options['stationmin']=None
if TFpn is None or final:
Tp, Fp, Fn, bounds = alert_accuracy(reports,references,
Mmin=options['Mmin'],
Lmin=options['Lmin'],
Mtypes=options['Mtypes'],
MMIcountry=options['MMIcountry'],
MMImin=options['MMImin'],
stationmin=options['stationmin'],
final=final)
else:
Tp, Fp, Fn, bounds = TFpn
def _forward(x):
return sqrt(x)
def _inverse(x):
return x**2
norm = colors.FuncNorm((_forward, _inverse),
vmin=0,
vmax=200)
n = 0
scattercmaps={}
beginend = sort([ xyzmte[4] for mtype in Tp for xyzmte in Tp[mtype] ])
for i,mtype in enumerate(Tp):
n += len(Tp[mtype])
dist = [ eventcountrydistance(options['MMIcountry'], (xyzmte[0],xyzmte[1],xyzmte[2]) ) for xyzmte in Tp[mtype] ]
d_epi = [ gps2dist_azimuth(xyzmte[1],xyzmte[0],xyzmte[7],xyzmte[6])[0]/1000.0 for xyzmte in Tp[mtype] ]
d_dep = [ xyzmte[8] - xyzmte[2] for xyzmte in Tp[mtype] ]
d_hyp = [(d_epi[i]**2 + d_dep[i]**2)**.5 for i,x in enumerate(d_dep)]
d_mag = [ xyzmte[9]-xyzmte[3] for xyzmte in Tp[mtype] ]
mag = [ xyzmte[3] for xyzmte in Tp[mtype] ]
times = [ xyzmte[4] for xyzmte in Tp[mtype] ]
alphas = asarray(sortarg(times))
alphas = alphas + max(alphas)/3
alphas = alphas / max(alphas)
c = cm.ScalarMappable(norm=norm, cmap=cmaps[mtype]).to_rgba(dist)#[xyzmte[2] for xyzmte in Tp[mtype]])#
c[:,-1] = alphas
scattercmaps[mtype] = ax.scatter([nan,nan],[nan,nan],
mag2size(asarray([3,7])),
[nan,nan],
cmap=cmaps[mtype],
norm=norm,
alpha=0.5)
ax.scatter(d_hyp,
d_mag,
mag2size(asarray(mag)),
c,#dist,
marker='oo'[i],
linewidths=1.0,
zorder=9999)
locbar = 'left'
bbox = ax.get_position()
if cax is None:
if rightside:
locbar = 'right'
cax = ax.figure.add_axes([bbox.x0+bbox.width,
bbox.y0+bbox.height-bbox.height/2.5,
bbox.width/1.5,
bbox.height/2.5])
else:
cax = ax.figure.add_axes([bbox.x0-bbox.width/1,#.5,
bbox.y0+bbox.height-bbox.height/2.5,
bbox.width/1.5,
bbox.height/2.5])
cax.axis('off')
if len(scattercmaps)==1:
locbar = 'right'
if colorbar:
for mtype in scattercmaps:
cbar = ax.figure.colorbar(scattercmaps[mtype],
ax=cax,
location=locbar,
fraction=.8,
pad=0,
panchor=(0,0),
#use_gridspec=True,
#shrink=0.5
)
cbar.set_label('M$_{%s}$'%mtype[1:],
rotation=90)
if locbar == 'left':
locbar = 'right'
else:
locbar = 'left'
cbar.solids.set(alpha=1)
cax.set_title('Distance\nfrom national\nborder (km)',#Depth (km)',#
loc=['left','center'][len(Tp)>1])
l=ax.legend()
t = '%d Tp'%( n )
if title is not None:
t = '%s%s-%s\n%s'%(title,
str(beginend[0])[:7].replace('-','/'),
str(beginend[-1])[:7].replace('-','/'),
t)
if subtitle and 'MMImin' in options :
t += '\n$^{%s}_{%s}$'%(sorted(times)[0].isoformat()[:16], sorted(times)[1].isoformat()[:16])
if options['MMImin'] is not None:
t += '\n$MMI$'
if options['MMIcountry'] is not None:
t += '$_{%s}$'%options['MMIcountry'].capitalize()
t += '$>%.1g$'%options['MMImin']
if options['Mmin'] is not None and options['Mtypes'] is not None:
t += '\n$M_{%s}$'%options['Mtypes'].replace('M','')
t += '$>%.1f$'%options['Mmin']
if options['Lmin'] is not None:
t += '\n$Lik.>%.1g$'%options['Lmin']
l.set_title(title=t, prop={'weight':'bold'})
if legendsize:
ax.add_artist(l)
kw = dict(prop="sizes",
num=4,#[4.5,5.5,6.5],
alpha=0.8,
color='k',
fmt="M{x:.2g}",
func=size2mag)
if lax is None:
lax = ax
l = lax.legend(*scattercmaps[list(scattercmaps.keys())[0]].legend_elements(**kw),
#loc="lower right",
title="Magnitude",
loc='lower left')
lax.add_artist(l)
lax.scatter([nan],[nan],
mag2size(asarray([5])),
'k',
label='Earliest',
alpha=0.3)
lax.scatter([nan],[nan],
mag2size(asarray([5])),
'k',
label='Latest',
alpha=1)
l = lax.legend(title="Time",
loc='upper left')
lax.add_artist(l)
ax.tick_params(right=True, top=True,
left=True, bottom=True,
which='both')
ax.xaxis.set_minor_locator(AutoMinorLocator())
ax.yaxis.set_minor_locator(AutoMinorLocator())
ax.grid( which='major', color='gray', linestyle='dashdot', zorder=-9999) #b=True,
ax.grid( which='minor', color='beige', ls='-', zorder=-9999) #b=True,
return ax
[docs]
def event_eew_mag(events,
MMIcountry='Costa Rica',
Mtypes='Mfd,MVS',
Mmin=5.5,
Lmin=0.8,
MMImin=4,
ipm=gmm,
debug=False):
"""
Evaluates alert parameters (from the event first solution over alerting criteria)
on the input catalog of seismic event source parameters.
The alert parameters (maximal shaking intensity, MMI) are inferred from the
source parameters and the alerting parameters (Mmin, Lmin, MMImin, MMIcountry,
ipm, and Mtypes) provided as options.
:param events: The reference catalog of seismic events.
:type events: :external:py:class:`obspy.core.event.Catalog`
:param MMIcountry: The country for which the maximal Modified Mercalli Intensity
(MMI) is calculated.
:type MMIcountry: :py:class:`str`
:param Mtypes: The comma-separated list of types of magnitudes that are
considered for alerting in the event reports.
:type Mtypes: :py:class:`str`
:param Mmin: The minimum magnitude threshold for an earthquake event to be
considered in the analysis.
:type Mmin: :py:class:`float`, optional
:param Lmin: The minimum likelihood value for a seismic event reports to be
considered in the analysis.
:type Lmin: :py:class:`float`, optional
:param Dmin: The minimum allowable difference in longitude and latitude values
when comparing locations.
:type Dmin: :py:class:`float`, optional
:param MMImin: The minimum value of the Modified Mercalli Intensity (MMI) that
an earthquake must reach to be considered in the evaluation.
:type MMImin: :py:class:`float`, optional
:param ipm: A prediction model to calculate the shaking intensity of a seismic event
based on the country distance and magnitude of the event. See :mod:`pyshake.gm`.
:type ipm: :class:`pyshake.gm.gmm`, optional
:return: First event magnitudes exceeding the alert thresholds.
:rtype: :py:class:`list`
"""
eew_mags = [ None for e in events]
for evtindex,e in enumerate(events):
if e is None:
continue
ct = [m.creation_info.creation_time for m in e.magnitudes]
for m in argsort(ct):
if eew_mags[evtindex] is not None:
print('This should never append')
continue
magnitude = e.magnitudes[m]
origin = ([None]+[o for o in e.origins if magnitude.origin_id.id == o.resource_id.id])[-1]
if debug:
print(origin.longitude,origin.latitude,origin.depth/1000.,magnitude.mag,origin.time)
if origin is None:
continue
if Mtypes is not None and magnitude.magnitude_type not in Mtypes:
if debug:
print(magnitude.magnitude_type,'not in',Mtypes)
continue
if magnitude.mag is None :
if debug:
print(magnitude.mag,'is', None )
continue
if Mmin is not None and magnitude.mag < Mmin:
if debug:
print(magnitude.mag,'<', Mmin)
continue
likelihood = [None]
likelihood += [float(c.text) for c in magnitude.comments if c.resource_id.id.split('/')[-1] == 'likelihood']
likelihood = likelihood[-1]
if likelihood is None :
if debug:
print(likelihood,'is', None)
continue
if Lmin is not None and likelihood < Lmin:
if debug:
print(likelihood,'<', Lmin)
continue
di = eventcountrydistance(MMIcountry,
(origin.longitude,
origin.latitude,
origin.depth/1000.))
mmi = ipm.get_intensity(di,
magnitude.mag)
if MMImin is not None and mmi < MMImin:
if debug:
print(mmi,' <', MMImin)
continue
eew_mags[evtindex] = [magnitude, origin]
break
return eew_mags
[docs]
def chronology(reports=None,
references=None,
TFpn=None,
ax=None,
title=None,
subtitle=False,
cmaps={'Mfd':'autumn','MVS':'winter','None':None},
colorbar=True,
rightside=False,
cax=None,
legendsize=True,
lax=None,
xml=True,
arrivalrank=4,
arrivalF=False,
legendsizenum=4,
legend_marker_size = 70,
legend_marker_alpha = 2/3,
hlegend_line_length = 1,
annotation = None,
alphathreshold = 70,
eventlines=True,
**options):
"""
Run :func:`pyshake.catalog.alert_accuracy` (supporting all its related
parameters) and generate a chronology plot.
:param reports: A dictionary of EEW reports with reference solutions as
provided by :func:`pyshake.catalog.matchreports`.
:type reports: :py:class:`dict`
:param references: The reference catalog of seismic events as provided by
:func:`pyshake.catalog.getref` to find events missing from reports.
:type references: :external:py:class:`pandas.DataFrame`
:param TFpn: Pre-computer event classification as provided by
:func:`pyshake.catalog.alert_accuracy`, making table output faster.
:type TFpn: :py:class:`tuple`, optional
:return: The table of false events that can be viewed with :py:func:`print`
or :py:func:`display`.
:rtype: :external:py:class:`matplotlib.DataFrame`
"""
if ax is None:
ax = figure().gca()
if 'Mmin' not in options:
options['Mmin']=None
if 'Lmin' not in options:
options['Lmin']=None
if 'Mtypes' not in options:
options['Mtypes']=None
if 'MMIcountry' not in options:
options['MMIcountry']=None
if 'MMImin' not in options:
options['MMImin']=None
if 'stationmin' not in options:
options['stationmin']=None
if TFpn is None:
Tp, Fp, Fn, bounds = alert_accuracy(reports,references,
Mmin=options['Mmin'],
Lmin=options['Lmin'],
Mtypes=options['Mtypes'],
MMIcountry=options['MMIcountry'],
MMImin=options['MMImin'],
stationmin=options['stationmin'])
else:
Tp, Fp, Fn, bounds = TFpn
def _forward(x):
return sqrt(x)
def _inverse(x):
return x**2
norm = colors.FuncNorm((_forward, _inverse),
vmin=0,
vmax=200)
def linnorm(x,themax):
y = 1 - asarray(x)/themax #asarray(sortarg(times)) + 1
if isinstance(y,float):
if y > 1:
y = 1
elif y < 0:
y = 0
elif y>0.5:
y = 1
elif y<=0.5:
y = 0
else:
for i,v in enumerate(y):
if v > 1:
y[i] = 1
elif v < 0:
y[i] = 0
elif v>0.5:
y[i] = 1
elif v<=0.5:
y[i] = 0
y = y * 3/4 + 1/4 #/ max(alphas)
return y
scattercmaps={}
beginend = sort([ xyzmte[4] for mtype in Tp for xyzmte in Tp[mtype] ])
n=0
for j,cl in enumerate([Tp,Fp,Fn]):
for i,mtype in enumerate(cl):
dist = [ eventcountrydistance(options['MMIcountry'], (xyzmte[0],xyzmte[1],xyzmte[2]) ) for xyzmte in cl[mtype] ]
if j==0:
d_epi = [ gps2dist_azimuth(xyzmte[1],xyzmte[0],xyzmte[7],xyzmte[6])[0]/1000.0 for xyzmte in cl[mtype] ]
d_dep = [ xyzmte[8] - xyzmte[2] for xyzmte in cl[mtype] ]
d_hyp = [(d_epi[i]**2 + d_dep[i]**2)**.5 for i,x in enumerate(d_dep)]
else:
d_hyp = [ 0 for xyzmte in cl[mtype] ]
d_time = [ xyzmte[11-(j*6)] for xyzmte in cl[mtype] ]
if j==2 or j==1:
d_time = [ 0 for xyzmte in cl[mtype] ]
mag = [ xyzmte[3] for xyzmte in cl[mtype] ]
times = [ xyzmte[4].datetime for xyzmte in cl[mtype] ]
arrivals = [ nan for xyzmte in cl[mtype] ]
if xml and ( arrivalF or j == 0 ):
for e,xyzmte in enumerate(cl[mtype]):
if xyzmte[14-(j*6)] is None:
print(14-(j*6),xyzmte)
continue
magnitude = xyzmte[14-(j*6)][0]
test = magnitude.creation_info.creation_time - xyzmte[10-(j*6)]
d_time[e] = test
if j==2 or j==1:
continue
try:
event = xyzmte[15-(j*6)][0]
print('OK pref reference origin')
except Exception as e:
print('cannot get pref reference origin for')
print(xyzmte)
continue
if len(event.origins)>1:
print('Not sure to get preferred origin for ')
print(event)
preforgtime = event.origins[0].time
arrivaltimes = {}
for p in event.picks:
station = '%s.%s'%(p.waveform_id['network_code'],p.waveform_id['station_code'])
if station not in arrivaltimes:
arrivaltimes[station] = p.time
if p.time < arrivaltimes[station] :
arrivaltimes[station] = p.time
test = [arrivaltimes[s] for s in arrivaltimes]
if len(test)>=arrivalrank:
arrivals[e] = sort(test)[arrivalrank-1] - preforgtime # xyzmte[10-(j*6)]
if arrivals[e] > d_time[e]:
print('Incomplete picking for event',event)
arrivals[e] = nan
alphas = linnorm(dist,alphathreshold*2) #asarray(sortarg(times)) + 1
c = cm.ScalarMappable(norm=norm, cmap=cmaps[mtype]).to_rgba(d_hyp)#dist)
c[:,-1] = alphas
if j == 2:
c[:,0] = 1
c[:,1] = 0
c[:,2] = 1
fc = c.copy()
for i in range(len(fc)):
if fc[i,-1] != 1:
fc[i] = [1,1,1,1]
c[i,-1] = 1
# order for c[i,-1] ==1 below
dorder = argsort(c[:,-1])[::-1]
if j==0 or j==2:
n += len(cl[mtype])
if j==0 :
scattercmaps[mtype] = ax.scatter([nan,nan],[nan,nan],
mag2size(asarray([3,7])),
[nan,nan],
cmap=cmaps[mtype],
norm=norm,
alpha=0.5)
for i,dt in enumerate(d_time):
if dt < 2 :
d_time[i] = nan
print('Ignoring ref/EEW matching issue with:')
print(cl[mtype][i])
if eventlines:
for k,t in enumerate(times):
ax.plot([t,t],
[arrivals[k],d_time[k]],
color=c[k],
linewidth=0.5,
)
ax.scatter([times[i] for i in dorder],
[d_time[i] for i in dorder],
mag2size(asarray([mag[i] for i in dorder])),
[1,1,1],
marker='oXs'[j],
linewidths=1.0,
clip_on = j==0,
zorder=9999)
ax.scatter([times[i] for i in dorder],
[arrivals[i] for i in dorder],
mag2size(asarray([mag[i] for i in dorder])),
[1,1,1],
marker='___'[j],
linewidths=2.5,
zorder=9999)
sc = ax.scatter([times[i] for i in dorder],
[d_time[i] for i in dorder],
mag2size(asarray([mag[i] for i in dorder])),
facecolors=[fc[i] for i in dorder],
edgecolors=[c[i] for i in dorder],
marker='oXs'[j],
linewidths=1.5,
label=('%s (%d)'%(['Tp$^{%s}$','Fp$^{%s}$','Fn$^{%s}$'][j]%(mtype[1:]), len(cl[mtype]))).replace('$^{one}$',''),
clip_on = j==0,
zorder=9999)
ax.scatter([times[i] for i in dorder],
[arrivals[i] for i in dorder],
mag2size(asarray([mag[i] for i in dorder])),
[c[i] for i in dorder],
marker='___'[j],
linewidths=2.5,
zorder=99999)
ax.scatter([nan,nan],
[nan,nan],
mag2size(asarray([3,7])),
'k',
marker='_',
linewidths=1.0,
label='%d$^{th}$ arrival'%(arrivalrank),
zorder=9999)
ax.set_ylim(bottom=0)
bbox = ax.get_position()
if cax is None:
if rightside:
locbar = 'right'
cax = ax.figure.add_axes([bbox.x0+bbox.width,
bbox.y0+bbox.height-bbox.height/2.5,
bbox.width/1.5,
bbox.height/2.5])
else:
cax = ax.figure.add_axes([bbox.x0-bbox.width/1.5,
bbox.y0+bbox.height-bbox.height/2.5,
bbox.width/1.5,
bbox.height/2.5])
cax.axis('off')
locbar = 'right'
if colorbar:
for mtype in scattercmaps:
cbar = ax.figure.colorbar(scattercmaps[mtype],
ax=cax,
location=locbar,
fraction=.8,
pad=0,
panchor=(0,0),
#use_gridspec=True,
#shrink=0.5
)
cbar.set_label('M$_{%s}$'%mtype[1:],
rotation=90)
if locbar == 'left':
locbar = 'right'
else:
locbar = 'left'
cbar.solids.set(alpha=1)
cax.set_title('Location\nerror (km, hyp.)',
loc=['left','center'][len(Tp)>1])
if annotation is not None:
for k in annotation:
ax.axvline(UTCDateTime(annotation[k]),label=k, color='k', linewidth=3, alpha=0.5, zorder = -99)
l=ax.legend(handlelength=hlegend_line_length)
for lh in l.legendHandles:
lh.set_alpha(legend_marker_alpha)
if hasattr(lh,'set_sizes'):
lh.set_sizes([legend_marker_size])
t = '%d ev.'%( n )
if title is not None:
t = '%s%s-%s\n%s'%(title,
str(beginend[0])[:7].replace('-','/'),
str(beginend[-1])[:7].replace('-','/'),
t)
if subtitle and 'MMImin' in options :
t += '\n$^{%s}_{%s}$'%(sorted(times)[0].isoformat()[:16], sorted(times)[1].isoformat()[:16])
if options['MMImin'] is not None:
t += '\n$MMI$'
if options['MMIcountry'] is not None:
t += '$_{%s}$'%options['MMIcountry'].capitalize()
t += '$>%.1g$'%options['MMImin']
if options['Mmin'] is not None and options['Mtypes'] is not None:
t += '\n$M_{%s}$'%options['Mtypes'].replace('M','')
t += '$>%.1f$'%options['Mmin']
if options['Lmin'] is not None:
t += '\n$Lik.>%.1g$'%options['Lmin']
l.set_title(title=t, prop={'weight':'bold'})
if legendsize:
ax.add_artist(l)
kw = dict(prop="sizes",
num=legendsizenum,
alpha=0.8,
color='k',
fmt="M{x:.2g}",
func=size2mag)
if lax is None:
lax = ax
l = lax.legend(*scattercmaps[list(scattercmaps.keys())[0]].legend_elements(**kw),
#loc="lower right",
title="Magnitude",
loc='lower left')
lax.add_artist(l)
for n,d in enumerate([alphathreshold+1,alphathreshold-1]):
lax.scatter([nan],[nan],
mag2size(asarray([5])),
edgecolors='k',
facecolors='kw'[n],
label='<>'[n]+f'{alphathreshold}',
#alpha=linnorm(d, alphathreshold*2)
)
l = lax.legend(title="Distance\nfrom national\nborder (km)",
loc='upper left')
lax.add_artist(l)
if ax.get_ylim()[1]>100:
ax.set_yscale('symlog',linthresh=100)
ax.tick_params(right=True, top=True,
left=True, bottom=True,
which='both')
ax.xaxis.set_minor_locator(AutoMinorLocator())
ax.yaxis.set_minor_locator(AutoMinorLocator())
ax.grid( which='major', color='gray', linestyle='dashdot', zorder=-9999) #b=True,
ax.grid( which='minor', color='beige', ls='-', zorder=-9999) #b=True,
return ax