"""
Class for interacting with events on a filesystem.
"""
import inspect
import os
import time
from concurrent.futures import Executor
from functools import reduce, partial
from operator import add
from os.path import getmtime
from pathlib import Path
from typing import Optional, Union, Sequence, Set
import numpy as np
import obspy
import obspy.core.event as ev
import pandas as pd
import obsplus
import obsplus.events.pd
from obsplus.bank.core import _Bank
from obsplus.utils.bank import (
sql_connection,
_read_table,
_get_tables,
_drop_rows,
_remove_base_path,
_natify_paths,
)
from obsplus.utils.pd import (
cast_dtypes,
order_columns,
_time_cols_to_ints,
_ints_to_time_columns,
)
from obsplus.utils.events import _summarize_event, get_event_client
from obsplus.constants import (
EVENT_PATH_STRUCTURE,
EVENT_NAME_STRUCTURE,
get_events_parameters,
bar_parameter_description,
EVENT_TYPES_OUTPUT,
EVENT_TYPES_INPUT,
bank_subpaths_type,
paths_description,
)
from obsplus.events.get_events import (
_sanitize_circular_search,
_get_ids,
_validate_get_event_kwargs,
)
from obsplus.exceptions import BankDoesNotExistError
from obsplus.interfaces import ProgressBar, EventClient
from obsplus.utils import iterate
from obsplus.utils.misc import try_read_catalog, suppress_warnings
from obsplus.utils.docs import compose_docstring
from obsplus.utils.time import _dict_times_to_npdatetimes, to_datetime64
# --- define static types
# Fixed types for expected columns
# output types (ie returned from read_index)
STR_COLUMNS = {
i
for i, v in EVENT_TYPES_OUTPUT.items()
if inspect.isclass(v) and issubclass(v, str)
}
INT_COLUMNS = {i for i, v in EVENT_TYPES_OUTPUT.items() if v is int}
# kwargs supported by get_index
SUPPORTED_KWARGS = set(EVENT_TYPES_OUTPUT) | {"columns", "_allow_update"}
[docs]
class EventBank(_Bank):
"""
A class to interact with a directory of event files.
EventBank recursively reads each event file in a directory and creates
an index to allow the files to be efficiently queried.
Implements a superset of the :class:`~obsplus.interfaces.EventClient`
interface.
Parameters
----------
base_path
The path to the directory containing event files. If it does not
exist an empty directory will be created.
path_structure
Defines the directory structure used by the event bank. Characters
are separated by /, regardless of operating system. The following
words can be used in curly braces as data specific variables:
year, month, day, julday, hour, minute, second, event_id,
event_id_short
If no structure is provided it will be read from the index, if no
index exists the default is {year}/{month}/{day}.
name_structure
The same as path structure but for the file name. Supports the same
variables and a slash cannot be used in a file name on most operating
systems. The default extension (.xml) will be added.
The default is {time}_{event_id_short}.
format
The anticipated format of the event files. Any format supported by the
obspy.read_events function is permitted.
ext
The extension on the files. Can be used to avoid parsing non-event
files.
executor
An executor with the same interface as
:py:class:`concurrent.futures.Executor, the map method of the executor
will be used for reading files and updating indices.
Attributes
----------
allow_update_timestamp
If True, allow the bank to update its index timestamp. The default value
of True is appropriate for all but some very complicated setups.
Examples
--------
>>> # --- Create an `EventBank` from a path to a directory with quakeml files.
>>> import obsplus
>>> event_path = obsplus.copy_dataset('default_test').event_path
>>> # init an EventBank and index the event files.
>>> ebank = obsplus.EventBank(event_path).update_index()
>>> # --- Retrieve catalog objects from the bank.
>>> cat = ebank.get_events(minmagnitude=4.3, minlatitude=40.12)
>>> print(cat)
1 Event(s) in Catalog:...
>>> # --- Put event files bank into the bank.
>>> # get an event from another dataset, keep track of its id
>>> ds = obsplus.load_dataset('bingham_test')
>>> new_events = ds.event_client.get_events(limit=1)
>>> new_event_id = str(new_events[0].resource_id)
>>> # put the event into the EventBank
>>> _ = ebank.put_events(new_events)
>>> print(ebank.get_events(eventid=new_event_id))
1 Event(s) in Catalog:...
>>> # --- Read the index used by EventBank as a DataFrame.
>>> df = ebank.read_index()
>>> assert len(df) == 4, 'there should now be 4 events in the bank.'
"""
namespace = "/events"
index_name = ".index.db" # name of index file
allow_update_timestamp = True
_min_files_for_bar = 50
_dtypes_output = EVENT_TYPES_OUTPUT
_dtypes_input = EVENT_TYPES_INPUT
_max_events_in_memory = 2000
[docs]
def __init__(
self,
base_path: Union[str, Path, "EventBank"] = ".",
path_structure: Optional[str] = None,
name_structure: Optional[str] = None,
format="quakeml",
ext=".xml",
executor: Optional[Executor] = None,
):
"""Initialize an instance."""
if isinstance(base_path, EventBank):
self.__dict__.update(base_path.__dict__)
return
self.bank_path = Path(base_path).absolute()
self._index = None
self.format = format
self.ext = ext
# get waveforms structure based on structures of path and filename
ps = (
path_structure
if path_structure is not None
else (self._path_structure or EVENT_PATH_STRUCTURE)
)
self.path_structure = ps
ns = name_structure or self._name_structure or EVENT_NAME_STRUCTURE
self.name_structure = ns
self.executor = executor
# enforce min version and warn on newer
self._enforce_min_version()
self._warn_on_newer_version()
@property
def last_updated_timestamp(self):
"""Return the last modified time stored in the index, else 0.0"""
with sql_connection(self.index_path) as con:
try:
return _read_table(self._time_node, con).loc[0, "time"]
except (pd.io.sql.DatabaseError, KeyError): # table is empty
return 0.0
@property
def _path_structure(self):
"""return the path structure stored in memory"""
try:
return self._read_metadata()["path_structure"][0]
except (pd.io.sql.DatabaseError, BankDoesNotExistError):
return None
@property
def _name_structure(self):
"""return the name structure stored in memory"""
try:
return self._read_metadata()["name_structure"][0]
except (pd.io.sql.DatabaseError, BankDoesNotExistError):
return None
# --- index stuff
[docs]
@compose_docstring(get_events_params=get_events_parameters)
def read_index(self, **kwargs) -> pd.DataFrame:
"""
Read the index and return a dataframe containing the event info.
Parameters
----------
{get_events_params}
"""
self.ensure_bank_path_exists()
# make sure all times are numpy datetime64
kwargs = _dict_times_to_npdatetimes(kwargs)
# a simple switch to prevent infinite recursion
allow_update = kwargs.pop("_allow_update", True)
# validate kwargs
_validate_get_event_kwargs(kwargs, extra=SUPPORTED_KWARGS)
# Circular search requires work to be done on the dataframe - we need
# to get the whole dataframe then calculate the distances and search in
# that
circular_kwargs, kwargs = _sanitize_circular_search(**kwargs)
with sql_connection(self.index_path) as con:
try:
df = _read_table(self._index_node, con, **kwargs)
except pd.io.sql.DatabaseError:
# if this database has never been updated, update now
if allow_update and self.last_updated_timestamp < 1:
self.update_index()
return self.read_index(_allow_update=False, **kwargs)
# else return empty index
df = pd.DataFrame(columns=list(EVENT_TYPES_OUTPUT))
df = _ints_to_time_columns(df, columns=INT_COLUMNS).pipe(
self._prepare_dataframe, dtypes=EVENT_TYPES_OUTPUT
)
if len(circular_kwargs) >= 3:
# Requires at least latitude, longitude and min or max radius
circular_ids = _get_ids(df, circular_kwargs)
df = df[df.event_id.isin(circular_ids)]
return df
[docs]
@compose_docstring(
bar_description=bar_parameter_description,
subpaths_description=paths_description,
)
def update_index(
self,
bar: Optional[ProgressBar] = None,
paths: Optional[bank_subpaths_type] = None,
) -> "EventBank":
"""
Iterate files in bank and add any modified since last update to index.
Parameters
----------
{bar_parameter_description}
{paths_description}
"""
self._enforce_min_version() # delete index if schema has changed
# create iterator and lists for storing output
update_time = time.time()
# create an iterator which yields files to update and updates bar
file_yielder = self._unindexed_iterator(paths=paths)
update_file_feeder = self._measure_iterator(file_yielder, bar)
new_func = partial(self._get_cat_update_time_path, format=self.format)
# create iterator, loop over it in chunks until it is exhausted
iterator = self._map(new_func, update_file_feeder)
events_remain = True
while events_remain:
events_remain = self._index_from_iterable(iterator, update_time)
return self
@staticmethod
def _get_cat_update_time_path(path, format):
"""Function to yield events, update_time and paths."""
# NOTE: This function must be static to avoid pickling the attached
# executor. (see #158).
cat = try_read_catalog(path, format=format)
update_time = getmtime(path)
return cat, update_time, path
def _index_from_iterable(self, iterable, update_time):
"""Iterate over an event iterable and dump to database."""
events, update_times, paths = [], [], []
max_mem = self._max_events_in_memory # this avoids the MRO each loop
events_remain = False
for cat, mtime, path in iterable:
if cat is None:
continue
for event in cat:
events.append(event)
update_times.append(mtime)
paths.append(path)
if len(events) >= max_mem: # max limit exceeded, dump to db
events_remain = True
break
# add new events to database
df = obsplus.events.pd._default_cat_to_df(events)
df["updated"] = to_datetime64(update_times)
df["path"] = _remove_base_path(pd.Series(paths, dtype=object), self.bank_path)
if len(df):
df = _time_cols_to_ints(df)
df_to_write = self._prepare_dataframe(df, EVENT_TYPES_INPUT)
self._write_update(df_to_write, update_time)
return events_remain
def _prepare_dataframe(self, df: pd.DataFrame, dtypes: dict):
"""
Fill missing values and casting data types.
"""
# replace "None" with empty string for str columns
str_cols = list(STR_COLUMNS & set(df.columns))
df.loc[:, str_cols] = df.loc[:, str_cols].replace(["None"], [""])
# get expected datatypes
assert set(INT_COLUMNS | STR_COLUMNS).issubset(set(dtypes))
intersection = set(dtypes) & set(df.columns)
dtype = {i: dtypes[i] for i in dtypes if i in intersection}
# order columns, set types, reset index
out = (
df.loc[:, ~df.columns.duplicated()]
.pipe(cast_dtypes, dtype=dtype) # drop dup. columns
.pipe(order_columns, required_columns=list(dtype), drop_columns=True)
.reset_index(drop=True)
)
# convert low ints back to NaT
return out
def _write_update(self, df: pd.DataFrame, update_time=None):
"""convert updates to dataframe, then append to index table"""
# read in dataframe and cast to correct types
assert not df.duplicated().any(), "update index has duplicate entries"
# set both dfs to use index of event_id
df = df.set_index("event_id")
# get current events, but dont allow it to update again
current = self.read_index(event_id=set(df.index), _allow_update=False)
indicies_to_update = set(current["event_id"]) & set(df.index)
# populate index store and update metadata
with sql_connection(self.index_path) as con:
if indicies_to_update: # delete rows that will be re-entered
_drop_rows(self._index_node, con, event_id=indicies_to_update)
node = self._index_node
df.to_sql(node, con, if_exists="append", index_label="event_id")
tables = _get_tables(con)
if self._meta_node not in tables:
meta = self._make_meta_table()
meta.to_sql(self._meta_node, con, if_exists="replace")
# update timestamp
with suppress_warnings(): # ignore pandas collection warning
if self.allow_update_timestamp:
timestamp = update_time or time.time()
dft = pd.DataFrame(timestamp, index=[0], columns=["time"])
dft.to_sql(self._time_node, con, if_exists="replace", index=False)
self._metadata = meta
self._index = None
# --- meta table
def _read_metadata(self):
"""return the meta table"""
self.ensure_bank_path_exists()
with sql_connection(self.index_path) as con:
sql = f'SELECT * FROM "{self._meta_node}";'
out = pd.read_sql(sql, con)
return out
# --- read events stuff
[docs]
@compose_docstring(get_events_params=get_events_parameters)
def get_events(self, **kwargs) -> obspy.Catalog:
"""
Read events from bank.
Parameters
----------
{get_events_params}
"""
ind = self.read_index(**kwargs)
eids = ind["event_id"]
file_paths = ind["path"]
paths = str(self.bank_path) + os.sep + _natify_paths(file_paths)
paths.drop_duplicates(inplace=True)
read_func = partial(try_read_catalog, format=self.format)
# Divide work evenly between workers, with a min chunksize of 1.
chunksize = len(paths) // self._max_workers or 1
map_kwargs = dict(chunksize=chunksize)
try:
mapped_values = self._map(read_func, paths.values, **map_kwargs)
non_none_values = (x for x in mapped_values if x is not None)
cat = reduce(add, non_none_values)
except TypeError: # empty events
cat = obspy.Catalog()
# Make sure only the events of interest are included
events = [eve for eve in cat if eve.resource_id.id in eids.values]
return obspy.Catalog(events=events)
[docs]
def ids_in_bank(self, event_id: Union[str, Sequence[str]]) -> Set[str]:
"""
Determine if one or more event_ids are used by the bank.
This function is faster than reading the entire index into memory to
perform a similar check.
Parameters
----------
event_id
A single event id or sequence of event ids.
Returns
-------
A set of event_ids which are also found in the bank.
"""
eids = self.read_index(columns="event_id").values
unique = set(np.unique(eids))
return unique & {str(x) for x in iterate(event_id)}
[docs]
@compose_docstring(bar_parameter_description=bar_parameter_description)
def put_events(
self,
events: Union[ev.Event, ev.Catalog, EventClient],
update_index: bool = True,
bar: Optional[ProgressBar] = None,
overwrite_existing=True,
) -> "EventBank":
"""
Put events into the EventBank.
If the event_id already exists the old event will be overwritten on
disk.
Parameters
----------
events
An objects which contains event data (e.g. Catalog, Event,
EventBank, etc.)
update_index
Flag to indicate whether or not to update the event index after
writing the new events. Note: Only events added through this
method call will get indexed. Default is True.
{bar_parameter_description}
overwrite_existing
If True, overwrite any existing events in the EventBank which
share an event id with the new events.
Notes
-----
If any of the events do not have an extractable reference time a
``ValueError`` will be raised.
"""
self.ensure_bank_path_exists(create=True)
# get catalog from any event client
events = get_event_client(events).get_events()
# get index only with needed resource_ids
event_ids = [str(x.resource_id) for x in events]
df = self.read_index(event_id=event_ids).set_index("event_id")
# create an iterator and apply over potential pool
event_feeder = self._measure_iterator(events, bar)
new_func = partial(
self._put_event,
index=df,
bank_path=self.bank_path,
path_structure=self.path_structure,
name_structure=self.name_structure,
format=self.format,
overwrite_existing=overwrite_existing,
)
paths_raw = list(self._map(new_func, event_feeder)) # can include None
if update_index: # parse newly saved files and update index
paths = [x for x in paths_raw if x is not None] # remove None
self.update_index(paths=paths)
return self
@staticmethod
def _put_event(
event,
index,
bank_path,
path_structure,
name_structure,
format,
overwrite_existing,
):
"""
Get a single event's path, save to db, return path.
NOTE: This function must be static to avoid pickling the attached
executor (see #158)
"""
bank_path = str(bank_path)
df = index
rid = str(event.resource_id)
if rid in df.index: # event needs to be updated
path = df.loc[rid, "path"]
save_path = bank_path + os.sep + path
if not overwrite_existing: # dont update existing
return
else: # event file does not yet exist
path = _summarize_event(
event, path_struct=path_structure, name_struct=name_structure
)["path"]
save_path = (Path(bank_path) / path).absolute()
path = Path(save_path)
path.parent.mkdir(exist_ok=True, parents=True)
event.write(str(path), format=format)
return path
get_event_summary = read_index