#!/usr/local/cpython-3.6/bin/python3

# pylint: disable=global-statement,import-error,wrong-import-order

"""Run n shell commands m at a time."""

# prereqs:
# /usr/local/lib/bashquote.py - first added to tailor 2008-01-02

# Note that waitpid may decide a process has exited well before the corresponding pipe has been fully
# emptied.  For this reason, we may need to wait for the pipes to be drained before closing them.

# When does it make sense to rename a file?  After the process exits, or after the thread has terminated,
# or both?  It probably makes the most sense after the process exists, because that's when we have the
# information, and the thread can continue writing to it via the same file descriptor even after the
# rename.

# thread termination occurs when copy_to_stdout_and_file returns

# Licensing:
# Microsoft Public License (Ms-PL) Mon, 2007-10-15 19:23 [OSI Approved License]
#
# This license governs use of the accompanying software. If you use the software, you accept this license. If you do not accept the
# license, do not use the software.
#
# 1. Definitions
# The terms "reproduce," "reproduction," "derivative works," and "distribution" have the same meaning here as under U.S. copyright
# law.
# A "contribution" is the original software, or any additions or changes to the software.
# A "contributor" is any person that distributes its contribution under this license.
# "Licensed patents" are a contributor's patent claims that read directly on its contribution.
#
# 2. Grant of Rights

# (A) Copyright Grant- Subject to the terms of this license, including the license conditions and limitations in section 3, each
# contributor grants you a non-exclusive, worldwide, royalty-free copyright license to reproduce its contribution, prepare
# derivative works of its contribution, and distribute its contribution or any derivative works that you create.

# (B) Patent Grant- Subject to the terms of this license, including the license conditions and limitations in section 3, each
# contributor grants you a non-exclusive, worldwide, royalty-free license under its licensed patents to make, have made, use, sell,
# offer for sale, import, and/or otherwise dispose of its contribution in the software or derivative works of the contribution in
# the software.
#
# 3. Conditions and Limitations
# (A) No Trademark License- This license does not grant you rights to use any contributors' name, logo, or trademarks.

# (B) If you bring a patent claim against any contributor over patents that you claim are infringed by the software, your patent
# license from such contributor to the software ends automatically.

# (C) If you distribute any portion of the software, you must retain all copyright, patent, trademark, and attribution notices that
# are present in the software.

# (D) If you distribute any portion of the software in source code form, you may do so only under this license by including a
# complete copy of this license with your distribution. If you distribute any portion of the software in compiled or object code
# form, you may only do so under a license that complies with this license.

# (E) The software is licensed "as-is." You bear the risk of using it. The contributors give no express warranties, guarantees or
# conditions. You may have additional consumer rights under your local laws which this license cannot change. To the extent
# permitted under your local laws, the contributors exclude the implied warranties of merchantability, fitness for a particular
# purpose and non-infringement.

import os
import re
import sys
import stat
import time
import errno
import select
import shutil
import signal

try:
    import thread as thread_mod
except ImportError:
    import _thread as thread_mod  # pylint: disable=import-error

sys.path.insert(0, '/usr/local/lib')
sys.path.insert(0, os.path.expanduser('~/lib'))

import bashquote as bashquote_mod  # noqa # pylint: disable=wrong-import-position

# In this variable names:
# "SMS" stands for "Shared Mutable State".
# "NON_SMS" stands for "NonShared Mutable State".  However, these variables technically are SMS, they just don't have a global
#           statement to make them mutable in a callable

VERBOSITY = 0
DUMP_LINES_MODE = 'numeric'
DUMP_LINES_FROM_1 = 10
NON_SMS_TO_STDOUT = False
SMS_NUM_THREADS = 0
LOCK_NUM_THREADS = thread_mod.allocate_lock()
NON_SMS_THREAD_TIMEOUT = 1
STAGGER_TIME = 0.25
ABSOLUTE_T0 = time.time()
SMS_CURRENT_CONCURRENCY = 0
LOCK_CURRENT_CONCURRENCY = thread_mod.allocate_lock()
FORCE_CLEANUP = False


EOL = bytes('\n', 'ISO-8859-1')
NULL_STRING = bytes('', 'ISO-8859-1')


def make_used(variable):
    """Convince pymode/pyflakes that variable is "used"."""
    assert True or variable


