Source code for obsplus.events.get_events

"""
Module for adding a get_events method to obspy events.
"""

import inspect
from typing import Tuple, Union

import numpy as np
import obspy
import pandas as pd
from obspy.clients.fdsn import Client
from obspy.geodetics import kilometers2degrees

import obsplus
import obsplus.utils.misc
from obsplus.constants import (
    get_events_parameters,
    CIRCULAR_PARAMS,
    UNSUPPORTED_PARAMS,
    NONCIRCULAR_PARAMS,
)
from obsplus.exceptions import UnsupportedKeyword
from obsplus.utils.docs import compose_docstring
from obsplus.utils.geodetics import SpatialCalculator, map_longitudes
from obsplus.utils.misc import strip_prefix
from obsplus.utils.time import _dict_times_to_npdatetimes


CLIENT_SUPPORTED = set(inspect.signature(Client.get_events).parameters)
SUPPORTED_PARAMS = (CLIENT_SUPPORTED | CIRCULAR_PARAMS) - UNSUPPORTED_PARAMS


def _validate_get_event_kwargs(kwargs, extra=frozenset()):
    """Raise UnsupportedKeyword if any kwargs arent supported by get_events."""
    supported = set(SUPPORTED_PARAMS) | set(extra)
    bad_kwargs = set(kwargs) - supported
    if bad_kwargs:
        # make sure there aren't any max/min of supported params.
        really_bad = []
        for bad_kwarg in bad_kwargs:
            stripped = strip_prefix(bad_kwarg, ["min", "max"])
            if stripped not in supported:
                really_bad.append(stripped)
        # there there are still bad kwargs raise
        if really_bad:
            msg = f"{really_bad} are not supported kwargs by get_events."
            raise UnsupportedKeyword(msg)


def _sanitize_circular_search(**kwargs) -> Tuple[dict, dict]:
    """
    Check for clashes between circular-search and box-search kwargs.

    Returns
    -------
    Two separate dictionaries of the circular kwargs and everything else.
    """
    if CIRCULAR_PARAMS.intersection(kwargs):
        if NONCIRCULAR_PARAMS.intersection(kwargs):
            raise ValueError(
                "{0} cannot be used with {1}".format(
                    NONCIRCULAR_PARAMS.intersection(kwargs),
                    CIRCULAR_PARAMS.intersection(kwargs),
                )
            )
        if not {"latitude", "longitude"}.issubset(kwargs):
            raise ValueError("Circular search requires both longitude and latitude")
        # If neither minradius not maxradius they just want everything.
        if not {"minradius", "maxradius"}.intersection(kwargs):
            _ = kwargs.pop("latitude", None)
            _ = kwargs.pop("longitude", None)
    # Split parameters that are supported on sql and those that are not.
    circular_kwargs = {}
    for key in CIRCULAR_PARAMS:
        value = kwargs.pop(key, None)
        if value is not None:
            circular_kwargs.update({key: value})
    return circular_kwargs, kwargs


def _get_bounding_box(circular_kwargs: dict) -> dict:
    """
    Return a dict containing the bounding box for circular params.
    """
    circular_kwargs = dict(circular_kwargs)  # we dont want to mutate this dict
    out = {}  # init empty dict for outputs
    if "maxradius" in circular_kwargs.keys():
        maxradius = circular_kwargs["maxradius"]
        if not circular_kwargs.get("degrees", True):
            # If distance is in m we will just assume a spherical earth
            maxradius = kilometers2degrees(maxradius / 1000.0)
        # Make the approximated box a bit bigger to cope with flattening.
        out.update(
            dict(
                minlatitude=circular_kwargs["latitude"] - (1.2 * maxradius),
                maxlatitude=circular_kwargs["latitude"] + (1.2 * maxradius),
                minlongitude=circular_kwargs["longitude"] - (1.2 * maxradius),
                maxlongitude=circular_kwargs["longitude"] + (1.2 * maxradius),
            )
        )
    return out


