# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
desitransfer.daemon
===================
Entry point for :command:`desi_transfer_daemon`.
"""
import datetime as dt
import hashlib
import importlib.resources as ir
import json
import logging
import os
import re
import shutil
import stat
import subprocess as sub
import time
import traceback
import requests
from argparse import ArgumentParser
from collections import namedtuple
from configparser import ConfigParser, ExtendedInterpolation
from logging.handlers import RotatingFileHandler, SMTPHandler
from socket import getfqdn
from tempfile import TemporaryFile
from desiutil.log import get_logger
from .common import dir_perm, file_perm, rsync, yesterday, empty_rsync, new_exposures, ensure_scratch
from .status import TransferStatus
from . import __version__ as dtVersion
log = None
[docs]def _options():
"""Parse command-line options for :command:`desi_transfer_daemon`.
Returns
-------
:class:`argparse.Namespace`
The parsed command-line options.
"""
desc = "Transfer DESI raw data files."
prsr = ArgumentParser(description=desc)
prsr.add_argument('-B', '--no-backup', action='store_false', dest='backup',
help="Skip NERSC HPSS backups.")
prsr.add_argument('-c', '--configuration', metavar='FILE',
help="Read configuration from FILE.")
prsr.add_argument('-d', '--debug', action='store_true',
help='Set log level to DEBUG.')
prsr.add_argument('-k', '--kill', metavar='FILE',
default=os.path.join(os.environ['HOME'], 'stop_desi_transfer'),
help="Exit the script when FILE is detected (default %(default)s).")
prsr.add_argument('-t', '--test', action='store_true',
help='Observe the actions of another data transfer script but do not make any changes.')
prsr.add_argument('-V', '--version', action='version',
version='%(prog)s {0}'.format(dtVersion))
return prsr.parse_args()
[docs]class TransferDaemon(object):
"""Manage data transfer configuration, options, and operations.
Parameters
----------
options : :class:`argparse.Namespace`
The parsed command-line options.
"""
_link_re = re.compile(r'[0-9]{8}/[0-9]{8}$')
_directory = namedtuple('_directory', 'source, staging, destination, hpss, checksum')
_default_configuration = os.path.join(str(ir.files('desitransfer')), 'data', 'desi_transfer_daemon.ini')
def __init__(self, options):
if options.configuration is None:
self._ini = self._default_configuration
else:
self._ini = options.configuration
self.test = options.test
self.tape = options.backup
self.conf = ConfigParser(defaults=os.environ, strict=True,
interpolation=ExtendedInterpolation(),
converters={'list': lambda x: x.split(','),
'dict': lambda x: dict([tuple(i.split(':')) for i in x.split(',')])})
files = self.conf.read(self._ini)
# assert files[0] == self._ini
self.sections = [s for s in self.conf.sections()
if s not in ('common', 'logging')]
self.directories = [self._directory(self.conf[s]['source'],
self.conf[s]['staging'],
self.conf[s]['destination'],
self.conf[s]['hpss'],
self.conf[s]['checksum_file'])
for s in self.sections]
self.scratch = ensure_scratch(self.conf['common'].getlist('temporary'))
self._configure_log(options.debug)
return
[docs] def transfer(self):
"""Loop over and transfer all configured directories.
"""
if self.checksum_lock():
return
for d in self.directories:
log.info('Looking for new data in %s.', d.source)
try:
self.directory(d)
except Exception as e:
log.critical("Exception detected in transfer of %s!\n\n%s",
d.source, traceback.format_exc())
[docs] def checksum_lock(self):
"""See if checksums are being computed at KPNO.
Returns
-------
:class:`bool`
``True`` if checksums are being computed.
"""
cmd = [self.conf['common']['ssh'], '-q', 'dts',
'/bin/ls', self.conf['common']['checksum_lock']]
log.debug(' '.join(cmd))
_, out, err = _popen(cmd)
if out:
log.info('Checksums are being computed at KPNO.')
return True
return False
[docs] def directory(self, d):
"""Data transfer operations for a single destination directory.
Parameters
----------
d : :func:`collections.namedtuple`
Configuration for the destination directory.
"""
status = TransferStatus(os.path.join(os.path.dirname(d.staging),
'status'))
#
# Find symlinks at KPNO.
#
cmd = [self.conf['common']['ssh'], '-q', 'dts',
'/bin/find', d.source, '-type', 'l']
log.debug(' '.join(cmd))
_, out, err = _popen(cmd)
links = sorted([x for x in out.split('\n') if x])
if links:
for link in links:
if self._link_re.search(link) is None:
log.warning("Malformed symlink detected: %s. Skipping.", link)
else:
self.exposure(d, link, status)
else:
log.warning('No links found, check connection.')
#
# Check for delayed files.
#
yst = yesterday()
now = int(dt.datetime.utcnow().strftime('%H'))
if now >= self.conf['common'].getint('catchup'):
self.catchup(d, yst, status)
#
# Are any nights eligible for backup?
#
if now >= self.conf['common'].getint('backup'):
s = self.backup(d, yst, status)
if s and self.tape:
log.debug("status.update('%s', 'all', 'backup')", yst)
status.update(yst, 'all', 'backup')
[docs] def hpss_status(self):
"""Check HPSS availability.
Returns
-------
:class:`bool`
``True`` if HPSS is available.
"""
try:
log.debug("requests.get('%s')", self.conf['common']['hpss_status'])
r = requests.get(self.conf['common']['hpss_status'])
except requests.exceptions.ConnectionError:
log.critical("Error while determining HPSS availability!")
return False
else:
try:
status = r.json()
except json.decoder.JSONDecodeError:
log.critical("Error while decoding HPSS status!")
return False
else:
return status['status'] == 'active'
[docs] def exposure(self, d, link, status):
"""Data transfer operations for a single exposure.
This method will unconditionally install an exposure directory
in the destination, regardless of any transfer or checksum errors.
Parameters
----------
d : :func:`collections.namedtuple`
Configuration for the destination directory.
link : :class:`str`
The exposure path.
status : :class:`desitransfer.status.TransferStatus`
The status object associated with `d`.
"""
exposure = os.path.basename(link)
night = os.path.basename(os.path.dirname(link))
staging_night = os.path.join(d.staging, night)
destination_night = os.path.join(d.destination, night)
staging_exposure = os.path.join(staging_night, exposure)
destination_exposure = os.path.join(destination_night, exposure)
#
# New night detected?
#
if not os.path.isdir(staging_night):
log.debug("os.makedirs('%s', exist_ok=True)", staging_night)
if not self.test:
os.makedirs(staging_night, exist_ok=True)
#
# Set up DESI_SPECTRO_DATA.
#
if not os.path.isdir(destination_night):
log.debug("os.makedirs('%s', exist_ok=True)", destination_night)
log.debug("os.chmod('%s', 0o%o)", destination_night, dir_perm)
if not self.test:
os.makedirs(destination_night, exist_ok=True)
os.chmod(destination_night, dir_perm)
#
# Has exposure already been transferred?
#
if not os.path.isdir(staging_exposure) and not os.path.isdir(destination_exposure):
cmd = rsync(os.path.join(d.source, night, exposure), staging_exposure)
log.debug(' '.join(cmd))
if self.test:
rsync_status = '0'
else:
rsync_status, out, err = _popen(cmd)
else:
log.debug('%s already transferred.', staging_exposure)
return
#
# Transfer complete.
#
if rsync_status == '0':
log.debug("status.update('%s', '%s', 'rsync')", night, exposure)
if not self.test:
status.update(night, exposure, 'rsync')
else:
log.critical('rsync problem (status = %s) detected for %s/%s, check logs!',
rsync_status, night, exposure)
log.error('rsync STDOUT = %s', out)
log.error('rsync STDERR = %s', err)
log.debug("status.update('%s', '%s', 'rsync', failure=True)", night, exposure)
status.update(night, exposure, 'rsync', failure=True)
#
# Check permissions.
#
log.debug("lock_directory('%s', %s)", staging_exposure, str(self.test))
lock_directory(staging_exposure, self.test)
#
# Verify checksums.
#
checksum_file = os.path.join(staging_exposure,
d.checksum.format(night=night, exposure=exposure))
self.checksum(checksum_file, status)
#
# Move data into DESI_SPECTRO_DATA.
#
if not os.path.isdir(destination_exposure):
log.debug("shutil.move('%s', '%s')", staging_exposure, destination_night)
if not self.test:
shutil.move(staging_exposure, destination_night)
[docs] def checksum(self, checksum_file, status):
"""Verify checksum associated with `checksum_file` and report status.
The status is reported via log messages and messages passed
to the `status` object, not via a return value.
Parameters
----------
checksum_file : :class:`str`
The checksum file.
status : :class:`desitransfer.status.TransferStatus`
The associated status object.
"""
exposure = os.path.basename(os.path.dirname(checksum_file))
night = os.path.basename(os.path.dirname(os.path.dirname(checksum_file)))
log.debug("verify_checksum('%s')", checksum_file)
if not self.test:
if os.path.exists(checksum_file):
checksum_status = verify_checksum(checksum_file)
#
# Did we pass checksums?
#
if checksum_status == "":
log.debug("status.update('%s', '%s', 'checksum')", night, exposure)
status.update(night, exposure, 'checksum')
else:
msg = "The following checksum error(s) detected for %s/%s:\n\n" + checksum_status
log.critical(msg, night, exposure)
log.debug("status.update('%s', '%s', 'checksum', failure=True)", night, exposure)
status.update(night, exposure, 'checksum', failure=True)
else:
log.critical("No checksum file for %s/%s!", night, exposure)
log.debug("status.update('%s', '%s', 'checksum', failure=True)", night, exposure)
status.update(night, exposure, 'checksum', failure=True)
[docs] def catchup(self, d, night, status, backup=False):
"""Do a "catch-up" transfer to catch delayed files in the morning, rather than at noon.
Parameters
----------
d : :func:`collections.namedtuple`
Configuration for the destination directory.
night : :class:`str`
Night to check.
status : :class:`desitransfer.status.TransferStatus`
The status object associated with `d`.
backup : :class:`bool`
If ``True``, this catch-up is happening immediately prior to tape backup.
Notes
-----
* 07:00 MST = 14:00 UTC.
* This script can do nothing about exposures that were never linked
into the DTS area at KPNO in the first place.
"""
if os.path.isdir(os.path.join(d.destination, night)):
ketchup_file = d.destination.replace('/', '_')
sync_file = os.path.join(self.scratch,
'ketchup_{0}_{1}.txt'.format(ketchup_file, night))
if self.test:
sync_file = sync_file.replace('.txt', '.test.txt')
if backup:
sync_file = sync_file.replace('ketchup', 'backup')
if os.path.exists(sync_file):
log.debug("%s detected, catch-up transfer is done.", sync_file)
else:
cmd = rsync(os.path.join(d.source, night),
os.path.join(d.destination, night), test=True)
log.debug(' '.join(cmd))
rsync_status, out, err = _popen(cmd)
with open(sync_file, 'w') as sf:
sf.write(out)
if empty_rsync(out):
log.info('No files appear to have changed in %s.', night)
else:
log.warning('New files detected in %s!', night)
rsync_night(d.source, d.destination, night, self.test)
#
# Re-check the checksums for exposures that changed.
#
e = new_exposures(out)
if len(e) == 0:
log.warning('No updated exposures in night %s detected.', night)
else:
for exposure in e:
checksum_file = os.path.join(os.path.join(d.destination, night, exposure),
d.checksum.format(night=night, exposure=exposure))
self.checksum(checksum_file, status)
else:
log.warning("No data from %s detected, skipping catch-up transfer.", night)
[docs] def backup(self, d, night, status):
"""Final sync and backup for a specific night.
Parameters
----------
d : :func:`collections.namedtuple`
Configuration for the destination directory.
night : :class:`str`
Night to check.
status : :class:`desitransfer.status.TransferStatus`
The status object associated with `d`.
Returns
-------
:class:`bool`
``True`` indicates the backup ran to completion and the
the transfer status should be updated to reflect that.
Notes
-----
* 12:00 MST = 19:00 UTC, plus one hour just to be safe, so after 20:00 UTC.
"""
if os.path.isdir(os.path.join(d.destination, night)):
hpss_file = d.hpss.replace('/', '_')
ls_file = os.path.join(self.scratch, hpss_file + '.txt')
if self.test:
ls_file = ls_file.replace('.txt', '.test.txt')
log.debug("os.remove('%s')", ls_file)
try:
os.remove(ls_file)
except FileNotFoundError:
log.debug("Failed to remove %s because it didn't exist. That's OK.", ls_file)
cmd = [os.path.join(self.conf['common']['hpss'], 'hsi'),
'-O', ls_file, 'ls', '-l', d.hpss]
if self.tape:
log.debug(' '.join(cmd))
_, out, err = _popen(cmd)
with open(ls_file) as ls_fileobj:
data = ls_fileobj.read()
backup_files = [ls_out.split()[-1] for ls_out in data.split('\n') if ls_out]
else:
backup_files = []
backup_file = hpss_file + '_' + night + '.tar'
#
# Both a .tar and a .tar.idx file should be present.
#
if backup_file in backup_files and backup_file + '.idx' in backup_files:
log.debug("Backup of %s already complete.", night)
return False
else:
self.catchup(d, night, status, backup=True)
#
# Final permission lock: remove user-write permission from directories.
#
for dirpath, dirnames, filenames in os.walk(os.path.join(d.destination, night)):
log.debug("os.chmod('%s', 0o%o)", dirpath, dir_perm ^ stat.S_IWUSR)
if not self.test:
os.chmod(dirpath, dir_perm ^ stat.S_IWUSR)
#
# Issue HTAR command.
#
if self.tape:
start_dir = os.getcwd()
log.debug("os.chdir('%s')", d.destination)
os.chdir(d.destination)
cmd = [os.path.join(self.conf['common']['hpss'], 'htar'),
'-cvhf', os.path.join(d.hpss, backup_file),
'-H', 'crc:verify=all',
night]
log.debug(' '.join(cmd))
if not self.test:
status, out, err = _popen(cmd)
if status != '0' or err:
msg = "HTAR Backup failed! Command was: {0}.".format(' '.join(cmd))
if err:
msg += "\nHTAR error message was: " + err
log.critical(msg)
log.debug("os.chdir('%s')", start_dir)
os.chdir(start_dir)
else:
log.info('Tape backup disabled by user request.')
return True
else:
log.warning("No data from %s detected, skipping HPSS backup.", night)
return False
[docs]def _popen(command):
"""Simple wrapper for :class:`subprocess.Popen` to avoid repeated code.
Parameters
----------
command : :class:`list`
Command to pass to :class:`subprocess.Popen`.
Returns
-------
:func:`tuple`
The returncode, standard output and standard error.
"""
with TemporaryFile() as tout, TemporaryFile() as terr:
p = sub.Popen(command, stdout=tout, stderr=terr)
p.wait()
tout.seek(0)
out = tout.read()
terr.seek(0)
err = terr.read()
return (str(p.returncode), out.decode('utf-8'), err.decode('utf-8'))
[docs]def verify_checksum(checksum_file):
"""Verify checksums supplied with the raw data.
Parameters
----------
checksum_file : :class:`str`
The checksum file.
Returns
-------
:class:`str`
An string that describes the errors encountered while verifying
the checksum. In addition to mismatches, there can be missing files,
extraneous files, etc. An empty string indicates no errors.
"""
with open(checksum_file) as c:
data = c.read()
#
# The trailing \n at the end of the file should make the length of
# lines equal to the length of files.
#
lines = data.split('\n')
d = os.path.dirname(checksum_file)
files = os.listdir(d)
errors = ""
n_lines = len(lines) - len(files)
if n_lines > 0:
log.error("%s lists %d file(s) that are not present!",
checksum_file, n_lines)
errors += "{0:d} file(s) listed but not downloaded.\n".format(n_lines)
if n_lines < 0:
log.error("%d files are not listed in %s!", -1 * n_lines, checksum_file)
errors += "{0:d} file(s) downloaded but not listed.\n".format(-1 * n_lines)
digest = dict([(cl.split()[1], cl.split()[0]) for cl in lines if cl])
for f in files:
ff = os.path.join(d, f)
if ff != checksum_file:
with open(ff, 'rb') as fp:
h = hashlib.sha256(fp.read()).hexdigest()
try:
hh = digest[f]
except KeyError:
hh = ''
log.error("%s does not appear in %s!", ff, checksum_file)
errors += "{0} not listed in checksum file.\n".format(f)
if hh == h:
log.debug("%s is valid.", ff)
elif hh == '':
pass
else:
log.error("Checksum mismatch for %s in %s!", ff, checksum_file)
errors += "{0} had a checksum mismatch.\n".format(f)
return errors
[docs]def unlock_directory(directory, test=False):
"""Set a directory and its contents user-writeable.
Parameters
----------
directory : :class:`str`
Directory to unlock.
test : :class:`bool`, optional
If ``True``, only print the commands.
"""
for dirpath, dirnames, filenames in os.walk(directory):
log.debug("os.chmod('%s', 0o%o)", dirpath, dir_perm | stat.S_IWUSR)
if not test:
os.chmod(dirpath, dir_perm | stat.S_IWUSR)
for f in filenames:
log.debug("os.chmod('%s', 0o%o)", os.path.join(dirpath, f),
file_perm | stat.S_IWUSR)
if not test:
os.chmod(os.path.join(dirpath, f), file_perm | stat.S_IWUSR)
[docs]def lock_directory(directory, test=False):
"""Set a directory and its contents read-only.
Parameters
----------
directory : :class:`str`
Directory to lock.
test : :class:`bool`, optional
If ``True``, only print the commands.
"""
for dirpath, dirnames, filenames in os.walk(directory):
log.debug("os.chmod('%s', 0o%o)", dirpath, dir_perm)
if not test:
os.chmod(dirpath, dir_perm)
for f in filenames:
log.debug("os.chmod('%s', 0o%o)", os.path.join(dirpath, f), file_perm)
if not test:
os.chmod(os.path.join(dirpath, f), file_perm)
[docs]def rsync_night(source, destination, night, test=False):
"""Run an rsync command on an entire `night`, for example, to pick up
delayed files.
Parameters
----------
source : :class:`str`
Source directory.
destination : :class:`str`
Destination directory.
night : :class:`str`
Night directory.
test : :class:`bool`, optional
If ``True``, only print the commands.
"""
#
# Unlock files.
#
unlock_directory(os.path.join(destination, night), test)
#
# Run rsync.
#
cmd = rsync(os.path.join(source, night),
os.path.join(destination, night))
log.debug(' '.join(cmd))
if test:
rsync_status, out, err = '0', '', ''
else:
rsync_status, out, err = _popen(cmd)
if rsync_status != '0':
log.critical('rsync problem (status = %s) detected on catch-up for %s, check logs!',
rsync_status, night)
log.error('rsync STDOUT = \n%s', out)
log.error('rsync STDERR = \n%s', err)
#
# Lock files.
#
lock_directory(os.path.join(destination, night), test)
[docs]def main():
"""Entry point for :command:`desi_transfer_daemon`.
Returns
-------
:class:`int`
An integer suitable for passing to :func:`sys.exit`.
"""
options = _options()
transfer = TransferDaemon(options)
sleep = transfer.conf['common'].getint('sleep')
while True:
log.info('Starting transfer loop; desitransfer version = %s.',
dtVersion)
if os.path.exists(options.kill):
log.info("%s detected, shutting down transfer daemon.",
options.kill)
return 0
transfer.transfer()
time.sleep(sleep * 60)
return 0