def usage(retval):
    """Output a simple (albeit large) usage message."""
    # pylint: disable=too-many-statements,invalid-name
    w = sys.stderr.write
    w("Usage: %s\n" % sys.argv[0])
    w('\t--directory                             The directory to use for output files\n')
    w('\t--resolution n                          The frequency with which to check for subprocess\n')
    w('\t                                        termination\n')
    w('\t--exit-early                            Exit immediately upon detecting a subprocess that does\n')
    w('\t                                        not yield the expected value (depends on --or-statuses\n')
    w('\t                                        and --and-statuses)\n')
    w('\t--or-statuses                           Or the exit statuses of the subprocesses when computing exit status\n')
    w('\t--and-statuses                          And the exit statuses of the subprocesses when computing exit\n')
    w('\t                                        status (default)\n')
    w('\t--max-concurrency n                     Start <= n subprocesses concurrently (default: infinite)\n')
    w('\t--maxtime n                             Terminate processes that take longer than n seconds\n')
    w('\t--derive                                Derive a filename for output from the command run\n')
    w('\t--numbered                              Just use numbered filenames\n')
    w('\t--command cmd                           Add cmd to the list of subprocesses\n')
    w('\t--commands-file filename                Read commands, one per line, from filename\n')
    w('\t--commands cmd1 cmd2 ... cmdn           Add commands to list of subprocesses from command line\n')
    w('\t--labeled-command label:cmd             Add cmd to list of subprocesses.  Output in label\n')
    w('\t--labeled-commands-file filename        Read label:cmd pairs from filename.  cmd output\n')
    w('\t                                        in filename "label"\n')
    w('\t--labeled-commands                      label1:cmd1 label2:cmd2 ... labeln:cmdn.  cmdn output in labeln\n')
    w('\t--parameterized-command                 cmd \'parameter1 parameter2 ... parametern\'\n')
    w('\t                                        EG: \'ssh %s hostname\' \'master node1 node2\'\n')
    w('\t                                        output would be files master, node1 and node2\n')
    w('\t--ssh-command                           cmd \'hostname1 hostname2 ... hostnamen\'\n')
    w('\t                                        EG: \'hostname\' \'master node1 node2\'\n')
    w('\t                                        output would be files master, node1 and node2\n')
    w('\t--scp-command                           "srcfilename" "destfilename" "hostname1 hostname2 ... hostnamen"\n')
    w('\t                                        EG: "/etc/hosts" "/etc/hosts" "node1 node2"\n')
    w('\t                                        output filenames are always derived\n')
    w('\t--rsync1-command                        EG: filename "hostname1 hostname2 ... hostnamen"\n')
    w('\t--rsync-command                         EG: srcfn dstfn "hostname1 hostname2 ... hostnamen"\n')
    w('\t                                        Note that --rsync2-command is an alias for --rsync-command\n')
    w('\t--rsync3-command                        EG: rsyncopts srcfn dstfn "hostname1 hostname2 ... hostnamen"\n')
    w('\t--number-padding n                      Pad output file numbering to n characters for ls sorting\n')
    w('\t--no-unlink-desired                     Do not unlink output files with desired exit statuses (depends\n')
    w('\t                                        on --or-statuses and --and-statuses)\n')
    w('\t--stagger                               Pause for %4.2f seconds after each command start\n' % STAGGER_TIME)
    w('\t--stagger-time                          Specify how long to stagger process startup\n')
    w('\t--dump-lines number-of-lines            (defaults to %d)\n' % DUMP_LINES_FROM_1)
    w('\t--dump-nothing\n')
    w('\t--dump-entire\n')
    w('\t--force-cleanup                         Cleanup output directory, even if some commands did not exit affirmatively\n')
    w('\t--to-stdout\n')
    w('\t--verbosity levelno\n')
    w('\t-v\n')
    w('\t--help\n')
    w('\t--absolute-time                         output timestamps relative to program start (only in --to-stdout)\n')
    w('\t--relative-time                         output timestamps relative to subprogram start (only in --to-stdout)\n')
    w('\n')
    w('In --or-statuses mode, the exit status of this program will be the minimum of the\n')
    w('subprocesses\' exit statuses.\n')
    w('\n')
    w('Similarly, in --and-statuses mode (the default), the exit status of this program will\n')
    w('be the maximum of the subprocesses\' exit statuses.\n')
    w('\n')
    w('The exit status of a subprocess that timed out is always 254.\n')
    w('\n')
    w('Output is saved, one file per command (host), in the specified directory.\n')
    w('Output files are deleted if the exit status is what was desired (true or\n')
    w('false depending on --and-statuses or --or-statuses) by default - see\n')
    w('--no-unlink-desired.\n')
    w('\n')
    w('Don\'t try to --scp-command and --ssh-command in the same %s unless\n' % sys.argv[0])
    w('the --ssh-command does not depend on the --scp-command; they may be\n')
    w('run in parallel, so use two %s\'s.\n' % sys.argv[0])
    w('\n')
    w('Comparing --to-stdout and --not-to-stdout:\n')
    w('\t--to-stdout is a little slower than --not-to-stdout.\n')
    w('\t--to-stdout is not fully binary-safe.  --not-to-stdout is.\n')
    w('\t--to-stdout is more informative during a run.  --not-to-stdout is quieter.\n')
    w('\t--not-to-stdout is the default.\n')
    sys.exit(retval)


def coordinated_output(line, lock=None):
    """Write a line of output, but deal with concurrency issues using a lock."""
    if lock is not None:
        lock.acquire()
    os.write(1, line.rstrip(b'\n') + b'\n')
    if lock is not None:
        lock.release()


class Timeout(Exception):
    """An exception to raise on timeout."""

    pass


class ForEbadfResult(object):
    """Return an EBADF result."""

    # pylint: disable=too-few-public-methods
    def __init__(self, got_ebadf, result):
        """Initialize."""
        self.got_ebadf = got_ebadf
        self.result = result


def for_ebadf(command, function):
    """Deal with errno.EBADF."""
    # relevant_exceptions = (Error, OSError, IOError) # OSError and IOError definite, Error less so
    relevant_exceptions = (Exception, )
    try:
        result = function()
    except relevant_exceptions:
        err = sys.exc_info()[1]
        if err[0] == errno.EBADF:
            # this just means that the main thread realized the process died before we did
            command.close_pipes()
            return ForEbadfResult(True, result)
        else:
            raise
    return ForEbadfResult(False, result)


