Source code for cheta.fetch

#!/usr/bin/env python
# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Fetch values from the Ska engineering telemetry archive.
"""
import collections
import contextlib
import fnmatch
import logging
import operator
import os
import pickle
import re
import sys
import time
import warnings
from pathlib import Path

import numpy as np
import pyyaks.context
from astropy.io import ascii
from Chandra.Time import DateTime
from ska_helpers.utils import lru_cache_timed

from . import __version__  # noqa
from . import cache, file_defs, remote_access
from .derived.comps import ComputedMsid
from .lazy import LazyDict
from .remote_access import ENG_ARCHIVE
from .units import Units

# Module-level units, defaults to CXC units (e.g. Kelvins etc)
UNITS = Units(system="cxc")

# Module-level control of whether MSID.fetch will cache the last 30 results
CACHE = False

IGNORE_COLNAMES = ("TIME", "MJF", "MNF", "TLM_FMT")
DIR_PATH = os.path.dirname(os.path.abspath(__file__))

# Dates near the start of 2000 that demarcates the split between the 1999 data
# and post-2000 data.  The 1999 data goes out to at least 2000:005:13:00:00,
# while post-2000 data starts as late as 2000:001:11:58:59.  Dates between LO
# and HI get taken from either 1999 or post-2000.  The times are 4 millisec before
# a minor frame boundary to avoid collisions.
DATE2000_LO = DateTime("2000:001:00:00:00.090").date
DATE2000_HI = DateTime("2000:003:00:00:00.234").date

# Launch date (earliest possible date for telemetry)
LAUNCH_DATE = "1999:204"

# Maximum number of MSIDs that should ever match an input MSID spec
# (to prevent accidentally selecting a very large number of MSIDs)
MAX_GLOB_MATCHES = 10

# Special-case state codes that override those in the TDB
STATE_CODES = {
    # SIMDIAG
    "3SDSWELF": [(0, "F"), (1, "T")],
    "3SDSYRS": [(0, "F"), (1, "T")],
    "3SDWMRS": [(0, "F"), (1, "T")],
    # SIM_MRG
    "3TSCMOVE": [(0, "F"), (1, "T")],
    "3FAMOVE": [(0, "F"), (1, "T")],
    "3SEAID": [(0, "SEA-A"), (1, "SEA-B")],
    "3SEARSET": [(0, "F"), (1, "T")],
    "3SEAROMF": [(0, "F"), (1, "T")],
    "3SEAINCM": [(0, "F"), (1, "T")],
    "3STAB2EN": [(0, "DISABLE"), (1, "ENABLE")],
    "3SMOTPEN": [(0, "ENABLE"), (1, "DISABLE")],
    "3SMOTSEL": [(0, "TSC"), (1, "FA")],
    "3SHTREN": [(0, "DISABLE"), (1, "ENABLE")],
    "3SEARAMF": [(0, "F"), (1, "T")],
}

# Cached version (by content type) of first and last available times in archive
CONTENT_TIME_RANGES = {}

# Default source of data.
DEFAULT_DATA_SOURCE = "cxc"


class _DataSource(object):
    """
    Context manager and quasi-singleton configuration object for managing the
    data_source(s) used for fetching telemetry.
    """

    _data_sources = (DEFAULT_DATA_SOURCE,)
    _allowed = ("cxc", "maude", "test-drop-half")

    def __init__(self, *data_sources):
        self._new_data_sources = data_sources

    def __enter__(self):
        self._orig_data_sources = self.__class__._data_sources
        self.set(*self._new_data_sources)

    def __exit__(self, type, value, traceback):
        self.__class__._data_sources = self._orig_data_sources

    @classmethod
    def set(cls, *data_sources):
        """
        Set current data sources.

        :param *data_sources: one or more sources (str)
        """
        if any(
            data_source.split()[0] not in cls._allowed for data_source in data_sources
        ):
            raise ValueError(
                "data_sources {} not in allowed set {}".format(
                    data_sources, cls._allowed
                )
            )

        if len(data_sources) == 0:
            raise ValueError(
                "must select at least one data source in {}".format(cls._allowed)
            )

        cls._data_sources = data_sources

    @classmethod
    def sources(cls, include_test=True):
        """
        Get tuple of current data sources names.

        :param include_test: include sources that start with 'test'
        :returns: tuple of data source names
        """
        if include_test:
            sources = cls._data_sources
        else:
            sources = [x for x in cls._data_sources if not x.startswith("test")]

        return tuple(source.split()[0] for source in sources)

    @classmethod
    def get_msids(cls, source):
        """
        Get the set of MSID names corresponding to ``source`` (e.g. 'cxc' or 'maude')

        :param source: str
        :returns: set of MSIDs
        """
        source = source.split()[0]

        if source == "cxc":
            out = list(content.keys())
        elif source == "maude":
            import maude

            out = list(maude.MSIDS.keys())
        else:
            raise ValueError('source must be "cxc" or "msid"')

        return set(out)

    @classmethod
    def options(cls):
        """
        Get the data sources and corresponding options as a dict.

        Example::

          >>> data_source.set('cxc', 'maude allow_subset=False')
          >>> data_source.options()
          {'cxc': {}, 'maude': {'allow_subset': False}}

        :returns: dict of data source options
        """
        import ast

        out = {}
        for source in cls._data_sources:
            vals = source.split()
            name, opts = vals[0], vals[1:]
            out[name] = {}
            for opt in opts:
                key, val = opt.split("=")
                val = ast.literal_eval(val)
                out[name][key] = val

        return out


# Public interface is a "data_source" module attribute
data_source = _DataSource


def local_or_remote_function(remote_print_output):
    """
    Decorator maker so that a function gets run either locally or remotely
    depending on the state of remote_access.access_remotely.  This decorator
    maker takes an optional remote_print_output argument that will be
    be printed (locally) if the function is executed remotely,

    For functions that are decorated using this wrapper:

    Every path that may be generated locally but used remotely should be
    split with _split_path(). Conversely the functions that use
    the resultant path should re-join them with os.path.join. In the
    remote case the join will happen using the remote rules.
    """

    def the_decorator(func):
        def wrapper(*args, **kwargs):
            if remote_access.access_remotely:
                # If accessing a remote archive, establish the connection (if
                # necessary)
                if not remote_access.connection_is_established():
                    try:
                        if not remote_access.establish_connection():
                            raise remote_access.RemoteConnectionError(
                                "Unable to establish connection for remote fetch."
                            )
                    except EOFError:
                        # An EOF error can be raised if the python interpreter is being
                        # called in such a way that input cannot be received from the
                        # user (e.g. when the python interpreter is called from MATLAB)
                        # If that is the case (and remote access is enabled), then
                        # raise an import error
                        raise ImportError(
                            "Unable to interactively get remote access info from user."
                        )
                # Print the output, if specified
                if remote_access.show_print_output and remote_print_output is not None:
                    print(remote_print_output)
                    sys.stdout.flush()
                # Execute the function remotely and return the result
                return remote_access.execute_remotely(func, *args, **kwargs)
            else:
                return func(*args, **kwargs)

        return wrapper

    return the_decorator


def _split_path(path):
    """
    Return a tuple of the components for ``path``. Strip off the drive if
    it exists AND access is remote. This works correctly for the local OS
    (linux / windows).
    """
    path = Path(path)
    parts = path.parts
    if remote_access.access_remotely and path.drive:
        parts = ("/",) + parts[1:]
    return parts


def _get_start_stop_dates(times):
    if len(times) == 0:
        return {}
    else:
        return {"start": DateTime(times[0]).date, "stop": DateTime(times[-1]).date}


# Context dictionary to provide context for msid_files
ft = pyyaks.context.ContextDict("ft")

# Global (eng_archive) definition of file names
msid_files = pyyaks.context.ContextDict("msid_files", basedir=ENG_ARCHIVE)
msid_files.update(file_defs.msid_files)

# Module-level values defining available content types and column (MSID) names.
# Then convert from astropy Table to recarray for API stability.
# Note that filetypes.as_array().view(np.recarray) does not quite work...
filetypes = ascii.read(os.path.join(DIR_PATH, "filetypes.dat"))
filetypes_arr = filetypes.as_array()
filetypes = np.recarray(len(filetypes_arr), dtype=filetypes_arr.dtype)
filetypes[()] = filetypes_arr

# Get the list of filenames (an array is built to pass all the filenames at
# once to the remote machine since passing them one at a time is rather slow)
all_msid_names_files = dict()
for filetype in filetypes:
    ft["content"] = filetype["content"].lower()
    all_msid_names_files[str(ft["content"])] = _split_path(msid_files["colnames"].abs)


# Function to load MSID names from the files (executed remotely, if necessary)
@local_or_remote_function("Loading MSID names from Ska eng archive server...")
def load_msid_names(all_msid_names_files):
    import pickle

    all_colnames = dict()
    for k, msid_names_file in all_msid_names_files.items():
        try:
            all_colnames[k] = pickle.load(open(os.path.join(*msid_names_file), "rb"))
        except IOError:
            pass
    return all_colnames


def load_content(all_colnames):
    out = {}
    # Save the names
    for content_type, msid_names in all_colnames.items():
        out.update(
            (name, content_type)
            for name in sorted(msid_names)
            if name not in IGNORE_COLNAMES
        )
    return out


# Define MSID names as a dict of content_type: [MSID_names_for_content_type].
# This is a LazyDict so nothing happens until a value is requested.
all_colnames = LazyDict(load_msid_names, all_msid_names_files)

# Define MSID content definition as dict of MSID_name: content_type
content = LazyDict(load_content, all_colnames)


# Cache of the most-recently used TIME array and associated bad values mask.
# The key is (content_type, tstart, tstop).
times_cache = dict(key=None)


# Set up logging.
class NullHandler(logging.Handler):
    def emit(self, record):
        pass


logger = logging.getLogger("cheta.fetch")
logger.addHandler(NullHandler())
logger.propagate = False


[docs]def get_units(): """Get the unit system currently being used for conversions.""" return UNITS["system"]
[docs]def set_units(unit_system): """Set the unit system used for output telemetry values. The default is "cxc". Allowed values for ``unit_system`` are: ==== ============================================================== cxc FITS standard units used in CXC archive files (basically MKS) sci Same as "cxc" but with temperatures in degC instead of Kelvins eng OCC engineering units (TDB P009, e.g. degF, ft-lb-sec, PSI) ==== ============================================================== :param unit_system: system of units (cxc, sci, eng) """ UNITS.set_units(unit_system)
[docs]def read_bad_times(table): """Include a list of bad times from ``table`` in the fetch module ``bad_times`` registry. This routine can be called multiple times with different tables and the bad times will be appended to the registry. The table can include any number of bad time interval specifications, one per line. A bad time interval line has three columns separated by whitespace, e.g.:: aogbias1 2008:292:00:00:00 2008:297:00:00:00 The MSID name is not case sensitive and the time values can be in any ``DateTime`` format. Blank lines and any line starting with the # character are ignored. """ bad_times = ascii.read(table, format="no_header", names=["msid", "start", "stop"]) for msid, start, stop in bad_times: msid_bad_times.setdefault(msid.upper(), []).append((start, stop))
# Set up bad times dict msid_bad_times = dict() read_bad_times(os.path.join(DIR_PATH, "msid_bad_times.dat"))
[docs]def msid_glob(msid): """Get the archive MSIDs matching ``msid``. The function returns a tuple of (msids, MSIDs) where ``msids`` is a list of MSIDs that is all lower case and (where possible) matches the input ``msid``. The output ``MSIDs`` is all upper case and corresponds to the exact MSID names stored in the archive HDF5 files. :param msid: input MSID glob :returns: tuple (msids, MSIDs) """ msids = {} MSIDS = {} # First check if `msid` matches a computed class. This does not allow # for globs, and here the output MSIDs is the single computed class. if ComputedMsid.get_matching_comp_cls(msid) is not None: return [msid], [msid.upper()] sources = data_source.sources(include_test=False) for source in sources: ms, MS = _msid_glob(msid, source) msids.update((m, None) for m in ms) MSIDS.update((m, None) for m in MS) if not msids: raise ValueError( "MSID {!r} is not in {} data source(s)".format( msid, " or ".join(x.upper() for x in sources) ) ) return list(msids), list(MSIDS)
def _msid_glob(msid, source): """Get the archive MSIDs matching ``msid``. The function returns a tuple of (msids, MSIDs) where ``msids`` is a list of MSIDs that is all lower case and (where possible) matches the input ``msid``. The output ``MSIDs`` is all upper case and corresponds to the exact MSID names stored in the archive HDF5 files. :param msid: input MSID glob :returns: tuple (msids, MSIDs) """ source_msids = data_source.get_msids(source) # MSID is the upper-case version of the MSID name that is actually used in # backend queries. ``msid`` is the user-supplied version. MSID = msid.upper() # CALC_ is a synonym for DP_ which works in both CXC and MAUDE archives, so # swap to DP_ if CALC_ is found. These are calculated pseudo-MSIDs. if MSID.startswith("CALC_"): MSID = "DP_" + MSID[5:] # matches_msid is a list of MSIDs that match the input MSID. Providing the # initial DP_ is optional so we try both if the MSID doesn't already start # with DP_ (i.e. PITCH or DP_PITCH). matches_msid = (MSID,) if not MSID.startswith("DP_"): matches_msid += ("DP_" + MSID,) # If one of matches_msid is in the source then return the upper # case version and whatever the user supplied (could be any case). for match in matches_msid: if match in source_msids: return [msid], [match] # Next try as a file glob. If there is a match then return a # list of matches, all lower case and all upper case. Since the # input was a glob the returned msids are just lower case versions # of the matched upper case MSIDs. for match in matches_msid: matches = fnmatch.filter(source_msids, match) if matches: if len(matches) > MAX_GLOB_MATCHES: raise ValueError( "MSID spec {} matches more than {} MSIDs. " "Refine the spec or increase fetch.MAX_GLOB_MATCHES".format( msid, MAX_GLOB_MATCHES ) ) return [x.lower() for x in matches], matches # msid not found for this data source return [], [] def _get_table_intervals_as_list(table, check_overlaps=True): """ Determine if the input ``table`` looks like a table of intervals. This can either be a structured array / Table with datestart / datestop or tstart / tstop columns, OR a list of lists. If so, return a list of corresponding start/stop tuples, otherwise return None. If ``check_overlaps`` is True then a check is made to assure that the supplied intervals do not overlap. This is needed when reading multiple intervals with a single call to fetch, but not for bad times filtering. """ intervals = None if isinstance(table, (list, tuple)): try: intervals = [ (DateTime(row[0]).secs, DateTime(row[1]).secs) for row in table ] except Exception: pass else: for prefix in ("date", "t"): start = prefix + "start" stop = prefix + "stop" try: intervals = [ (DateTime(row[start]).secs, DateTime(row[stop]).secs) for row in table ] except Exception: pass else: break # Got an intervals list, now sort if check_overlaps and intervals is not None: intervals = sorted(intervals, key=lambda x: x[0]) # Check for overlaps if any(i0[1] > i1[0] for i0, i1 in zip(intervals[:-1], intervals[1:])): raise ValueError("Input intervals overlap") return intervals
[docs]class MSID(object): """Fetch data from the engineering telemetry archive into an MSID object. The input ``msid`` is case-insensitive and can include linux file "glob" patterns, for instance ``orb*1*_x`` (ORBITEPHEM1_X) or ``*pcadmd`` (AOPCADMD). For derived parameters the initial ``DP_`` is optional, for instance ``dpa_pow*`` (DP_DPA_POWER). :param msid: name of MSID (case-insensitive) :param start: start date of telemetry (Chandra.Time compatible) :param stop: stop date of telemetry (current time if not supplied) :param filter_bad: automatically filter out bad values :param stat: return 5-minute or daily statistics ('5min' or 'daily') :returns: MSID instance """ units = UNITS fetch = sys.modules[__name__] def __init__(self, msid, start=LAUNCH_DATE, stop=None, filter_bad=False, stat=None): msids, MSIDs = msid_glob(msid) if len(MSIDs) > 1: raise ValueError("Multiple matches for {} in Eng Archive".format(msid)) else: self.msid = msids[0] self.MSID = MSIDs[0] # Capture the current module units self.units = Units(self.units["system"]) self.unit = self.units.get_msid_unit(self.MSID) self.stat = stat if stat: self.dt = {"5min": 328, "daily": 86400}[stat] # If ``start`` is actually a table of intervals then fetch # each interval separately and concatenate the results intervals = _get_table_intervals_as_list(start, check_overlaps=True) if intervals is not None: start, stop = intervals[0][0], intervals[-1][1] self.tstart = DateTime(start).secs self.tstop = ( DateTime(stop).secs if stop else DateTime(time.time(), format="unix").secs ) self.datestart = DateTime(self.tstart).date self.datestop = DateTime(self.tstop).date self.data_source = {} self.content = content.get(self.MSID) if self.datestart < DATE2000_LO and self.datestop > DATE2000_HI: intervals = [(self.datestart, DATE2000_HI), (DATE2000_HI, self.datestop)] # Get the times, values, bad values mask from the HDF5 files archive if intervals is None: self._get_data() else: self._get_data_over_intervals(intervals) # If requested filter out bad values and set self.bad = None if filter_bad: self.filter_bad() if "CHETA_FETCH_DATA_GAP" in os.environ: create_msid_data_gap(self, os.environ["CHETA_FETCH_DATA_GAP"]) def __len__(self): return len(self.vals) @property def dtype(self): return self.vals.dtype def __repr__(self): attrs = [self.__class__.__name__, self.MSID] for name, val in ( ("start", self.datestart), ("stop", self.datestop), ("len", len(self)), ("dtype", self.dtype.name), ("unit", self.unit), ("stat", self.stat), ): if val is not None: attrs.append("{}={}".format(name, val)) return "<" + " ".join(attrs) + ">" def _get_data_over_intervals(self, intervals): """ Fetch intervals separately and concatenate the results. """ msids = [] for start, stop in intervals: msids.append( self.fetch.MSID( self.msid, start, stop, filter_bad=False, stat=self.stat ) ) # No bad values column for stat='5min' or 'daily', but still need this attribute. if self.stat: self.bads = None self.colnames = msids[0].colnames for attr in self.colnames: vals = np.concatenate([getattr(msid, attr) for msid in msids]) setattr(self, attr, vals) def _get_data(self): """Get data from the Eng archive""" logger.info( "Getting data for %s between %s to %s", self.msid, self.datestart, self.datestop, ) comp_cls = ComputedMsid.get_matching_comp_cls(self.msid) if comp_cls: self._get_comp_data(comp_cls) return # Avoid stomping on caller's filetype 'ft' values with _cache_ft() with _cache_ft(): ft["content"] = self.content ft["msid"] = self.MSID with _set_msid_files_basedir(self.datestart): if self.stat: if "maude" in data_source.sources(): raise ValueError( "MAUDE data source does not support telemetry statistics" ) ft["interval"] = self.stat self._get_stat_data() else: self.colnames = ["vals", "times", "bads"] args = ( self.content, self.tstart, self.tstop, self.MSID, self.units["system"], ) if ( "cxc" in data_source.sources() and self.MSID in data_source.get_msids("cxc") ): # CACHE is normally True only when doing ingest processing. Note # also that to support caching the get_msid_data_from_cxc_cached # method must be static. get_msid_data = ( self._get_msid_data_from_cxc_cached if CACHE else self._get_msid_data_from_cxc ) self.vals, self.times, self.bads = get_msid_data(*args) self.data_source["cxc"] = _get_start_stop_dates(self.times) if "test-drop-half" in data_source.sources() and hasattr( self, "vals" ): # For testing purposes drop half the data off the end. This assumes another # data_source like 'cxc' has been selected. idx = len(self.vals) // 2 self.vals = self.vals[:idx] self.times = self.times[:idx] self.bads = self.bads[:idx] # Following assumes only one prior data source but ok for controlled testing for source in self.data_source: self.data_source[source] = _get_start_stop_dates(self.times) if ( "maude" in data_source.sources() and self.MSID in data_source.get_msids("maude") ): # Update self.vals, times, bads in place. This might concatenate MAUDE # telemetry to existing CXC values. self._get_msid_data_from_maude(*args) def _get_comp_data(self, comp_cls): logger.info(f"Getting computed values for {self.msid}") # Do computation. This returns a dict of MSID attribute values. attrs = comp_cls(self.units["system"])( self.tstart, self.tstop, self.msid, self.stat ) # Allow upstream class to be a bit sloppy on times and include samples # outside the time range. This can happen with classes that inherit # from DerivedParameter. ok = (attrs["times"] >= self.tstart) & (attrs["times"] <= self.tstop) all_ok = np.all(ok) # List of "colnames", which is the ndarray attributes. There can be # non-ndarray attributes that get returned, including typically 'unit'. self.colnames = [ attr for attr, val in attrs.items() if (isinstance(val, np.ndarray) and len(val) == len(attrs["times"])) ] # Apply attributes to self for attr, val in attrs.items(): if ( not all_ok and isinstance(val, np.ndarray) and len(val) == len(attrs["times"]) ): val = val[ok] setattr(self, attr, val) def _get_stat_data(self): """Do the actual work of getting stats values for an MSID from HDF5 files""" filename = msid_files["stats"].abs logger.info("Opening %s", filename) @local_or_remote_function( "Getting stat data for " + self.MSID + " from Ska eng archive server..." ) def get_stat_data_from_server(filename, dt, tstart, tstop): import tables open_file = getattr(tables, "open_file", None) or tables.openFile h5 = open_file(os.path.join(*filename)) table = h5.root.data times = (table.col("index") + 0.5) * dt row0, row1 = np.searchsorted(times, [tstart, tstop]) table_rows = table[row0:row1] # returns np.ndarray (structured array) h5.close() return (times[row0:row1], table_rows, row0, row1) times, table_rows, row0, row1 = get_stat_data_from_server( _split_path(filename), self.dt, self.tstart, self.tstop ) logger.info("Closed %s", filename) self.bads = None self.times = times self.colnames = ["times"] for colname in table_rows.dtype.names: # Don't like the way columns were named in the stats tables. # Fix that here. colname_out = _plural(colname) if colname != "n" else "samples" if colname_out in ( "vals", "mins", "maxes", "means", "p01s", "p05s", "p16s", "p50s", "p84s", "p95s", "p99s", ): vals = self.units.convert(self.MSID, table_rows[colname]) elif colname_out == "stds": vals = self.units.convert( self.MSID, table_rows[colname], delta_val=True ) else: vals = table_rows[colname] setattr(self, colname_out, vals) self.colnames.append(colname_out) # Redefine the 'vals' attribute to be 'means' if it exists. This is a # more consistent use of the 'vals' attribute and there is little use # for the original sampled version. if hasattr(self, "means"): # Create new attribute midvals and add as a column (fixes kadi#17) self.colnames.append("midvals") self.midvals = self.vals self.vals = self.means # Convert vals to unicode for Python 3+. If this MSID is a # state-valued MSID (with string value) then `vals` is the only possible # string attribute. None of the others like mins/maxes etc will exist. for colname in self.colnames: vals = getattr(self, colname) if vals.dtype.kind == "S": setattr(self, colname, vals.astype("U")) @staticmethod @cache.lru_cache(30) def _get_msid_data_from_cxc_cached(content, tstart, tstop, msid, unit_system): """Do the actual work of getting time and values for an MSID from HDF5 files and cache recent results. Caching is very beneficial for derived parameter updates but not desirable for normal fetch usage.""" return MSID._get_msid_data_from_cxc(content, tstart, tstop, msid, unit_system) @staticmethod def _get_msid_data_from_cxc(content, tstart, tstop, msid, unit_system): """Do the actual work of getting time and values for an MSID from HDF5 files""" # Get a row slice into HDF5 file for this content type that picks out # the required time range plus a little padding on each end. h5_slice = get_interval(content, tstart, tstop) # Cache the last set of TIME values so repeated queries from within a # content type use the already-available times. Use the content, start # row and stop row as key. This guarantees that the times array matches # the subsequent values. cache_key = (content, h5_slice.start, h5_slice.stop) # Read the TIME values either from cache or from disk. if times_cache["key"] == cache_key: logger.info("Using times_cache for %s %s to %s", content, tstart, tstop) times = times_cache["val"] # Already filtered on times_ok times_ok = times_cache["ok"] # For filtering MSID.val and MSID.bad times_all_ok = times_cache["all_ok"] else: ft["msid"] = "time" filename = msid_files["msid"].abs logger.info("Reading %s", filename) @local_or_remote_function( "Getting time data from Ska eng archive server..." ) def get_time_data_from_server(h5_slice, filename): import tables open_file = getattr(tables, "open_file", None) or tables.openFile h5 = open_file(os.path.join(*filename)) times_ok = ~h5.root.quality[h5_slice] times = h5.root.data[h5_slice] h5.close() return (times_ok, times) times_ok, times = get_time_data_from_server(h5_slice, _split_path(filename)) # Filter bad times. Last instance of bad times in archive is 2004 # so don't do this unless needed. Creating a new 'times' array is # much more expensive than checking for np.all(times_ok). times_all_ok = np.all(times_ok) if not times_all_ok: times = times[times_ok] times_cache.update( dict(key=cache_key, val=times, ok=times_ok, all_ok=times_all_ok) ) # Extract the actual MSID values and bad values mask ft["msid"] = msid filename = msid_files["msid"].abs logger.info("Reading %s", filename) @local_or_remote_function( "Getting msid data for " + msid + " from Ska eng archive server..." ) def get_msid_data_from_server(h5_slice, filename): import tables open_file = getattr(tables, "open_file", None) or tables.openFile h5 = open_file(os.path.join(*filename)) vals = h5.root.data[h5_slice] bads = h5.root.quality[h5_slice] h5.close() return (vals, bads) vals, bads = get_msid_data_from_server(h5_slice, _split_path(filename)) # Remote access will return arrays that don't own their data, see #150. # For an explanation see: # https://ipyparallel.readthedocs.io/en/latest/details.html#non-copying-sends-and-numpy-arrays try: bads.flags.writeable = True vals.flags.writeable = True except ValueError: bads = bads.copy() vals = vals.copy() # Filter bad times rows if needed if not times_all_ok: logger.info("Filtering bad times values for %s", msid) bads = bads[times_ok] vals = vals[times_ok] # Slice down to exact requested time range row0, row1 = np.searchsorted(times, [tstart, tstop]) logger.info("Slicing %s arrays [%d:%d]", msid, row0, row1) vals = Units(unit_system).convert(msid.upper(), vals[row0:row1]) times = times[row0:row1] bads = bads[row0:row1] # Possibly expand the bads list for a set of about 30 MSIDs which # have incorrect values in CXCDS telemetry bads = _fix_ctu_dwell_mode_bads(msid, bads) # Change bytestring to (unicode) string if vals.dtype.kind == "S": vals = vals.astype("U") return (vals, times, bads) def _get_msid_data_from_maude(self, content, tstart, tstop, msid, unit_system): """ Get time and values for an MSID from MAUDE. Returned values are (for now) all assumed to be good. """ import maude # Telemetry values from another data_source may already be available. If # so then only query MAUDE from after the last available point. telem_already = hasattr(self, "times") and len(self.times) > 2 if telem_already: tstart = self.times[-1] + 0.001 # Don't fetch the last point again dt = self.times[-1] - self.times[-2] if tstop - tstart < dt * 2: # Already got enough data from the original query, no need to hit MAUDE return # Actually query MAUDE options = data_source.options()["maude"] try: out = maude.get_msids(msids=msid, start=tstart, stop=tstop, **options) except Exception as e: raise Exception("MAUDE query failed: {}".format(e)) # Only one MSID is queried from MAUDE but maude.get_msids() already returns # a list of results, so select the first element. out = out["data"][0] vals = Units(unit_system).convert( msid.upper(), out["values"], from_system="eng" ) times = out["times"] bads = np.zeros(len(vals), dtype=bool) # No 'bad' values from MAUDE self.data_source["maude"] = _get_start_stop_dates(times) self.data_source["maude"]["flags"] = out["flags"] if telem_already: vals = np.concatenate([self.vals, vals]) times = np.concatenate([self.times, times]) bads = np.concatenate([self.bads, bads]) self.vals = vals self.times = times self.bads = bads @property def state_codes(self): """List of state codes tuples (raw_count, state_code) for state-valued MSIDs """ if self.vals.dtype.kind not in ("S", "U"): self._state_codes = None if self.MSID in STATE_CODES: self._state_codes = STATE_CODES[self.MSID] if not hasattr(self, "_state_codes"): import Ska.tdb try: states = Ska.tdb.msids[self.MSID].Tsc except Exception: self._state_codes = None else: if states is None or len(set(states["CALIBRATION_SET_NUM"])) != 1: warnings.warn( "MSID {} has string vals but no state codes " "or multiple calibration sets".format(self.msid) ) self._state_codes = None else: states = np.sort(states.data, order="LOW_RAW_COUNT") self._state_codes = [ (state["LOW_RAW_COUNT"], state["STATE_CODE"]) for state in states ] return self._state_codes @property def raw_vals(self): """Raw counts corresponding to the string state-code values that are stored in ``self.vals`` """ # If this is not a string-type value then there are no raw values if self.vals.dtype.kind not in ("S", "U") or self.state_codes is None: self._raw_vals = None if not hasattr(self, "_raw_vals"): self._raw_vals = np.zeros(len(self.vals), dtype="int8") - 1 # CXC state code telem all has same length with trailing spaces # so find max length for formatting below. max_len = max(len(x[1]) for x in self.state_codes) fmtstr = "{:" + str(max_len) + "s}" for raw_val, state_code in self.state_codes: ok = self.vals == fmtstr.format(state_code) self._raw_vals[ok] = raw_val return self._raw_vals @property def tdb(self): """Access the Telemetry database entries for this MSID""" import Ska.tdb return Ska.tdb.msids[self.MSID]
[docs] def interpolate(self, dt=None, start=None, stop=None, times=None): """Perform nearest-neighbor interpolation of the MSID to the specified time sequence. The time sequence steps uniformly by ``dt`` seconds starting at the ``start`` time and ending at the ``stop`` time. If not provided the times default to the first and last times for the MSID. The MSID ``times`` attribute is set to the common time sequence. In addition a new attribute ``times0`` is defined that stores the nearest neighbor interpolated time, providing the *original* timestamps of each new interpolated value for that MSID. If ``times`` is provided then this gets used instead of the default linear progression from ``start`` and ``dt``. :param dt: time step (sec, default=328.0) :param start: start of interpolation period (DateTime format) :param stop: end of interpolation period (DateTime format) :param times: array of times for interpolation (default=None) """ import Ska.Numpy if times is not None: if any(kwarg is not None for kwarg in (dt, start, stop)): raise ValueError( 'If "times" keyword is set then "dt", "start", ' 'and "stop" cannot be set' ) # Use user-supplied times that are within the range of telemetry. ok = (times >= self.times[0]) & (times <= self.times[-1]) times = times[ok] else: dt = 328.0 if dt is None else dt tstart = DateTime(start).secs if start else self.times[0] tstop = DateTime(stop).secs if stop else self.times[-1] # Legacy method for backward compatibility. Note that the np.arange() # call accrues floating point error. tstart = max(tstart, self.times[0]) tstop = min(tstop, self.times[-1]) times = np.arange(tstart, tstop, dt) logger.info("Interpolating index for %s", self.msid) indexes = Ska.Numpy.interpolate( np.arange(len(self.times)), self.times, times, method="nearest", sorted=True ) logger.info("Slicing on indexes") for colname in self.colnames: colvals = getattr(self, colname) if colvals is not None: setattr(self, colname, colvals[indexes]) # Make a new attribute times0 that stores the nearest neighbor # interpolated times. Then set the MSID times to be the common # interpolation times. self.times0 = self.times self.times = times
def copy(self): from copy import deepcopy return deepcopy(self)
[docs] def filter_bad(self, bads=None, copy=False): """Filter out any bad values. After applying this method the "bads" column will be set to None to indicate that there are no bad values. :param bads: Bad values mask. If not supplied then self.bads is used. :param copy: return a copy of MSID object with bad values filtered """ obj = self.copy() if copy else self # If bad mask is provided then override any existing bad mask for MSID if bads is not None: obj.bads = bads # Nothing to do if bads is None (i.e. bad values already filtered) if obj.bads is None: return if np.any(obj.bads): logger.info("Filtering bad values for %s", obj.msid) ok = ~obj.bads colnames = (x for x in obj.colnames if x != "bads") for colname in colnames: setattr(obj, colname, getattr(obj, colname)[ok]) obj.bads = None if copy: return obj
[docs] def filter_bad_times(self, start=None, stop=None, table=None, copy=False): """Filter out intervals of bad data in the MSID object. There are three usage options: - Supply no arguments. This will use the global list of bad times read in with fetch.read_bad_times(). - Supply both ``start`` and ``stop`` values where each is a single value in a valid DateTime format. - Supply a ``table`` parameter with one of two forms: (1) list of bad time interval strings, where each string contains the start and stop dates separated by a space; (2) string with the name of a file in the same format. Examples:: bad_times = ['2008:292:00:00:00 2008:297:00:00:00', '2008:305:00:12:00 2008:305:00:12:03', '2010:101:00:01:12 2010:101:00:01:25'] msid.filter_bad_times(table=bad_times) msid.filter_bad_times(table='msid_bad_times.dat') :param start: Start of time interval to exclude (any DateTime format) :param stop: End of time interval to exclude (any DateTime format) :param table: List of str with start and stop for bad time intervals or str with name of file containing list of bad time intervals :param copy: return a copy of MSID object with bad times filtered """ if table is not None: bad_times = ascii.read(table, format="no_header", names=["start", "stop"]) elif start is None and stop is None: bad_times = [] for msid_glob, times in msid_bad_times.items(): if fnmatch.fnmatch(self.MSID, msid_glob): bad_times.extend(times) elif start is None or stop is None: raise ValueError( "filter_times requires either 2 args (start, stop) or no args" ) else: bad_times = [(start, stop)] obj = self.copy() if copy else self obj._filter_times(bad_times, exclude=True) if copy: return obj
[docs] def remove_intervals(self, intervals, copy=False): """ Remove telemetry points that occur within the specified ``intervals`` This method is the converse of select_intervals(). The ``intervals`` argument can be either a list of (start, stop) tuples or an EventQuery object from kadi. If ``copy`` is set to True then a copy of the MSID object is made prior to removing intervals, and that copy is returned. The default is to remove intervals in place. This example shows fetching the pitch component of the spacecraft rate. After examining the rates, the samples during maneuvers are then removed and the standard deviation is recomputed. This filters out the large rates during maneuvers:: >>> aorate2 = fetch.Msid('aorate2', '2011:001', '2011:002') >>> aorate2.vals.mean() * 3600 * 180 / np.pi # rate in arcsec/sec 3.9969393528801782 >>> figure(1) >>> aorate2.plot(',') >>> from kadi import events >>> aorate2.remove_intervals(events.manvrs) >>> aorate2.vals.mean() * 3600 * 180 / np.pi # rate in arcsec/sec -0.0003688639491030978 >>> figure(2) >>> aorate2.plot(',') :param intervals: EventQuery or iterable (N x 2) with start, stop dates/times :param copy: return a copy of MSID object with intervals removed """ obj = self.copy() if copy else self obj._filter_times(intervals, exclude=True) if copy: return obj
[docs] def select_intervals(self, intervals, copy=False): """ Select telemetry points that occur within the specified ``intervals`` This method is the converse of remove_intervals(). The ``intervals`` argument can be either a list of (start, stop) tuples or an EventQuery object from kadi. If ``copy`` is set to True then a copy of the MSID object is made prior to selecting intervals, and that copy is returned. The default is to selecte intervals in place. This example shows fetching the pitch component of the spacecraft rate. After examining the rates, the samples during maneuvers are then selected and the mean is recomputed. This highlights the large rates during maneuvers:: >>> aorate2 = fetch.Msid('aorate2', '2011:001', '2011:002') >>> aorate2.vals.mean() * 3600 * 180 / np.pi # rate in arcsec/sec 3.9969393528801782 >>> figure(1) >>> aorate2.plot(',') >>> from kadi import events >>> aorate2.select_intervals(events.manvrs) >>> aorate2.vals.mean() * 3600 * 180 / np.pi # rate in arcsec/sec 24.764309542605481 >>> figure(2) >>> aorate2.plot(',') :param intervals: EventQuery or iterable (N x 2) with start, stop dates/times :param copy: return a copy of MSID object with intervals selected """ obj = self.copy() if copy else self obj._filter_times(intervals, exclude=False) if copy: return obj
def _filter_times(self, intervals, exclude=True): """ Filter the times of self based on ``intervals``. :param intervals: iterable (N x 2) with tstart, tstop in seconds :param exclude: exclude intervals if True, else include intervals """ # Make an initial acceptance mask. If exclude is True then initially # all values are allowed (ok=True). If exclude is False (i.e. only # include the interval times) then ok=False everywhere. ok = np.empty(len(self.times), dtype=bool) ok[:] = exclude # See if the input intervals is actually a table of intervals intervals_list = _get_table_intervals_as_list(intervals, check_overlaps=False) if intervals_list is not None: intervals = intervals_list # Check if this is an EventQuery. Would rather not import EventQuery # because this is expensive (django), so just look at the names in # object MRO. if "EventQuery" in (cls.__name__ for cls in intervals.__class__.__mro__): intervals = intervals.intervals(self.datestart, self.datestop) intervals = [ (DateTime(start).secs, DateTime(stop).secs) for start, stop in intervals ] for tstart, tstop in intervals: if tstart > tstop: raise ValueError( "Start time %s must be less than stop time %s" % (tstart, tstop) ) if tstop < self.times[0] or tstart > self.times[-1]: continue # Find the indexes of bad data. Using side=left,right respectively # will exclude points exactly equal to the bad_times values # (though in reality an exact tie is extremely unlikely). i0 = np.searchsorted(self.times, tstart, side="left") i1 = np.searchsorted(self.times, tstop, side="right") ok[i0:i1] = not exclude colnames = (x for x in self.colnames) for colname in colnames: attr = getattr(self, colname) if isinstance(attr, np.ndarray): setattr(self, colname, attr[ok])
[docs] def write_zip(self, filename, append=False): """Write MSID to a zip file named ``filename`` Within the zip archive the data for this MSID will be stored in csv format with the name <msid_name>.csv. :param filename: output zipfile name :param append: append to an existing zipfile """ import zipfile colnames = self.colnames[:] if self.bads is None and "bads" in colnames: colnames.remove("bads") if self.state_codes: colnames.append("raw_vals") # Indexes value is not interesting for output if "indexes" in colnames: colnames.remove("indexes") colvals = tuple(getattr(self, x) for x in colnames) fmt = ",".join("%s" for x in colnames) f = zipfile.ZipFile( filename, ("a" if append and os.path.exists(filename) else "w") ) info = zipfile.ZipInfo(self.msid + ".csv") info.external_attr = 0o664 << 16 # Set permissions info.date_time = time.localtime()[:7] info.compress_type = zipfile.ZIP_DEFLATED f.writestr( info, ",".join(colnames) + "\n" + "\n".join(fmt % x for x in zip(*colvals)) + "\n", ) f.close()
[docs] def logical_intervals(self, op, val, complete_intervals=False, max_gap=None): """Determine contiguous intervals during which the logical comparison expression "MSID.vals op val" is True. Allowed values for ``op`` are:: == != > < >= <= If ``complete_intervals`` is True (default is False) then the intervals are guaranteed to be complete so that the all reported intervals had a transition before and after within the telemetry interval. If ``max_gap`` is specified then any time gaps longer than ``max_gap`` are filled with a fictitious False value to create an artificial interval boundary at ``max_gap / 2`` seconds from the nearest data value. Returns a structured array table with a row for each interval. Columns are: * datestart: date of interval start * datestop: date of interval stop * duration: duration of interval (sec) * tstart: time of interval start (CXC sec) * tstop: time of interval stop (CXC sec) Examples:: >>> dat = fetch.MSID('aomanend', '2010:001', '2010:005') >>> manvs = dat.logical_intervals('==', 'NEND', complete_intervals=True) >>> dat = fetch.MSID('61PSTS02', '1999:200', '2000:001') >>> safe_suns = dat.logical_intervals('==', 'SSM', max_gap=66) :param op: logical operator, one of == != > < >= <= :param val: comparison value :param complete_intervals: return only complete intervals (default=True) :param max_gap: max allowed gap between time stamps (sec, default=None) :returns: structured array table of intervals """ from . import utils ops = { "==": operator.eq, "!=": operator.ne, ">": operator.gt, "<": operator.lt, ">=": operator.ge, "<=": operator.le, } try: op = ops[op] except KeyError: raise ValueError( 'op = "{}" is not in allowed values: {}'.format(op, sorted(ops.keys())) ) # Do local version of bad value filtering if self.bads is not None and np.any(self.bads): ok = ~self.bads vals = self.vals[ok] times = self.times[ok] else: vals = self.vals times = self.times bools = op(vals, val) return utils.logical_intervals(times, bools, complete_intervals, max_gap)
[docs] def state_intervals(self): """Determine contiguous intervals during which the MSID value is unchanged. Returns a structured array table with a row for each interval. Columns are: * datestart: date of interval start * datestop: date of interval stop * duration: duration of interval (sec) * tstart: time of interval start (CXC sec) * tstop: time of interval stop (CXC sec) * val: MSID value during the interval Example:: dat = fetch.MSID('cobsrqid', '2010:001', '2010:005') obsids = dat.state_intervals() :param val: state value for which intervals are returned. :returns: structured array table of intervals """ from . import utils # Do local version of bad value filtering if self.bads is not None and np.any(self.bads): ok = ~self.bads vals = self.vals[ok] times = self.times[ok] else: vals = self.vals times = self.times if len(self.vals) < 2: raise ValueError("Filtered data length must be at least 2") return utils.state_intervals(times, vals)
[docs] def iplot(self, fmt="-b", fmt_minmax="-c", **plot_kwargs): """Make an interactive plot for exploring the MSID data. This method opens a new plot figure (or clears the current figure) and plots the MSID ``vals`` versus ``times``. This plot can be panned or zoomed arbitrarily and the data values will be fetched from the archive as needed. Depending on the time scale, ``iplot`` displays either full resolution, 5-minute, or daily values. For 5-minute and daily values the min and max values are also plotted. Once the plot is displayed and the window is selected by clicking in it, the following key commands are recognized:: a: autoscale for full data range in x and y m: toggle plotting of min/max values p: pan at cursor x y: toggle autoscaling of y-axis z: zoom at cursor x ?: print help Example:: dat = fetch.Msid('aoattqt1', '2011:001', '2012:001', stat='5min') dat.iplot() dat.iplot('.b', '.c', markersize=0.5) Caveat: the ``iplot()`` method is not meant for use within scripts, and may give unexpected results if used in combination with other plotting commands directed at the same plot figure. :param fmt: plot format for values (default="-b") :param fmt_minmax: plot format for mins and maxes (default="-c") :param plot_kwargs: additional plotting keyword args """ from .plot import MsidPlot self._iplot = MsidPlot(self, fmt, fmt_minmax, **plot_kwargs)
[docs] def plot(self, *args, **kwargs): """Plot the MSID ``vals`` using Ska.Matplotlib.plot_cxctime() This is a convenience function for plotting the MSID values. It is equivalent to:: plot_cxctime(self.times, self.vals, *args, **kwargs) where ``*args`` are additional arguments and ``**kwargs`` are additional keyword arguments that are accepted by ``plot_cxctime()``. Example:: dat = fetch.Msid('tephin', '2011:001', '2012:001', stat='5min') dat.plot('-r', linewidth=2) """ import matplotlib.pyplot as plt from Ska.Matplotlib import plot_cxctime vals = self.raw_vals if self.state_codes else self.vals plot_cxctime(self.times, vals, *args, state_codes=self.state_codes, **kwargs) plt.margins(0.02, 0.05) # Upper-cased version of msid name from user title = self.msid.upper() if self.stat: title = f"{title} ({self.stat})" plt.title(title) if self.unit: plt.ylabel(self.unit)
[docs]class MSIDset(collections.OrderedDict): """Fetch a set of MSIDs from the engineering telemetry archive. Each input ``msid`` is case-insensitive and can include linux file "glob" patterns, for instance ``orb*1*_?`` (ORBITEPHEM1_X, Y and Z) or ``aoattqt[1234]`` (AOATTQT1, 2, 3, and 4). For derived parameters the initial ``DP_`` is optional, for instance ``dpa_pow*`` (DP_DPA_POWER). :param msids: list of MSID names (case-insensitive) :param start: start date of telemetry (Chandra.Time compatible) :param stop: stop date of telemetry (current time if not supplied) :param filter_bad: automatically filter out bad values :param stat: return 5-minute or daily statistics ('5min' or 'daily') :returns: Dict-like object containing MSID instances keyed by MSID name """ MSID = MSID def __init__( self, msids=None, start=LAUNCH_DATE, stop=None, filter_bad=False, stat=None ): if msids is None: msids = [] super(MSIDset, self).__init__() intervals = _get_table_intervals_as_list(start, check_overlaps=True) if intervals is not None: start, stop = intervals[0][0], intervals[-1][1] self.tstart = DateTime(start).secs self.tstop = DateTime(stop).secs if stop else DateTime().secs self.datestart = DateTime(self.tstart).date self.datestop = DateTime(self.tstop).date # Input ``msids`` may contain globs, so expand each and add to new list new_msids = [] for msid in msids: new_msids.extend(msid_glob(msid)[0]) for msid in new_msids: if intervals is None: self[msid] = self.MSID( msid, self.tstart, self.tstop, filter_bad=False, stat=stat ) else: self[msid] = self.MSID(msid, intervals, filter_bad=False, stat=stat) if filter_bad: self.filter_bad() def __deepcopy__(self, memo=None): out = self.__class__([], None) for attr in ("tstart", "tstop", "datestart", "datestop"): setattr(out, attr, getattr(self, attr)) for msid in self: out[msid] = self[msid].copy() return out
[docs] def copy(self): return self.__deepcopy__()
[docs] def filter_bad(self, copy=False, union=False): """Filter bad values for the MSID set. By default (``union=False``) the bad values are filtered individually for each MSID. If ``union=True`` this method applies the union (logical-OR) of bad value masks for all MSIDs in the set with the same content type. The result is that the filtered MSID samples are valid for *all* MSIDs within the content type and the arrays all match up. For example:: msids = fetch.MSIDset(['aorate1', 'aorate2', 'aogyrct1', 'aogyrct2'], '2009:001', '2009:002') msids.filter_bad() Since ``aorate1`` and ``aorate2`` both have content type of ``pcad3eng`` they will be filtered as a group and will remain with the same sampling. This will allow something like:: plot(msids['aorate1'].vals, msids['aorate2'].vals) Likewise the two gyro count MSIDs would be filtered as a group. If this group-filtering is not the desired behavior one can always call the individual MSID.filter_bad() function for each MSID in the set:: for msid in msids.values(): msid.filter_bad() :param copy: return a copy of MSID object with intervals selected """ obj = self.copy() if copy else self if not union: # Filter bad values individually for each MSID for msid in obj.values(): msid.filter_bad() # Maintain existing function API to return None for copy=False return obj if copy else None # Union of bad value masks for all MSIDs in the set with the same # content type. for content in set(x.content for x in obj.values()): bads = None msids = [ x for x in obj.values() if x.content == content and x.bads is not None ] for msid in msids: if bads is None: bads = msid.bads.copy() else: bads |= msid.bads for msid in msids: msid.filter_bad(bads) if copy: return obj
[docs] def filter_bad_times(self, start=None, stop=None, table=None, copy=False): """Filter out intervals of bad data in the MSIDset object. There are three usage options: - Supply no arguments. This will use the global list of bad times read in with fetch.read_bad_times(). - Supply both ``start`` and ``stop`` values where each is a single value in a valid DateTime format. - Supply a ``table`` parameter with one of two forms: (1) list of bad time interval strings, where each string contains the start and stop dates separated by a space; (2) string with the name of a file in the same format. Examples:: msidset.filter_bad_times() bad_times = ['2008:292:00:00:00 2008:297:00:00:00', '2008:305:00:12:00 2008:305:00:12:03', '2010:101:00:01:12 2010:101:00:01:25'] msidset.filter_bad_times(table=bad_times) msidset.filter_bad_times(table='msid_bad_times.dat') :param start: Start of time interval to exclude (any DateTime format) :param stop: End of time interval to exclude (any DateTime format) :param table: List of str with start and stop for bad time intervals or str with name of file containing list of bad time intervals :param copy: return a copy of MSID object with intervals selected """ obj = self.copy() if copy else self for msid in obj.values(): msid.filter_bad_times(start, stop, table) if copy: return obj
[docs] def interpolate( self, dt=None, start=None, stop=None, filter_bad=True, times=None, bad_union=False, copy=False, ): """ Perform nearest-neighbor interpolation of all MSID values in the set to a common time sequence. The values are updated in-place. **Times** The time sequence steps uniformly by ``dt`` seconds starting at the ``start`` time and ending at the ``stop`` time. If not provided the times default to the ``start`` and ``stop`` times for the MSID set. If ``times`` is provided then this gets used instead of the default linear progression from ``start`` and ``dt``. For each MSID in the set the ``times`` attribute is set to the common time sequence. In addition a new attribute ``times0`` is defined that stores the nearest neighbor interpolated time, providing the *original* timestamps of each new interpolated value for that MSID. **Filtering and bad values** If ``filter_bad`` is True (default) then bad values are filtered from the interpolated MSID set. There are two strategies for doing this: 1) ``bad_union = False`` Remove the bad values in each MSID prior to interpolating the set to a common time series. This essentially says to use all the available data individually. Each MSID has bad data filtered individually *before* interpolation so that the nearest neighbor interpolation only finds good data. This strategy is done when ``filter_union = False``, which is the default setting. 2) ``bad_union = True`` Mark every MSID in the set as bad at the interpolated time if *any* of them are bad at that time. This stricter version is required when it is important that the MSIDs be truly correlated in time. For instance this is needed for attitude quaternions since all four values must be from the exact same telemetry sample. If you are not sure, this is the safer option. :param dt: time step (sec, default=328.0) :param start: start of interpolation period (DateTime format) :param stop: end of interpolation period (DateTime format) :param filter_bad: filter bad values :param times: array of times for interpolation (default=None) :param bad_union: filter union of bad values after interpolating :param copy: return a new copy instead of in-place update (default=False) """ import Ska.Numpy obj = self.copy() if copy else self msids = list(obj.values()) # MSID objects in the MSIDset # Ensure that tstart / tstop is entirely within the range of available # data fetched from the archive. max_fetch_tstart = max(msid.times[0] for msid in msids) min_fetch_tstop = min(msid.times[-1] for msid in msids) if times is not None: if any(kwarg is not None for kwarg in (dt, start, stop)): raise ValueError( 'If "times" keyword is set then "dt", "start", ' 'and "stop" cannot be set' ) # Use user-supplied times that are within the range of telemetry. ok = (times >= max_fetch_tstart) & (times <= min_fetch_tstop) obj.times = times[ok] else: # Get the nominal tstart / tstop range dt = 328.0 if dt is None else dt tstart = DateTime(start).secs if start else obj.tstart tstop = DateTime(stop).secs if stop else obj.tstop tstart = max(tstart, max_fetch_tstart) tstop = min(tstop, min_fetch_tstop) obj.times = np.arange((tstop - tstart) // dt + 1) * dt + tstart for msid in msids: if filter_bad and not bad_union: msid.filter_bad() logger.info("Interpolating index for %s", msid.msid) indexes = Ska.Numpy.interpolate( np.arange(len(msid.times)), msid.times, obj.times, method="nearest", sorted=True, ) logger.info("Slicing on indexes") for colname in msid.colnames: colvals = getattr(msid, colname) if colvals is not None: setattr(msid, colname, colvals[indexes]) # Make a new attribute times0 that stores the nearest neighbor # interpolated times. Then set the MSID times to be the common # interpolation times. msid.times0 = msid.times msid.times = obj.times if bad_union: common_bads = np.zeros(len(obj.times), dtype=bool) for msid in msids: if msid.stat is None and msid.bads is None: warnings.warn( "WARNING: {!r} MSID has bad values already filtered.\n" "This prevents `filter_bad_union` from working as expected.\n" "Use MSIDset (not Msidset) with filter_bad=False.\n" ) if msid.bads is not None: # 5min and daily stats have no bad values common_bads |= msid.bads # Apply the common bads array and optional filter out these bad values for msid in msids: msid.bads = common_bads if filter_bad: msid.filter_bad() # Filter MSIDset-level times attr to match invididual MSIDs if filter_bad is True if filter_bad: obj.times = obj.times[~common_bads] if copy: return obj
[docs] def write_zip(self, filename): """Write MSIDset to a zip file named ``filename`` Within the zip archive the data for each MSID in the set will be stored in csv format with the name <msid_name>.csv. :param filename: output zipfile name """ append = False for msid in self.values(): msid.write_zip(filename, append=append) append = True
[docs]class Msid(MSID): """ Fetch data from the engineering telemetry archive into an MSID object. Same as MSID class but with filter_bad=True by default. :param msid: name of MSID (case-insensitive) :param start: start date of telemetry (Chandra.Time compatible) :param stop: stop date of telemetry (current time if not supplied) :param filter_bad: automatically filter out bad values :param stat: return 5-minute or daily statistics ('5min' or 'daily') :param unit_system: Unit system (cxc|eng|sci, default=current units) :returns: MSID instance """ units = UNITS def __init__(self, msid, start=LAUNCH_DATE, stop=None, filter_bad=True, stat=None): super(Msid, self).__init__( msid, start=start, stop=stop, filter_bad=filter_bad, stat=stat )
[docs]class Msidset(MSIDset): """Fetch a set of MSIDs from the engineering telemetry archive. Same as MSIDset class but with filter_bad=True by default. :param msids: list of MSID names (case-insensitive) :param start: start date of telemetry (Chandra.Time compatible) :param stop: stop date of telemetry (current time if not supplied) :param filter_bad: automatically filter out bad values :param stat: return 5-minute or daily statistics ('5min' or 'daily') :param unit_system: Unit system (cxc|eng|sci, default=current units) :returns: Dict-like object containing MSID instances keyed by MSID name """ MSID = MSID def __init__(self, msids, start=LAUNCH_DATE, stop=None, filter_bad=True, stat=None): super(Msidset, self).__init__( msids, start=start, stop=stop, filter_bad=filter_bad, stat=stat )
class HrcSsMsid(Msid): """ Fetch data from the engineering telemetry archive into an MSID object. Same as MSID class but with filter_bad=True by default. :param msid: name of MSID (case-insensitive) :param start: start date of telemetry (Chandra.Time compatible) :param stop: stop date of telemetry (current time if not supplied) :param filter_bad: automatically filter out bad values :param stat: return 5-minute or daily statistics ('5min' or 'daily') :param unit_system: Unit system (cxc|eng|sci, default=current units) :returns: MSID instance """ units = UNITS def __new__(self, msid, start=LAUNCH_DATE, stop=None, stat=None): ss_msids = "2TLEV1RT 2VLEV1RT 2SHEV1RT 2TLEV2RT 2VLEV2RT 2SHEV2RT" if msid.upper() not in ss_msids.split(): raise ValueError( "MSID {} is not in HRC secondary science ({})".format(msid, ss_msids) ) # If this is not full-resolution then add boolean bads mask to individual MSIDs msids = [msid, "HRC_SS_HK_BAD"] out = MSIDset(msids, start=start, stop=stop, stat=stat) if stat is not None: for m in msids: out[m].bads = np.zeros(len(out[m].vals), dtype=np.bool) # Set bad mask i_bads = np.flatnonzero(out["HRC_SS_HK_BAD"].vals > 0) out["HRC_SS_HK_BAD"].bads[i_bads] = True # For full-resolution smear the bad mask out by +/- 5 samples if stat is None: for i_bad in i_bads: i0 = max(0, i_bad - 5) i1 = i_bad + 5 out["HRC_SS_HK_BAD"].bads[i0:i1] = True # Finally interpolate and filter out bad values out.interpolate(times=out[msid].times, bad_union=True, filter_bad=True) return out[msid] class memoized(object): """Decorator that caches a function's return value each time it is called. If called later with the same arguments, the cached value is returned, and not re-evaluated. """ def __init__(self, func): self.func = func self.cache = {} def __call__(self, *args): try: return self.cache[args] except KeyError: self.cache[args] = value = self.func(*args) return value except TypeError: # uncachable -- for instance, passing a list as an argument. # Better to not cache than to blow up entirely. return self.func(*args) def __repr__(self): """Return the function's docstring.""" return self.func.__doc__
[docs]def get_time_range(msid, format=None): """ Get the time range for the given ``msid``. :param msid: MSID name :param format: Output format (DateTime format, e.g. 'secs', 'date', 'greta') :returns: (tstart, tstop) in CXC seconds """ MSID = msid.upper() with _cache_ft(): ft["content"] = content[MSID] ft["msid"] = "time" filename = msid_files["msid"].abs logger.info("Reading %s", filename) @local_or_remote_function("Getting time range from Ska eng archive server...") def get_time_data_from_server(filename): import tables open_file = getattr(tables, "open_file", None) or tables.openFile h5 = open_file(os.path.join(*filename)) tstart = h5.root.data[0] tstop = h5.root.data[-1] h5.close() return tstart, tstop if filename in CONTENT_TIME_RANGES: tstart, tstop = CONTENT_TIME_RANGES[filename] else: tstart, tstop = get_time_data_from_server(_split_path(filename)) CONTENT_TIME_RANGES[filename] = (tstart, tstop) if format is not None: tstart = getattr(DateTime(tstart), format) tstop = getattr(DateTime(tstop), format) return tstart, tstop
[docs]def get_telem( msids, start=None, stop=None, sampling="full", unit_system="eng", interpolate_dt=None, remove_events=None, select_events=None, time_format=None, outfile=None, quiet=False, max_fetch_Mb=1000, max_output_Mb=100, ): """ High-level routine to get telemetry for one or more MSIDs and perform common processing functions: - Fetch a set of MSIDs over a time range, specifying the sampling as either full-resolution, 5-minute, or daily data. - Filter out bad or missing data. - Interpolate (resample) all MSID values to a common uniformly-spaced time sequence. - Remove or select time intervals corresponding to specified Kadi event types. - Change the time format from CXC seconds (seconds since 1998.0) to something more convenient like GRETA time. - Write the MSID telemetry data to a zipfile. :param msids: MSID(s) to fetch (string or list of strings)') :param start: Start time for data fetch (default=<stop> - 30 days) :param stop: Stop time for data fetch (default=NOW) :param sampling: Data sampling (full | 5min | daily) (default=full) :param unit_system: Unit system for data (eng | sci | cxc) (default=eng) :param interpolate_dt: Interpolate to uniform time steps (secs, default=None) :param remove_events: Remove kadi events expression (default=None) :param select_events: Select kadi events expression (default=None) :param time_format: Output time format (secs|date|greta|jd|..., default=secs) :param outfile: Output file name (default=None) :param quiet: Suppress run-time logging output (default=False) :param max_fetch_Mb: Max allowed memory (Mb) for fetching (default=1000) :param max_output_Mb: Max allowed memory (Mb) for file output (default=100) :returns: MSIDset object """ from .get_telem import get_telem return get_telem( msids, start, stop, sampling, unit_system, interpolate_dt, remove_events, select_events, time_format, outfile, quiet, max_fetch_Mb, max_output_Mb, )
@lru_cache_timed(maxsize=1000, timeout=600) def get_interval(content, tstart, tstop): """ Get the approximate row intervals that enclose the specified ``tstart`` and ``tstop`` times for the ``content`` type. The output of this function is cached with an LRU cache of the most recent 1000 results. The cache expires every 10 minutes to ensure that a persistent session will get new data if the archive gets updated. :param content: content type (e.g. 'pcad3eng', 'thm1eng') :param tstart: start time (CXC seconds) :param tstop: stop time (CXC seconds) :returns: rowslice """ ft["content"] = content @local_or_remote_function( "Getting interval data from " + "DB on Ska eng archive server..." ) def get_interval_from_db(tstart, tstop, server): import Ska.DBI db = Ska.DBI.DBI(dbi="sqlite", server=os.path.join(*server)) query_row = db.fetchone( "SELECT tstart, rowstart FROM archfiles " "WHERE filetime < ? order by filetime desc", (tstart,), ) if not query_row: query_row = db.fetchone( "SELECT tstart, rowstart FROM archfiles order by filetime asc" ) rowstart = query_row["rowstart"] query_row = db.fetchone( "SELECT tstop, rowstop FROM archfiles " "WHERE filetime > ? order by filetime asc", (tstop,), ) if not query_row: query_row = db.fetchone( "SELECT tstop, rowstop FROM archfiles order by filetime desc" ) rowstop = query_row["rowstop"] return slice(rowstart, rowstop) return get_interval_from_db(tstart, tstop, _split_path(msid_files["archfiles"].abs)) @contextlib.contextmanager def _cache_ft(): """ Cache the global filetype ``ft`` context variable so that fetch operations do not corrupt user values of ``ft``. """ ft_cache_pickle = pickle.dumps(ft) try: yield finally: ft_cache = pickle.loads(ft_cache_pickle) ft.update(ft_cache) delkeys = [x for x in ft if x not in ft_cache] for key in delkeys: del ft[key] @contextlib.contextmanager def _set_msid_files_basedir(datestart, msid_files=msid_files): """ If datestart is before 2000:001:00:00:00 then use the 1999 archive files. """ try: cache_basedir = msid_files.basedir if datestart < DATE2000_LO: # Note: don't use os.path.join because ENG_ARCHIVE and basedir must # use linux '/' convention but this might be running on Windows. dirs = msid_files.basedir.split(os.pathsep) msid_files.basedir = os.pathsep.join(dir_ + "/1999" for dir_ in dirs) yield finally: msid_files.basedir = cache_basedir def _fix_ctu_dwell_mode_bads(msid, bads): """ Because of an issue related to the placement of the dwell mode flag, MSIDs that get stepped on in dwell mode get a bad value at the beginning of a dwell mode, while the dwell mode values (DWELLnn) get a bad value at the end. This does a simple brute-force fix of expanding any section of bad values by ones sample in the appropriate direction. """ MSID = msid.upper() stepped_on_msids = ( "4PRT5BT", "4RT585T", "AFLCA3BI", "AIRU1BT", "CSITB5V", "CUSOAOVN", "ECNV3V", "PLAED4ET", "PR1TV01T", "TCYLFMZM", "TOXTSUPN", "ES1P5CV", "ES2P5CV", ) if MSID in stepped_on_msids or re.match(r"DWELL\d\d", MSID): # Find transitions from good value to bad value. Turn that # good value to bad to extend the badness by one sample. ok = (bads[:-1] == False) & (bads[1:] == True) # noqa bads[:-1][ok] = True return bads def add_logging_handler(level=logging.INFO, formatter=None, handler=None): """Configure logging for fetch module. :param level: logging level (logging.DEBUG, logging.INFO, etc) :param formatter: logging.Formatter (default: Formatter('%(funcName)s: %(message)s')) :param handler: logging.Handler (default: StreamHandler()) """ if formatter is None: formatter = logging.Formatter("%(funcName)s: %(message)s") if handler is None: handler = logging.StreamHandler() handler.setFormatter(formatter) logger.setLevel(level) logger.addHandler(handler) def _plural(x): """Return English plural of ``x``. Super-simple and only valid for the known small set of cases within fetch where it will get applied. """ return x + "es" if (x.endswith("x") or x.endswith("s")) else x + "s" def get_data_gap_spec_parser(): """ Get parser for fetch data gap specification. This env var is in the form of a command line argument string with the following command line options:: --include INCLUDE Include MSIDs matching glob (default="*", can be repeated) --exclude EXCLUDE Exclude MSIDs matching glob (default=None, can be repeated) --start START Gap start (relative seconds or abs time) --stop STOP Gap stop (relative seconds or abs time) For example:: "--exclude=*EPHEM* --start=-25000 --stop=-2000" """ import argparse parser = argparse.ArgumentParser() parser.add_argument( "--include", default=[], action="append", help="Include MSIDs matching glob (default='*', can be repeated)", ) parser.add_argument( "--exclude", default=[], action="append", help="Exclude MSIDs matching glob (default=None, can be repeated)", ) parser.add_argument( "--start", default=-30000, help="Gap start (relative seconds or abs time)" ) parser.add_argument( "--stop", default=-3000, help="Gap stop (relative seconds or abs time)" ) return parser def msid_matches_data_gap_spec(msid, includes, excludes): from fnmatch import fnmatch msid = msid.upper() includes = includes or ["*"] match = any(fnmatch(msid, include.upper()) for include in includes) and not any( fnmatch(msid, exclude.upper()) for exclude in excludes ) return match
[docs]def create_msid_data_gap(msid_obj: MSID, data_gap_spec: str): """ Make a data gap in the ``msid_obj`` (in-place) by removing data points. This is mostly useful for testing via setting the ``CHETA_FETCH_DATA_GAP`` environment variable. The ``data_gap_spec`` string corresponds to a command line argument string with the following options:: --include INCLUDE Include MSIDs matching glob (default="*", can be repeated) --exclude EXCLUDE Exclude MSIDs matching glob (default=None, can be repeated) --start START Gap start (CxoTimeLike) --stop STOP Gap stop (CxoTimeLike) For example:: >>> dat = fetch.MSID('aopcadmd', '2010:001', '2010:002') >>> gap_spec = "--start=-2010:001:12:00:00 --stop=2010:001:12:30:00" >>> fetch.create_msid_data_gap(dat, gap_spec) :param msid_obj: MSID object :param data_gap_spec: data gap specification """ import shlex from cxotime import CxoTime parser = get_data_gap_spec_parser() args = parser.parse_args(shlex.split(data_gap_spec)) if msid_matches_data_gap_spec(msid_obj.MSID, args.include, args.exclude): start = CxoTime(args.start) stop = CxoTime(args.stop) logger.info( f"Creating data gap for {msid_obj.MSID} " f"from {start.date} to {stop.date}" ) i0, i1 = np.searchsorted(msid_obj.times, [start.secs, stop.secs]) for attr in msid_obj.colnames: val = getattr(msid_obj, attr) if val is not None: val_new = np.concatenate([val[:i0], val[i1:]]) setattr(msid_obj, attr, val_new)