Source code for obsplus.events.merge

"""
functions for merging catalogs together
"""

import warnings
from collections import OrderedDict
from typing import Optional, Union

import numpy as np
from obspy.core.event import Catalog, Origin, Event

import obsplus
from obsplus import validate_catalog
from obsplus.utils.events import bump_creation_version


[docs] def merge_events(eve1: Event, eve2: Event, reject_old: bool = True) -> Event: """ Merge picks and amplitudes of two events together. This function attempts to merge picks and amplitudes of two events together that may have different resource_ids in some attributes. The second event is imposed on the first which is modified in place. Parameters ---------- eve1 : Catalog The first (former) event eve2 : Catalog The second (new) event reject_old : bool If True, reject anything in eve1 not in eve2 Returns ------- Event The merged events """ _merge_picks(eve1, eve2, reject_old=reject_old) _merge_amplitudes(eve1, eve2, reject_old=reject_old) return eve1
def _generate_pick_phase_maps(eve1, eve2): maps = {} maps["widp_p1"] = _hash_wids(eve1.picks, "phase_hint") maps["widp_p2"] = _hash_wids(eve2.picks, "phase_hint") maps["widm_a1"] = _hash_wids(eve1.amplitudes, "magnitude_hint") maps["widm_a2"] = _hash_wids(eve2.amplitudes, "magnitude_hint") maps["id_p1"] = {x.resource_id.id for x in eve1.picks} maps["id_p2"] = {x.resource_id.id for x in eve2.picks} maps["id1_p2"] = { val.resource_id.id: maps["widp_p2"].get(key, None) for key, val in maps["widp_p1"].items() } maps["id2_p1"] = { val.resource_id.id: maps["widp_p1"].get(key, None) for key, val in maps["widp_p2"].items() } return maps def _merge_picks(eve1, eve2, reject_old=False): """ Merge a list of objects that have waveform ids (arrivals, picks, amplitudes) """ maps = _generate_pick_phase_maps(eve1, eve2) # new attributes that should overwrite old ones attrs_no_update = {"resource_id", "force_resource_id"} widp_p1 = maps["widp_p1"] widp_p2 = maps["widp_p2"] # for picks in old and new for key in set(widp_p1) & set(widp_p2): if widp_p1[key] != widp_p2[key]: bump_creation_version(widp_p2[key]) # bump creation info pd2 = widp_p2[key].__dict__ update_vals = {x: pd2[x] for x in pd2 if x not in attrs_no_update} widp_p1[key].__dict__.update(update_vals) # for new picks append for key in set(widp_p2) - set(widp_p1): eve1.picks.append(widp_p2[key]) # reject old if reject_old: _reject_old(eve1.picks, "phase_hint", widp_p2) # eve1.picks = [x for x in eve1.picks if _hash_wid(x, "phase_hint") in widp_p2] def _merge_amplitudes(eve1, eve2, reject_old=False): """Merge the amplitudes together.""" attrs_no_update = {"pick_id", "resource_id", "force_resource_id"} maps = _generate_pick_phase_maps(eve1, eve2) pid1_a1 = {x.pick_id.id: x for x in eve1.amplitudes} pid1_a2 = {} # map all amplitude 2 back to pick ids on first event for amp in eve2.amplitudes: aid = amp.pick_id if aid is not None and maps["id2_p1"][aid.id] is not None: key = maps["id2_p1"][aid.id].resource_id.id pid1_a2[key] = amp # common keys for key in set(pid1_a1) & set(pid1_a2): amp1, amp2 = pid1_a2[key], pid1_a1[key] if amp1 == amp2: continue bump_creation_version(amp2) # bump creation info pd2 = amp2.__dict__ update_vals = {x: pd2[x] for x in pd2 if x not in attrs_no_update} amp1.__dict__.update(update_vals) # for new amplitudes append for key in set(pid1_a2) - set(pid1_a1): eve1.amplitudes.append(pid1_a2[key]) # reject old if reject_old: _reject_old(eve1.amplitudes, "magnitude_hint", pid1_a2) # eve1.amplitudes = [x for x in eve1.amplitudes if x.pick_id.id in pid1_a2] def _reject_old(objs, hash_attr, checklist): """Set the evaluation status of outdated objects to 'rejected'""" for x in objs: try: wid = _hash_wid(x, hash_attr) except AttributeError: # It is not a valid Amplitude... reject it x.evaluation_status = "rejected" else: if wid not in checklist: x.evaluation_status = "rejected"
[docs] def attach_new_origin( old_event: Event, new_event: Event, new_origin: Origin, preferred: bool, index: Optional[int] = None, ) -> Catalog: """ Attach a new origin to an existing events object. Parameters ---------- old_event : obspy.core.event.Event The old event that will receive the new origin new_event : obspy.core.event.Event The new event that contains the origin, needed for merging picks that may not exist in old_event new_origin : obspy.core.event.Origin The new origin that will be attached to old_event preferred : bool If True mark the new origin as the preferred_origin index : int or None The origin index of old_cat that new_origin will overwrite, if None append the new_origin to old_cat.origins Returns ------- obspy.Catalog modifies old_cat in-place, returns old_catalog """ # make sure all the picks/amplitudes in new_event are also in old_event merge_events(old_event, new_event, reject_old=False) # point the arrivals in the new origin at the old picks _associate_picks(old_event, new_event, new_origin) # append the origin if index is not None: # if this origin is to replace another try: old_ori = old_event.origins[index] except IndexError: msg = ("%d is not valid for an origin list of length %d") % ( index, len(old_event.origins), ) msg += " appending new origin to end of list" warnings.warn(msg) old_event.origins.append(new_origin) else: # set resource id and creation info new_origin.resource_id = old_ori.resource_id new_origin.creation_info = old_ori.creation_info old_event.origins[index] = new_origin else: old_event.origins.append(new_origin) # bump origin creation info bump_creation_version(new_origin) # set preferred if preferred: old_event.preferred_origin_id = new_origin.resource_id validate_catalog(old_event) return old_event
def _associate_picks(old_eve, new_event, new_origin): """associate the origin arrivals with correct picks from old event""" picks = old_eve.picks # picks of old cat_name new_resource_pick_dict = {x.resource_id.id: x for x in new_event.picks} old_pick_dict = _hash_wids(picks, "phase_hint") for arrival in new_origin.arrivals: # associate picks together new_pick = new_resource_pick_dict[arrival.pick_id.id] new_pick_hash = _hash_wid(new_pick, "phase_hint") # get corresponding old pick and swap resource id of arrival old_pick = old_pick_dict[new_pick_hash] arrival.pick_id = old_pick.resource_id
[docs] def associate_merge( event: Event, new_catalog: Union[Catalog, Event], median_tolerance: float = 1.0, reject_old: bool = False, ) -> Event: """ Merge the "closest" event in a catalog into an existing event. Finds the closest event in new_catalog to event using median pick times, then calls :func:`obsplus.events.merge.merge_events` to merge the events together. Parameters ---------- event The base event which will be modified in place. new_catalog A new catalog or event which contains picks. median_tolerance The tolerance, in seconds, of the median pick for associating events in new_catalog into event. reject_old Reject any picks/amplitudes in old event if not found in new event. """ def _get_pick_median(time_ser): """ Return a (close enough) approximation of the median for datetimes in ns. """ int_median = int(time_ser.view(np.int64).median()) return int_median def _get_associated_event_id(new_picks, old_picks): """Return the associated event id""" new_med = new_picks.groupby("event_id")["time"].apply(_get_pick_median) old_med = _get_pick_median(old_picks["time"]) diffs = abs(new_med - old_med) # check on min tolerance, if exceeded return empty if diffs.min() / 1_000_000_000 > median_tolerance: return None return diffs.idxmin() # Get list-like of events from new_catalog new_cat = new_catalog if isinstance(new_catalog, Catalog) else [new_catalog] assert len(new_catalog) > 0 # Get dataframes of event info new_pick_df = obsplus.picks_to_df(new_cat) old_pick_df = obsplus.picks_to_df(event) eid = _get_associated_event_id(new_pick_df, old_pick_df) new_event = {str(x.resource_id): x for x in new_catalog}.get(eid) # The association failed, just return original event if new_event is None: return event return merge_events(event, new_event, reject_old=reject_old)
# ---------- silly hash functions for getting around resource_ids (sorta) def _hash_wids(objs, extra_attr=None): out = OrderedDict() for obj in objs: try: key = _hash_wid(obj, extra_attr) except AttributeError: # if cant hash object continue else: out[key] = obj return out def _hash_wid(obj, extra_attr): wid = obj.waveform_id extra = getattr(obj, extra_attr) if extra_attr else "" key = "-".join([wid.network_code, wid.station_code, wid.channel_code, extra]) return key