def readline_with_timeout(command, file_, terminator=EOL, timeout=0):
    """Read a line of input from file_, but allow for an optional timeout."""
    line_so_far = []
    while True:
        fer = for_ebadf(command, lambda: select.select([file_], [], [file_], timeout))
        if fer.got_ebadf:
            return
        (input_ready, output_ready, exception_ready) = fer.result
        if exception_ready:
            line_so_far = []
            raise IOError
        if output_ready:
            sys.stderr.write('%s: Internal error: Output ready in readline_with_timeout\n' % sys.argv[0])
            sys.exit(1)
        if input_ready:
            if hasattr(file_, 'read'):
                fer = for_ebadf(command, lambda: file_.read(1))
            else:
                fer = for_ebadf(command, lambda: os.read(file_, 1))
            if fer.got_ebadf:
                return
            character = fer.result
            if not character:
                raise EOFError
            line_so_far.append(character)
            if character == terminator:
                result = NULL_STRING.join(line_so_far)
                line_so_far = []
                yield result
        if not (input_ready or output_ready or exception_ready):
            raise Timeout


# quick bashquote
def quick_bashquote(string):
    """Quote a string for passing to bash."""
    bashquote = bashquote_mod.BashquoteBytes()
    bashquote.add(string)
    return bashquote.result()


def minutes(seconds):
    """Convert seconds to minutes."""
    return seconds/60.0


