Source code for obsplus.utils.mseed
"""
Slimmed down bits of obspy's mseed module.
Copyrights to ObsPy developers still apply.
"""
import os
import numpy as np
from obspy.io.mseed.core import DATATYPES, C, clibmseed
def _get_lil(mseed_object):
"""
Slimmed down functions for indexing miniseed files.
Contains hacked bits from obspy's mseed module.
Copyrights to ObsPy developers still apply.
"""
# Parse the headonly and reclen flags.
unpack_data = 0
details = False
reclen = -1
header_byteorder = -1
verbose = 0
selections = None
length = os.path.getsize(mseed_object)
assert length > 128, "length too small"
assert length < 2147483648, "length too big"
# Assume a file was passed
bfr_np = np.fromfile(mseed_object, dtype=np.int8)
buflen = len(bfr_np)
all_data = []
# Use a callback function to allocate the memory and keep track of the
# data.
def allocate_data(samplecount, sampletype):
# Enhanced sanity checking for libmseed 2.10 can result in the
# sampletype not being set. Just return an empty array in this case.
if sampletype == b"\x00":
data = np.empty(0)
else:
data = np.empty(samplecount, dtype=DATATYPES[sampletype])
all_data.append(data)
return data.ctypes.data
alloc_data = C.CFUNCTYPE(C.c_longlong, C.c_int, C.c_char)(allocate_data)
clibmseed.verbose = bool(verbose)
# note: normally this would be in a try except, but we just fal back to
# obspy read if an exception happens (this only needs to work most the time)
lil = clibmseed.readMSEEDBuffer(
bfr_np,
buflen,
selections,
C.c_int8(unpack_data),
reclen,
C.c_int8(verbose),
C.c_int8(details),
header_byteorder,
alloc_data,
)
clibmseed.verbose = True
return lil
[docs]
def summarize_mseed(mseed_object):
"""
get a summary of an mseed file.
Note: we cannot simply use obspy.io.mseed.get_record_information because it
returns info only about the first trace.
"""
lil = _get_lil(mseed_object)
traces = []
current_id = lil.contents
while True:
# Init header with the essential information.
header = {
"network": current_id.network.strip(),
"station": current_id.station.strip(),
"location": current_id.location.strip(),
"channel": current_id.channel.strip(),
"path": mseed_object,
}
# Loop over segments.
try:
current_segment = current_id.firstSegment.contents
except ValueError:
break
while True:
header["starttime"] = current_segment.starttime * 1_000
header["endtime"] = current_segment.endtime * 1_000
header["sampling_period"] = current_segment.hpdelta * 1_000
traces.append(dict(header))
try:
current_segment = current_segment.next.contents
except ValueError:
break
try:
current_id = current_id.next.contents
except ValueError:
break
clibmseed.lil_free(lil) # NOQA
del lil # NOQA
if not traces:
raise IOError(f"could not read {mseed_object}")
return traces