Source code for aca_view.data.fetch
#!/usr/bin/env python
"""
Utility function to run the ACA-view data service in standalone mode.
This is NOT called from aca_view, but it accomplishes the same.
"""
import logging
from PyQt5 import QtCore as QtC
from aca_view.aca_telemetry_timeline import AcaTelemetryTimeline
from aca_view.data import config as ds_config
from aca_view.data import multiprocess_service
from aca_view.logging import LoggingSetup
LOGGER = logging.getLogger("aca_view")
_APP = None
def get_app():
"""
Get the singleton Qt application (there can be only one).
"""
global _APP # noqa: PLW0603
if _APP is None:
_APP = QtC.QCoreApplication([])
return _APP
def low_level_fetch(settings, callback=None):
"""
Start the data fetching in a Qt thread and return it when it is done.
.. warning::
This function creates its own Qt App, therefore it cannot be called from aca_view.
"""
intermediate_results = []
# The one global holding all batches of data that are fetched
def update(data):
"""
Append batch to global data. This is called repeatedly for each batch of data.
"""
LOGGER.info(f'Got {len(data["common_timeline"])} rows')
intermediate_results.append(data)
app = get_app()
thread = QtC.QThread()
thread.finished.connect(app.exit)
worker = multiprocess_service.MultiProcessWorker(settings)
worker.moveToThread(thread)
thread.started.connect(worker.start)
worker.update_timeline.connect(update)
if callback is not None:
worker.update_timeline.connect(callback)
worker.finished.connect(thread.quit)
thread.start()
if app.exec_():
raise RuntimeError("There was some error fetching.")
timeline = AcaTelemetryTimeline()
for r in intermediate_results:
timeline.append(r)
return timeline
[docs]
def fetch(
obsid=None,
start=None,
stop=None,
filenames=None,
mica=None,
maude=None,
maude_channel="FLIGHT",
multi_process=True,
ska_data_sources=None,
real_time=None,
real_time_offset=0,
max_workers=None,
):
"""
Utility function to run the ACA-view data services in standalone mode (no GUI).
This is NOT called from aca_view, but it is equivalent. It is intended as a way to test data
utilities without having to start aca_view.
:param obsid: int
The observation ID (optional)
:param start: CxoTime
Get data after this time (optional)
:param stop: CxoTime
Get data before this time (optional)
:param filenames: list
List of filenames (optional)
:param mica: bool
Flag to enable mica (if neither MAUDE nor mica are enabled explicitly, both are enabled)
:param maude: bool
Flag to enable MAUDE (if neither MAUDE nor mica are enabled explicitly, both are enabled)
:param maude_channel: str
MAUDE channel (default: "FLIGHT")
:param multi_process: bool
Flag to enable/disable fetching in multiple processes
:param real_time: bool
Flag to enable real-time mode
:param real_time_offset: int
Time offset between 'now' and the time to query MAUDE
(for testing real--time-like behavior with old data)
:param max_workers: int
Maximum number of workers in multi-process mode
"""
ska_data_sources = (
["local", "ska_api"] if ska_data_sources is None else ska_data_sources
)
with LoggingSetup(): # this configures logging from aca_view.config.SETTINGS
settings = ds_config.validate_config(
obsid=obsid,
start=start,
stop=stop,
filenames=filenames,
mica=mica,
maude=maude,
maude_channel=maude_channel,
multi_process=multi_process,
real_time=real_time,
real_time_offset=real_time_offset,
max_workers=max_workers,
ska_data_sources=ska_data_sources,
)
return low_level_fetch(settings)