Source code for obsplus.utils.stations

"""
Utilities for working with inventories.
"""
import json
import warnings
from functools import singledispatch, lru_cache
from pathlib import Path
from typing import Optional

import numpy as np
import obspy
import pandas as pd
from obspy.core.inventory import Channel, Station, Network

import obsplus
import obsplus.utils.misc
from obsplus.constants import (
    station_clientable_type,
    SMALLDT64,
    LARGEDT64,
    SMALL_UTC,
    BIG_UTC,
    NSLC,
    STATION_DTYPES,
)
from obsplus.exceptions import AmbiguousResponseError
from obsplus.interfaces import StationClient
from obsplus.utils import get_nslc_series
from obsplus.utils.docs import compose_docstring, format_dtypes
from obsplus.utils.time import to_utc

LARGE_NUMBER = obspy.UTCDateTime("3000-01-01").timestamp

# create key mappings for mapping keys (used to create inventory objects
# from various columns in the stations dataframe).
mapping_keys = {
    Channel: {"code": "channel", "location_code": "location"},
    Station: {"code": "station"},
    Network: {"code": "network"},
}

type_mappings = {"start_date": to_utc, "end_date": to_utc}
station_col_str = format_dtypes(STATION_DTYPES)


class _InventoryConstructor:
    """A private helper class for constructing inventories from dataframes."""

    _client_col = "get_station_kwargs"
    _nrl_response_cols = frozenset({"datalogger_keys", "sensor_keys"})
    # define expected kwargs for inventory class constructors
    net_map = {"code": "network", "operator": "operator"}
    sta_map = {
        "code": "station",
        "equipment": "equipment",
        "creation_date": "creation_date",
        "latitude": "latitude",
        "longitude": "longitude",
        "elevation": "elevation",
    }
    cha_map = {
        "response": "response",
        "start_date": "start_date",
        "end_date": "end_date",
        "code": "channel",
        "location_code": "location",
        "sample_rate": "sample_rate",
        "sensor": "sensor",
        "equipment": "equipment",
        "elevation": "elevation",
        "azimuth": "azimuth",
        "dip": "dip",
        "data_logger": "data_logger",
        "latitude": "latitude",
        "longitude": "longitude",
        "depth": "depth",
    }
    # A set of expected columns
    _expected_cols = (
        set(net_map.values()) | set(cha_map.values()) | set(sta_map.values())
    )
    # A set of columns to drop before building inventory
    _drop_cols = list(_nrl_response_cols) + [_client_col, "index", "seed_id"]
    # columns for performing groupby at various levels
    _gb_cols = dict(
        network=("network",),
        station=("station", "latitude", "longitude", "elevation"),
        channel=("channel", "location", "start_date", "end_date"),
    )

    def __init__(self, station_client=None):
        self._client = station_client

    def _groupby_if_exists(self, df, level):
        """Groupby columns if they exist on dataframe, else return empty."""
        columns = list(self._gb_cols[level])
        cols = list(obsplus.utils.misc.iterate(columns))
        # copy df and set missing start/end times to reasonable values
        # this is needed so they get included in a groupby
        df = df.copy()
        isnan = df.isna()
        default_start = pd.Timestamp(SMALLDT64)
        default_end = pd.Timestamp(LARGEDT64)

        if "start_date" in columns:
            df["start_date"] = df["start_date"].fillna(default_start)
        if "end_date" in columns:
            df["end_date"] = df["end_date"].fillna(default_end)

        for ind, df_sub in df.groupby(cols):
            # Pandas added an annoying warning about iterating groupbys with
            # len one. In the future ind will be a tuple, so future proofing.
            # See pandas/issues/42795
            ind = ind[0] if isinstance(ind, tuple) and len(ind) == 1 else ind
            # replace NaN values
            sub_nan = isnan.loc[df_sub.index]
            has_nan = sub_nan.any(axis=0)
            cols_with_nan = has_nan[has_nan].index
            for col in cols_with_nan:
                df_sub.loc[sub_nan[col].values, col] = np.nan
            yield ind, df_sub

    def _get_kwargs(self, series, key_mapping):
        """create the kwargs from a series and key mapping."""
        out = {}
        for k, v in key_mapping.items():
            # skip if requested kwarg is not in the series
            if v not in series:
                continue
            value = series[v]
            value = value if not pd.isnull(value) else None
            # if the type needs to be cast to something else
            if k in type_mappings and value is not None:
                value = type_mappings[k](value)
            out[k] = value

        return out

    @property
    @lru_cache()
    def nrl_client(self):
        """Initiate a nominal response library object."""
        from obspy.clients.nrl import NRL

        return NRL()

    @property
    @lru_cache()
    def station_client(self):
        """
        Instantiate an IRIS FDSN client or return the provided client.
        """
        client = self._client
        if self._client is None:
            # importing Client can make a network call ;only do it if needed
            from obspy.clients.fdsn import Client

            client = Client()
        return client

    @lru_cache()
    def get_nrl_response(self, datalogger_keys, sensor_keys):
        nrl = self.nrl_client
        kwargs = dict(datalogger_keys=datalogger_keys, sensor_keys=sensor_keys)
        return nrl.get_response(**kwargs)

    def _get_station_client_response(self, kwargs):
        """Use the client to get station responses."""
        client = self.station_client
        sub_inv = client.get_stations(level="response", **kwargs)
        channels = sub_inv.get_contents()["channels"]
        if not len(channels) == 1:
            msg = (
                f"More than one channel returned by client with kwargs:"
                f"{kwargs}, add constraints to resolve the issue"
            )
            warnings.warn(msg)
        return sub_inv[0][0][0].response

    def _update_nrl_response(self, response, df):
        """Update the responses with NRL."""
        # df doesn't have needed columns, just exit
        logger_keys, sensor_keys = df["datalogger_keys"], df["sensor_keys"]
        valid = (~logger_keys.isna()) & (~sensor_keys.isna())
        response[valid] = [
            self.get_nrl_response(tuple(x), tuple(y))
            for x, y in zip(logger_keys[valid], sensor_keys[valid])
        ]

    def _update_client_responses(self, response, df):
        """Update the client responses."""
        # infer get_stations kwargs from existing columns
        provided_kwargs = df[self._client_col]
        # only try for chans that have kwargs and no response (yet)
        is_valid = (~pd.isnull(provided_kwargs)) & pd.isnull(response)
        base_kwargs = df[is_valid].apply(self.infer_get_station_kwargs, axis=1)
        for ind in base_kwargs.index:
            # get input
            input_kwargs = base_kwargs[ind]
            input_kwargs.update(provided_kwargs[ind])
            response[ind] = self._get_station_client_response(input_kwargs)

    def infer_get_station_kwargs(self, ser):
        """Infer values for get stations kwargs from a series."""
        out = {x: ser[x] for x in NSLC}
        return out

    def get_valid_json_keys(self, value):
        """
        Iterate a series, load its contents using json module if str, set
        to None if null-ish value is found.
        """
        if not value:
            value = None
        if isinstance(value, str):
            # replace single quotes with double to make json str more flexible
            value = json.loads(value.replace("'", '"'))
        return value

    def _check_only_one_response_method(self, df):
        """Raise if both response methods are specified."""
        valid_nrl_cols = ~df[self._nrl_response_cols].isnull().all(axis=1)
        valid_client_cols = ~df[self._client_col].isnull()
        both_types_used = valid_nrl_cols & valid_client_cols
        if both_types_used.any():
            bad_nslc = get_nslc_series(df[both_types_used])
            msg = (
                f"The following channels specify both a NRL and station "
                f"client response methods, choose one or the other:\n "
                f"{bad_nslc}."
            )
            raise AmbiguousResponseError(msg)

    def _load_response_columns(self, df):
        """Else raise an AmbiguousResponseError"""
        # Load any json in any of the response columns
        # Note: we copy the df at the start so mutation is ok
        for col in [self._client_col] + list(self._nrl_response_cols):
            if col not in df.columns:
                df[col] = [None] * len(df)
            else:
                df[col] = df[col].apply(self.get_valid_json_keys)
        self._check_only_one_response_method(df)
        return df

    def _get_responses(self, df):
        """Return a series of response objects."""
        # init empty series of None for storing responses
        responses = pd.Series(index=df.index, dtype=object)
        responses.loc[responses.isnull()] = None
        # early return if neither response column is found
        resp_cols = [self._client_col] + list(self._nrl_response_cols)
        if set(resp_cols).isdisjoint(set(df.columns)):
            return responses
        # Ensure both methods are not requested for any rows
        self._load_response_columns(df)
        # update responses
        self._update_nrl_response(responses, df)
        self._update_client_responses(responses, df)
        return responses

    def _add_dates(self, kwargs, some_list):
        """Add min start_date and max end_date to kwargs."""
        if not some_list:
            return
        start_list = [x.start_date or SMALL_UTC for x in some_list]
        end_list = [x.end_date or BIG_UTC for x in some_list]
        kwargs["start_date"] = np.min(start_list)
        kwargs["end_date"] = np.max(end_list)
        return kwargs

    def _maybe_warn_on_unexpected_columns(self, df):
        """Issues a UserWarning if unexpected columns are found."""
        col_set = set(df.columns)
        diff = col_set - self._expected_cols
        if diff:
            msg = f"df_to_inventory found unexpected columns: {diff}"
            warnings.warn(msg)

    def _make_inventory(self, df: pd.DataFrame):
        """
        Loopy logic for creating the inventory form a dataframe.
        """
        # get dataframe with correct columns/conditioning from input
        df = obsplus.stations_to_df(df).copy()
        # add responses (if requested) and drop response cols
        df["response"] = self._get_responses(df)
        df = df.drop(columns=self._drop_cols, errors="ignore")
        # warn if any unexpected columns are found in df
        self._maybe_warn_on_unexpected_columns(df)
        # Iterate networks and create stations
        networks = []
        for net_code, net_df in self._groupby_if_exists(df, "network"):
            stations = []
            for st_code, sta_df in self._groupby_if_exists(net_df, "station"):
                if not st_code[0]:
                    continue
                channels = []
                for ch_code, ch_df in self._groupby_if_exists(sta_df, "channel"):
                    if not ch_code[0]:  # skip empty channel lines
                        continue
                    chan_series = ch_df.iloc[0]
                    kwargs = self._get_kwargs(chan_series, self.cha_map)
                    # try to add the inventory
                    channels.append(Channel(**kwargs))
                kwargs = self._get_kwargs(sta_df.iloc[0], self.sta_map)
                self._add_dates(kwargs, channels)
                stations.append(Station(channels=channels, **kwargs))
            kwargs = self._get_kwargs(net_df.iloc[0], self.net_map)
            self._add_dates(kwargs, stations)
            networks.append(Network(stations=stations, **kwargs))

        return obspy.Inventory(
            networks=networks, source=f"ObsPlus_v{obsplus.__version__}"
        )

    __call__ = _make_inventory


