# Licensed under a 3-clause BSD style license - see LICENSE.rst
import contextlib
import functools
import logging
import os
import re
from enum import Enum
from pathlib import Path
from typing import Optional
import numexpr
import numpy as np
import numpy.typing as npt
import tables
from astropy.table import Column, Table
from Chandra.Time import DateTime
from cxotime import CxoTimeLike, convert_time_format
from packaging.version import Version
from .healpix import get_healpix_index_table, get_stars_from_healpix_h5, is_healpix
from .paths import default_agasc_dir
from .supplement.utils import get_supplement_table
__all__ = [
"sphere_dist",
"get_agasc_cone",
"get_star",
"get_stars",
"read_h5_table",
"get_agasc_filename",
"MAG_CATID_SUPPLEMENT",
"BAD_CLASS_SUPPLEMENT",
"set_supplement_enabled",
"SUPPLEMENT_ENABLED_ENV",
"write_agasc",
"TABLE_DTYPE",
"TableOrder",
"add_pmcorr_columns",
]
logger = logging.getLogger("agasc")
SUPPLEMENT_ENABLED_ENV = "AGASC_SUPPLEMENT_ENABLED"
SUPPLEMENT_ENABLED_DEFAULT = "True"
MAG_CATID_SUPPLEMENT = 100
BAD_CLASS_SUPPLEMENT = 100
# Columns that are required for calls to get_agasc_cone
COLUMNS_REQUIRED = {
"RA",
"DEC",
"EPOCH",
"PM_DEC",
"PM_RA",
}
RA_DECS_CACHE = {}
COMMON_AGASC_FILE_DOC = """\
If ``agasc_file`` is not specified or is None then return either
``default_agasc_dir()/${AGASC_HDF5_FILE}`` if ``${AGASC_HDF5_FILE}`` is defined;
or return the latest version of ``proseco_agasc`` in ``default_agasc_dir()``.
If ``agasc_file`` ends with the suffix ``.h5`` then it is returned as-is.
If ``agasc_file`` ends with ``*`` then the latest version of the matching AGASC file
in ``default_agasc_dir()`` is returned. For example, ``proseco_agasc_*`` could return
``${SKA}/data/agasc/proseco_agasc_1p7.h5``.
Any other ending for ``agasc_file`` raises a ``ValueError``.
The default AGASC directory is the environment variable ``${AGASC_DIR}`` if defined,
otherwise ``${SKA}/data/agasc``."""
COMMON_DOC = f"""By default, stars with available mag estimates or bad star entries
in the AGASC supplement are updated in-place in the output ``stars`` Table:
- ``MAG_ACA`` and ``MAG_ACA_ERR`` are set according to the supplement.
- ``MAG_CATID`` (mag catalog ID) is set to ``agasc.MAG_CATID_SUPPLEMENT``.
- If ``COLOR1`` is 0.7 or 1.5 then it is changed to 0.69 or 1.49 respectively.
Those particular values do not matter except that they are different from
the "status flag" values of 0.7 (no color => very unreliable mag estimate)
or 1.5 (red star => somewhat unreliable mag estimate) that have special
meaning based on deep heritage in the AGASC catalog itself.
- If ``AGASC_ID`` is in the supplement bad stars table then CLASS is set to
``agasc.BAD_CLASS_SUPPLEMENT``.
To disable the magnitude / bad star updates from the AGASC supplement, see
the ``set_supplement_enabled`` context manager / decorator.
{COMMON_AGASC_FILE_DOC.replace("return", "use")}
The default AGASC supplement file is ``<AGASC_DIR>/agasc_supplement.h5``.
"""
[docs]
@contextlib.contextmanager
def set_supplement_enabled(value: bool):
"""Decorator / context manager to temporarily set the default for use of
AGASC supplement in query functions.
This sets the default for the ``use_supplement`` argument in AGASC function
calls if the user does not supply that argument.
This is mostly for testing or legacy applications to override the
default behavior to use the AGASC supplement star mags when available.
Examples::
import agasc
# Disable use of the supplement for the context block
with agasc.set_supplement_enabled(False):
aca = proseco.get_aca_catalog(obsid=8008)
# Disable use of the supplement for the function
@agasc.set_supplement_enabled(False)
def test_get_aca_catalog():
aca = proseco.get_aca_catalog(obsid=8008)
...
# Globally disable use of the supplement everywhere
os.environ[agasc.SUPPLEMENT_ENABLED_VAR] = 'False'
Parameters
----------
value : bool
Whether to use the AGASC supplement in the context / decorator
"""
if not isinstance(value, bool):
raise TypeError("value must be bool (True|False)")
orig = os.environ.get(SUPPLEMENT_ENABLED_ENV)
os.environ[SUPPLEMENT_ENABLED_ENV] = str(value)
yield
if orig is None:
del os.environ[SUPPLEMENT_ENABLED_ENV]
else:
os.environ[SUPPLEMENT_ENABLED_ENV] = orig
class IdNotFound(LookupError):
pass
class InconsistentCatalogError(Exception):
pass
class RaDec(object):
def __init__(self, agasc_file):
self._agasc_file = agasc_file
@property
def agasc_file(self):
return self._agasc_file
@property
def ra(self):
if not hasattr(self, "_ra"):
self._ra, self._dec = self.read_ra_dec()
return self._ra
@property
def dec(self):
if not hasattr(self, "_dec"):
self._ra, self._dec = self.read_ra_dec()
return self._dec
def read_ra_dec(self):
# Read the RA and DEC values from the agasc
with tables.open_file(self.agasc_file) as h5:
ras = h5.root.data.read(field="RA")
decs = h5.root.data.read(field="DEC")
return ras, decs
def get_ra_decs(agasc_file):
agasc_file = os.path.abspath(agasc_file)
if agasc_file not in RA_DECS_CACHE:
RA_DECS_CACHE[agasc_file] = RaDec(agasc_file)
return RA_DECS_CACHE[agasc_file]
[docs]
def read_h5_table(
h5_file: str | Path | tables.file.File,
row0: Optional[int] = None,
row1: Optional[int] = None,
path="data",
cache=False,
columns: Optional[list | tuple] = None,
) -> np.ndarray:
"""
Read HDF5 table ``columns`` from group ``path`` in ``h5_file``.
If ``row0`` and ``row1`` are specified then only the rows in that range are read,
e.g. ``data[row0:row1]``.
If ``cache`` is ``True`` then the data for the last read is cached in memory. The
cache key is ``(h5_file, path, columns)`` and up to 128 cache entries are
kept.
Parameters
----------
h5_file : str, Path, tables.file.File
Path to the HDF5 file to read.
row0 : int, optional
First row to read. Default is None (read from first row).
row1 : int, optional
Last row to read. Default is None (read to last row).
path : str, optional
Path to the data table in the HDF5 file. Default is 'data'.
cache : bool, optional
Whether to cache the read data. Default is False.
columns : list or tuple, optional
Column names to read from the file. If not specified, all columns are read.
Returns
-------
out : np.ndarray
The HDF5 data as a numpy structured array
"""
if columns is not None:
columns = tuple(columns)
if cache:
if isinstance(h5_file, tables.file.File):
h5_file = h5_file.filename
data = _read_h5_table_cached(h5_file, path, columns)
out = data[row0:row1]
else:
out = _read_h5_table(h5_file, path, row0, row1, columns)
return out
@functools.lru_cache
def _read_h5_table_cached(
h5_file: str | Path,
path: str,
columns: tuple | None = None,
) -> np.ndarray:
return _read_h5_table(h5_file, path, row0=None, row1=None, columns=columns)
def _read_h5_table(
h5_file: str | Path | tables.file.File,
path: str,
row0: None | int,
row1: None | int,
columns: tuple | None = None,
) -> np.ndarray:
if isinstance(h5_file, tables.file.File):
out = _read_h5_table_from_open_h5_file(h5_file, path, row0, row1, columns)
else:
with tables.open_file(h5_file) as h5:
out = _read_h5_table_from_open_h5_file(h5, path, row0, row1, columns)
out = np.asarray(out) # Convert to structured ndarray (not recarray)
return out
def _read_h5_table_from_open_h5_file(
h5: tables.file.File,
path: str,
row0: int,
row1: int,
columns: tuple | None = None,
):
data = getattr(h5.root, path)
out = data.read(start=row0, stop=row1)
if columns:
out = np.rec.fromarrays([out[col] for col in columns], names=columns)
return np.asarray(out)
[docs]
def get_agasc_filename(
agasc_file: Optional[str | Path] = None,
allow_rc: bool = False,
version: Optional[str] = None,
) -> str:
"""Get a matching AGASC file name from ``agasc_file``.
{common_agasc_file_doc}
Parameters
----------
agasc_file : str, Path, optional
AGASC file name (default=None)
allow_rc : bool, optional
Allow AGASC release candidate files (default=False)
version : str, optional
Version number to match (e.g. "1p8" or "1p8rc4", default=None)
Returns
-------
filename : str
Matching AGASC file name
Examples
--------
Setup:
>>> from agasc import get_agasc_filename
Selecting files in the default AGASC directory:
>>> get_agasc_filename()
'/Users/aldcroft/ska/data/agasc/proseco_agasc_1p7.h5'
>>> get_agasc_filename("proseco_agasc_*")
'/Users/aldcroft/ska/data/agasc/proseco_agasc_1p7.h5'
>>> get_agasc_filename("proseco_agasc_*", version="1p8", allow_rc=True)
'/Users/aldcroft/ska/data/agasc/proseco_agasc_1p8rc4.h5'
>>> get_agasc_filename("agas*")
Traceback (most recent call last):
...
FileNotFoundError: No AGASC files in /Users/aldcroft/ska/data/agasc found matching
agas*_?1p([0-9]+).h5
Selecting non-default AGASC file in the default directory:
>>> os.environ["AGASC_HDF5_FILE"] = "proseco_agasc_1p6.h5"
>>> get_agasc_filename()
'/Users/aldcroft/ska/data/agasc/proseco_agasc_1p6.h5'
Changing the default AGASC directory:
>>> os.environ["AGASC_DIR"] = "."
>>> get_agasc_filename()
'proseco_agasc_1p7.h5'
Selecting an arbitrary AGASC file name either directly or with the AGASC_HDF5_FILE
environment variable:
>>> get_agasc_filename("any_agasc.h5")
'any_agasc.h5'
>>> os.environ["AGASC_HDF5_FILE"] = "whatever.h5"
>>> get_agasc_filename()
'/Users/aldcroft/ska/data/agasc/whatever.h5'
"""
if agasc_file is None:
if "AGASC_HDF5_FILE" in os.environ:
return str(default_agasc_dir() / os.environ["AGASC_HDF5_FILE"])
else:
agasc_file = "proseco_agasc_*"
agasc_file = str(agasc_file)
if agasc_file.endswith(".h5"):
return agasc_file
# Get latest version of file matching agasc_file in the default AGASC dir
agasc_dir = default_agasc_dir()
if not agasc_file.endswith("*"):
raise ValueError("agasc_file must end with '*' or '.h5'")
agasc_file_re = agasc_file[:-1] + r"(1p[0-9]+) (rc[1-9][0-9]*)? \.h5$"
matches = []
for path in agasc_dir.glob("*.h5"):
name = path.name
if match := re.match(agasc_file_re, name, re.VERBOSE):
if not allow_rc and match.group(2):
continue
version_str = match.group(1)
rc_str = match.group(2) or ""
if version is not None and version not in (
version_str,
version_str + rc_str,
):
continue
matches.append((Version(version_str.replace("p", ".") + rc_str), path))
if len(matches) == 0:
with_version = f" with {version=}" if version is not None else ""
raise FileNotFoundError(
f"No AGASC files in {agasc_dir}{with_version} matching {agasc_file_re}"
)
# Get candidate with highest version number. Tuples are sorted lexically starting
# by first element, which is the version number here.
out = sorted(matches)[-1][1]
return str(out)
[docs]
def sphere_dist(
ra1: npt.ArrayLike,
dec1: npt.ArrayLike,
ra2: npt.ArrayLike,
dec2: npt.ArrayLike,
) -> np.ndarray:
"""
Haversine formula for angular distance on a sphere: more stable at poles.
This version uses arctan instead of arcsin and thus does better with sign
conventions. This uses numexpr to speed expression evaluation by a factor
of 2 to 3.
Parameters
----------
ra1 : array_like
first RA (deg)
dec1 : array_like
first Dec (deg)
ra2 : array_like
second RA (deg)
dec2 : array_like
second Dec (deg)
Returns
-------
angular separation distance (deg) : array_like
"""
ra1 = np.radians(ra1).astype(np.float64)
ra2 = np.radians(ra2).astype(np.float64)
dec1 = np.radians(dec1).astype(np.float64)
dec2 = np.radians(dec2).astype(np.float64)
numerator = numexpr.evaluate( # noqa: F841
"sin((dec2 - dec1) / 2) ** 2 + "
"cos(dec1) * cos(dec2) * sin((ra2 - ra1) / 2) ** 2"
)
dists = numexpr.evaluate("2 * arctan2(numerator ** 0.5, (1 - numerator) ** 0.5)")
return np.degrees(dists)
def update_color1_column(stars: Table):
"""
For any stars which have a V-I color (RSV3 > 0) and COLOR1 == 1.5
then set COLOR1 = COLOR2 * 0.850. For such stars the MAG_ACA / MAG_ACA_ERR
values are reliable and they should not be flagged with a COLOR1 = 1.5,
which generally implies to downstream tools that the mag is unreliable.
Also ensure that COLOR2 values that happen to be exactly 1.5 are shifted a bit.
The 0.850 factor is because COLOR1 = B-V while COLOR2 = BT-VT. See
https://heasarc.nasa.gov/W3Browse/all/tycho2.html for a reminder of the
scaling between the two.
This updates ``stars`` in place if the COLOR1 column is present.
"""
if "COLOR1" not in stars.columns:
return
# Select red stars that have a reliable mag in AGASC 1.7 and later.
color15 = np.isclose(stars["COLOR1"], 1.5) & (stars["RSV3"] > 0)
new_color1 = stars["COLOR2"][color15] * 0.850
if len(new_color1) > 0:
# Ensure no new COLOR1 are within 0.001 of 1.5, so downstream tests of
# COLOR1 == 1.5 or np.isclose(COLOR1, 1.5) will not accidentally succeed.
fix15 = np.isclose(new_color1, 1.5, rtol=0, atol=0.0005)
new_color1[fix15] = 1.499 # Insignificantly different from 1.50
# For stars with a reliable mag, now COLOR1 is really the B-V color.
stars["COLOR1"][color15] = new_color1
[docs]
def add_pmcorr_columns(stars, date):
"""Add proper-motion corrected columns RA_PMCORR and DEC_PMCORR to Table ``stars``.
This computes the PM-corrected RA and Dec of all ``stars`` using the supplied
``date``, which may be a scalar or an array-valued object (np.array or list).
The ``stars`` table is updated in-place.
Parameters
----------
stars
astropy Table of stars from the AGASC
date
scalar, list, array of date(s) in DateTime-compatible format
Returns
-------
None
"""
# Convert date to DateTime ensuring it can broadcast to stars table. Since
# DateTime is slow keep it as a scalar if possible.
if np.asarray(date).shape == ():
dates = DateTime(date)
else:
dates = DateTime(np.broadcast_to(date, len(stars)))
# Compute delta year. stars['EPOCH'] is Column, float32. Need to coerce to
# ndarray float64 for consistent results between scalar and array cases.
dyear = dates.frac_year - stars["EPOCH"].view(np.ndarray).astype(np.float64)
pm_to_degrees = dyear / (3600.0 * 1000.0)
dec_pmcorr = np.where(
stars["PM_DEC"] != -9999,
stars["DEC"] + stars["PM_DEC"] * pm_to_degrees,
stars["DEC"],
)
ra_scale = np.cos(np.radians(stars["DEC"]))
ra_pmcorr = np.where(
stars["PM_RA"] != -9999,
stars["RA"] + stars["PM_RA"] * pm_to_degrees / ra_scale,
stars["RA"],
)
# Add the proper-motion corrected columns to table using astropy.table.Table
stars.add_columns(
[
Column(data=ra_pmcorr, name="RA_PMCORR"),
Column(data=dec_pmcorr, name="DEC_PMCORR"),
]
)
[docs]
def get_agasc_cone(
ra: float,
dec: float,
radius: float = 1.5,
date: Optional[CxoTimeLike] = None,
agasc_file: Optional[str] = None,
pm_filter: bool = True,
fix_color1: bool = True,
use_supplement: Optional[bool] = None,
matlab_pm_bug: bool = False,
columns: Optional[tuple[str]] = None,
cache: bool = False,
) -> Table:
"""
Get AGASC catalog entries within ``radius`` degrees of ``ra``, ``dec``.
The star positions are corrected for proper motion using the ``date``
argument, which defaults to the current date if not supplied. The
corrected positions are available in the ``RA_PMCORR`` and ``DEC_PMCORR``
columns, respectively.
{common_doc}
Example::
>>> import agasc
>>> stars = agasc.get_agasc_cone(10.0, 20.0, 1.5)
>>> plt.plot(stars['RA'], stars['DEC'], '.')
Parameters
----------
ra : float
RA (deg)
dec : float
Declination (deg)
radius : float
Cone search radius (deg)
date : CxoTimeLike, optional
Date for proper motion (default=Now)
agasc_file : str, optional
AGASC file (optional)
pm_filter : bool
Use PM-corrected positions in filtering
fix_color1 : bool
set COLOR1=COLOR2 * 0.85 for stars with V-I color
use_supplement : bool, optional
Use estimated mag from AGASC supplement where available (default=value of
AGASC_SUPPLEMENT_ENABLED env var, or True if not defined)
matlab_pm_bug : bool
Apply MATLAB proper motion bug prior to the MAY2118A loads (default=False)
columns : tuple[str], optional
Columns to return (default=all)
cache : bool
Cache the AGASC data in memory (default=False)
Returns
-------
astropy.table.Table
Table of AGASC entries
"""
agasc_file = get_agasc_filename(agasc_file)
get_stars_func = (
get_stars_from_healpix_h5
if is_healpix(agasc_file)
else get_stars_from_dec_sorted_h5
)
# Possibly expand initial radius to allow for slop due proper motion
rad_pm = radius + (0.1 if pm_filter else 0.0)
# Ensure that the columns we need are read from the AGASC file, excluding PMCORR
# columns if supplied since they are not in the HDF5.
columns_query = (
None
if columns is None
else tuple(
column for column in columns if column not in ("RA_PMCORR", "DEC_PMCORR")
)
)
# Minimal columns to compute PM-corrected positions and do filtering.
if columns and COLUMNS_REQUIRED - set(columns):
raise ValueError(f"columns must include all of {COLUMNS_REQUIRED}")
stars = get_stars_func(
ra, dec, rad_pm, agasc_file=agasc_file, columns=columns_query, cache=cache
)
if matlab_pm_bug:
apply_matlab_pm_bug(stars, date)
add_pmcorr_columns(stars, date)
if fix_color1:
update_color1_column(stars)
# Final filtering using proper-motion corrected positions
if pm_filter:
dists = sphere_dist(ra, dec, stars["RA_PMCORR"], stars["DEC_PMCORR"])
ok = dists <= radius
stars = stars[ok]
update_from_supplement(stars, use_supplement)
stars.meta["agasc_file"] = agasc_file
return stars
def apply_matlab_pm_bug(stars: Table, date: CxoTimeLike):
"""Account for a bug in MATLAB proper motion correction.
The bug was not dividing the RA proper motion by cos(dec), so here we premultiply by
that factor so that add_pmcorr_columns() will match MATLAB. This is mostly for use
in kadi.commands.observations.set_star_ids() to match flight catalogs created with
MATLAB.
This bug was fixed starting with the MAY2118A loads (MATLAB Tools 2018115).
Parameters
----------
stars : astropy.table.Table
Table of stars from the AGASC
date : CxoTimeLike
Date for proper motion correction
"""
if convert_time_format(date, "date") < "2018:141:03:35:03.000":
ok = stars["PM_RA"] != -9999
# Note this is an int16 field so there is some rounding error, but for
# the purpose of star identification this is fine.
stars["PM_RA"][ok] = np.round(
stars["PM_RA"][ok] * np.cos(np.deg2rad(stars["DEC"][ok]))
)
def get_stars_from_dec_sorted_h5(
ra: float,
dec: float,
radius: float,
agasc_file: str | Path,
columns: Optional[list[str] | tuple[str]] = None,
cache: bool = False,
) -> Table:
"""
Returns a table of stars within a given radius of a given RA and Dec.
Parameters
----------
ra : float
The right ascension of the center of the search radius, in degrees.
dec : float
The declination of the center of the search radius, in degrees.
radius : float
The radius of the search circle, in degrees.
agasc_file : str or Path
The path to the AGASC HDF5 file.
columns : list or tuple, optional
The columns to read from the AGASC file. If not specified, all columns are read.
cache : bool, optional
Whether to cache the AGASC data in memory. Default is False.
Returns
-------
stars : astropy.table.Table
A structured ndarray of stars within the search radius, sorted by declination.
"""
ra_decs = get_ra_decs(agasc_file)
idx0, idx1 = np.searchsorted(ra_decs.dec, [dec - radius, dec + radius])
dists = sphere_dist(ra, dec, ra_decs.ra[idx0:idx1], ra_decs.dec[idx0:idx1])
ok = dists <= radius
stars = read_h5_table(
agasc_file, row0=idx0, row1=idx1, cache=cache, columns=columns
)
stars = Table(stars[ok])
return stars
[docs]
def get_star(
id: int,
agasc_file: Optional[str] = None,
date: Optional[CxoTimeLike] = None,
fix_color1: bool = True,
use_supplement: Optional[bool] = None,
) -> Table:
"""
Get AGASC catalog entry for star with requested id.
{common_doc}
Example::
>>> import agasc
>>> star = agasc.get_star(636629880)
>>> for name in star.colnames:
... print('{{:12s}} : {{}}'.format(name, star[name]))
AGASC_ID : 636629880
RA : 125.64184
DEC : -4.23235
POS_ERR : 300
POS_CATID : 6
EPOCH : 1983.0
PM_RA : -9999
PM_DEC : -9999
PM_CATID : 0
PLX : -9999
PLX_ERR : -9999
PLX_CATID : 0
MAG_ACA : 12.1160011292
MAG_ACA_ERR : 45
CLASS : 0
MAG : 13.2700004578
...
Parameters
----------
id : int
AGASC id
agasc_file : Optional[str]
Path to the AGASC file (default=None)
date : Optional[CxoTimeLike]
Date for proper motion (default=Now)
fix_color1 : bool
Set COLOR1=COLOR2 * 0.85 for stars with V-I color (default=True)
use_supplement : Optional[bool]
Use estimated mag from AGASC supplement where available
(default=value of AGASC_SUPPLEMENT_ENABLED env var, or True if not defined)
Returns
-------
astropy.table.Table
Row of entry for id
"""
agasc_file = get_agasc_filename(agasc_file)
with tables.open_file(agasc_file) as h5:
tbl = h5.root.data
id_rows = tbl.read_where("(AGASC_ID == {})".format(id))
if len(id_rows) > 1:
raise InconsistentCatalogError(
"More than one entry found for {} in AGASC".format(id)
)
if id_rows is None or len(id_rows) == 0:
raise IdNotFound()
t = Table(id_rows)
add_pmcorr_columns(t, date)
if fix_color1:
update_color1_column(t)
update_from_supplement(t, use_supplement)
return t[0]
def _get_rows_read_where(ids_1d, dates_1d, agasc_file):
rows = []
with tables.open_file(agasc_file) as h5:
tbl = h5.root.data
for id, _date in zip(ids_1d, dates_1d):
id_rows = tbl.read_where("(AGASC_ID == {})".format(id))
if len(id_rows) > 1:
raise InconsistentCatalogError(
f"More than one entry found for {id} in AGASC"
)
if id_rows is None or len(id_rows) == 0:
raise IdNotFound(f"No entry found for {id} in AGASC")
rows.append(id_rows[0])
return rows
def _get_rows_read_entire(ids_1d, dates_1d, agasc_file):
with tables.open_file(agasc_file) as h5:
tbl = h5.root.data[:]
agasc_idx = {agasc_id: idx for idx, agasc_id in enumerate(tbl["AGASC_ID"])}
rows = []
for agasc_id, _date in zip(ids_1d, dates_1d):
if agasc_id not in agasc_idx:
raise IdNotFound(f"No entry found for {agasc_id} in AGASC")
rows.append(tbl[agasc_idx[agasc_id]])
return rows
[docs]
def get_stars(
ids: int | npt.ArrayLike,
agasc_file: Optional[str] = None,
dates: Optional[CxoTimeLike] = None,
method_threshold: int = 5000,
fix_color1: bool = True,
use_supplement: Optional[bool] = None,
) -> Table | Table.Row:
"""
Get AGASC catalog entries for star ``ids`` at ``dates``.
The input ``ids`` and ``dates`` are broadcast together for the output shape
(though note that the result is flattened in the end). If both are scalar
inputs then the output is a Table Row, otherwise the output is a Table.
This function has two possible methods for getting stars, either by using
the HDF5 ``tables.read_where()`` function to get one star at a time from the
HDF5 file, or by reading the entire table into memory and doing the search
by making a dict index by AGASC ID. Tests indicate that the latter is faster
for about 5000 or more stars, so this function will read the entire AGASC if
the number of stars is greater than ``method_threshold``.
Unlike the similar ``get_star`` function, this adds a ``DATE`` column
indicating the date at which the star coordinates (RA_PMCORR, DEC_PMCORR)
are computed.
{common_doc}
Example::
>>> import agasc
>>> star = agasc.get_stars(636629880)
>>> for name in star.colnames:
... print('{{:12s}} : {{}}'.format(name, star[name]))
AGASC_ID : 636629880
RA : 125.64184
DEC : -4.23235
POS_ERR : 300
POS_CATID : 6
EPOCH : 1983.0
PM_RA : -9999
PM_DEC : -9999
PM_CATID : 0
PLX : -9999
PLX_ERR : -9999
PLX_CATID : 0
MAG_ACA : 12.1160011292
MAG_ACA_ERR : 45
CLASS : 0
MAG : 13.2700004578
...
Parameters
----------
ids : int or array_like
AGASC ids (scalar or array)
agasc_file : str, optional
AGASC file (optional)
dates : CxoTimeLike, optional
Dates for proper motion (scalar or array) (default=Now)
method_threshold : int
Number of stars above which to use the "read_entire_agasc" method
fix_color1 : bool
set COLOR1=COLOR2 * 0.85 for stars with V-I color (default=True)
use_supplement : bool, optional
Use estimated mag from AGASC supplement where available (default=value of
AGASC_SUPPLEMENT_ENABLED env var, or True if not defined)
Returns
-------
astropy Table of AGASC entries, or Table Row of one entry for scalar input
"""
agasc_file = get_agasc_filename(agasc_file)
dates_in = DateTime(dates).date
dates_is_scalar = np.asarray(dates_in).shape == ()
ids, dates = np.broadcast_arrays(ids, dates_in)
ids_1d, dates_1d = np.atleast_1d(ids), np.atleast_1d(dates)
if len(ids_1d) < method_threshold:
rows = _get_rows_read_where(ids_1d, dates_1d, agasc_file)
method = "tables_read_where"
else:
rows = _get_rows_read_entire(ids_1d, dates_1d, agasc_file)
method = "read_entire_agasc"
t = Table(np.vstack(rows).flatten())
# Define a temporary attribute indicating get_stars method, mostly for testing
t.get_stars_method = method
add_pmcorr_columns(t, dates_in if dates_is_scalar else dates)
if fix_color1:
update_color1_column(t)
t["DATE"] = dates
update_from_supplement(t, use_supplement)
return t if ids.shape else t[0]
# Interpolate common docs into function docstrings. Using f-string interpolation in the
# docstring itself does not work.
for func in get_stars, get_star, get_agasc_cone:
func.__doc__ = func.__doc__.format(common_doc=COMMON_DOC)
get_agasc_filename.__doc__ = get_agasc_filename.__doc__.format(
common_agasc_file_doc=COMMON_AGASC_FILE_DOC
)
def update_from_supplement(stars, use_supplement=None):
"""Update mag, color1 and class information from AGASC supplement in ``stars``.
Stars with available mag estimates in the AGASC supplement are updated
in-place in the input ``stars`` Table:
- ``MAG_ACA`` and ``MAG_ACA_ERR`` are set according to the supplement.
- ``MAG_CATID`` (mag catalog ID) is set to ``MAG_CATID_SUPPLEMENT``.
- If COLOR1 is 0.7 or 1.5 then it is changed to 0.69 or 1.49 respectively.
Those particular values do not matter except that they are different from
the "status flag" values of 0.7 (no color => very unreliable mag estimate)
or 1.5 (red star => somewhat unreliable mag estimate) that have special
meaning based on deep heritage in the AGASC catalog itself.
Stars which are in the bad stars table are updated as follows:
- ``CLASS = BAD_CLASS_SUPPLEMENT``
Whether to actually apply the update is set by a combination of the
``use_supplement`` argument, which has priority, and the
``AGASC_SUPPLEMENT_ENABLED`` environment variable.
Parameters
----------
stars
astropy.table.Table of stars
use_supplement : bool, None
Use the supplement (default=None, see above)
"""
if use_supplement is None:
supplement_enabled_env = os.environ.get(
SUPPLEMENT_ENABLED_ENV, SUPPLEMENT_ENABLED_DEFAULT
)
if supplement_enabled_env not in ("True", "False"):
raise ValueError(
f'{SUPPLEMENT_ENABLED_ENV} env var must be either "True" or "False" '
f"got {supplement_enabled_env}"
)
supplement_enabled = supplement_enabled_env == "True"
else:
supplement_enabled = use_supplement
if not supplement_enabled or len(stars) == 0:
return
def set_star(star, name, value):
"""Set star[name] = value if ``name`` is a column in the table"""
try:
star[name] = value
except KeyError:
pass
# Get estimate mags and errs from supplement as a dict of dict
# agasc_id : {mag_aca: .., mag_aca_err: ..}.
supplement_mags = get_supplement_table("mags", agasc_dir=default_agasc_dir())
supplement_mags_index = supplement_mags.meta["index"]
# Get bad stars as {agasc_id: {source: ..}}
bad_stars = get_supplement_table("bad", agasc_dir=default_agasc_dir())
bad_stars_index = bad_stars.meta["index"]
for star in stars:
agasc_id = int(star["AGASC_ID"])
if agasc_id in supplement_mags_index:
idx = supplement_mags_index[agasc_id]
mag_est = supplement_mags["mag_aca"][idx]
mag_est_err = supplement_mags["mag_aca_err"][idx]
set_star(star, "MAG_ACA", mag_est)
# Mag err is stored as int16 in units of 0.01 mag. Use same convention here.
set_star(star, "MAG_ACA_ERR", round(mag_est_err * 100))
set_star(star, "MAG_CATID", MAG_CATID_SUPPLEMENT)
if "COLOR1" in stars.colnames:
color1 = star["COLOR1"]
if np.isclose(color1, 0.7) or np.isclose(color1, 1.5):
star["COLOR1"] = color1 - 0.01
if agasc_id in bad_stars_index:
set_star(star, "CLASS", BAD_CLASS_SUPPLEMENT)
def write_healpix_index_table(filename: str, healpix_index: Table, nside: int):
"""
Write a HEALPix index table to an HDF5 file.
Parameters
----------
filename : str
The path to the HDF5 file to write to.
healpix_index : astropy.table.Table
The HEALPix index table to write.
nside : int
The NSIDE parameter used to generate the HEALPix index.
Returns
-------
None
"""
healpix_index_np = healpix_index.as_array()
with tables.open_file(filename, mode="a") as h5:
h5.create_table("/", "healpix_index", healpix_index_np, title="HEALPix index")
h5.root.healpix_index.attrs["nside"] = nside
import numpy.typing as npt
TABLE_DTYPES: dict[str, npt.DTypeLike] = {
"AGASC_ID": np.int32,
"RA": np.float64,
"DEC": np.float64,
"POS_ERR": np.int16,
"POS_CATID": np.uint8,
"EPOCH": np.float32,
"PM_RA": np.int16,
"PM_DEC": np.int16,
"PM_CATID": np.uint8,
"PLX": np.int16,
"PLX_ERR": np.int16,
"PLX_CATID": np.uint8,
"MAG_ACA": np.float32,
"MAG_ACA_ERR": np.int16,
"CLASS": np.int16,
"MAG": np.float32,
"MAG_ERR": np.int16,
"MAG_BAND": np.int16,
"MAG_CATID": np.uint8,
"COLOR1": np.float32,
"COLOR1_ERR": np.int16,
"C1_CATID": np.uint8,
"COLOR2": np.float32,
"COLOR2_ERR": np.int16,
"C2_CATID": np.uint8,
"RSV1": np.float32,
"RSV2": np.int16,
"RSV3": np.uint8,
"VAR": np.int16,
"VAR_CATID": np.uint8,
"ASPQ1": np.int16,
"ASPQ2": np.int16,
"ASPQ3": np.int16,
"ACQQ1": np.int16,
"ACQQ2": np.int16,
"ACQQ3": np.int16,
"ACQQ4": np.int16,
"ACQQ5": np.int16,
"ACQQ6": np.int16,
"XREF_ID1": np.int32,
"XREF_ID2": np.int32,
"XREF_ID3": np.int32,
"XREF_ID4": np.int32,
"XREF_ID5": np.int32,
"RSV4": np.int16,
"RSV5": np.int16,
"RSV6": np.int16,
}
TABLE_DTYPE = np.dtype(list(TABLE_DTYPES.items()))
"""Standard dtypes for AGASC table."""
[docs]
class TableOrder(Enum):
"""
Enumeration type to specify the AGASC table ordering:
- TableOrder.NONE. The stars are not sorted.
- TableOrder.HEALPIX. The stars are sorted using a HEALPix index.
- TableOrder.DEC. The stars are sorted by declination.
"""
NONE = 1
DEC = 2
HEALPIX = 3
[docs]
def write_agasc(
filename: str,
stars: np.ndarray,
version: str,
nside=64,
order=TableOrder.HEALPIX,
full_agasc=True,
):
"""
Write AGASC stars to a new HDF5 file.
The table is coerced to the correct dtype if necessary (:any:`TABLE_DTYPE`).
Parameters
----------
filename : str
The path to the HDF5 file to write to.
stars : np.ndarray
The AGASC stars to write.
version : str
The AGASC version number. This sets an attribute in the table.
nside : int, optional
The HEALPix NSIDE parameter to use for the HEALPix index table.
Default is 64.
order : :any:`TableOrder`, optional
The order of the stars in the AGASC file (Default is TableOrder.HEALPIX).
The options are:
- TableOrder.HEALPIX. The stars are sorted using a HEALPix index.
- TableOrder.DEC. The stars are sorted by declination.
full_agasc : bool, optional
Whether writing a full AGASC table with all columns or a subset (normally
proseco). Default is True, in which case all AGASC columns are required.
"""
star_cols_set = set(stars.dtype.names)
table_dtypes_list = list(TABLE_DTYPES.items())
if stars.dtype != np.dtype(table_dtypes_list):
if disallowed_keys := star_cols_set - set(TABLE_DTYPES):
# Stars has keys that are not in allowed set
raise ValueError(f"stars has disallowed keys: {disallowed_keys}")
if full_agasc:
if missing_keys := set(TABLE_DTYPES) - star_cols_set:
# Stars is missing some keys in required set
raise ValueError(f"missing keys in stars: {missing_keys}")
stars_out_dtypes = table_dtypes_list
else:
# Allow for a subset of AGASC columns in stars (e.g. proseco_agasc) making
# sure to preserve standard AGASC column ordering and dtype.
stars_out_dtypes = [
(col, dtype) for col, dtype in table_dtypes_list if col in star_cols_set
]
# Coerce stars structured ndarray to the correct column ordering and dtype with
# a side trip through astropy Table.
cols = [col for col, _ in stars_out_dtypes]
stars = Table(stars)[cols].as_array().astype(stars_out_dtypes)
match order:
case TableOrder.DEC:
logger.info("Sorting on DEC column")
idx_sort = np.argsort(stars["DEC"])
case TableOrder.HEALPIX:
logger.info(
f"Creating healpix_index table for nside={nside} "
"and sorting by healpix index"
)
healpix_index, idx_sort = get_healpix_index_table(stars, nside)
stars = stars.take(idx_sort)
_write_agasc(filename, stars, version)
if order == TableOrder.HEALPIX:
write_healpix_index_table(filename, healpix_index, nside)
def _write_agasc(filename, stars, version):
"""Do the actual writing to HDF file."""
logger.info(f"Creating {filename}")
with tables.open_file(filename, mode="w") as h5:
data = h5.create_table("/", "data", stars, title=f"AGASC {version}")
data.attrs["version"] = version
data.flush()
logger.info(" Creating AGASC_ID index")
data.cols.AGASC_ID.create_csindex()
logger.info(f" Flush and close {filename}")
data.flush()
h5.flush()