class Command(object):
    # pylint: disable=too-many-instance-attributes
    """A class to hold one command to be executed."""

    COMMAND_NUMBER = 0

    # pylint: disable=attribute-defined-outside-init,too-many-instance-attributes
    # attribute-defined-outside-init: We hasattr a lot
    def __init__(self, cmd, maxtime, local_to_stdout, lock, numbered=False, derived=False, named=False, name=b''):
        """Initialize."""
        # pylint: disable=too-many-arguments
        self.cmd = cmd
        self.quoted_cmd = quick_bashquote(cmd)
        self.maxtime = maxtime
        self.to_stdout = local_to_stdout
        self.lock = lock
        self.numbered = numbered
        self.derived = derived
        self.named = named
        self.pid = None
        if self.numbered + self.named + self.derived != 1:
            sys.stderr.write('%s: Error: Command.start must have exactly 1 of numbered, name or derive\n' % sys.argv[0])
            sys.exit(1)
        if self.numbered:
            self.name = b'%*d' % (NUMBER_PADDING, Command.COMMAND_NUMBER)
        elif self.named:
            self.name = b'%*d - %s' % (NUMBER_PADDING, Command.COMMAND_NUMBER, name)
        elif self.derived:
            self.name = b'%*d - %s' % (NUMBER_PADDING, Command.COMMAND_NUMBER, re.sub(b'[^a-zA-Z0-9]+', b'-', self.cmd))
        else:
            sys.stderr.write('%s: Eh?  warble frob\n' % sys.argv[1])
            sys.exit(1)
        Command.COMMAND_NUMBER += 1

    def start(self):
        """Start a command."""
        global SMS_NUM_THREADS

        def via(boolean):
            """Return a string describing how output is to be performed."""
            if boolean:
                return b'via thread'
            return b'via redirection'

        if VERBOSITY >= 2:
            coordinated_output(b'--> %s starting - writing to %s %s' %
                               (self.quoted_cmd, quick_bashquote(self.name), via(self.to_stdout)), lock=OUTPUT_LOCK)
        if self.to_stdout:
            # self.parent_out_fd, self.child_out_fd = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
            self.parent_receive, self.child_send = os.pipe()
            self.child_receive, self.parent_send = os.pipe()
        self.pid = os.fork()
        if self.pid == 0:
            # child process
            os.setpgid(0, 0)
            if self.to_stdout:
                # Must not close these before dup2()'ing to them, otherwise you get a silent error - but flush()'ing
                # is good
                sys.stdout.flush()
                sys.stderr.flush()
                os.dup2(self.child_receive, sys.stdin.fileno())
                os.dup2(self.child_send, sys.stdout.fileno())
                os.dup2(self.child_send, sys.stderr.fileno())
                os.execlp(b'/bin/sh', b'/bin/sh', b'-c', b'eval %s 2>&1' % quick_bashquote(self.cmd))
            else:
                os.execlp(b'/bin/sh', b'/bin/sh', b'-c', b'eval %s > %s/%s 2>&1' %
                          (quick_bashquote(self.cmd), quick_bashquote(DIRECTORY), quick_bashquote(self.name)))
        else:
            # parent process
            self.start_time = time.time()
            if self.to_stdout:
                with LOCK_NUM_THREADS:
                    SMS_NUM_THREADS += 1
                _unused = thread_mod.start_new_thread(self.copy_to_stdout_and_file, ('nothing',))
                make_used(_unused)

    def copy_to_stdout_and_file(self, _unused):
        # pylint: disable=too-many-branches
        """Copy output to stdout and a file."""
        global SMS_NUM_THREADS
        got_timeout = False
        relative_t0 = time.time()
        # only used in the parent process - and this function is run by a series of threads - one for each
        # subprocess.  This simplifies treating lines of output from the various child processes as
        # atomic units
        filename = os.path.join(DIRECTORY, self.name)
        file_output = open(filename, 'wb')
        # create a python file object from our file descriptor, and make it unbuffered
        gen = readline_with_timeout(self, self.parent_receive, timeout=NON_SMS_THREAD_TIMEOUT)
        while True:
            try:
                line = next(gen)
            except Timeout:
                if hasattr(self, 'exit_status'):
                    # we have an exit status, so the child process is dead (or perhaps we just tried
                    # unsuccessfully to kill it).  So let this thread die too
                    # Be careful with this decrement... It's probably fine, but it feels a little
                    # like something that could get messed up in maintenance phase
                    if self.exit_status == 254:
                        got_timeout = True
                    break
                else:
                    # We don't have an exit status, so go back for more input!
                    gen = readline_with_timeout(self, self.parent_receive, timeout=NON_SMS_THREAD_TIMEOUT)
            except EOFError:
                break
            except StopIteration:
                break
            else:
                time1 = time.time()
                file_output.write(line)
                file_output.flush()
                string = b'%s' % self.name
                # All this +'ing is a little slow
                if ABSOLUTE_TIME:
                    string += b' (abs: %.1fm)' % minutes(time1 - ABSOLUTE_T0)
                if RELATIVE_TIME:
                    string += b' (rel: %.1fm)' % minutes(time1 - relative_t0)
                string += b': %s' % line.rstrip(b'\n')
                coordinated_output(string, lock=OUTPUT_LOCK)
        coordinated_output(b'%s: Letting thread die, renaming file and closing file/pipes' % self.name, lock=OUTPUT_LOCK)
        self.ren_or_del_with_exit_status(natural=not got_timeout, timeout=got_timeout)
        self.close_pipes()
        file_output.close()
        with LOCK_NUM_THREADS:
            SMS_NUM_THREADS -= 1

    def close_pipes(self):
        """Close our pipes."""
        if hasattr(self, 'parent_receive'):
            os.close(self.parent_receive)
        if hasattr(self, 'child_receive'):
            os.close(self.child_receive)
        if hasattr(self, 'parent_send'):
            os.close(self.parent_send)
        if hasattr(self, 'child_send'):
            os.close(self.child_send)

    def ren_or_del_with_exit_status(self, natural=False, timeout=False):
        """If success, delete the file.  If failure, rename the file."""
        if natural + timeout != 1:
            sys.stderr.write('%s: ren_or_del_with_exit_status: natural + timeout != 1 (%d, %d)' % (sys.argv[0],
                                                                                                   natural, timeout))
            sys.exit(1)
        if natural:
            description = b'naturally'
        elif timeout:
            description = b'by timeout'
        else:
            sys.stderr.write('%s: ren_or_del_with_exit_status 2: natural + timeout != 1' % sys.argv[0])
            sys.exit(1)
        # compute these in case we need them in a moment
        # we won't always need them, but it's better than duplicating code
        old_name = self.name
        new_name = b'%s - %d' % (self.name, self.exit_status)
        if (AND_STATUSES and not self.exit_status or not AND_STATUSES and self.exit_status) and \
                UNLINK_DESIRED_STATUS_FILES:
            # delete the file and tell the user
            os.unlink(os.path.join(DIRECTORY, old_name))
            if VERBOSITY >= 2:
                coordinated_output(b'<-- %s finished %s - deleted outfile' % (self.quoted_cmd, description),
                                   lock=OUTPUT_LOCK)
        else:
            # rename the file and tell the user
            os.rename(os.path.join(DIRECTORY, old_name), os.path.join(DIRECTORY, new_name))
            self.name = new_name
            if VERBOSITY >= 2:
                coordinated_output(b'<-- %s finished %s - written to %s' %
                                   (self.quoted_cmd, description, quick_bashquote(self.name)), lock=OUTPUT_LOCK)

    def is_done(self):
        """Return True if this command finished."""
        # pylint: disable=too-many-branches
        pid, value = os.waitpid(self.pid, os.WNOHANG)
        if (pid, value) == (0, 0):
            difference = time.time() - self.start_time
            if difference > self.maxtime and self.maxtime >= 1e-5:
                if VERBOSITY >= 4:
                    coordinated_output(b'process %d (%s) has been running for %f and has a max of %f' %
                                       (self.pid, self.cmd, difference, self.maxtime), lock=OUTPUT_LOCK)
                killed = False
                for _unused_index in range(10):
                    os.kill(-self.pid, signal.SIGKILL)  # pylint: disable=invalid-unary-operand-type
                    time.sleep(RESOLUTION / 4.0)
                    pid, value = os.waitpid(self.pid, os.WNOHANG)
                    if (pid, value) == (0, 0):
                        # It's probably not dead yet - or at least we have no particular reason to believe it is
                        continue
                    else:
                        killed = True
                        break
                if killed:
                    pass
                else:
                    # Sheesh, the process group won't die.  Give up with those deadly signals pending.  The processes
                    # in the process group will probably become zombies when they die until this parent process exits.
                    # In maxtime(1), we fork an assassin process to kill the undead over and over - we're a little
                    # less aggressive (thorough) here with subprocess termination.
                    coordinated_output(b'%s: process group -%d would not die' % (sys.argv[0], self.pid), lock=OUTPUT_LOCK)
                self.exit_status = 254
                if not NON_SMS_TO_STDOUT:
                    self.ren_or_del_with_exit_status(natural=False, timeout=True)
                return True
            else:
                return False
        else:
            if value is None:
                self.exit_status = 0
            else:
                self.exit_status = value // 256
            if not NON_SMS_TO_STDOUT:
                self.ren_or_del_with_exit_status(natural=True, timeout=False)
            return True


