# Licensed under a 3-clause BSD style license - see LICENSE.rst
import csv
import re
import warnings
from itertools import count
from astropy.table import Table
from ska_helpers.utils import LazyDict, convert_to_int_float_str
from parse_cm import or_list_comments
from .common import _get_content_as_lines
__all__ = ['read_or_list', 'read_or_list_full', 'get_zero_offset_table']
[docs]
def get_zero_offset_table(content):
"""
Extract zero offset table from OR filename or OR text as list of strings
:param content: OR list filename or OR list as list of strings
:returns: table of zero offset entries
"""
try:
lines = _get_content_as_lines(content)
i_start = lines.index('***ZERO-OFFSET AIMPOINTS START') + 1
i_stop = lines.index('***ZERO-OFFSET AIMPOINTS END')
out = Table.read(lines[i_start:i_stop], format='ascii.basic', guess=False)
# Ensure the Table is a valid zero-offsets table. SOT MP appears to
# generate non-operational OR lists with invalid tables embedded.
assert out.colnames == 'obsid cycle date detector chipx chipy chip_id'.split()
except Exception:
out = None
return out
def set_zero_offset_values(obss, lines, op19):
"""Include the zero offset table values in each OR output of obss"""
zero_offset_table = get_zero_offset_table(lines)
if zero_offset_table is None:
return
id_name = 'id' if op19 else 'obsid'
zero_offsets = {row['obsid']: row for row in zero_offset_table}
for obs in obss:
if (obs_zero_offset := zero_offsets.get(obs[id_name])) is not None:
for col in ['cycle', 'date', 'detector', 'chipx', 'chipy', 'chip_id']:
# For back compatibility with legacy parser which skipped this.
if col == 'date' and not op19:
continue
# Define the output dict, either a 'zero_offset' dict in obs
# (for the full reader) or obs itself for the legacy reader.
zo = obs.setdefault('zero_offset', {}) if op19 else obs
zo[col] = obs_zero_offset[col]
def get_comments(lines):
"""Get all the comment blocks in the file as a dict of list.
Parameters
----------
lines : list of str
Lines of the OR list file
Returns
-------
dict
Dict of list of comments (str) keyed by COMMENT ID (49999 or ObsID).
"""
comments = {}
comment_blocks = or_list_comments.get_comment_blocks(lines, include_49999=True)
for comment_block in comment_blocks:
comments.setdefault(comment_block["obsid"], []).append(comment_block["comment"])
return comments
[docs]
def read_or_list(content, asdict=False):
"""
Read an OR list file or content (DEPRECATED, use read_or_list_full instead)
This uses a legacy parser which is incomplete and does not parse all OR
parameters or arguments. The parameter names are adhoc.
An example output is::
{
"chip_id": 7,
"chipx": 200.7,
"chipy": 476.9,
"cycle": 17,
"detector": "ACIS-S",
"dither_on": "ON",
"dither_y_amp": 0.002222,
"dither_y_freq": 0.36,
"dither_y_phase": 0.0,
"dither_z_amp": 0.002222,
"dither_z_freq": 0.5091,
"dither_z_phase": 0.0,
"duration": 25000.0,
"grating": "HETG",
"mon_dec": -7.735581,
"mon_ra": 111.463705,
"obsid": 18799,
"priority": 5,
"si": "ACIS-S",
"si_mode": "TE_0088E",
"sim_offset_x": 0,
"sim_offset_z": 0,
"target_dec": 65.719444,
"target_name": "Mkn 876",
"target_offset_y": 0.0,
"target_offset_z": 0.0,
"target_ra": 243.488333,
}
:param content: OR list file name or lines
:param asdict: return dict instead of list
:returns: list of dict ORs or dict of ORs keyed by obsid (if asdict is True)
"""
warnings.warn(
"read_or_list is deprecated and will be removed, use read_or_list_full instead",
FutureWarning,
stacklevel=2,
)
lines = _get_content_as_lines(content)
i_starts = (i for i, line in enumerate(lines) if line.startswith('OBS,'))
obss = []
for i_start in i_starts:
for i_off in count():
# Parse OBS record when first line without a trailing comma is found
if not lines[i_start + i_off].endswith(','):
obs = parse_obs(lines[i_start:i_start + i_off + 1])
obss.append(obs)
break
set_zero_offset_values(obss, lines, op19=False)
# Sort the keys to make it prettier
obss = [{key: obs[key] for key in sorted(obs)} for obs in obss]
return {obs['obsid']: obs for obs in obss} if asdict else obss
[docs]
def read_or_list_full(content):
"""
Read an OR list file or list of lines for an OR.
This returns a nested dict of parameter values keyed by obsid. Parameters
that have multiple arguments are represented as a dict. An example output is::
{
"aca_mode": "DEFAULT",
"comment": "The proposed observation is an ACIS-HETG observation.",
"dither": {
"status": "ON",
"y_amp": 0.002222,
"y_freq": 0.36,
"y_phase": 0.0,
"z_amp": 0.002222,
"z_freq": 0.5091,
"z_phase": 0.0,
},
"duration": {"nominal": 25000.0},
"grating": "HETG",
"id": 18799,
"min_acq": 1,
"min_guide": 1,
"preceding": {"reqid": 18798},
"priority": 5,
"roll": {"roll_angle": 105.0, "roll_tolerance": 0.0},
"segment": {"max_number": 1, "min_duration": 22800.0},
"si": "ACIS-S",
"si_mode": "TE_0088E",
"sim_offset": {"focus_offset": 0, "trans_offset": 0},
"star": {"dec": -7.735581, "mag": 9.0, "ra": 111.463705, "type": "MON"},
"target": {"dec": 65.719444, "name": "Mkn 876", "ra": 243.488333},
"target_offset": {"y_offset": 0.0, "z_offset": 0.0},
"window": {
"window_end_time": "2016:067:01:47:50.562",
"window_start_time": "2016:067:00:08:52.887",
},
"zero_offset": {
"chip_id": 7,
"chipx": 200.7,
"chipy": 476.9,
"cycle": 17,
"date": "2016-03-05",
"detector": "ACIS-S",
},
}
In addition to the formal OR parameters, this function puts additional
values into the dict for each OR:
- 'comment': OR comment if available
- 'info': dict of key/value pairs from OR comment if it conforms to the OR list
machine-readable comments specification.
- 'zero_offset': dict of zero_offset values from the zero-offsets table
The second return value is a list of the comments in the OR list that have
ID=49999.
:param content: OR list file name or list of lines
:returns: (dict, list)
Dict of ORs keyed by obsid, list of comments (ID=49999)
"""
lines = _get_content_as_lines(content, rstrip_only=True)
# Get all comments in the OR list file as a dict (keyed by ID) of list. Since YAML
# machine readable comments depend on indentation, only strip whitespace from the
# right ends of each line (above).
comments = get_comments(lines)
# Rest of parsing requires the lines to be stripped of all whitespace.
lines = [line.strip() for line in lines]
i_starts = (i for i, line in enumerate(lines) if line.startswith('OBS,'))
obss = []
for i_start in i_starts:
for i_off in count():
# Parse OBS record when first blank line is encountered
if not lines[i_start + i_off].endswith(','):
obs = parse_obs_op19(lines[i_start:i_start + i_off + 1])
obss.append(obs)
break
# Include the zero offset table values in each OR output
set_zero_offset_values(obss, lines, op19=True)
# Put applicable comments into individual ORs
for obs in obss:
if obs['id'] in comments:
add_comment_to_obs(comments[obs['id']], obs)
# Sort the keys to make it prettier
obss = [{key: obs[key] for key in sorted(obs)} for obs in obss]
# Now change comments into just the list of ID=49999 comments
comments = comments.get(49999, [])
# Turn obss list into a dict keyed by ID
obss = {obs['id']: obs for obs in obss}
return obss, comments
def add_comment_to_obs(comment_list : list[str], obs : dict[str]) -> None:
"""Add comment to observation dictionary and extract information from the comment.
This adds keys in-place to ``obs``:
- ``comment``: OR comment string
- ``info``: dict of key/value pairs from OR machine readable comment
Parameters
----------
comment_list : list
A list of comments corresponding to the OBS statement
obs : dict
A dictionary containing information about the observation.
"""
if len(comment_list) > 1:
warnings.warn(
f"obsid {obs['id']} has multiple comments, using first",
UserWarning,
stacklevel=2
)
obs['comment'] = comment_list[0]
try:
info = or_list_comments.yaml_loads(obs["comment"])
for required_key in ["obsid", "sequence_number", "cycle_number"]:
assert required_key in info
except Exception:
pass
else:
obs["info"] = info
if obs["info"]["obsid"] != obs["id"]:
warnings.warn(
f"obsid {obs['id']} != obsid in comment {obs['info']['obsid']}",
UserWarning,
stacklevel=2,
)
def get_op19_params():
# Define the complete list of OP19 fields by copy/paste from the PDF and
# then a bit of editing.
op19_fields_text = """
ID
TARGET=(ra, dec, name)
MANEUVER=(v1, v2, v3, angle, ref)
SS_OBJECT
MT_OBJECT
SL_RATE
DURATION=(nominal,min_duration, max_duration)
PRIORITY
SI
STAR=(ra,dec,mag,type,id)
MIN_GUIDE
MIN_ACQ
FID=(fidid, mag)
GRATING
SI_MODE
BIAS
ACA_MODE
CLASS
ROLL=(roll_angle,roll_tolerance)
TARGET_OFFSET=(y_offset, z_offset)
SIM_OFFSET=(trans_offset, focus_offset)
DITHER=(status, y_amp, y_freq,y_phase , z_amp, z_freq, z_phase)
WINDOW=(window_start_time,window_end_time)
PHASE=(period, epoch, start_range, start_margin, end_range, end_margin)
REPETITION=(period, delta,number_reps)
PRECEDING=(reqid, minimum_lead, maximum_lead)
OVERLAP=(reqid,start_lead, end_lead)
SEGMENT=(max_number, min_duration, max_separation)
MOON=(status, avoid_angle)
SUN=(status, avoid_angle)
FSS=(status, fss_fov_angle)
EARTH=(status, avoid_angle)
PLANET=(id, status, avoid_angle)
OBJECT=(id, status, avoid_angle)
RADIATION=(limit_type, species, delay, status, event_type, limit,energy_range1, energy_units1 ,energy_range2, energy_units2, kp, percentile)
ECLIPSE=(status, day_night)
ALTITUDE=(status, min_altitude, max_altitude)""" # noqa
# Turn the above text definitions into a dict of parameter and argument names
op19_params = {}
for line in op19_fields_text.strip().replace(' ', '').splitlines():
vals = line.split('=')
name = vals[0].lower()
if len(vals) == 1:
# No arguments
op19_params[name] = None
elif len(vals) == 2:
op19_params[name] = vals[1].strip('()').split(',')
else:
raise RuntimeError('bad op19_fields_text')
return op19_params
OP19_PARAMS = LazyDict(get_op19_params)
def parse_obs_op19(lines):
"""Parse a single obs request in ``lines`` following OP19 nomenclature.
This returns a dict of parameter values. Parameters that have multiple
arguments are represented as a dict. For example part of an OR would be::
{
'si': 'ACIS-S',
'si_mode': 'TE_00910',
'target': {'ra': 35.887917, 'dec': 45.821194, 'name': 'V Zw 232 Notes 02'}
}
:param lines: list of lines
:returns: dict of NAME: value or NAME.argument: value pairs
"""
obs = {}
# Name values are always uppercase letters or underscore followed by =.
fields = re.split(r'([A-Z_]+=)', ''.join(lines))
# Clean up the ends of each field
fields = [x.strip(' ,\n\r') for x in fields]
# Apply a couple of sanity checks
if fields[0].strip() != 'OBS':
raise ValueError('OBS statement failed to parse')
if len(fields) % 2 != 1:
raise ValueError('OBS statement split problem')
for name, value in zip(fields[1::2], fields[2::2]):
name = name.lower()
if name[-1] != '=':
raise ValueError(f'OBS statement field {name} missing equals')
name = name[:-1]
if name not in OP19_PARAMS:
raise ValueError(f'field {name} not a valid OP19 parameter')
if value.startswith('(') and value.endswith(')'):
# Get rid of leading/trailing parens
value = value[1:-1]
# Change {param value} to "param value" so csv can parse it.
value = value.replace('{', '"')
value = value.replace('}', '"')
# Parse the comma-delimited line with the csv package.
for values in csv.reader([value]):
for param, value in zip(OP19_PARAMS[name], values):
if value: # Skip empty values
# Could use defaultdict but don't like the repr
if name not in obs:
obs[name] = {}
obs[name][param] = convert_to_int_float_str(value.strip())
else:
obs[name] = convert_to_int_float_str(value.strip())
# Always fill these in for convenience
for name, param, val in (('target_offset', 'y_offset', 0.0),
('target_offset', 'z_offset', 0.0),
('sim_offset', 'trans_offset', 0),
('sim_offset', 'focus_offset', 0)):
if name not in obs:
obs[name] = {}
obs[name].setdefault(param, val)
return obs
def parse_obs(lines):
"""Parse a single obs request in ``lines`` using adhoc names for parameters.
This function is deprecated because it is incomplete and does not parse all
parameters nor all arguments of some parameters (e.g. STAR).
:param lines: list of lines
:returns: dict of NAME: value or NAME.argument: value pairs
"""
line = ''.join(lines)
obs = {}
obs['target_offset_y'], obs['target_offset_z'] = 0.0, 0.0
obs['sim_offset_x'], obs['sim_offset_z'] = 0, 0
def set_vals(attrs, regex):
if isinstance(attrs, str):
attrs = [attrs]
match = re.search(regex, line)
if match:
for attr, val in zip(attrs, match.groups()):
obs[attr] = convert_to_int_float_str(val)
return match
set_vals('obsid', r'ID=(\d+),')
set_vals(['mon_ra', 'mon_dec'], r'STAR=\(([^,]+),([^,\)]+)')
set_vals(['ss_object'], r'SS_OBJECT=([^,\)]+)')
set_vals(['si'], r'SI=([^,]+)')
set_vals(['si_mode'], r'SI_MODE=([^,]+)')
set_vals(['priority'], r'PRIORITY=([^,]+)')
set_vals(['duration'], r'DURATION=\(([^,]+)\)')
if not set_vals(['target_ra', 'target_dec', 'target_name'],
r'TARGET=\(([^,]+),([^,]+),\s*\{([^}]+)\}\),'):
set_vals(['target_ra', 'target_dec'],
r'TARGET=\(([^,]+),([^,\)]+)')
if not set_vals(['target_offset_y', 'target_offset_z'],
r'TARGET_OFFSET=\((-?[\d\.]+),(-?[\d\.]+)\)'):
set_vals(['target_offset_y'],
r'TARGET_OFFSET=\((-?[\d\.]+)\)')
if set_vals(['dither'], r'DITHER=\(([^)]+)\)'):
for name, val in zip(['on', 'y_amp', 'y_freq', 'y_phase', 'z_amp', 'z_freq', 'z_phase'],
obs['dither'].split(',')):
obs['dither_' + name] = convert_to_int_float_str(val)
del obs['dither']
if not set_vals(['sim_offset_x', 'sim_offset_z'],
r'SIM_OFFSET=\(([^,\)]+),([^,]+)\)'):
set_vals(['sim_offset_z'], r'SIM_OFFSET=\(([^,\)]+)')
set_vals(['grating'], r'GRATING=([^,]+)')
return obs