def _get_ids(df, kwargs) -> set:
    """return a set of event_ids that meet filter requirements"""
    filt = np.ones(len(df)).astype(bool)
    # Separate kwargs used in circular searches.
    circular_kwargs, kwargs = _sanitize_circular_search(**kwargs)
    if circular_kwargs:
        # Circular kwargs are used, first apply non-circular query then trim
        kwargs.update(_get_bounding_box(circular_kwargs))
        df = get_event_summary(df, **kwargs)
        if len(df) == 0:  # If there are no events in the rectangular region.
            return set()
        filt = np.ones(len(df)).astype(bool)
        # Trim based on circular kwargs, first get distance dataframe.
        input = (circular_kwargs["latitude"], circular_kwargs["longitude"], 0)
        dist_calc = SpatialCalculator()
        dist_df = dist_calc(input, df)
        # then get radius and filter if needed
        degrees = circular_kwargs.get("distance_degrees", True)
        radius = dist_df["distance_degrees" if degrees else "distance_m"].values
        if "minradius" in circular_kwargs:
            filt &= radius > circular_kwargs["minradius"]
        if "maxradius" in circular_kwargs:
            filt &= radius < circular_kwargs["maxradius"]
        df = df[filt]
    else:  # No circular kwargs are being used; normal query
        filt, kwargs = _handle_dateline_transversal(filt, df, kwargs)
        for item, value in kwargs.items():
            if value is None:
                continue
            item = item.replace("start", "min").replace("end", "max")
            if item.startswith("min"):
                col = item.replace("min", "")
                filt &= df[col] > value
            if item.startswith("max"):
                col = item.replace("max", "")
                filt &= df[col] < value
            if item == "updatedafter":
                filt &= df["updated"] > value
            if item == "eventid":
                filt &= df["event_id"] == str(value)
        df = df[filt]
    limit = kwargs.get("limit", len(df))
    return set(df["event_id"].values[:limit])


def _handle_dateline_transversal(filt, df, kwargs):
    """Check if dateline should be transversed by query."""
    # if longitudes aren't being used bail out
    if not {"minlongitude", "maxlongitude"}.issubset(set(kwargs)):
        return filt, kwargs
    # if dateline is not to be transversed by query bail out
    long_array = np.array([kwargs["minlongitude"], kwargs["maxlongitude"]])
    minlong, maxlong = map_longitudes(long_array)
    if not minlong > maxlong:
        return filt, kwargs
    long = df["longitude"]
    # remove min/max long from query dict and reform to two queries.
    kwargs.pop("minlongitude"), kwargs.pop("maxlongitude")
    filt &= (long >= minlong) | (long <= maxlong)
    return filt, kwargs


[docs] @compose_docstring(get_events_params=get_events_parameters) def get_events(cat: obspy.Catalog, **kwargs) -> obspy.Catalog: """ Return a subset of a events filtered on input parameters. Parameters ---------- {get_event_parameters} """ # If not kwargs are passed just return all events if not kwargs: return cat # Make sure all inputs are supported if not set(kwargs).issubset(SUPPORTED_PARAMS): bad_params = set(kwargs) - SUPPORTED_PARAMS msg = f"{bad_params} are not supported get_events parameters" raise TypeError(msg) # Ensure all times are numpy datetimes kwargs = _dict_times_to_npdatetimes(kwargs) event_ids = _get_ids(obsplus.events_to_df(cat), kwargs) events = [eve for eve in cat if str(eve.resource_id) in event_ids] return obspy.Catalog(events=events)
[docs] @compose_docstring(get_events_params=get_events_parameters) def get_event_summary( cat: Union[obspy.Catalog, pd.DataFrame], **kwargs ) -> pd.DataFrame: """ Return a dataframe from a events object after applying filters. Parameters ---------- {get_event_parameters} """ df = obsplus.events_to_df(cat) event_ids = _get_ids(df, kwargs) return df[df.event_id.isin(event_ids)]
# --------------- monkey patch get_events method to events obspy.Catalog.get_events = get_events obspy.Catalog.get_event_summary = get_event_summary