Source code for obsplus.events.merge

"""
functions for merging catalogs together
"""

from __future__ import annotations

import warnings
from collections import OrderedDict

import numpy as np
from obspy.core.event import Catalog, Event, Origin

import obsplus
from obsplus import validate_catalog
from obsplus.utils.events import bump_creation_version


[docs] def merge_events(eve1: Event, eve2: Event, reject_old: bool = True) -> Event: """ Merge picks and amplitudes of two events together. This function attempts to merge picks and amplitudes of two events together that may have different resource_ids in some attributes. The second event is imposed on the first which is modified in place. Parameters ---------- eve1 : Catalog The first (former) event eve2 : Catalog The second (new) event reject_old : bool If True, reject anything in eve1 not in eve2 Returns ------- Event The merged events """ _merge_picks(eve1, eve2, reject_old=reject_old) _merge_amplitudes(eve1, eve2, reject_old=reject_old) return eve1
def _generate_pick_phase_maps(eve1, eve2): maps = {} maps["widp_p1"] = _hash_wids(eve1.picks, "phase_hint") maps["widp_p2"] = _hash_wids(eve2.picks, "phase_hint") maps["widm_a1"] = _hash_wids(eve1.amplitudes, "magnitude_hint") maps["widm_a2"] = _hash_wids(eve2.amplitudes, "magnitude_hint") maps["id_p1"] = {x.resource_id.id for x in eve1.picks} maps["id_p2"] = {x.resource_id.id for x in eve2.picks} maps["id1_p2"] = { val.resource_id.id: maps["widp_p2"].get(key, None) for key, val in maps["widp_p1"].items() } maps["id2_p1"] = { val.resource_id.id: maps["widp_p1"].get(key, None) for key, val in maps["widp_p2"].items() } return maps def _merge_picks(eve1, eve2, reject_old=False): """ Merge a list of objects that have waveform ids (arrivals, picks, amplitudes) """ maps = _generate_pick_phase_maps(eve1, eve2) # new attributes that should overwrite old ones attrs_no_update = {"resource_id", "force_resource_id"} widp_p1 = maps["widp_p1"] widp_p2 = maps["widp_p2"] # for picks in old and new for key in set(widp_p1) & set(widp_p2): if widp_p1[key] != widp_p2[key]: bump_creation_version(widp_p2[key]) # bump creation info pd2 = widp_p2[key].__dict__ update_vals = {x: pd2[x] for x in pd2 if x not in attrs_no_update} widp_p1[key].__dict__.update(update_vals) # for new picks append for key in set(widp_p2) - set(widp_p1): eve1.picks.append(widp_p2[key]) # reject old if reject_old: _reject_old(eve1.picks, "phase_hint", widp_p2) # eve1.picks = [x for x in eve1.picks if _hash_wid(x, "phase_hint") in widp_p2] def _merge_amplitudes(eve1, eve2, reject_old=False): """Merge the amplitudes together.""" attrs_no_update = {"pick_id", "resource_id", "force_resource_id"} maps = _generate_pick_phase_maps(eve1, eve2) pid1_a1 = {x.pick_id.id: x for x in eve1.amplitudes} pid1_a2 = {} # map all amplitude 2 back to pick ids on first event for amp in eve2.amplitudes: aid = amp.pick_id if aid is not None and maps["id2_p1"][aid.id] is not None: key = maps["id2_p1"][aid.id].resource_id.id pid1_a2[key] = amp # common keys for key in set(pid1_a1) & set(pid1_a2): amp1, amp2 = pid1_a2[key], pid1_a1[key] if amp1 == amp2: continue bump_creation_version(amp2) # bump creation info pd2 = amp2.__dict__ update_vals = {x: pd2[x] for x in pd2 if x not in attrs_no_update} amp1.__dict__.update(update_vals) # for new amplitudes append for key in set(pid1_a2) - set(pid1_a1): eve1.amplitudes.append(pid1_a2[key]) # reject old if reject_old: _reject_old(eve1.amplitudes, "magnitude_hint", pid1_a2) # eve1.amplitudes = [x for x in eve1.amplitudes if x.pick_id.id in pid1_a2] def _reject_old(objs, hash_attr, checklist): """Set the evaluation status of outdated objects to 'rejected'""" for x in objs: try: wid = _hash_wid(x, hash_attr) except AttributeError: # It is not a valid Amplitude... reject it x.evaluation_status = "rejected" else: if wid not in checklist: x.evaluation_status = "rejected"
[docs] def attach_new_origin( old_event: Event, new_event: Event, new_origin: Origin, preferred: bool, index: int | None = None, ) -> Catalog: """ Attach a new origin to an existing events object. Parameters ---------- old_event : obspy.core.event.Event The old event that will receive the new origin new_event : obspy.core.event.Event The new event that contains the origin, needed for merging picks that may not exist in old_event new_origin : obspy.core.event.Origin The new origin that will be attached to old_event preferred : bool If True mark the new origin as the preferred_origin index : int or None The origin index of old_cat that new_origin will overwrite, if None append the new_origin to old_cat.origins Returns ------- obspy.Catalog modifies old_cat in-place, returns old_catalog """ # make sure all the picks/amplitudes in new_event are also in old_event merge_events(old_event, new_event, reject_old=False) # point the arrivals in the new origin at the old picks _associate_picks(old_event, new_event, new_origin) # append the origin if index is not None: # if this origin is to replace another try: old_ori = old_event.origins[index] except IndexError: msg = ("%d is not valid for an origin list of length %d") % ( index, len(old_event.origins), ) msg += " appending new origin to end of list" warnings.warn(msg) old_event.origins.append(new_origin) else: # set resource id and creation info new_origin.resource_id = old_ori.resource_id new_origin.creation_info = old_ori.creation_info old_event.origins[index] = new_origin else: old_event.origins.append(new_origin) # bump origin creation info bump_creation_version(new_origin) # set preferred if preferred: old_event.preferred_origin_id = new_origin.resource_id validate_catalog(old_event) return old_event
def _associate_picks(old_eve, new_event, new_origin): """Associate the origin arrivals with correct picks from old event""" picks = old_eve.picks # picks of old cat_name new_resource_pick_dict = {x.resource_id.id: x for x in new_event.picks} old_pick_dict = _hash_wids(picks, "phase_hint") for arrival in new_origin.arrivals: # associate picks together new_pick = new_resource_pick_dict[arrival.pick_id.id] new_pick_hash = _hash_wid(new_pick, "phase_hint") # get corresponding old pick and swap resource id of arrival old_pick = old_pick_dict[new_pick_hash] arrival.pick_id = old_pick.resource_id
[docs] def associate_merge( event: Event, new_catalog: Catalog | Event, median_tolerance: float = 1.0, reject_old: bool = False, ) -> Event: """ Merge the "closest" event in a catalog into an existing event. Finds the closest event in new_catalog to event using median pick times, then calls :func:`obsplus.events.merge.merge_events` to merge the events together. Parameters ---------- event The base event which will be modified in place. new_catalog A new catalog or event which contains picks. median_tolerance The tolerance, in seconds, of the median pick for associating events in new_catalog into event. reject_old Reject any picks/amplitudes in old event if not found in new event. """ def _get_pick_median(time_ser): """ Return a (close enough) approximation of the median for datetimes in ns. """ int_median = int(time_ser.astype(np.int64).median()) return int_median def _get_associated_event_id(new_picks, old_picks): """Return the associated event id""" new_med = new_picks.groupby("event_id")["time"].apply(_get_pick_median) old_med = _get_pick_median(old_picks["time"]) diffs = abs(new_med - old_med) # check on min tolerance, if exceeded return empty if diffs.min() / 1_000_000_000 > median_tolerance: return None return diffs.idxmin() # Get list-like of events from new_catalog new_cat = new_catalog if isinstance(new_catalog, Catalog) else [new_catalog] assert len(new_catalog) > 0 # Get dataframes of event info new_pick_df = obsplus.picks_to_df(new_cat) old_pick_df = obsplus.picks_to_df(event) eid = _get_associated_event_id(new_pick_df, old_pick_df) new_event = {str(x.resource_id): x for x in new_catalog}.get(eid) # The association failed, just return original event if new_event is None: return event return merge_events(event, new_event, reject_old=reject_old)
# ---------- silly hash functions for getting around resource_ids (sorta) def _hash_wids(objs, extra_attr=None): out = OrderedDict() for obj in objs: try: key = _hash_wid(obj, extra_attr) except AttributeError: # if cant hash object continue else: out[key] = obj return out def _hash_wid(obj, extra_attr): wid = obj.waveform_id extra = getattr(obj, extra_attr) if extra_attr else "" key = "-".join([wid.network_code, wid.station_code, wid.channel_code, extra]) return key