Source code for parse_cm.or_list

# Licensed under a 3-clause BSD style license - see LICENSE.rst

import csv
import re
import warnings
from itertools import count

from astropy.table import Table
from ska_helpers.utils import LazyDict, convert_to_int_float_str

from parse_cm import or_list_comments

from .common import _get_content_as_lines

__all__ = ['read_or_list', 'read_or_list_full', 'get_zero_offset_table']


[docs] def get_zero_offset_table(content): """ Extract zero offset table from OR filename or OR text as list of strings :param content: OR list filename or OR list as list of strings :returns: table of zero offset entries """ try: lines = _get_content_as_lines(content) i_start = lines.index('***ZERO-OFFSET AIMPOINTS START') + 1 i_stop = lines.index('***ZERO-OFFSET AIMPOINTS END') out = Table.read(lines[i_start:i_stop], format='ascii.basic', guess=False) # Ensure the Table is a valid zero-offsets table. SOT MP appears to # generate non-operational OR lists with invalid tables embedded. assert out.colnames == 'obsid cycle date detector chipx chipy chip_id'.split() except Exception: out = None return out
def set_zero_offset_values(obss, lines, op19): """Include the zero offset table values in each OR output of obss""" zero_offset_table = get_zero_offset_table(lines) if zero_offset_table is None: return id_name = 'id' if op19 else 'obsid' zero_offsets = {row['obsid']: row for row in zero_offset_table} for obs in obss: if (obs_zero_offset := zero_offsets.get(obs[id_name])) is not None: for col in ['cycle', 'date', 'detector', 'chipx', 'chipy', 'chip_id']: # For back compatibility with legacy parser which skipped this. if col == 'date' and not op19: continue # Define the output dict, either a 'zero_offset' dict in obs # (for the full reader) or obs itself for the legacy reader. zo = obs.setdefault('zero_offset', {}) if op19 else obs zo[col] = obs_zero_offset[col] def get_comments(lines): """Get all the comment blocks in the file as a dict of list. Parameters ---------- lines : list of str Lines of the OR list file Returns ------- dict Dict of list of comments (str) keyed by COMMENT ID (49999 or ObsID). """ comments = {} comment_blocks = or_list_comments.get_comment_blocks(lines, include_49999=True) for comment_block in comment_blocks: comments.setdefault(comment_block["obsid"], []).append(comment_block["comment"]) return comments
[docs] def read_or_list(content, asdict=False): """ Read an OR list file or content (DEPRECATED, use read_or_list_full instead) This uses a legacy parser which is incomplete and does not parse all OR parameters or arguments. The parameter names are adhoc. An example output is:: { "chip_id": 7, "chipx": 200.7, "chipy": 476.9, "cycle": 17, "detector": "ACIS-S", "dither_on": "ON", "dither_y_amp": 0.002222, "dither_y_freq": 0.36, "dither_y_phase": 0.0, "dither_z_amp": 0.002222, "dither_z_freq": 0.5091, "dither_z_phase": 0.0, "duration": 25000.0, "grating": "HETG", "mon_dec": -7.735581, "mon_ra": 111.463705, "obsid": 18799, "priority": 5, "si": "ACIS-S", "si_mode": "TE_0088E", "sim_offset_x": 0, "sim_offset_z": 0, "target_dec": 65.719444, "target_name": "Mkn 876", "target_offset_y": 0.0, "target_offset_z": 0.0, "target_ra": 243.488333, } :param content: OR list file name or lines :param asdict: return dict instead of list :returns: list of dict ORs or dict of ORs keyed by obsid (if asdict is True) """ warnings.warn( "read_or_list is deprecated and will be removed, use read_or_list_full instead", FutureWarning, stacklevel=2, ) lines = _get_content_as_lines(content) i_starts = (i for i, line in enumerate(lines) if line.startswith('OBS,')) obss = [] for i_start in i_starts: for i_off in count(): # Parse OBS record when first line without a trailing comma is found if not lines[i_start + i_off].endswith(','): obs = parse_obs(lines[i_start:i_start + i_off + 1]) obss.append(obs) break set_zero_offset_values(obss, lines, op19=False) # Sort the keys to make it prettier obss = [{key: obs[key] for key in sorted(obs)} for obs in obss] return {obs['obsid']: obs for obs in obss} if asdict else obss
[docs] def read_or_list_full(content): """ Read an OR list file or list of lines for an OR. This returns a nested dict of parameter values keyed by obsid. Parameters that have multiple arguments are represented as a dict. An example output is:: { "aca_mode": "DEFAULT", "comment": "The proposed observation is an ACIS-HETG observation.", "dither": { "status": "ON", "y_amp": 0.002222, "y_freq": 0.36, "y_phase": 0.0, "z_amp": 0.002222, "z_freq": 0.5091, "z_phase": 0.0, }, "duration": {"nominal": 25000.0}, "grating": "HETG", "id": 18799, "min_acq": 1, "min_guide": 1, "preceding": {"reqid": 18798}, "priority": 5, "roll": {"roll_angle": 105.0, "roll_tolerance": 0.0}, "segment": {"max_number": 1, "min_duration": 22800.0}, "si": "ACIS-S", "si_mode": "TE_0088E", "sim_offset": {"focus_offset": 0, "trans_offset": 0}, "star": {"dec": -7.735581, "mag": 9.0, "ra": 111.463705, "type": "MON"}, "target": {"dec": 65.719444, "name": "Mkn 876", "ra": 243.488333}, "target_offset": {"y_offset": 0.0, "z_offset": 0.0}, "window": { "window_end_time": "2016:067:01:47:50.562", "window_start_time": "2016:067:00:08:52.887", }, "zero_offset": { "chip_id": 7, "chipx": 200.7, "chipy": 476.9, "cycle": 17, "date": "2016-03-05", "detector": "ACIS-S", }, } In addition to the formal OR parameters, this function puts additional values into the dict for each OR: - 'comment': OR comment if available - 'info': dict of key/value pairs from OR comment if it conforms to the OR list machine-readable comments specification. - 'zero_offset': dict of zero_offset values from the zero-offsets table The second return value is a list of the comments in the OR list that have ID=49999. :param content: OR list file name or list of lines :returns: (dict, list) Dict of ORs keyed by obsid, list of comments (ID=49999) """ lines = _get_content_as_lines(content, rstrip_only=True) # Get all comments in the OR list file as a dict (keyed by ID) of list. Since YAML # machine readable comments depend on indentation, only strip whitespace from the # right ends of each line (above). comments = get_comments(lines) # Rest of parsing requires the lines to be stripped of all whitespace. lines = [line.strip() for line in lines] i_starts = (i for i, line in enumerate(lines) if line.startswith('OBS,')) obss = [] for i_start in i_starts: for i_off in count(): # Parse OBS record when first blank line is encountered if not lines[i_start + i_off].endswith(','): obs = parse_obs_op19(lines[i_start:i_start + i_off + 1]) obss.append(obs) break # Include the zero offset table values in each OR output set_zero_offset_values(obss, lines, op19=True) # Put applicable comments into individual ORs for obs in obss: if obs['id'] in comments: add_comment_to_obs(comments[obs['id']], obs) # Sort the keys to make it prettier obss = [{key: obs[key] for key in sorted(obs)} for obs in obss] # Now change comments into just the list of ID=49999 comments comments = comments.get(49999, []) # Turn obss list into a dict keyed by ID obss = {obs['id']: obs for obs in obss} return obss, comments
def add_comment_to_obs(comment_list : list[str], obs : dict[str]) -> None: """Add comment to observation dictionary and extract information from the comment. This adds keys in-place to ``obs``: - ``comment``: OR comment string - ``info``: dict of key/value pairs from OR machine readable comment Parameters ---------- comment_list : list A list of comments corresponding to the OBS statement obs : dict A dictionary containing information about the observation. """ if len(comment_list) > 1: warnings.warn( f"obsid {obs['id']} has multiple comments, using first", UserWarning, stacklevel=2 ) obs['comment'] = comment_list[0] try: info = or_list_comments.yaml_loads(obs["comment"]) for required_key in ["obsid", "sequence_number", "cycle_number"]: assert required_key in info except Exception: pass else: obs["info"] = info if obs["info"]["obsid"] != obs["id"]: warnings.warn( f"obsid {obs['id']} != obsid in comment {obs['info']['obsid']}", UserWarning, stacklevel=2, ) def get_op19_params(): # Define the complete list of OP19 fields by copy/paste from the PDF and # then a bit of editing. op19_fields_text = """ ID TARGET=(ra, dec, name) MANEUVER=(v1, v2, v3, angle, ref) SS_OBJECT MT_OBJECT SL_RATE DURATION=(nominal,min_duration, max_duration) PRIORITY SI STAR=(ra,dec,mag,type,id) MIN_GUIDE MIN_ACQ FID=(fidid, mag) GRATING SI_MODE BIAS ACA_MODE CLASS ROLL=(roll_angle,roll_tolerance) TARGET_OFFSET=(y_offset, z_offset) SIM_OFFSET=(trans_offset, focus_offset) DITHER=(status, y_amp, y_freq,y_phase , z_amp, z_freq, z_phase) WINDOW=(window_start_time,window_end_time) PHASE=(period, epoch, start_range, start_margin, end_range, end_margin) REPETITION=(period, delta,number_reps) PRECEDING=(reqid, minimum_lead, maximum_lead) OVERLAP=(reqid,start_lead, end_lead) SEGMENT=(max_number, min_duration, max_separation) MOON=(status, avoid_angle) SUN=(status, avoid_angle) FSS=(status, fss_fov_angle) EARTH=(status, avoid_angle) PLANET=(id, status, avoid_angle) OBJECT=(id, status, avoid_angle) RADIATION=(limit_type, species, delay, status, event_type, limit,energy_range1, energy_units1 ,energy_range2, energy_units2, kp, percentile) ECLIPSE=(status, day_night) ALTITUDE=(status, min_altitude, max_altitude)""" # noqa # Turn the above text definitions into a dict of parameter and argument names op19_params = {} for line in op19_fields_text.strip().replace(' ', '').splitlines(): vals = line.split('=') name = vals[0].lower() if len(vals) == 1: # No arguments op19_params[name] = None elif len(vals) == 2: op19_params[name] = vals[1].strip('()').split(',') else: raise RuntimeError('bad op19_fields_text') return op19_params OP19_PARAMS = LazyDict(get_op19_params) def parse_obs_op19(lines): """Parse a single obs request in ``lines`` following OP19 nomenclature. This returns a dict of parameter values. Parameters that have multiple arguments are represented as a dict. For example part of an OR would be:: { 'si': 'ACIS-S', 'si_mode': 'TE_00910', 'target': {'ra': 35.887917, 'dec': 45.821194, 'name': 'V Zw 232 Notes 02'} } :param lines: list of lines :returns: dict of NAME: value or NAME.argument: value pairs """ obs = {} # Name values are always uppercase letters or underscore followed by =. fields = re.split(r'([A-Z_]+=)', ''.join(lines)) # Clean up the ends of each field fields = [x.strip(' ,\n\r') for x in fields] # Apply a couple of sanity checks if fields[0].strip() != 'OBS': raise ValueError('OBS statement failed to parse') if len(fields) % 2 != 1: raise ValueError('OBS statement split problem') for name, value in zip(fields[1::2], fields[2::2]): name = name.lower() if name[-1] != '=': raise ValueError(f'OBS statement field {name} missing equals') name = name[:-1] if name not in OP19_PARAMS: raise ValueError(f'field {name} not a valid OP19 parameter') if value.startswith('(') and value.endswith(')'): # Get rid of leading/trailing parens value = value[1:-1] # Change {param value} to "param value" so csv can parse it. value = value.replace('{', '"') value = value.replace('}', '"') # Parse the comma-delimited line with the csv package. for values in csv.reader([value]): for param, value in zip(OP19_PARAMS[name], values): if value: # Skip empty values # Could use defaultdict but don't like the repr if name not in obs: obs[name] = {} obs[name][param] = convert_to_int_float_str(value.strip()) else: obs[name] = convert_to_int_float_str(value.strip()) # Always fill these in for convenience for name, param, val in (('target_offset', 'y_offset', 0.0), ('target_offset', 'z_offset', 0.0), ('sim_offset', 'trans_offset', 0), ('sim_offset', 'focus_offset', 0)): if name not in obs: obs[name] = {} obs[name].setdefault(param, val) return obs def parse_obs(lines): """Parse a single obs request in ``lines`` using adhoc names for parameters. This function is deprecated because it is incomplete and does not parse all parameters nor all arguments of some parameters (e.g. STAR). :param lines: list of lines :returns: dict of NAME: value or NAME.argument: value pairs """ line = ''.join(lines) obs = {} obs['target_offset_y'], obs['target_offset_z'] = 0.0, 0.0 obs['sim_offset_x'], obs['sim_offset_z'] = 0, 0 def set_vals(attrs, regex): if isinstance(attrs, str): attrs = [attrs] match = re.search(regex, line) if match: for attr, val in zip(attrs, match.groups()): obs[attr] = convert_to_int_float_str(val) return match set_vals('obsid', r'ID=(\d+),') set_vals(['mon_ra', 'mon_dec'], r'STAR=\(([^,]+),([^,\)]+)') set_vals(['ss_object'], r'SS_OBJECT=([^,\)]+)') set_vals(['si'], r'SI=([^,]+)') set_vals(['si_mode'], r'SI_MODE=([^,]+)') set_vals(['priority'], r'PRIORITY=([^,]+)') set_vals(['duration'], r'DURATION=\(([^,]+)\)') if not set_vals(['target_ra', 'target_dec', 'target_name'], r'TARGET=\(([^,]+),([^,]+),\s*\{([^}]+)\}\),'): set_vals(['target_ra', 'target_dec'], r'TARGET=\(([^,]+),([^,\)]+)') if not set_vals(['target_offset_y', 'target_offset_z'], r'TARGET_OFFSET=\((-?[\d\.]+),(-?[\d\.]+)\)'): set_vals(['target_offset_y'], r'TARGET_OFFSET=\((-?[\d\.]+)\)') if set_vals(['dither'], r'DITHER=\(([^)]+)\)'): for name, val in zip(['on', 'y_amp', 'y_freq', 'y_phase', 'z_amp', 'z_freq', 'z_phase'], obs['dither'].split(',')): obs['dither_' + name] = convert_to_int_float_str(val) del obs['dither'] if not set_vals(['sim_offset_x', 'sim_offset_z'], r'SIM_OFFSET=\(([^,\)]+),([^,]+)\)'): set_vals(['sim_offset_z'], r'SIM_OFFSET=\(([^,\)]+)') set_vals(['grating'], r'GRATING=([^,]+)') return obs