COMMANDS = []
MAXTIME = -1.0
MAX_CONCURRENCY = -1
AND_STATUSES = True
RESOLUTION = 0.1
DIRECTORY = b''
EXIT_EARLY = False
NUMBERED = False
DERIVED = True
NUMBER_PADDING = 0
LOCK_NUMBER_OF_EXITS = thread_mod.allocate_lock()
SMS_NUMBER_OF_TRUE_EXITS = 0
SMS_NUMBER_OF_FALSE_EXITS = 0
UNLINK_DESIRED_STATUS_FILES = True
STAGGER = False
ABSOLUTE_TIME = False
RELATIVE_TIME = False
OUTPUT_LOCK = None


BYTES_ARGV = [os.fsencode(argument) for argument in sys.argv]


def read_lines(file_):
    """Read lines from file_, dealing with possible comments and blank lines as we go."""
    for line in file_:
        line2 = re.sub(b'#.*$', b'', line)
        line3 = line2.strip()
        if line3:
            yield line3


if not BYTES_ARGV[1:]:
    usage(0)

while BYTES_ARGV[1:]:  # noqa
    if BYTES_ARGV[1] in [b'--help', b'-h']:
        usage(0)
    elif BYTES_ARGV[1] == b'--no-unlink-desired':
        UNLINK_DESIRED_STATUS_FILES = False
    elif BYTES_ARGV[1] == b'--directory':
        DIRECTORY = BYTES_ARGV[2]
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--number-padding':
        NUMBER_PADDING = int(BYTES_ARGV[2])
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--resolution':
        RESOLUTION = float(BYTES_ARGV[2])
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--stagger':
        STAGGER = True
    elif BYTES_ARGV[1] == b'--stagger-time':
        STAGGER = True
        STAGGER_TIME = float(BYTES_ARGV[2])
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--exit-early':
        EXIT_EARLY = True
    elif BYTES_ARGV[1] == b'--or-statuses':
        AND_STATUSES = False
    elif BYTES_ARGV[1] == b'--and-statuses':
        AND_STATUSES = True
    elif BYTES_ARGV[1] == b'--max-concurrency':
        MAX_CONCURRENCY = int(BYTES_ARGV[2])
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--maxtime':
        MAXTIME = float(BYTES_ARGV[2])
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--derive':
        NUMBERED = False
        DERIVED = True
    elif BYTES_ARGV[1] == b'--numbered':
        NUMBERED = True
        DERIVED = False
    elif BYTES_ARGV[1] == b'--command':
        TEMP_COMMAND = \
            Command(BYTES_ARGV[2].rstrip(b'\n'), MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, numbered=NUMBERED, derived=DERIVED)
        COMMANDS.append(TEMP_COMMAND)
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--commands-file':
        with open(BYTES_ARGV[2], 'rb') as FILE:
            COMMANDS.extend([Command(x.rstrip(b'\n'), MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK,
                                     numbered=NUMBERED, derived=DERIVED) for x in read_lines(FILE)])
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--commands':
        COMMANDS.extend([Command(x, MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK,
                                 numbered=NUMBERED, derived=DERIVED) for x in BYTES_ARGV[2:]])
        del BYTES_ARGV[2:]
    elif BYTES_ARGV[1] == b'--labeled-command':
        PARTITIONED = BYTES_ARGV[2].partition(b':')
        COMMANDS.append(Command(PARTITIONED[2], MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, named=True, name=PARTITIONED[0]))
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--labeled-commands-file':
        with open(BYTES_ARGV[2], 'rb') as FILE:
            for LINE in read_lines(FILE):
                PARTITIONED = LINE.partition(b':')
                COMMANDS.append(Command(PARTITIONED[2].rstrip(b'\n'), MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK,
                                        named=True, name=PARTITIONED[0]))
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--labeled-commands':
        for arg in BYTES_ARGV[2:]:
            PARTITIONED = arg.partition(b':')
            COMMANDS.append(Command(PARTITIONED[2], MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, named=True, name=PARTITIONED[0]))
        del BYTES_ARGV[2:]
    elif BYTES_ARGV[1] == b'--parameterized-command':
        COMMAND_STRING = BYTES_ARGV[2]
        PARAMETERS = BYTES_ARGV[3].split()
        for parameter in PARAMETERS:
            comm = Command(COMMAND_STRING % parameter, MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, named=True, name=parameter)
            COMMANDS.append(comm)
        del BYTES_ARGV[1]
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--ssh-command':
        COMMAND_STRING_WITHIN_SSH = BYTES_ARGV[2]
        # Sun Nov  5 11:35:53 PST 2017: Tried adding a -t to help with commands left running on remote hosts, but it lead
        # to hangs.
        COMMAND_STRING = b'ssh %%s %s' % quick_bashquote(COMMAND_STRING_WITHIN_SSH)
        PARAMETERS = BYTES_ARGV[3].split()
        for parameter in PARAMETERS:
            comm = Command(COMMAND_STRING % parameter, MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, named=True, name=parameter)
            COMMANDS.append(comm)
        del BYTES_ARGV[1]
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--scp1-command':
        FILENAME = BYTES_ARGV[2]
        COMMAND_STRING = b'scp %s %%s:%s' % (FILENAME, FILENAME)
        PARAMETERS = BYTES_ARGV[3].split()
        for parameter in PARAMETERS:
            COMMANDS.append(Command(COMMAND_STRING % parameter, MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, derived=True))
        del BYTES_ARGV[1]
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--scp-command':
        SOURCE_FILENAME = BYTES_ARGV[2]
        DESTINATION_FILENAME = BYTES_ARGV[3]
        COMMAND_STRING = b'scp %s %%s:%s' % (SOURCE_FILENAME, DESTINATION_FILENAME)
        PARAMETERS = BYTES_ARGV[4].split()
        for parameter in PARAMETERS:
            COMMANDS.append(Command(COMMAND_STRING % parameter, MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, derived=True))
        del BYTES_ARGV[1]
        del BYTES_ARGV[1]
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--rsync1-command':
        FILENAME = BYTES_ARGV[2]
        COMMAND_STRING = b'rsync -apl %s %%s:%s' % (FILENAME, FILENAME)
        PARAMETERS = BYTES_ARGV[3].split()
        for parameter in PARAMETERS:
            COMMANDS.append(Command(COMMAND_STRING % parameter, MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, derived=True))
        del BYTES_ARGV[1]
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] in [b'--rsync-command', b'--rsync2-command']:
        SOURCE_FILENAME = BYTES_ARGV[2]
        DESTINATION_FILENAME = BYTES_ARGV[3]
        COMMAND_STRING = b'rsync -apl %s %%s:%s' % (SOURCE_FILENAME, DESTINATION_FILENAME)
        PARAMETERS = BYTES_ARGV[4].split()
        for parameter in PARAMETERS:
            COMMANDS.append(Command(COMMAND_STRING % parameter, MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, derived=True))
        del BYTES_ARGV[1]
        del BYTES_ARGV[1]
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--rsync3-command':
        OPTIONS = BYTES_ARGV[2]
        SOURCE_FILENAME = BYTES_ARGV[3]
        DESTINATION_FILENAME = BYTES_ARGV[4]
        COMMAND_STRING = b'rsync -apl %s %s %%s:%s' % (OPTIONS, SOURCE_FILENAME, DESTINATION_FILENAME)
        PARAMETERS = BYTES_ARGV[5].split()
        for parameter in PARAMETERS:
            COMMANDS.append(Command(COMMAND_STRING % parameter, MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, derived=True))
        del BYTES_ARGV[1]
        del BYTES_ARGV[1]
        del BYTES_ARGV[1]
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--dump-lines':
        DUMP_LINES_MODE = 'numeric'
        DUMP_LINES_FROM_1 = int(BYTES_ARGV[2])
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'--dump-entire':
        DUMP_LINES_MODE = 'all'
    elif BYTES_ARGV[1] == b'--dump-nothing':
        DUMP_LINES_MODE = 'nothing'
    elif BYTES_ARGV[1] == b'--force-cleanup':
        FORCE_CLEANUP = True
    elif BYTES_ARGV[1] == b'--to-stdout':
        NON_SMS_TO_STDOUT = True
        OUTPUT_LOCK = thread_mod.allocate_lock()
    elif BYTES_ARGV[1] == b'--not-to-stdout':
        NON_SMS_TO_STDOUT = False
        OUTPUT_LOCK = None
    elif BYTES_ARGV[1] == b'--verbosity':
        VERBOSITY = int(BYTES_ARGV[2])
        del BYTES_ARGV[1]
    elif BYTES_ARGV[1] == b'-v':
        VERBOSITY += 1
    elif BYTES_ARGV[1] == b'--absolute-time':
        ABSOLUTE_TIME = True
    elif BYTES_ARGV[1] == b'--relative-time':
        RELATIVE_TIME = True
    else:
        sys.stderr.write("%s: Illegal argument: %s\n" % (BYTES_ARGV[0], BYTES_ARGV[1]))
        usage(1)
    del BYTES_ARGV[1]