[docs] @compose_docstring(station_columns=station_col_str) def df_to_inventory( df: pd.DataFrame, client: Optional[StationClient] = None ) -> obspy.Inventory: """ Create an inventory from a dataframe. Parameters ---------- df A dataframe with the same columns and dtypes as the one returned by :func:`obsplus.stations_to_df`, which are: {station_columns} extra columns, except for those mentioned in the notes, are ignored. client Any client with a `get_stations` method. Only used if the dataframe contains special columns for retrieving channel responses. Notes ----- There are two ways to include response information: 1. Using the Nominal Response Library (NRL): (https://docs.obspy.org/master/packages/obspy.clients.nrl.html) If the dataframe has columns named "sensor_keys" and "datalogger_keys" these will indicate the response information should be fetched using ObsPy's NRL client. Each of these columns should either contain lists or josn-loadable strings. For example, to specify sensor keys: ["Nanometrics", "Trillium 120 Horizon"] or '["Nanometrics", "Trillium 120 Horizon"]' are both valid. 2. Using a station client: If the dataframe contains a column get_stations_kwargs it indicates that either a client was passed as the client argument or the fdsn IRIS client should be used to download at least *some* channel response information. The contents of this column must be a dictionary or json string of acceptable keyword arguments for the client's `get_stations` method. All time values must be provided as iso8601 strings. For example, {'network': 'UU', 'station': 'TMU', 'location': '01', 'channel': 'HHZ', 'starttime': '2017-01-02',} would be passed to the provided client to download a response for the corresponding channel. - If more than one channel is returned from the get_stations call an AmbiguousResponseError will be raised and a more specific query will be required. - If not all the required seed_id information is provided it will be ascertained from the appropriate column. - To simply fetch a response using only the info provided in other columns use an empty dict, or json string (eg '{}'). - No responses will be fetched for any rows with empty strings or null values in the get_stations_kwargs column. - If both NRL and client methods are indicated by column contents an AmbiguousResponseError will be raised. """ inv_constructor = _InventoryConstructor(station_client=client) return inv_constructor(df)
[docs] @singledispatch def get_station_client(stations: station_clientable_type) -> StationClient: """ Extract a station client from various inputs. Parameters ---------- stations Any of the following: * A path to an obspy-readable station file * A path to a directory of obspy-readable station files * An `obspy.Inventory` instance * An instance of :class:`~obsplus.EventBank` * Any other object that has a `get_stations` method Raises ------ TypeError If a station client cannot be determined from the input. """ if not isinstance(stations, StationClient): msg = f"a station client could not be extracted from {stations}" raise TypeError(msg) return stations
@get_station_client.register(Path) @get_station_client.register(str) def _read_inventory(path) -> StationClient: path = Path(path) inv = None if path.is_dir(): for inv_path in path.rglob("*.xml"): if inv is None: inv = obspy.read_inventory(str(inv_path)) else: inv += obspy.read_inventory(str(inv_path)) else: inv = obspy.read_inventory(str(path)) return get_station_client(inv)