import logging
import numpy as np
from astropy.table import MaskedColumn
from chandra_aca import transform
from Quaternion import Quat
from aca_view.utils import ExceptionFreeDecorator
from . import ska_data
logger = logging.getLogger("aca_view.data")
[docs]
@ExceptionFreeDecorator(logger="aca_view.data", log_level=logging.DEBUG)
def add_residuals(data, settings):
"""
Add residuals for each slot in `data['slot_data']`.
Input data should be a dictionary of the form::
{'slot_data': Table, 'non_slot_data': Table}
This adds the following masked columns: PRED_YAGS, PRED_ZAGS, DYAGS, DZAGS, AGASC_ID, RA, DEC.
If there is no catalog associated to a given time, the corresponding row is masked.
This does not use chandra_aca.centroid_resid, because:
- chandra_aca.centroid_resid uses cheta for attitudes (not available in real-time)
- chandra_aca.centroid_resid does a bunch of things we do not need
(such as shifting times, using other aspect solutions)
- if there is no star or no catalog, a sort of generic exception is raised
This function should never raise an exception.
:param data: dict
:param settings: dict
"""
non_slot_data, slot_data = data["non_slot_data"], data["slot_data"]
if "starcat_date" not in non_slot_data.colnames:
logger.debug("No starcat_date column in slot_data")
return
logger.debug("add_residuals")
starcat_dates = np.unique(non_slot_data["starcat_date"])
scenario = settings.get("kadi_commands_scenario", ska_data.DEFAULT_SKA_DATA_SOURCES)
ska_data_sources = ",".join(
settings.get("ska_data_sources", ska_data.DEFAULT_SKA_DATA_SOURCES)
)
cats = {
starcat_date: ska_data.get_starcat(
starcat_date,
ska_api_url=settings.get("ska_api_url", ska_data.DEFAULT_API_URL),
ska_data_sources=ska_data_sources,
scenario=scenario,
)
for starcat_date in starcat_dates
if starcat_date
}
logger.debug(f"starcats: {list(cats.keys())}")
img_starcat_date = ska_data.get_starcats(
slot_data["TIME"],
ska_api_url=settings.get("ska_api_url", ska_data.DEFAULT_API_URL),
ska_data_sources=ska_data_sources,
scenario=scenario,
)
slot_data["PRED_YAGS"] = MaskedColumn(
name="PRED_YAGS", dtype=float, length=len(slot_data)
)
slot_data["PRED_ZAGS"] = MaskedColumn(
name="PRED_ZAGS", dtype=float, length=len(slot_data)
)
slot_data["DYAGS"] = MaskedColumn(name="DYAGS", dtype=float, length=len(slot_data))
slot_data["DZAGS"] = MaskedColumn(name="DZAGS", dtype=float, length=len(slot_data))
slot_data["AGASC_ID"] = MaskedColumn(
name="AGASC_ID", dtype=int, length=len(slot_data)
)
slot_data["RA"] = MaskedColumn(name="RA", dtype=np.float64, length=len(slot_data))
slot_data["DEC"] = MaskedColumn(name="DEC", dtype=np.float64, length=len(slot_data))
# I use the time to line samples up, but time might differ slightly due to floating point
# precision. Just rounding does not work always work, so I round in multiples of 1.025.
# this can still fail if there is a time shift within this interval.
slot_data_t = slot_data["TIME"]
non_slot_data_t = non_slot_data["TIME"]
tmin = slot_data_t.min()
slot_data_t = np.round((slot_data_t - tmin) / 0.25625)
non_slot_data_t = np.round((non_slot_data_t - tmin) / 0.25625)
for slot in np.unique(slot_data["IMGNUM"]):
for starcat_date, cat in cats.items():
stars = cat[
(cat["slot"] == slot)
& ((cat["type"] == "GUI") | (cat["type"] == "BOT"))
]
if len(stars) == 0:
logger.debug(f"no star in slot {slot} for starcat {starcat_date}")
continue
sel = (slot_data["IMGNUM"] == slot) & (img_starcat_date == starcat_date)
if not np.count_nonzero(sel):
logger.debug(
f"no star in slot {slot} for starcat {starcat_date} in this sample"
)
continue
star = ska_data.get_star(stars[0]["id"], date=slot_data["TIME"][0])
slot_data["AGASC_ID"][sel] = star["AGASC_ID"]
slot_data["RA"][sel] = star["RA_PMCORR"]
slot_data["DEC"][sel] = star["DEC_PMCORR"]
eci = transform.radec_to_eci(slot_data["RA"][sel], slot_data["DEC"][sel])
idx = np.searchsorted(non_slot_data_t, slot_data_t[sel])
attitude = np.vstack(
[
np.array(non_slot_data[k][idx])
for k in ["AOATTQT1", "AOATTQT2", "AOATTQT3", "AOATTQT4"]
]
)
attitude = attitude.T
d_aca = np.einsum("ijk,ij->ik", Quat(q=attitude).transform, eci).T
slot_data["PRED_YAGS"][sel] = (
np.rad2deg(np.arctan2(d_aca[1], d_aca[0])) * 3600
)
slot_data["PRED_ZAGS"][sel] = (
np.rad2deg(np.arctan2(d_aca[2], d_aca[0])) * 3600
)
slot_data["DYAGS"][sel] = (
slot_data["YAGS"][sel] - slot_data["PRED_YAGS"][sel]
)
slot_data["DZAGS"][sel] = (
slot_data["ZAGS"][sel] - slot_data["PRED_ZAGS"][sel]
)