LOCK_OVERALL_RESULT = thread_mod.allocate_lock()
with LOCK_OVERALL_RESULT:
    if AND_STATUSES:
        # we and the values, so start out assuming True
        SMS_OVERALL_RESULT = 0
    else:
        # we or the values, so start out assuming False
        SMS_OVERALL_RESULT = 255

if DIRECTORY == b'':
    DIRECTORY = b'/tmp/looper-%d-%d' % (time.time(), os.getpid())
    coordinated_output(b'using directory %s' % DIRECTORY, lock=OUTPUT_LOCK)

if (ABSOLUTE_TIME or RELATIVE_TIME) and not NON_SMS_TO_STDOUT:
    sys.stderr.write('%s: Warning: --absolute-time and/or --relative-time specified, but not --to-stdout mode\n' % BYTES_ARGV[0])


# Lots of globals.  Sigh.  This started off just part of the main body
# of the program, and then I realized I needed to do this from two
# places.
def check_for_termination():
    """Check for completion of all commands."""
    time.sleep(RESOLUTION)
    global SMS_CURRENT_CONCURRENCY
    global SMS_NUMBER_OF_TRUE_EXITS
    global SMS_NUMBER_OF_FALSE_EXITS
    global SMS_OVERALL_RESULT
    if VERBOSITY >= 3:
        coordinated_output(b'checking for subprocess termination', lock=OUTPUT_LOCK)
    # Not the most pythonic loop, but it works pretty well and is a
    # common way of doing things in other languages.  It's this way
    # because we need to modify what we're iterating over, as we're
    # iterative over it.  OK, strictly speaking we could use two lists or
    # two deques or something, but... is that really any better?
    for active_command_no in range(len(ACTIVE_COMMANDS)-1, -1, -1):
        active_command = ACTIVE_COMMANDS[active_command_no]
        if active_command.is_done():
            # flip the logic
            with LOCK_NUMBER_OF_EXITS:
                if not active_command.exit_status:
                    SMS_NUMBER_OF_TRUE_EXITS += 1
                else:
                    SMS_NUMBER_OF_FALSE_EXITS += 1
            if AND_STATUSES:
                with LOCK_OVERALL_RESULT:
                    SMS_OVERALL_RESULT = max(active_command.exit_status, SMS_OVERALL_RESULT)
                    if EXIT_EARLY and SMS_OVERALL_RESULT:
                        coordinated_output(
                            b'%s: Terminating early because command %s exited false' % active_command,
                            lock=OUTPUT_LOCK,
                        )
                        sys.exit(SMS_OVERALL_RESULT)
            else:
                with LOCK_OVERALL_RESULT:
                    SMS_OVERALL_RESULT = min(active_command.exit_status, SMS_OVERALL_RESULT)
                    if EXIT_EARLY and not SMS_OVERALL_RESULT:
                        coordinated_output(
                            b'%s: Terminating early because command %s exited true' % active_command,
                            lock=OUTPUT_LOCK,
                        )
                        sys.exit(SMS_OVERALL_RESULT)
            del ACTIVE_COMMANDS[active_command_no]
            with LOCK_CURRENT_CONCURRENCY:
                SMS_CURRENT_CONCURRENCY -= 1
                if SMS_CURRENT_CONCURRENCY < 0:
                    sys.stderr.write('%s: Internal error: current_concurrency < 0\n' % BYTES_ARGV[0])
                    sys.exit(1)


