"""
Utilities for calculating spatial relationships.
"""
from typing import Union
import numpy as np
import pandas as pd
from obspy.geodetics import gps2dist_azimuth
from obspy.geodetics.base import WGS84_A, WGS84_F
from obspy.taup.taup_geo import calc_dist
import obsplus
from obsplus.constants import (
event_type,
inventory_type,
DISTANCE_COLUMN_DTYPES,
LOCATION_DTYPE,
ALT_DISTANCE_COLUMN_DTYPES,
)
from obsplus.utils.docs import compose_docstring
from obsplus.exceptions import DataFrameContentError
[docs]
class SpatialCalculator:
"""
Class for calculating spatial relationships between two entities.
The default values are appropriate for Earth.
Parameters
----------
radius
The radius of the planetary body in question.
flattening
The flattening coefficient of the planetary body in question.
Examples
--------
>>> import obsplus
>>> import obspy
>>> from obsplus.utils import SpatialCalculator
>>> calc = SpatialCalculator() # SpatialCalculator for Earth
>>> # Get the distance and azimuth between two points
>>> p1 = (45.55, -111.21, 1000) # format is lat, lon, elevation (m asl)
>>> p2 = (40.22, -115, 10)
>>> df = calc(p1, p2)
>>> # Get the distance between each event and each station
>>> events = obspy.read_events()
>>> station = obspy.read_inventory()
>>> df = calc(events, station)
>>> # index 1 is the event_id and index 2 is the seed id
"""
expected_exceptions = (TypeError, ValueError, AttributeError)
def __init__(self, radius: float = WGS84_A, flattening: float = WGS84_F):
self.radius = radius
self.flattening = flattening
# --- methods for getting input dataframe from various objects
def _get_dataframe(self, obj) -> pd.DataFrame:
"""
Return a dataframe with latitude, longitude, elevation, and id.
"""
cols = list(LOCATION_DTYPE)
cols1 = list(ALT_DISTANCE_COLUMN_DTYPES)
# if a dataframe is used
if isinstance(obj, pd.DataFrame):
if not (
set(cols).issubset(obj.columns) or set(cols1).issubset(obj.columns)
):
raise DataFrameContentError(
"SpatialCalculator input dataframe must have the following "
f"columns: {cols} or {cols1}"
)
return self._validate_dataframe(obj)
try: # first try events
df = self._df_from_events(obj)
except self.expected_exceptions: # then stations
try:
df = self._df_from_stations(obj)
except self.expected_exceptions:
# and lastly any sequence.
df = self._df_from_sequences(obj)
return self._validate_dataframe(df)
def _df_from_events(self, obj):
"""Get the needed dataframe from some objects with event data."""
df = obsplus.events_to_df(obj).set_index("event_id")
df["elevation"] = -df["depth"]
return df
def _df_from_stations(self, obj):
"""Get the needed dataframe from some object with station data."""
df = obsplus.stations_to_df(obj).set_index("seed_id")
return df
def _df_from_sequences(self, obj):
"""Get the dataframe from generic sequences."""
ar = np.atleast_2d(obj)
if ar.shape[1] == 3: # need to add index columns
id = np.arange(len(ar))
elif ar.shape[1] == 4:
ar, id = ar[:, :3].astype(float), ar[:, 3]
else:
msg = "A sequence must have either 3 or 4 elements."
raise ValueError(msg)
cols = list(LOCATION_DTYPE)
df = pd.DataFrame(ar, index=id, columns=cols)
return df
def _de_duplicate_df_index(self, df) -> pd.DataFrame:
"""
Remove any duplicated indices, raise an exception if any of the
duplicated indices have differing coordinates.
"""
# determine if there are any duplicates with different coords
duplicate_indices = df.index.duplicated()
duplicated_data = df.duplicated(list(LOCATION_DTYPE))
# if so raise an exception as this will not produced desired result.
if (duplicate_indices & (~duplicated_data)).any():
dup_ids = set(df.index[(duplicate_indices & (~duplicated_data))])
msg = (
f"There are multiple coordinates for ids: {dup_ids} "
"cannot reliable calculate distance relationships."
)
raise ValueError(msg)
return df[~duplicate_indices]
def _validate_dataframe(self, df) -> pd.DataFrame:
"""Ensure all the parameters of the dataframe are reasonable."""
# first cull out columns that aren't needed and de-dup index
if ("depth" in df.columns) and ("elevation" not in df.columns):
# Make sure that the df has an elevation column
out = df[list(ALT_DISTANCE_COLUMN_DTYPES)].astype(
ALT_DISTANCE_COLUMN_DTYPES
)
out["elevation"] = -1 * out["depth"]
out.drop("depth", inplace=True, axis=1)
self._de_duplicate_df_index(out)
else:
out = (
df[list(LOCATION_DTYPE)]
.astype(LOCATION_DTYPE)
.pipe(self._de_duplicate_df_index)
)
# sanity checks on lat/lon
lat_valid = abs(df["latitude"]) <= 90.0
lons_valid = abs(df["longitude"]) <= 180.0
if not (lat_valid.all() & lons_valid.all()):
msg = f"invalid lat/lon values found in {df}"
raise DataFrameContentError(msg)
return out
# --- methods for calculating spatial relationships
def _get_spatial_relations(self, array):
"""Calculate distances and azimuths."""
# TODO this may be vectorizable, look into it
# get planet radius and flattening
a, f = self.radius, self.flattening
rad_km = a / 1000.0
# first calculate the great circle distance in m, forward az and back az
m_az1_az2 = [gps2dist_azimuth(x[0], x[1], x[3], x[4], a=a, f=f) for x in array]
# then calculate distance in degrees
dist_degrees = [calc_dist(x[0], x[1], x[3], x[4], rad_km, f) for x in array]
# vertical distances
vert = array[:, 2] - array[:, 5]
# stack and return
out1 = np.array(m_az1_az2)
out2 = np.array(dist_degrees).reshape(-1, 1)
return np.hstack([out1, out2, vert.reshape(-1, 1)])
@compose_docstring(
out_columns=str(tuple(DISTANCE_COLUMN_DTYPES)),
in_columns=str(tuple(LOCATION_DTYPE)),
)
def __call__(
self,
entity_1: Union[event_type, inventory_type, pd.DataFrame, tuple],
entity_2: Union[event_type, inventory_type, pd.DataFrame, tuple],
) -> pd.DataFrame:
"""
Calculate spatial relationship(s) between two entities.
Parameters
----------
entity_1
A variety of obspy/obsplus types which have linked geographic info.
entity_2
A variety of obspy/obsplus types which have linked geographic info.
Notes
-----
If a dataframe is used for input it must have the following columns:
{in_columns}
Returns
-------
A dataframe with rows for each combination of entities with columns:
{out_columns}
"""
# first convert each entity to dataframe with lat, lon, elevation as
# columns and a meaningful id as the index.
obj1 = self._get_dataframe(entity_1)
obj2 = self._get_dataframe(entity_2)
# first get cartesian product from both indices and make aligned dfs
mesh = np.meshgrid(obj1.index.values, obj2.index.values)
ind1, ind2 = mesh[0].flatten(), mesh[1].flatten()
df1, df2 = obj1.loc[ind1], obj2.loc[ind2]
assert len(df1) == len(df2)
# create array of lat1, lon1, ele1, lat2, lon2, ele2
array = np.hstack((df2.values, df1.values))
# get spatial relationships, create index and return df
out = self._get_spatial_relations(array)
index = pd.MultiIndex.from_arrays([ind1, ind2], names=["id1", "id2"])
return pd.DataFrame(out, columns=list(DISTANCE_COLUMN_DTYPES), index=index)
[docs]
def map_longitudes(angle_array: Union[np.ndarray, pd.Series]) -> pd.Series:
"""
Map longitudes to -180 to 180 domain.
Parameters
----------
angle_array
An array or series with real numeric values representing angles in
degrees.
Examples
--------
>>> import pandas as pd
>>> import numpy as np
>>> angles = np.array([35, -45, 340, -721])
>>> out = map_longitudes(angles)
>>> expected = np.array([35, -45, -20, -1])
>>> assert np.allclose(out, expected)
"""
out = angle_array.astype(float) % 360
gt_180 = out > 180
out[gt_180] = out[gt_180] - 360
return out
if __name__ == "__main__":
import doctest
doctest.testmod()