#!/usr/local/cpython-3.6/bin/python3 # pylint: disable=global-statement,import-error,wrong-import-order """Run n shell commands m at a time.""" # prereqs: # /usr/local/lib/bashquote.py - first added to tailor 2008-01-02 # Note that waitpid may decide a process has exited well before the corresponding pipe has been fully # emptied. For this reason, we may need to wait for the pipes to be drained before closing them. # When does it make sense to rename a file? After the process exits, or after the thread has terminated, # or both? It probably makes the most sense after the process exists, because that's when we have the # information, and the thread can continue writing to it via the same file descriptor even after the # rename. # thread termination occurs when copy_to_stdout_and_file returns # Licensing: # Microsoft Public License (Ms-PL) Mon, 2007-10-15 19:23 [OSI Approved License] # # This license governs use of the accompanying software. If you use the software, you accept this license. If you do not accept the # license, do not use the software. # # 1. Definitions # The terms "reproduce," "reproduction," "derivative works," and "distribution" have the same meaning here as under U.S. copyright # law. # A "contribution" is the original software, or any additions or changes to the software. # A "contributor" is any person that distributes its contribution under this license. # "Licensed patents" are a contributor's patent claims that read directly on its contribution. # # 2. Grant of Rights # (A) Copyright Grant- Subject to the terms of this license, including the license conditions and limitations in section 3, each # contributor grants you a non-exclusive, worldwide, royalty-free copyright license to reproduce its contribution, prepare # derivative works of its contribution, and distribute its contribution or any derivative works that you create. # (B) Patent Grant- Subject to the terms of this license, including the license conditions and limitations in section 3, each # contributor grants you a non-exclusive, worldwide, royalty-free license under its licensed patents to make, have made, use, sell, # offer for sale, import, and/or otherwise dispose of its contribution in the software or derivative works of the contribution in # the software. # # 3. Conditions and Limitations # (A) No Trademark License- This license does not grant you rights to use any contributors' name, logo, or trademarks. # (B) If you bring a patent claim against any contributor over patents that you claim are infringed by the software, your patent # license from such contributor to the software ends automatically. # (C) If you distribute any portion of the software, you must retain all copyright, patent, trademark, and attribution notices that # are present in the software. # (D) If you distribute any portion of the software in source code form, you may do so only under this license by including a # complete copy of this license with your distribution. If you distribute any portion of the software in compiled or object code # form, you may only do so under a license that complies with this license. # (E) The software is licensed "as-is." You bear the risk of using it. The contributors give no express warranties, guarantees or # conditions. You may have additional consumer rights under your local laws which this license cannot change. To the extent # permitted under your local laws, the contributors exclude the implied warranties of merchantability, fitness for a particular # purpose and non-infringement. import os import re import sys import stat import time import errno import select import shutil import signal try: import thread as thread_mod except ImportError: import _thread as thread_mod # pylint: disable=import-error sys.path.insert(0, '/usr/local/lib') sys.path.insert(0, os.path.expanduser('~/lib')) import bashquote as bashquote_mod # noqa # pylint: disable=wrong-import-position # In this variable names: # "SMS" stands for "Shared Mutable State". # "NON_SMS" stands for "NonShared Mutable State". However, these variables technically are SMS, they just don't have a global # statement to make them mutable in a callable VERBOSITY = 0 DUMP_LINES_MODE = 'numeric' DUMP_LINES_FROM_1 = 10 NON_SMS_TO_STDOUT = False SMS_NUM_THREADS = 0 LOCK_NUM_THREADS = thread_mod.allocate_lock() NON_SMS_THREAD_TIMEOUT = 1 STAGGER_TIME = 0.25 ABSOLUTE_T0 = time.time() SMS_CURRENT_CONCURRENCY = 0 LOCK_CURRENT_CONCURRENCY = thread_mod.allocate_lock() FORCE_CLEANUP = False EOL = bytes('\n', 'ISO-8859-1') NULL_STRING = bytes('', 'ISO-8859-1') def make_used(variable): """Convince pymode/pyflakes that variable is "used".""" assert True or variable def usage(retval): """Output a simple (albeit large) usage message.""" # pylint: disable=too-many-statements,invalid-name w = sys.stderr.write w("Usage: %s\n" % sys.argv[0]) w('\t--directory The directory to use for output files\n') w('\t--resolution n The frequency with which to check for subprocess\n') w('\t termination\n') w('\t--exit-early Exit immediately upon detecting a subprocess that does\n') w('\t not yield the expected value (depends on --or-statuses\n') w('\t and --and-statuses)\n') w('\t--or-statuses Or the exit statuses of the subprocesses when computing exit status\n') w('\t--and-statuses And the exit statuses of the subprocesses when computing exit\n') w('\t status (default)\n') w('\t--max-concurrency n Start <= n subprocesses concurrently (default: infinite)\n') w('\t--maxtime n Terminate processes that take longer than n seconds\n') w('\t--derive Derive a filename for output from the command run\n') w('\t--numbered Just use numbered filenames\n') w('\t--command cmd Add cmd to the list of subprocesses\n') w('\t--commands-file filename Read commands, one per line, from filename\n') w('\t--commands cmd1 cmd2 ... cmdn Add commands to list of subprocesses from command line\n') w('\t--labeled-command label:cmd Add cmd to list of subprocesses. Output in label\n') w('\t--labeled-commands-file filename Read label:cmd pairs from filename. cmd output\n') w('\t in filename "label"\n') w('\t--labeled-commands label1:cmd1 label2:cmd2 ... labeln:cmdn. cmdn output in labeln\n') w('\t--parameterized-command cmd \'parameter1 parameter2 ... parametern\'\n') w('\t EG: \'ssh %s hostname\' \'master node1 node2\'\n') w('\t output would be files master, node1 and node2\n') w('\t--ssh-command cmd \'hostname1 hostname2 ... hostnamen\'\n') w('\t EG: \'hostname\' \'master node1 node2\'\n') w('\t output would be files master, node1 and node2\n') w('\t--scp-command "srcfilename" "destfilename" "hostname1 hostname2 ... hostnamen"\n') w('\t EG: "/etc/hosts" "/etc/hosts" "node1 node2"\n') w('\t output filenames are always derived\n') w('\t--rsync1-command EG: filename "hostname1 hostname2 ... hostnamen"\n') w('\t--rsync-command EG: srcfn dstfn "hostname1 hostname2 ... hostnamen"\n') w('\t Note that --rsync2-command is an alias for --rsync-command\n') w('\t--rsync3-command EG: rsyncopts srcfn dstfn "hostname1 hostname2 ... hostnamen"\n') w('\t--number-padding n Pad output file numbering to n characters for ls sorting\n') w('\t--no-unlink-desired Do not unlink output files with desired exit statuses (depends\n') w('\t on --or-statuses and --and-statuses)\n') w('\t--stagger Pause for %4.2f seconds after each command start\n' % STAGGER_TIME) w('\t--stagger-time Specify how long to stagger process startup\n') w('\t--dump-lines number-of-lines (defaults to %d)\n' % DUMP_LINES_FROM_1) w('\t--dump-nothing\n') w('\t--dump-entire\n') w('\t--force-cleanup Cleanup output directory, even if some commands did not exit affirmatively\n') w('\t--to-stdout\n') w('\t--verbosity levelno\n') w('\t-v\n') w('\t--help\n') w('\t--absolute-time output timestamps relative to program start (only in --to-stdout)\n') w('\t--relative-time output timestamps relative to subprogram start (only in --to-stdout)\n') w('\n') w('In --or-statuses mode, the exit status of this program will be the minimum of the\n') w('subprocesses\' exit statuses.\n') w('\n') w('Similarly, in --and-statuses mode (the default), the exit status of this program will\n') w('be the maximum of the subprocesses\' exit statuses.\n') w('\n') w('The exit status of a subprocess that timed out is always 254.\n') w('\n') w('Output is saved, one file per command (host), in the specified directory.\n') w('Output files are deleted if the exit status is what was desired (true or\n') w('false depending on --and-statuses or --or-statuses) by default - see\n') w('--no-unlink-desired.\n') w('\n') w('Don\'t try to --scp-command and --ssh-command in the same %s unless\n' % sys.argv[0]) w('the --ssh-command does not depend on the --scp-command; they may be\n') w('run in parallel, so use two %s\'s.\n' % sys.argv[0]) w('\n') w('Comparing --to-stdout and --not-to-stdout:\n') w('\t--to-stdout is a little slower than --not-to-stdout.\n') w('\t--to-stdout is not fully binary-safe. --not-to-stdout is.\n') w('\t--to-stdout is more informative during a run. --not-to-stdout is quieter.\n') w('\t--not-to-stdout is the default.\n') sys.exit(retval) def coordinated_output(line, lock=None): """Write a line of output, but deal with concurrency issues using a lock.""" if lock is not None: lock.acquire() os.write(1, line.rstrip(b'\n') + b'\n') if lock is not None: lock.release() class Timeout(Exception): """An exception to raise on timeout.""" pass class ForEbadfResult(object): """Return an EBADF result.""" # pylint: disable=too-few-public-methods def __init__(self, got_ebadf, result): """Initialize.""" self.got_ebadf = got_ebadf self.result = result def for_ebadf(command, function): """Deal with errno.EBADF.""" # relevant_exceptions = (Error, OSError, IOError) # OSError and IOError definite, Error less so relevant_exceptions = (Exception, ) try: result = function() except relevant_exceptions: err = sys.exc_info()[1] if err[0] == errno.EBADF: # this just means that the main thread realized the process died before we did command.close_pipes() return ForEbadfResult(True, result) else: raise return ForEbadfResult(False, result) def readline_with_timeout(command, file_, terminator=EOL, timeout=0): """Read a line of input from file_, but allow for an optional timeout.""" line_so_far = [] while True: fer = for_ebadf(command, lambda: select.select([file_], [], [file_], timeout)) if fer.got_ebadf: return (input_ready, output_ready, exception_ready) = fer.result if exception_ready: line_so_far = [] raise IOError if output_ready: sys.stderr.write('%s: Internal error: Output ready in readline_with_timeout\n' % sys.argv[0]) sys.exit(1) if input_ready: if hasattr(file_, 'read'): fer = for_ebadf(command, lambda: file_.read(1)) else: fer = for_ebadf(command, lambda: os.read(file_, 1)) if fer.got_ebadf: return character = fer.result if not character: raise EOFError line_so_far.append(character) if character == terminator: result = NULL_STRING.join(line_so_far) line_so_far = [] yield result if not (input_ready or output_ready or exception_ready): raise Timeout # quick bashquote def quick_bashquote(string): """Quote a string for passing to bash.""" bashquote = bashquote_mod.BashquoteBytes() bashquote.add(string) return bashquote.result() def minutes(seconds): """Convert seconds to minutes.""" return seconds/60.0 class Command(object): # pylint: disable=too-many-instance-attributes """A class to hold one command to be executed.""" COMMAND_NUMBER = 0 # pylint: disable=attribute-defined-outside-init,too-many-instance-attributes # attribute-defined-outside-init: We hasattr a lot def __init__(self, cmd, maxtime, local_to_stdout, lock, numbered=False, derived=False, named=False, name=b''): """Initialize.""" # pylint: disable=too-many-arguments self.cmd = cmd self.quoted_cmd = quick_bashquote(cmd) self.maxtime = maxtime self.to_stdout = local_to_stdout self.lock = lock self.numbered = numbered self.derived = derived self.named = named self.pid = None if self.numbered + self.named + self.derived != 1: sys.stderr.write('%s: Error: Command.start must have exactly 1 of numbered, name or derive\n' % sys.argv[0]) sys.exit(1) if self.numbered: self.name = b'%*d' % (NUMBER_PADDING, Command.COMMAND_NUMBER) elif self.named: self.name = b'%*d - %s' % (NUMBER_PADDING, Command.COMMAND_NUMBER, name) elif self.derived: self.name = b'%*d - %s' % (NUMBER_PADDING, Command.COMMAND_NUMBER, re.sub(b'[^a-zA-Z0-9]+', b'-', self.cmd)) else: sys.stderr.write('%s: Eh? warble frob\n' % sys.argv[1]) sys.exit(1) Command.COMMAND_NUMBER += 1 def start(self): """Start a command.""" global SMS_NUM_THREADS def via(boolean): """Return a string describing how output is to be performed.""" if boolean: return b'via thread' return b'via redirection' if VERBOSITY >= 2: coordinated_output(b'--> %s starting - writing to %s %s' % (self.quoted_cmd, quick_bashquote(self.name), via(self.to_stdout)), lock=OUTPUT_LOCK) if self.to_stdout: # self.parent_out_fd, self.child_out_fd = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) self.parent_receive, self.child_send = os.pipe() self.child_receive, self.parent_send = os.pipe() self.pid = os.fork() if self.pid == 0: # child process os.setpgid(0, 0) if self.to_stdout: # Must not close these before dup2()'ing to them, otherwise you get a silent error - but flush()'ing # is good sys.stdout.flush() sys.stderr.flush() os.dup2(self.child_receive, sys.stdin.fileno()) os.dup2(self.child_send, sys.stdout.fileno()) os.dup2(self.child_send, sys.stderr.fileno()) os.execlp(b'/bin/sh', b'/bin/sh', b'-c', b'eval %s 2>&1' % quick_bashquote(self.cmd)) else: os.execlp(b'/bin/sh', b'/bin/sh', b'-c', b'eval %s > %s/%s 2>&1' % (quick_bashquote(self.cmd), quick_bashquote(DIRECTORY), quick_bashquote(self.name))) else: # parent process self.start_time = time.time() if self.to_stdout: with LOCK_NUM_THREADS: SMS_NUM_THREADS += 1 _unused = thread_mod.start_new_thread(self.copy_to_stdout_and_file, ('nothing',)) make_used(_unused) def copy_to_stdout_and_file(self, _unused): # pylint: disable=too-many-branches """Copy output to stdout and a file.""" global SMS_NUM_THREADS got_timeout = False relative_t0 = time.time() # only used in the parent process - and this function is run by a series of threads - one for each # subprocess. This simplifies treating lines of output from the various child processes as # atomic units filename = os.path.join(DIRECTORY, self.name) file_output = open(filename, 'wb') # create a python file object from our file descriptor, and make it unbuffered gen = readline_with_timeout(self, self.parent_receive, timeout=NON_SMS_THREAD_TIMEOUT) while True: try: line = next(gen) except Timeout: if hasattr(self, 'exit_status'): # we have an exit status, so the child process is dead (or perhaps we just tried # unsuccessfully to kill it). So let this thread die too # Be careful with this decrement... It's probably fine, but it feels a little # like something that could get messed up in maintenance phase if self.exit_status == 254: got_timeout = True break else: # We don't have an exit status, so go back for more input! gen = readline_with_timeout(self, self.parent_receive, timeout=NON_SMS_THREAD_TIMEOUT) except EOFError: break except StopIteration: break else: time1 = time.time() file_output.write(line) file_output.flush() string = b'%s' % self.name # All this +'ing is a little slow if ABSOLUTE_TIME: string += b' (abs: %.1fm)' % minutes(time1 - ABSOLUTE_T0) if RELATIVE_TIME: string += b' (rel: %.1fm)' % minutes(time1 - relative_t0) string += b': %s' % line.rstrip(b'\n') coordinated_output(string, lock=OUTPUT_LOCK) coordinated_output(b'%s: Letting thread die, renaming file and closing file/pipes' % self.name, lock=OUTPUT_LOCK) self.ren_or_del_with_exit_status(natural=not got_timeout, timeout=got_timeout) self.close_pipes() file_output.close() with LOCK_NUM_THREADS: SMS_NUM_THREADS -= 1 def close_pipes(self): """Close our pipes.""" if hasattr(self, 'parent_receive'): os.close(self.parent_receive) if hasattr(self, 'child_receive'): os.close(self.child_receive) if hasattr(self, 'parent_send'): os.close(self.parent_send) if hasattr(self, 'child_send'): os.close(self.child_send) def ren_or_del_with_exit_status(self, natural=False, timeout=False): """If success, delete the file. If failure, rename the file.""" if natural + timeout != 1: sys.stderr.write('%s: ren_or_del_with_exit_status: natural + timeout != 1 (%d, %d)' % (sys.argv[0], natural, timeout)) sys.exit(1) if natural: description = b'naturally' elif timeout: description = b'by timeout' else: sys.stderr.write('%s: ren_or_del_with_exit_status 2: natural + timeout != 1' % sys.argv[0]) sys.exit(1) # compute these in case we need them in a moment # we won't always need them, but it's better than duplicating code old_name = self.name new_name = b'%s - %d' % (self.name, self.exit_status) if (AND_STATUSES and not self.exit_status or not AND_STATUSES and self.exit_status) and \ UNLINK_DESIRED_STATUS_FILES: # delete the file and tell the user os.unlink(os.path.join(DIRECTORY, old_name)) if VERBOSITY >= 2: coordinated_output(b'<-- %s finished %s - deleted outfile' % (self.quoted_cmd, description), lock=OUTPUT_LOCK) else: # rename the file and tell the user os.rename(os.path.join(DIRECTORY, old_name), os.path.join(DIRECTORY, new_name)) self.name = new_name if VERBOSITY >= 2: coordinated_output(b'<-- %s finished %s - written to %s' % (self.quoted_cmd, description, quick_bashquote(self.name)), lock=OUTPUT_LOCK) def is_done(self): """Return True if this command finished.""" # pylint: disable=too-many-branches pid, value = os.waitpid(self.pid, os.WNOHANG) if (pid, value) == (0, 0): difference = time.time() - self.start_time if difference > self.maxtime and self.maxtime >= 1e-5: if VERBOSITY >= 4: coordinated_output(b'process %d (%s) has been running for %f and has a max of %f' % (self.pid, self.cmd, difference, self.maxtime), lock=OUTPUT_LOCK) killed = False for _unused_index in range(10): os.kill(-self.pid, signal.SIGKILL) # pylint: disable=invalid-unary-operand-type time.sleep(RESOLUTION / 4.0) pid, value = os.waitpid(self.pid, os.WNOHANG) if (pid, value) == (0, 0): # It's probably not dead yet - or at least we have no particular reason to believe it is continue else: killed = True break if killed: pass else: # Sheesh, the process group won't die. Give up with those deadly signals pending. The processes # in the process group will probably become zombies when they die until this parent process exits. # In maxtime(1), we fork an assassin process to kill the undead over and over - we're a little # less aggressive (thorough) here with subprocess termination. coordinated_output(b'%s: process group -%d would not die' % (sys.argv[0], self.pid), lock=OUTPUT_LOCK) self.exit_status = 254 if not NON_SMS_TO_STDOUT: self.ren_or_del_with_exit_status(natural=False, timeout=True) return True else: return False else: if value is None: self.exit_status = 0 else: self.exit_status = value // 256 if not NON_SMS_TO_STDOUT: self.ren_or_del_with_exit_status(natural=True, timeout=False) return True COMMANDS = [] MAXTIME = -1.0 MAX_CONCURRENCY = -1 AND_STATUSES = True RESOLUTION = 0.1 DIRECTORY = b'' EXIT_EARLY = False NUMBERED = False DERIVED = True NUMBER_PADDING = 0 LOCK_NUMBER_OF_EXITS = thread_mod.allocate_lock() SMS_NUMBER_OF_TRUE_EXITS = 0 SMS_NUMBER_OF_FALSE_EXITS = 0 UNLINK_DESIRED_STATUS_FILES = True STAGGER = False ABSOLUTE_TIME = False RELATIVE_TIME = False OUTPUT_LOCK = None BYTES_ARGV = [os.fsencode(argument) for argument in sys.argv] def read_lines(file_): """Read lines from file_, dealing with possible comments and blank lines as we go.""" for line in file_: line2 = re.sub(b'#.*$', b'', line) line3 = line2.strip() if line3: yield line3 if not BYTES_ARGV[1:]: usage(0) while BYTES_ARGV[1:]: # noqa if BYTES_ARGV[1] in [b'--help', b'-h']: usage(0) elif BYTES_ARGV[1] == b'--no-unlink-desired': UNLINK_DESIRED_STATUS_FILES = False elif BYTES_ARGV[1] == b'--directory': DIRECTORY = BYTES_ARGV[2] del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--number-padding': NUMBER_PADDING = int(BYTES_ARGV[2]) del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--resolution': RESOLUTION = float(BYTES_ARGV[2]) del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--stagger': STAGGER = True elif BYTES_ARGV[1] == b'--stagger-time': STAGGER = True STAGGER_TIME = float(BYTES_ARGV[2]) del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--exit-early': EXIT_EARLY = True elif BYTES_ARGV[1] == b'--or-statuses': AND_STATUSES = False elif BYTES_ARGV[1] == b'--and-statuses': AND_STATUSES = True elif BYTES_ARGV[1] == b'--max-concurrency': MAX_CONCURRENCY = int(BYTES_ARGV[2]) del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--maxtime': MAXTIME = float(BYTES_ARGV[2]) del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--derive': NUMBERED = False DERIVED = True elif BYTES_ARGV[1] == b'--numbered': NUMBERED = True DERIVED = False elif BYTES_ARGV[1] == b'--command': TEMP_COMMAND = \ Command(BYTES_ARGV[2].rstrip(b'\n'), MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, numbered=NUMBERED, derived=DERIVED) COMMANDS.append(TEMP_COMMAND) del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--commands-file': with open(BYTES_ARGV[2], 'rb') as FILE: COMMANDS.extend([Command(x.rstrip(b'\n'), MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, numbered=NUMBERED, derived=DERIVED) for x in read_lines(FILE)]) del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--commands': COMMANDS.extend([Command(x, MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, numbered=NUMBERED, derived=DERIVED) for x in BYTES_ARGV[2:]]) del BYTES_ARGV[2:] elif BYTES_ARGV[1] == b'--labeled-command': PARTITIONED = BYTES_ARGV[2].partition(b':') COMMANDS.append(Command(PARTITIONED[2], MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, named=True, name=PARTITIONED[0])) del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--labeled-commands-file': with open(BYTES_ARGV[2], 'rb') as FILE: for LINE in read_lines(FILE): PARTITIONED = LINE.partition(b':') COMMANDS.append(Command(PARTITIONED[2].rstrip(b'\n'), MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, named=True, name=PARTITIONED[0])) del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--labeled-commands': for arg in BYTES_ARGV[2:]: PARTITIONED = arg.partition(b':') COMMANDS.append(Command(PARTITIONED[2], MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, named=True, name=PARTITIONED[0])) del BYTES_ARGV[2:] elif BYTES_ARGV[1] == b'--parameterized-command': COMMAND_STRING = BYTES_ARGV[2] PARAMETERS = BYTES_ARGV[3].split() for parameter in PARAMETERS: comm = Command(COMMAND_STRING % parameter, MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, named=True, name=parameter) COMMANDS.append(comm) del BYTES_ARGV[1] del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--ssh-command': COMMAND_STRING_WITHIN_SSH = BYTES_ARGV[2] # Sun Nov 5 11:35:53 PST 2017: Tried adding a -t to help with commands left running on remote hosts, but it lead # to hangs. COMMAND_STRING = b'ssh %%s %s' % quick_bashquote(COMMAND_STRING_WITHIN_SSH) PARAMETERS = BYTES_ARGV[3].split() for parameter in PARAMETERS: comm = Command(COMMAND_STRING % parameter, MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, named=True, name=parameter) COMMANDS.append(comm) del BYTES_ARGV[1] del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--scp1-command': FILENAME = BYTES_ARGV[2] COMMAND_STRING = b'scp %s %%s:%s' % (FILENAME, FILENAME) PARAMETERS = BYTES_ARGV[3].split() for parameter in PARAMETERS: COMMANDS.append(Command(COMMAND_STRING % parameter, MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, derived=True)) del BYTES_ARGV[1] del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--scp-command': SOURCE_FILENAME = BYTES_ARGV[2] DESTINATION_FILENAME = BYTES_ARGV[3] COMMAND_STRING = b'scp %s %%s:%s' % (SOURCE_FILENAME, DESTINATION_FILENAME) PARAMETERS = BYTES_ARGV[4].split() for parameter in PARAMETERS: COMMANDS.append(Command(COMMAND_STRING % parameter, MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, derived=True)) del BYTES_ARGV[1] del BYTES_ARGV[1] del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--rsync1-command': FILENAME = BYTES_ARGV[2] COMMAND_STRING = b'rsync -apl %s %%s:%s' % (FILENAME, FILENAME) PARAMETERS = BYTES_ARGV[3].split() for parameter in PARAMETERS: COMMANDS.append(Command(COMMAND_STRING % parameter, MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, derived=True)) del BYTES_ARGV[1] del BYTES_ARGV[1] elif BYTES_ARGV[1] in [b'--rsync-command', b'--rsync2-command']: SOURCE_FILENAME = BYTES_ARGV[2] DESTINATION_FILENAME = BYTES_ARGV[3] COMMAND_STRING = b'rsync -apl %s %%s:%s' % (SOURCE_FILENAME, DESTINATION_FILENAME) PARAMETERS = BYTES_ARGV[4].split() for parameter in PARAMETERS: COMMANDS.append(Command(COMMAND_STRING % parameter, MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, derived=True)) del BYTES_ARGV[1] del BYTES_ARGV[1] del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--rsync3-command': OPTIONS = BYTES_ARGV[2] SOURCE_FILENAME = BYTES_ARGV[3] DESTINATION_FILENAME = BYTES_ARGV[4] COMMAND_STRING = b'rsync -apl %s %s %%s:%s' % (OPTIONS, SOURCE_FILENAME, DESTINATION_FILENAME) PARAMETERS = BYTES_ARGV[5].split() for parameter in PARAMETERS: COMMANDS.append(Command(COMMAND_STRING % parameter, MAXTIME, NON_SMS_TO_STDOUT, OUTPUT_LOCK, derived=True)) del BYTES_ARGV[1] del BYTES_ARGV[1] del BYTES_ARGV[1] del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--dump-lines': DUMP_LINES_MODE = 'numeric' DUMP_LINES_FROM_1 = int(BYTES_ARGV[2]) del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'--dump-entire': DUMP_LINES_MODE = 'all' elif BYTES_ARGV[1] == b'--dump-nothing': DUMP_LINES_MODE = 'nothing' elif BYTES_ARGV[1] == b'--force-cleanup': FORCE_CLEANUP = True elif BYTES_ARGV[1] == b'--to-stdout': NON_SMS_TO_STDOUT = True OUTPUT_LOCK = thread_mod.allocate_lock() elif BYTES_ARGV[1] == b'--not-to-stdout': NON_SMS_TO_STDOUT = False OUTPUT_LOCK = None elif BYTES_ARGV[1] == b'--verbosity': VERBOSITY = int(BYTES_ARGV[2]) del BYTES_ARGV[1] elif BYTES_ARGV[1] == b'-v': VERBOSITY += 1 elif BYTES_ARGV[1] == b'--absolute-time': ABSOLUTE_TIME = True elif BYTES_ARGV[1] == b'--relative-time': RELATIVE_TIME = True else: sys.stderr.write("%s: Illegal argument: %s\n" % (BYTES_ARGV[0], BYTES_ARGV[1])) usage(1) del BYTES_ARGV[1] LOCK_OVERALL_RESULT = thread_mod.allocate_lock() with LOCK_OVERALL_RESULT: if AND_STATUSES: # we and the values, so start out assuming True SMS_OVERALL_RESULT = 0 else: # we or the values, so start out assuming False SMS_OVERALL_RESULT = 255 if DIRECTORY == b'': DIRECTORY = b'/tmp/looper-%d-%d' % (time.time(), os.getpid()) coordinated_output(b'using directory %s' % DIRECTORY, lock=OUTPUT_LOCK) if (ABSOLUTE_TIME or RELATIVE_TIME) and not NON_SMS_TO_STDOUT: sys.stderr.write('%s: Warning: --absolute-time and/or --relative-time specified, but not --to-stdout mode\n' % BYTES_ARGV[0]) # Lots of globals. Sigh. This started off just part of the main body # of the program, and then I realized I needed to do this from two # places. def check_for_termination(): """Check for completion of all commands.""" time.sleep(RESOLUTION) global SMS_CURRENT_CONCURRENCY global SMS_NUMBER_OF_TRUE_EXITS global SMS_NUMBER_OF_FALSE_EXITS global SMS_OVERALL_RESULT if VERBOSITY >= 3: coordinated_output(b'checking for subprocess termination', lock=OUTPUT_LOCK) # Not the most pythonic loop, but it works pretty well and is a # common way of doing things in other languages. It's this way # because we need to modify what we're iterating over, as we're # iterative over it. OK, strictly speaking we could use two lists or # two deques or something, but... is that really any better? for active_command_no in range(len(ACTIVE_COMMANDS)-1, -1, -1): active_command = ACTIVE_COMMANDS[active_command_no] if active_command.is_done(): # flip the logic with LOCK_NUMBER_OF_EXITS: if not active_command.exit_status: SMS_NUMBER_OF_TRUE_EXITS += 1 else: SMS_NUMBER_OF_FALSE_EXITS += 1 if AND_STATUSES: with LOCK_OVERALL_RESULT: SMS_OVERALL_RESULT = max(active_command.exit_status, SMS_OVERALL_RESULT) if EXIT_EARLY and SMS_OVERALL_RESULT: coordinated_output( b'%s: Terminating early because command %s exited false' % active_command, lock=OUTPUT_LOCK, ) sys.exit(SMS_OVERALL_RESULT) else: with LOCK_OVERALL_RESULT: SMS_OVERALL_RESULT = min(active_command.exit_status, SMS_OVERALL_RESULT) if EXIT_EARLY and not SMS_OVERALL_RESULT: coordinated_output( b'%s: Terminating early because command %s exited true' % active_command, lock=OUTPUT_LOCK, ) sys.exit(SMS_OVERALL_RESULT) del ACTIVE_COMMANDS[active_command_no] with LOCK_CURRENT_CONCURRENCY: SMS_CURRENT_CONCURRENCY -= 1 if SMS_CURRENT_CONCURRENCY < 0: sys.stderr.write('%s: Internal error: current_concurrency < 0\n' % BYTES_ARGV[0]) sys.exit(1) try: STATBUF = os.stat(DIRECTORY) except OSError: os.makedirs(DIRECTORY, 7*64 + 5*8 + 5) else: if stat.S_ISDIR(STATBUF.st_mode): sys.stderr.write('%s: %s already exists and is a directory. Continuing.\n' % (BYTES_ARGV[0], DIRECTORY)) else: sys.stderr.write('%s: %s already exists and is not a directory. Terminating.\n' % (BYTES_ARGV[0], DIRECTORY)) sys.exit(1) # lst.pop() is much faster than v=lst[0]; del lst[0] COMMANDS.reverse() ACTIVE_COMMANDS = [] def signal_handler(signum, frame): """Clean up processes and exit.""" make_used(signum) make_used(frame) for active_command in ACTIVE_COMMANDS: if active_command.pid is not None: sys.stderr.write('Killing pid {}\n'.format(active_command.pid)) os.kill(-active_command.pid, signal.SIGKILL) sys.exit(1) for sig in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]: signal.signal(sig, signal_handler) # we're pulling commands out of the "commands" list, and then putting # them into the ACTIVE_COMMANDS list. while COMMANDS: with LOCK_CURRENT_CONCURRENCY: while COMMANDS and (MAX_CONCURRENCY == -1 or SMS_CURRENT_CONCURRENCY < MAX_CONCURRENCY): INACTIVE_COMMAND = COMMANDS.pop() INACTIVE_COMMAND.start() if STAGGER: time.sleep(STAGGER_TIME) SMS_CURRENT_CONCURRENCY += 1 ACTIVE_COMMANDS.append(INACTIVE_COMMAND) check_for_termination() # beware - globals modified while ACTIVE_COMMANDS: check_for_termination() # beware - globals modified # give the threads time to finish up - otherwise we may lose the end of some of the output streams GLOBAL_FINISH_TIME_0 = time.time() GLOBAL_MAX_THREAD_WAIT = NON_SMS_THREAD_TIMEOUT + 0.5 if NON_SMS_TO_STDOUT: while True: with LOCK_NUM_THREADS: if SMS_NUM_THREADS < 0: sys.stderr.write('%s: internal error: SMS_NUM_THREADS is less than 0\n' % BYTES_ARGV[0]) elif SMS_NUM_THREADS == 0: break else: time.sleep(RESOLUTION) GLOBAL_FINISH_TIME_1 = time.time() GLOBAL_DIFFERENCE = GLOBAL_FINISH_TIME_1 - GLOBAL_FINISH_TIME_0 if GLOBAL_DIFFERENCE >= GLOBAL_MAX_THREAD_WAIT: if VERBOSITY >= 2: coordinated_output(b"%d threads are not finished, but that's OK. Finishing up" % SMS_NUM_THREADS, lock=OUTPUT_LOCK) # Threads are not finished, but they're taking too long - give up on them. They've had # plenty of time to finish by now unless we're on an extremely loaded system break if VERBOSITY >= 2: coordinated_output(b'waiting for threads to finish: %d thread(s) remain(s), %3.1f seconds remains' % (SMS_NUM_THREADS, GLOBAL_MAX_THREAD_WAIT - GLOBAL_DIFFERENCE), lock=OUTPUT_LOCK) if VERBOSITY >= 1: with LOCK_OVERALL_RESULT: coordinated_output(b'true exit statuses: %d' % SMS_NUMBER_OF_TRUE_EXITS, lock=OUTPUT_LOCK) coordinated_output(b'false exit statuses: %d' % SMS_NUMBER_OF_FALSE_EXITS, lock=OUTPUT_LOCK) RESULT_FILE_REGEX = re.compile(b'^([0-9]+) - .* - ([0-9]+)$') def regex_matches(regex, string): """If the regex matches string, return True.""" match_obj = regex.match(string) if match_obj is None: return False return True class ResultFile(object): """A class describing one result file.""" # pylint: disable=too-few-public-methods def __init__(self, string): """Initialize.""" self.string = string match = RESULT_FILE_REGEX.match(string) self.execution_order = int(match.group(1)) self.exit_status = int(match.group(2)) def __cmp__(self, other): """Compare the Python 2.x way.""" # for Python 2 # success < failure. Aside from that we list in execution order if self.exit_status == 0 and other.exit_status != 0: return -1 elif self.exit_status != 0 and other.exit_status == 0: return 1 if self.execution_order < other.execution_order: return -1 elif self.execution_order > other.execution_order: return 1 return 0 def __lt__(self, other): """< the Python 3.x way.""" # for Python 3 # success < failure. Aside from that we list in execution order if self.exit_status == 0 and other.exit_status != 0: return True return self.execution_order < other.execution_order def __str__(self): """Convert self to string.""" return str(self.string) __repr__ = __str__ if DUMP_LINES_MODE == 'all': FILENAMES = os.listdir(DIRECTORY) RESULT_FILES = [ResultFile(FILENAME) for FILENAME in FILENAMES if regex_matches(RESULT_FILE_REGEX, FILENAME)] RESULT_FILES.sort() for RESULT_FILE in RESULT_FILES: FULL_PATH = os.path.join(DIRECTORY, RESULT_FILE.string) sys.stdout.write('%s:\n' % FULL_PATH) with open(FULL_PATH, 'r') as FILE: for global_line in FILE: sys.stdout.write('\t%s' % global_line) elif DUMP_LINES_MODE == 'nothing': pass elif DUMP_LINES_MODE == 'numeric': FILENAMES = os.listdir(DIRECTORY) sys.stdout.write('%s filenames\n%s\n' % (len(FILENAMES), FILENAMES)) RESULT_FILES = [ResultFile(FILENAME) for FILENAME in FILENAMES if regex_matches(RESULT_FILE_REGEX, FILENAME)] RESULT_FILES.sort() sys.stdout.write('%s result files\n%s\n' % (len(RESULT_FILES), RESULT_FILES)) for RESULT_FILE in RESULT_FILES: full_path = os.path.join(DIRECTORY, RESULT_FILE.string) sys.stdout.write('%s:\n' % full_path) summary_file = open(full_path, 'r') lines_not_shown = 0 lineno_from_0 = 0 while True: global_line = summary_file.readline() if not global_line: break if lineno_from_0 < DUMP_LINES_FROM_1: sys.stdout.write('\t%s' % global_line) else: lines_not_shown += 1 lineno_from_0 += 1 if lines_not_shown == 0: sys.stdout.write('\t(all lines shown)\n') else: sys.stdout.write('\t(%d line(s) not shown)\n' % lines_not_shown) else: raise ValueError('DUMP_LINES_MODE had an invalid value: {}'.format(DUMP_LINES_MODE)) sys.stdout.flush() if not os.listdir(DIRECTORY): os.write(1, b'%s is empty - removing\n' % DIRECTORY) os.rmdir(DIRECTORY) else: if FORCE_CLEANUP: os.write(1, b'%s is not empty, but cleanup forced by request of caller - removing\n' % DIRECTORY) shutil.rmtree(DIRECTORY) else: os.write(1, b'%s is not empty - please look it over\n' % DIRECTORY) # invert the python result to get a shell (wait(), actually) result sys.exit(SMS_OVERALL_RESULT)