try:
    STATBUF = os.stat(DIRECTORY)
except OSError:
    os.makedirs(DIRECTORY, 7*64 + 5*8 + 5)
else:
    if stat.S_ISDIR(STATBUF.st_mode):
        sys.stderr.write('%s: %s already exists and is a directory.  Continuing.\n' % (BYTES_ARGV[0], DIRECTORY))
    else:
        sys.stderr.write('%s: %s already exists and is not a directory.  Terminating.\n' % (BYTES_ARGV[0], DIRECTORY))
        sys.exit(1)

# lst.pop() is much faster than v=lst[0]; del lst[0]
COMMANDS.reverse()

ACTIVE_COMMANDS = []


def signal_handler(signum, frame):
    """Clean up processes and exit."""
    make_used(signum)
    make_used(frame)
    for active_command in ACTIVE_COMMANDS:
        if active_command.pid is not None:
            sys.stderr.write('Killing pid {}\n'.format(active_command.pid))
            os.kill(-active_command.pid, signal.SIGKILL)
    sys.exit(1)


for sig in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]:
    signal.signal(sig, signal_handler)

# we're pulling commands out of the "commands" list, and then putting
# them into the ACTIVE_COMMANDS list.
while COMMANDS:
    with LOCK_CURRENT_CONCURRENCY:
        while COMMANDS and (MAX_CONCURRENCY == -1 or SMS_CURRENT_CONCURRENCY < MAX_CONCURRENCY):
            INACTIVE_COMMAND = COMMANDS.pop()
            INACTIVE_COMMAND.start()
            if STAGGER:
                time.sleep(STAGGER_TIME)
            SMS_CURRENT_CONCURRENCY += 1
            ACTIVE_COMMANDS.append(INACTIVE_COMMAND)
    check_for_termination()  # beware - globals modified
while ACTIVE_COMMANDS:
    check_for_termination()  # beware - globals modified

