Fetcher

The Fetcher class builds on the unified data request interfaces to provide convenient methods for working with whole datasets.

Listed below are the two methods which are currently available:

#1 - Get continuous data using channels contained in an inventory (or returned from a station_client).


#2 - Iterate over events and their corresponding waveforms that have channels defined by the station_client.


Note: Additional methods may be implemented in the future.

Setup

In the example below, a Fetcher from the TA and Crandall dataset will be used to demonstrate different aspects of the Fetcher functionality.

[1]:
import obsplus

# ta_test dataset contains continuous data
ta_dataset = obsplus.load_dataset('ta_test')
crandall = obsplus.load_dataset('crandall_test')

# crandall contains only event data
ta_fetcher = ta_dataset.get_fetcher()
crandall_fetcher = crandall.get_fetcher()
downloading waveform data for ta_test dataset ...
finished downloading waveform data for ta_test
downloading station data for ta_test dataset ...
finished downloading station data for ta_test
downloading event data for ta_test dataset ...
finished downloading event data for ta_test

The Fetcher can be initialized with any objects that the appropriate client can be obtained from. Commonly it is used with a WaveBank, a Catalog (or EventBank) and an Inventory.

The following example would also be valid:

[2]:
cat = crandall.event_client.get_events()
inv = crandall.station_client.get_stations()
wavebank = crandall.waveform_client

crandall_fetcher = obsplus.Fetcher(waveforms=wavebank, stations=inv, events=cat)

The WaveFetcher constructor can also take obsplus created csv/dataframes for the stations and events arguments. However, some of the Fetcher’s functionality, like getting a dataframe of picks, will raise an exception unless full station/event clients are used.

Quick fetch

The easiest way to get data out of a data fetcher is to call it. The fetcher takes an argument that will provide it with information about when the stream should start. It can be a variety of types (float, UTCDateTime, Catalog, Event). The time before the reference time, and the time after the reference time must also be provided in the method call or in the Fetcher construction.

The fetcher uses the inventory or station_client to know which channels to request from the waveform_client.

[3]:
import obspy
reference_time = obspy.UTCDateTime('2007-02-15T06')
time_before = 1
time_after = 30
stream = ta_fetcher(reference_time, time_before, time_after)
print(stream)
6 Trace(s) in Stream:
TA.M11A..VHE | 2007-02-15T05:59:59.999998Z - 2007-02-15T06:00:29.999998Z | 0.1 Hz, 4 samples
TA.M11A..VHN | 2007-02-15T05:59:59.999998Z - 2007-02-15T06:00:29.999998Z | 0.1 Hz, 4 samples
TA.M11A..VHZ | 2007-02-15T05:59:59.999998Z - 2007-02-15T06:00:29.999998Z | 0.1 Hz, 4 samples
TA.M14A..VHE | 2007-02-15T06:00:00.000003Z - 2007-02-15T06:00:30.000003Z | 0.1 Hz, 4 samples
TA.M14A..VHN | 2007-02-15T06:00:00.000003Z - 2007-02-15T06:00:30.000003Z | 0.1 Hz, 4 samples
TA.M14A..VHZ | 2007-02-15T06:00:00.000004Z - 2007-02-15T06:00:30.000004Z | 0.1 Hz, 4 samples

Continuous data

Continuous data can be requested from the wavefetcher, which uses the station_client to know which channels to pull from the waveform_client. This enables users to skip a lot of the boiler-plate associated with the normal get_waveforms interface.

The example below shows the continuous data being interated over while running a simple STA/LTA detector.

[4]:
from obspy.signal.trigger import classic_sta_lta

# first define a function for doing the sta/lta
def print_sta_lta(tr: obspy.Trace):
    """ prints the sta/lta """
    sr = tr.stats.sampling_rate
    cft = classic_sta_lta(tr.data, int(20 * sr), int(60 * sr))
    print(f'{tr.id} starting at {st[0].stats.starttime}, has a max sta/lta of {max(cft):0.2f}')
[5]:
# starttime for the continuous data
t1 = obspy.UTCDateTime('2007-02-16')

# endtime for the continuous data
t2 = t1 + 36000 * 10  # use 10 hours

# duration of each chunk returned (in seconds)
duration = 72000

# overlap (added to the end of the duration)
overlap = 60

# iterate over each chunk
kwargs = dict(starttime=t1, endtime=t2, duration=duration, overlap=overlap)
for st in ta_fetcher.yield_waveforms(**kwargs):
    # select only z component and perform preprocessing
    st = st.select(component='Z')
    st.detrend('linear')
    # do the sta/lta
    for tr in st:
        print_sta_lta(tr)
