Source code for desitransfer.status

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
desitransfer.status
===================

Entry point for :command:`desi_transfer_status`.
"""
import importlib.resources as ir
import json
import os
import shutil
import time
from datetime import date
from argparse import ArgumentParser
from desiutil.log import log, DEBUG
from . import __version__ as dtVersion


[docs]class TransferStatus(object): """Simple object for interacting with desitransfer status reports. Parameters ---------- directory : :class:`str` Retrieve and store JSON-encoded transfer status data in `directory`. install : :class:`bool`, optional If ``True``, install HTML and JS files. year : :class:`str` or :class:`int` Update records belonging to `year`. If not set, the current year is assumed. """ def __init__(self, directory, install=False, year=None): self._stages = {'rsync': 0, 'checksum': 1, 'backup': 2} self.directory = directory self.status = dict() if year is None: self.current_year = str(date.today().year) else: self.current_year = str(year) self.first_year = "2018" self.json = os.path.join(self.directory, f'desi_transfer_status_{self.current_year}.json') if not os.path.exists(self.directory) or install: log.debug("os.makedirs('%s', exist_ok=True)", self.directory) os.makedirs(self.directory, exist_ok=True) for ext in ('html', 'js'): src = os.path.join(str(ir.files('desitransfer')), 'data', 'desi_transfer_status.' + ext) if ext == 'html': log.debug("shutil.copyfile('%s', '%s')", src, os.path.join(self.directory, 'index.html')) shutil.copyfile(src, os.path.join(self.directory, 'index.html')) else: log.debug("shutil.copy('%s', '%s')", src, self.directory) shutil.copy(src, self.directory) try: with open(self.json) as j: try: self.status = json.load(j) except json.JSONDecodeError: self._handle_malformed() except FileNotFoundError: pass return
[docs] def _handle_malformed(self): """Handle malformed JSON files. This function will save the malformed file to a .bad file for later analysis, and write an empty array to a new status file. """ bad = self.json + '.bad' log.error("Malformed JSON file detected: %s; saving original file as %s.", self.json, bad) log.debug("shutil.copy2('%s', '%s')", self.json, bad) shutil.copy2(self.json, bad) log.info("Writing empty array to %s.", self.json) with open(self.json, 'w') as j: j.write('{}') return
[docs] def update(self, night, exposure, stage, failure=False): """Update the transfer status. Parameters ---------- night : :class:`str` Night of observation. exposure : :class:`str` Exposure number. stage : :class:`str` Stage of data transfer ('rsync', 'checksum', 'backup', ...). failure : :class:`bool`, optional Indicate failure. Returns ------- :class:`int` The number of updates performed. """ ts = int(time.time() * 1000) # Convert to milliseconds for JS. success = not failure row = [self._stages[stage], int(success), ts] if exposure == 'all': rows = list() for expid in self.status[night]: log.debug("self.status['%s']['%s'].insert(0, [%d, %d, %d])", night, expid, row[0], row[1], row[2]) self.status[night][expid].insert(0, row) rows.append(row) else: expid = str(int(exposure)) if night not in self.status: log.debug("self.status['%s'] = {'%s': []}", night, expid) self.status[night] = {expid: []} log.debug("il = self.find('%s', '%s', '%s')", night, expid, stage) il = self.find(night, expid, stage) if il: old_row = self.status[night][expid][il[0]] log.debug("self.status['%s']['%s'][%d] = [%d, %d, %d]", night, expid, il[0], old_row[0], old_row[1], old_row[2]) update = (ts >= old_row[2]) and (int(success) != old_row[1]) if update: log.debug("self.status['%s']['%s'][%d] = [%d, %d, %d]", night, expid, il[0], row[0], row[1], row[2]) self.status[night][expid][il[0]] = row rows = [] else: # # Rare edge case: daemon is in shadow/test mode and there # are untransferred files. # return 0 else: try: log.debug("self.status['%s']['%s'].insert(0, [%d, %d, %d])", night, expid, row[0], row[1], row[2]) self.status[night][expid].insert(0, row) except KeyError: log.debug("self.status['%s']['%s'] = [%d, %d, %d]", night, expid, row[0], row[1], row[2]) self.status[night][expid] = [row] rows = [row, ] # # Copy the original file before modifying. # This will overwrite any existing .bak file # log.debug("shutil.copy2('%s', '%s')", self.json, self.json + '.bak') try: shutil.copy2(self.json, self.json + '.bak') except FileNotFoundError: pass with open(self.json, 'w') as j: json.dump(self.status, j, indent=None, separators=(',', ':')) r = len(rows) if r == 0: return 1 return r
[docs] def find(self, night, exposure=None, stage=None): """Find status entries that match `night`, etc. Parameters ---------- night : :class:`str` Night of observation. exposure : :class:`str`, optional Exposure number. stage : :class:`str`, optional Stage of data transfer ('rsync', 'checksum', 'backup', ...). Returns ------- :class:`list` or class:`dict` If only `night` is set, return a :class:`dict` containing information on all exposures for that `night`. If `exposure` is not set, return a :class:`dict` keyed by exposure containing all data matching `stage` for that night. If `stage` is not set, return a :class:`list` containing *indexes* pointing to all data about that exposure. If both `exposure` and `stage` are set, return a :class:`list` of *indexes* pointing to the data for `exposure` filtered on `stage`. Raises ------ :exc:`KeyError` If `night` is not yet defined. """ if exposure is None and stage is None: try: return self.status[night] except KeyError: raise KeyError(f"Undefined night = '{night}'!") elif exposure is None: e = dict() for expid in self.status[night]: e[expid] = [k for k, r in enumerate(self.status[night][expid]) if r[0] == self._stages[stage]] return e elif stage is None: try: log.debug("e = self.status['%s']['%s']", night, exposure) e = self.status[night][exposure] except KeyError: log.debug("e = self.status['%s']['%s'] = list()", night, exposure) e = self.status[night][exposure] = list() return e else: try: r = [k for k, r in enumerate(self.status[night][exposure]) if r[0] == self._stages[stage]] except KeyError: r = list() return r
[docs]def _options(): """Parse command-line options for :command:`desi_transfer_status`. Returns ------- :class:`argparse.Namespace` The parsed command-line options. """ desc = 'Update the status of DESI raw data transfers.' prsr = ArgumentParser(description=desc) prsr.add_argument('-d', '--directory', dest='directory', metavar='DIR', default=os.path.join(os.environ['DESI_ROOT'], 'spectro', 'staging', 'status'), help="Install and update files in DIR (default %(default)s).") prsr.add_argument('-f', '--failure', action='store_true', dest='failure', help='Indicate that the transfer failed somehow.') prsr.add_argument('-i', '--install', action='store_true', dest='install', help='Ensure that HTML and related files are in place.') prsr.add_argument('-V', '--version', action='version', version='%(prog)s {0}'.format(dtVersion)) prsr.add_argument('-v', '--verbose', action='store_true', help='Print debugging information.') prsr.add_argument('night', type=int, metavar='YYYYMMDD', help="Night of observation.") prsr.add_argument('expid', metavar='EXPID', help="Exposure number, or 'all'.") prsr.add_argument('stage', choices=['rsync', 'checksum', 'backup'], help="Transfer stage.") return prsr.parse_args()
[docs]def main(): """Entry point for :command:`desi_transfer_status`. Returns ------- :class:`int` An integer suitable for passing to :func:`sys.exit`. """ global log options = _options() if options.verbose: log.setLevel(DEBUG) log.debug("st = TransferStatus('%s', install=%s, year='%s')", options.directory, options.install, str(options.night)[0:4]) st = TransferStatus(options.directory, install=options.install, year=str(options.night)[0:4]) log.debug("st.update('%s', '%s', '%s', %s)", str(options.night), options.expid, options.stage, options.failure) st.update(str(options.night), options.expid, options.stage, options.failure) return 0
# import os # import json # with open('desi_transfer_status.json') as j: # data = json.load(j) # # statuses = {'rsync': 0, 'checksum': 1, 'backup': 2} # for year in ('2022', '2021', '2020', '2019', '2018', '2017'): # nights = dict() # for row in data: # if str(row[0]).startswith(year): # if row[0] in nights: # if row[1] in nights[row[0]]: # nights[row[0]][row[1]].append([statuses[row[2]], int(row[3]), row[5]]) # else: # nights[row[0]][row[1]] = [[statuses[row[2]], int(row[3]), row[5]]] # else: # nights[row[0]] = {row[1]: [[statuses[row[2]], int(row[3]), row[5]]]} # with open(f'nights_{year}.json', 'w') as j: # json.dump(nights, j, indent=None, separators=(',', ':'))