# give the threads time to finish up - otherwise we may lose the end of some of the output streams
GLOBAL_FINISH_TIME_0 = time.time()
GLOBAL_MAX_THREAD_WAIT = NON_SMS_THREAD_TIMEOUT + 0.5
if NON_SMS_TO_STDOUT:
    while True:
        with LOCK_NUM_THREADS:
            if SMS_NUM_THREADS < 0:
                sys.stderr.write('%s: internal error: SMS_NUM_THREADS is less than 0\n' % BYTES_ARGV[0])
            elif SMS_NUM_THREADS == 0:
                break
            else:
                time.sleep(RESOLUTION)
                GLOBAL_FINISH_TIME_1 = time.time()
                GLOBAL_DIFFERENCE = GLOBAL_FINISH_TIME_1 - GLOBAL_FINISH_TIME_0
                if GLOBAL_DIFFERENCE >= GLOBAL_MAX_THREAD_WAIT:
                    if VERBOSITY >= 2:
                        coordinated_output(b"%d threads are not finished, but that's OK.  Finishing up" %
                                           SMS_NUM_THREADS, lock=OUTPUT_LOCK)
                    # Threads are not finished, but they're taking too long - give up on them. They've had
                    # plenty of time to finish by now unless we're on an extremely loaded system
                    break
            if VERBOSITY >= 2:
                coordinated_output(b'waiting for threads to finish: %d thread(s) remain(s), %3.1f seconds remains' %
                                   (SMS_NUM_THREADS, GLOBAL_MAX_THREAD_WAIT - GLOBAL_DIFFERENCE), lock=OUTPUT_LOCK)

if VERBOSITY >= 1:
    with LOCK_OVERALL_RESULT:
        coordinated_output(b'true exit statuses: %d' % SMS_NUMBER_OF_TRUE_EXITS, lock=OUTPUT_LOCK)
        coordinated_output(b'false exit statuses: %d' % SMS_NUMBER_OF_FALSE_EXITS, lock=OUTPUT_LOCK)

RESULT_FILE_REGEX = re.compile(b'^([0-9]+) - .* - ([0-9]+)$')


def regex_matches(regex, string):
    """If the regex matches string, return True."""
    match_obj = regex.match(string)
    if match_obj is None:
        return False
    return True


class ResultFile(object):
    """A class describing one result file."""

    # pylint: disable=too-few-public-methods
    def __init__(self, string):
        """Initialize."""
        self.string = string
        match = RESULT_FILE_REGEX.match(string)
        self.execution_order = int(match.group(1))
        self.exit_status = int(match.group(2))

    def __cmp__(self, other):
        """Compare the Python 2.x way."""
        # for Python 2
        # success < failure.  Aside from that we list in execution order
        if self.exit_status == 0 and other.exit_status != 0:
            return -1
        elif self.exit_status != 0 and other.exit_status == 0:
            return 1
        if self.execution_order < other.execution_order:
            return -1
        elif self.execution_order > other.execution_order:
            return 1
        return 0

    def __lt__(self, other):
        """< the Python 3.x way."""
        # for Python 3
        # success < failure.  Aside from that we list in execution order
        if self.exit_status == 0 and other.exit_status != 0:
            return True
        return self.execution_order < other.execution_order

    def __str__(self):
        """Convert self to string."""
        return str(self.string)

    __repr__ = __str__


if DUMP_LINES_MODE == 'all':
    FILENAMES = os.listdir(DIRECTORY)
    RESULT_FILES = [ResultFile(FILENAME) for FILENAME in FILENAMES if regex_matches(RESULT_FILE_REGEX, FILENAME)]
    RESULT_FILES.sort()
    for RESULT_FILE in RESULT_FILES:
        FULL_PATH = os.path.join(DIRECTORY, RESULT_FILE.string)
        sys.stdout.write('%s:\n' % FULL_PATH)
        with open(FULL_PATH, 'r') as FILE:
            for global_line in FILE:
                sys.stdout.write('\t%s' % global_line)
elif DUMP_LINES_MODE == 'nothing':
    pass
elif DUMP_LINES_MODE == 'numeric':
    FILENAMES = os.listdir(DIRECTORY)
    sys.stdout.write('%s filenames\n%s\n' % (len(FILENAMES), FILENAMES))
    RESULT_FILES = [ResultFile(FILENAME) for FILENAME in FILENAMES if regex_matches(RESULT_FILE_REGEX, FILENAME)]
    RESULT_FILES.sort()
    sys.stdout.write('%s result files\n%s\n' % (len(RESULT_FILES), RESULT_FILES))
    for RESULT_FILE in RESULT_FILES:
        full_path = os.path.join(DIRECTORY, RESULT_FILE.string)
        sys.stdout.write('%s:\n' % full_path)
        summary_file = open(full_path, 'r')
        lines_not_shown = 0
        lineno_from_0 = 0
        while True:
            global_line = summary_file.readline()
            if not global_line:
                break
            if lineno_from_0 < DUMP_LINES_FROM_1:
                sys.stdout.write('\t%s' % global_line)
            else:
                lines_not_shown += 1
            lineno_from_0 += 1
        if lines_not_shown == 0:
            sys.stdout.write('\t(all lines shown)\n')
        else:
            sys.stdout.write('\t(%d line(s) not shown)\n' % lines_not_shown)
else:
    raise ValueError('DUMP_LINES_MODE had an invalid value: {}'.format(DUMP_LINES_MODE))

sys.stdout.flush()
if not os.listdir(DIRECTORY):
    os.write(1, b'%s is empty - removing\n' % DIRECTORY)
    os.rmdir(DIRECTORY)
else:
    if FORCE_CLEANUP:
        os.write(1, b'%s is not empty, but cleanup forced by request of caller - removing\n' % DIRECTORY)
        shutil.rmtree(DIRECTORY)
    else:
        os.write(1, b'%s is not empty - please look it over\n' % DIRECTORY)

# invert the python result to get a shell (wait(), actually) result
sys.exit(SMS_OVERALL_RESULT)