TA.M11A..VHZ starting at 2007-02-15T23:59:59.999998Z, has a max sta/lta of 2.88
TA.M14A..VHZ starting at 2007-02-15T23:59:59.999998Z, has a max sta/lta of 2.98
TA.M11A..VHZ starting at 2007-02-16T19:59:59.999998Z, has a max sta/lta of 2.98
TA.M14A..VHZ starting at 2007-02-16T19:59:59.999998Z, has a max sta/lta of 2.97
TA.M11A..VHZ starting at 2007-02-17T15:59:59.999998Z, has a max sta/lta of 2.95
TA.M14A..VHZ starting at 2007-02-17T15:59:59.999998Z, has a max sta/lta of 2.84
TA.M11A..VHZ starting at 2007-02-18T11:59:59.999998Z, has a max sta/lta of 2.98
TA.M14A..VHZ starting at 2007-02-18T11:59:59.999998Z, has a max sta/lta of 3.00
TA.M11A..VHZ starting at 2007-02-19T07:59:59.999998Z, has a max sta/lta of 2.93
TA.M14A..VHZ starting at 2007-02-19T07:59:59.999998Z, has a max sta/lta of 2.93

Stream processors

It can be useful to define a stream_processing function that will be called on each stream before yielding it. This allows the user to define flexible, custom processing functions without cluttering up the function calls with a lot of processing parameters.

[6]:
# define a function that will be called on the stream before returning it.
def stream_processor(st: obspy.Stream) -> obspy.Stream:
    """ select the z component, detrend, and filter a stream """
    st = st.select(component='Z')
    st.detrend('linear')
    st.filter('bandpass', freqmin=.005, freqmax=.04)
    return st

# attach stream processor to the wave fetcher
ta_fetcher.stream_processor = stream_processor

kwargs = dict(starttime=t1, endtime=t2, duration=duration, overlap=overlap)
for st in ta_fetcher.yield_waveforms(**kwargs):
    for tr in st:
        print_sta_lta(tr)
TA.M11A..VHZ starting at 2007-02-15T23:59:59.999998Z, has a max sta/lta of 2.96
TA.M14A..VHZ starting at 2007-02-15T23:59:59.999998Z, has a max sta/lta of 2.95
TA.M11A..VHZ starting at 2007-02-16T19:59:59.999998Z, has a max sta/lta of 2.95
TA.M14A..VHZ starting at 2007-02-16T19:59:59.999998Z, has a max sta/lta of 3.00
TA.M11A..VHZ starting at 2007-02-17T15:59:59.999998Z, has a max sta/lta of 2.98
TA.M14A..VHZ starting at 2007-02-17T15:59:59.999998Z, has a max sta/lta of 2.96
TA.M11A..VHZ starting at 2007-02-18T11:59:59.999998Z, has a max sta/lta of 2.99
TA.M14A..VHZ starting at 2007-02-18T11:59:59.999998Z, has a max sta/lta of 2.97
TA.M11A..VHZ starting at 2007-02-19T07:59:59.999998Z, has a max sta/lta of 3.00
TA.M14A..VHZ starting at 2007-02-19T07:59:59.999998Z, has a max sta/lta of 2.99

Event data

When the wavefetcher object is provided with an event client, the event client can be used to iterate through the event waveforms.

[7]:
time_before = 1
time_after = 3
iterrator = crandall_fetcher.yield_event_waveforms(time_before, time_after)
for event_id, st in iterrator:
    print(f'fetched waveform data for {event_id} which has {len(st)} traces')
fetched waveform data for smi:local/248828 which has 48 traces
fetched waveform data for smi:local/248839 which has 48 traces
fetched waveform data for smi:local/248843 which has 48 traces
fetched waveform data for smi:local/248882 which has 51 traces
fetched waveform data for smi:local/248883 which has 51 traces
fetched waveform data for smi:local/248887 which has 51 traces
fetched waveform data for smi:local/248891 which has 51 traces
fetched waveform data for smi:local/248925 which has 51 traces

A dict of {event_id: stream} can be created using the following code.

[8]:
st_dict = dict(crandall_fetcher.yield_event_waveforms(time_before, time_after))

for event_id, st in st_dict.items():
    print(event_id, len(st))
smi:local/248828 48
smi:local/248839 48
smi:local/248843 48
smi:local/248882 51
smi:local/248883 51
smi:local/248887 51
smi:local/248891 51
smi:local/248925 51

Different events/inventories

The clients can be swapped out on each method call. This may be be useful to get a subset of the events or channels by providing a filtered catalog/inventory.

If a single call was needed for a station, the example below will accomplish this task. In this example station M11A will be used.

Note: This will not modify the original wavefetcher.

[9]:
# get a subset of the original inventory ()
inv = ta_dataset.station_client.get_stations()
inv2 = inv.select(station='M11A')

# iterate and print
for st in ta_fetcher.yield_waveforms(t1, t2, duration, overlap, stations=inv2):
    for tr in st:
        print_sta_lta(tr)

The same call applies for swapping out events:

[10]:
# read in catalog as and get a subset as a dataframe
cat = crandall.event_client.get_events()
cat_df = obsplus.events_to_df(cat)[:2]

# iterate the events and print
iterator = crandall_fetcher.yield_event_waveforms(time_before, time_after, events=cat_df)
for event_id, st in iterator:
    print(f'fetching {event_id}, got {len(st)} traces')
fetching smi:local/248839, got 48 traces
fetching smi:local/248883, got 51 traces