Source code for aca_view.data.core.add_residuals

import logging

import numpy as np
from astropy.table import MaskedColumn
from chandra_aca import transform
from Quaternion import Quat

from aca_view.utils import ExceptionFreeDecorator

from . import ska_data

logger = logging.getLogger("aca_view.data")


[docs] @ExceptionFreeDecorator(logger="aca_view.data", log_level=logging.DEBUG) def add_residuals(data, settings): """ Add residuals for each slot in `data['slot_data']`. Input data should be a dictionary of the form:: {'slot_data': Table, 'non_slot_data': Table} This adds the following masked columns: PRED_YAGS, PRED_ZAGS, DYAGS, DZAGS, AGASC_ID, RA, DEC. If there is no catalog associated to a given time, the corresponding row is masked. This does not use chandra_aca.centroid_resid, because: - chandra_aca.centroid_resid uses cheta for attitudes (not available in real-time) - chandra_aca.centroid_resid does a bunch of things we do not need (such as shifting times, using other aspect solutions) - if there is no star or no catalog, a sort of generic exception is raised This function should never raise an exception. :param data: dict :param settings: dict """ non_slot_data, slot_data = data["non_slot_data"], data["slot_data"] if "starcat_date" not in non_slot_data.colnames: logger.debug("No starcat_date column in slot_data") return logger.debug("add_residuals") starcat_dates = np.unique(non_slot_data["starcat_date"]) scenario = settings.get("kadi_commands_scenario", ska_data.DEFAULT_SKA_DATA_SOURCES) ska_data_sources = ",".join( settings.get("ska_data_sources", ska_data.DEFAULT_SKA_DATA_SOURCES) ) cats = { starcat_date: ska_data.get_starcat( starcat_date, ska_api_url=settings.get("ska_api_url", ska_data.DEFAULT_API_URL), ska_data_sources=ska_data_sources, scenario=scenario, ) for starcat_date in starcat_dates if starcat_date } logger.debug(f"starcats: {list(cats.keys())}") img_starcat_date = ska_data.get_starcats( slot_data["TIME"], ska_api_url=settings.get("ska_api_url", ska_data.DEFAULT_API_URL), ska_data_sources=ska_data_sources, scenario=scenario, ) slot_data["PRED_YAGS"] = MaskedColumn( name="PRED_YAGS", dtype=float, length=len(slot_data) ) slot_data["PRED_ZAGS"] = MaskedColumn( name="PRED_ZAGS", dtype=float, length=len(slot_data) ) slot_data["DYAGS"] = MaskedColumn(name="DYAGS", dtype=float, length=len(slot_data)) slot_data["DZAGS"] = MaskedColumn(name="DZAGS", dtype=float, length=len(slot_data)) slot_data["AGASC_ID"] = MaskedColumn( name="AGASC_ID", dtype=int, length=len(slot_data) ) slot_data["RA"] = MaskedColumn(name="RA", dtype=np.float64, length=len(slot_data)) slot_data["DEC"] = MaskedColumn(name="DEC", dtype=np.float64, length=len(slot_data)) # I use the time to line samples up, but time might differ slightly due to floating point # precision. Just rounding does not work always work, so I round in multiples of 1.025. # this can still fail if there is a time shift within this interval. slot_data_t = slot_data["TIME"] non_slot_data_t = non_slot_data["TIME"] tmin = slot_data_t.min() slot_data_t = np.round((slot_data_t - tmin) / 0.25625) non_slot_data_t = np.round((non_slot_data_t - tmin) / 0.25625) for slot in np.unique(slot_data["IMGNUM"]): for starcat_date, cat in cats.items(): stars = cat[ (cat["slot"] == slot) & ((cat["type"] == "GUI") | (cat["type"] == "BOT")) ] if len(stars) == 0: logger.debug(f"no star in slot {slot} for starcat {starcat_date}") continue sel = (slot_data["IMGNUM"] == slot) & (img_starcat_date == starcat_date) if not np.count_nonzero(sel): logger.debug( f"no star in slot {slot} for starcat {starcat_date} in this sample" ) continue star = ska_data.get_star(stars[0]["id"], date=slot_data["TIME"][0]) slot_data["AGASC_ID"][sel] = star["AGASC_ID"] slot_data["RA"][sel] = star["RA_PMCORR"] slot_data["DEC"][sel] = star["DEC_PMCORR"] eci = transform.radec_to_eci(slot_data["RA"][sel], slot_data["DEC"][sel]) idx = np.searchsorted(non_slot_data_t, slot_data_t[sel]) attitude = np.vstack( [ np.array(non_slot_data[k][idx]) for k in ["AOATTQT1", "AOATTQT2", "AOATTQT3", "AOATTQT4"] ] ) attitude = attitude.T d_aca = np.einsum("ijk,ij->ik", Quat(q=attitude).transform, eci).T slot_data["PRED_YAGS"][sel] = ( np.rad2deg(np.arctan2(d_aca[1], d_aca[0])) * 3600 ) slot_data["PRED_ZAGS"][sel] = ( np.rad2deg(np.arctan2(d_aca[2], d_aca[0])) * 3600 ) slot_data["DYAGS"][sel] = ( slot_data["YAGS"][sel] - slot_data["PRED_YAGS"][sel] ) slot_data["DZAGS"][sel] = ( slot_data["ZAGS"][sel] - slot_data["PRED_ZAGS"][sel] )