#!/usr/bin/env python # prereqs: # /usr/local/lib/bashquote.py - first added to tailor 2008-01-02 # Note that waitpid may decide a process has exited well before the corresponding pipe has been fully # emptied. For this reason, we may need to wait for the pipes to be drained before closing them. # When does it make sense to rename a file? After the process exits, or after the thread has terminated, # or both? It probably makes the most sense after the process exists, because that's when we have the # information, and the thread can continue writing to it via the same file descriptor even after the # rename. # thread termination occurs when copy_to_stdout_and_file returns # Licensing: #Microsoft Public License (Ms-PL) Mon, 2007-10-15 19:23 [OSI Approved License] # #This license governs use of the accompanying software. If you use the software, you accept this license. If you do not accept the license, do not use the software. # #1. Definitions #The terms "reproduce," "reproduction," "derivative works," and "distribution" have the same meaning here as under U.S. copyright law. #A "contribution" is the original software, or any additions or changes to the software. #A "contributor" is any person that distributes its contribution under this license. #"Licensed patents" are a contributor's patent claims that read directly on its contribution. # #2. Grant of Rights #(A) Copyright Grant- Subject to the terms of this license, including the license conditions and limitations in section 3, each contributor grants you a non-exclusive, worldwide, royalty-free copyright license to reproduce its contribution, prepare derivative works of its contribution, and distribute its contribution or any derivative works that you create. #(B) Patent Grant- Subject to the terms of this license, including the license conditions and limitations in section 3, each contributor grants you a non-exclusive, worldwide, royalty-free license under its licensed patents to make, have made, use, sell, offer for sale, import, and/or otherwise dispose of its contribution in the software or derivative works of the contribution in the software. # #3. Conditions and Limitations #(A) No Trademark License- This license does not grant you rights to use any contributors' name, logo, or trademarks. #(B) If you bring a patent claim against any contributor over patents that you claim are infringed by the software, your patent license from such contributor to the software ends automatically. #(C) If you distribute any portion of the software, you must retain all copyright, patent, trademark, and attribution notices that are present in the software. #(D) If you distribute any portion of the software in source code form, you may do so only under this license by including a complete copy of this license with your distribution. If you distribute any portion of the software in compiled or object code form, you may only do so under a license that complies with this license. #(E) The software is licensed "as-is." You bear the risk of using it. The contributors give no express warranties, guarantees or conditions. You may have additional consumer rights under your local laws which this license cannot change. To the extent permitted under your local laws, the contributors exclude the implied warranties of merchantability, fitness for a particular purpose and non-infringement. import os import re import sys import stat import time import errno import signal import select import traceback # install-helpers goes on the path before /usr/local/lib. As of Fri Jun 6 12:47:02 PDT 2008, # there are more appliances that have /usr/local/lib/bashquote.py, but trunk is going to put # it in /usr/local/install-helpers for simplicity. sys.path.insert(0, '/usr/local/lib') sys.path.insert(0, '/usr/local/install-helpers') sys.path.insert(0, os.path.expanduser('~/lib')) import bashquote verbosity = 0 dump_lines_from_1 = 10 global_to_stdout = False global_num_threads = 0 global_thread_timeout = 1 stagger_time = 0.25 absolute_t0 = time.time() def usage(retval): w = sys.stderr.write w("Usage: %s\n" % sys.argv[0]) w('\t--directory The directory to use for output files\n') w('\t--resolution n The frequency with which to check for subprocess\n') w('\t termination\n') w('\t--exit-early Exit immediately upon detecting a subprocess that does\n') w('\t not yield the expected value (depends on --or-statuses\n') w('\t and --and-statuses)\n') w('\t--or-statuses Or the exit statuses of the subprocesses when computing exit status\n') w('\t--and-statuses And the exit statuses of the subprocesses when computing exit\n') w('\t status (default)\n') w('\t--max-concurrency n Start <= n subprocesses concurrently (default: infinite)\n') w('\t--maxtime n Terminate processes that take longer than n seconds\n') w('\t--derive Derive a filename for output from the command run\n') w('\t--numbered Just use numbered filenames\n') w('\t--command cmd Add cmd to the list of subprocesses\n') w('\t--commands-file filename Read commands, one per line, from filename\n') w('\t--commands cmd1 cmd2 ... cmdn Add commands to list of subprocesses from command line\n') w('\t--labeled-command label:cmd Add cmd to list of subprocesses. Output in label\n') w('\t--labeled-commands-file filename Read label:cmd pairs from filename. cmd output\n') w('\t in filename "label"\n') w('\t--labeled-commands label1:cmd1 label2:cmd2 ... labeln:cmdn. cmdn output in labeln\n') w('\t--parameterized-command cmd \'parameter1 parameter2 ... parametern\'\n') w('\t EG: \'ssh %s hostname\' \'master node1 node2\'\n') w('\t output would be files master, node1 and node2\n') w('\t--ssh-command cmd \'hostname1 hostname2 ... hostnamen\'\n') w('\t EG: \'hostname\' \'master node1 node2\'\n') w('\t output would be files master, node1 and node2\n') w('\t--scp-command "srcfilename" "destfilename" "hostname1 hostname2 ... hostnamen"\n') w('\t EG: "/etc/hosts" "/etc/hosts" "node1 node2"\n') w('\t output filenames are always derived\n') w('\t--rsync1-command EG: filename "hostname1 hostname2 ... hostnamen"\n') w('\t--rsync-command EG: srcfn dstfn "hostname1 hostname2 ... hostnamen"\n') w('\t Note that --rsync2-command is an alias for --rsync-command\n') w('\t--rsync3-command EG: rsyncopts srcfn dstfn "hostname1 hostname2 ... hostnamen"\n') w('\t--number-padding n Pad output file numbering to n characters for ls sorting\n') w('\t--no-unlink-desired Do not unlink output files with desired exit statuses (depends\n') w('\t on --or-statuses and --and-statuses)\n') w('\t--stagger Pause for %4.2f seconds after each command start\n' % stagger_time) w('\t--dump-lines number-of-lines (defaults to %d)\n' % dump_lines_from_1) w('\t--dump-nothing\n') w('\t--dump-entire\n') w('\t--to-stdout\n') w('\t--verbosity levelno\n') w('\t-v\n') w('\t--help\n') w('\t--absolute-time output timestamps relative to program start (only in --to-stdout)\n') w('\t--relative-time output timestamps relative to subprogram start (only in --to-stdout)\n') w('\n') w('In --or-statuses mode, the exit status of this program will be the minimum of the\n') w('subprocesses\' exit statuses.\n') w('\n') w('Similarly, in --and-statuses mode (the default), the exit status of this program will\n') w('be the maximum of the subprocesses\' exit statuses.\n') w('\n') w('The exit status of a subprocess that timed out is always 254.\n') w('\n') w('Output is saved, one file per command (host), in the specified directory.\n') w('Output files are deleted if the exit status is what was desired (true or\n') w('false depending on --and-statuses or --or-statuses) by default - see\n') w('--no-unlink-desired.\n') w('\n') w('Don\'t try to --scp-command and --ssh-command in the same %s unless\n' % sys.argv[0]) w('the --ssh-command does not depend on the --scp-command; they may be\n') w('run in parallel, so use two %s\'s.\n' % sys.argv[0]) w('\n') w('Comparing --to-stdout and --not-to-stdout:\n') w('\t--to-stdout is a little slower than --not-to-stdout.\n') w('\t--to-stdout is not fully binary-safe. --not-to-stdout is.\n') w('\t--to-stdout is more informative during a run. --not-to-stdout is quieter.\n') w('\t--not-to-stdout is the default.\n') sys.exit(retval) def coordinated_output(line, file=sys.stdout, lock=None): if lock != None: lock.acquire() file.write('%s\n' % line) file.flush() if lock != None: lock.release() class Timeout(Exception): pass class For_EBADF_result: def __init__(self, got_EBADF, result): self.got_EBADF = got_EBADF self.result = result def for_EBADF(fn): #relevant_exceptions = (Error, OSError, IOError) # OSError and IOError definite, Error less so relevant_exceptions = (Exception,) try: result = fn() except relevant_exceptions, err: if err[0] == errno.EBADF: # this just means that the main thread realized the process died before we did self.close_pipes() return For_EBADF_result(True, result) else: raise return For_EBADF_result(False, result) def readline_with_timeout(file, terminator='\n', timeout=0): buffer = [] while True: #sys.stderr.write('file in readline_with_timeout is %d\n' % file) fEr = for_EBADF(lambda : select.select([file],[],[file],timeout)) if fEr.got_EBADF: return (input_ready, output_ready, exception_ready) = fEr.result if exception_ready: buffer = [] raise IOError if output_ready: sys.stderr.write('%s: Internal error: Output ready in readline_with_timeout\n' % sys.argv[0]) sys.exit(1) if input_ready: if hasattr(file, 'read'): fEr = for_EBADF(lambda : file.read(1)) else: fEr = for_EBADF(lambda : os.read(file, 1)) if fEr.got_EBADF: return ch = fEr.result if not ch: raise EOFError buffer.append(ch) if ch == terminator: result = ''.join(buffer) buffer = [] yield result if not (input_ready or output_ready or exception_ready): raise Timeout # quick bashquote def qbq(s): bq = bashquote.bashquote() bq.add(s) return bq.result() def minutes(seconds): return seconds/60.0 class Command: def __init__(self, cmd, maxtime, local_to_stdout, lock, numbered=False, derived=False, named=False, name=''): global n self.cmd = cmd self.quoted_cmd = qbq(cmd) self.maxtime = maxtime self.to_stdout = local_to_stdout self.lock = lock self.numbered = numbered self.derived = derived self.named = named if self.numbered + self.named + self.derived != 1: sys.stderr.write('%s: Error: Command.start must have exactly 1 of numbered, name or derive\n' % sys.argv[0]) sys.exit(1) if self.numbered: self.name = '%*d' % (number_padding, n) elif self.named: self.name = '%*d - %s' % (number_padding, n, name) elif self.derived: self.name = '%*d - %s' % (number_padding, n, re.sub('[^a-zA-Z0-9]+', '-', self.cmd)) else: sys.stderr.write('%s: Eh? warble frob\n' % sys.argv[1]) sys.exit(1) n += 1 def start(self): global global_num_threads def via(bool): if bool: return 'via thread' else: return 'via redirection' if verbosity >= 2: coordinated_output('--> %s starting - writing to %s %s' % \ (self.quoted_cmd, qbq(self.name), via(self.to_stdout)), lock=top_lock) if self.to_stdout: #self.parent_out_fd, self.child_out_fd = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) self.parent_receive, self.child_send = os.pipe() self.child_receive, self.parent_send = os.pipe() pid = os.fork() if pid == 0: # child process os.setpgid(0, 0) if self.to_stdout: # Must not close these before dup2()'ing to them, otherwise you get a silent error - but flush()'ing # is good sys.stdout.flush() sys.stderr.flush() os.dup2(self.child_receive, sys.stdin.fileno()) os.dup2(self.child_send, sys.stdout.fileno()) os.dup2(self.child_send, sys.stderr.fileno()) os.execlp('/bin/sh', '/bin/sh', '-c', 'eval %s 2>&1' % qbq(self.cmd)) else: os.execlp('/bin/sh', '/bin/sh', '-c', 'eval %s > %s/%s 2>&1' % \ (qbq(self.cmd), qbq(directory), qbq(self.name))) else: # parent process self.start_time = time.time() self.pid = pid if self.to_stdout: global_num_threads += 1 t = thread.start_new_thread(self.copy_to_stdout_and_file, ('nothing',)) def copy_to_stdout_and_file(self, dummy): global global_num_threads got_timeout = False relative_t0 = time.time() # only used in the parent process - and this function is run by a series of threads - one for each # subprocess. This simplifies treating lines of output from the various child processes as # atomic units try: filename = os.path.join(directory, self.name) file_output = open(filename, 'w') # create a python file object from our file descriptor, and make it unbuffered gen = readline_with_timeout(self.parent_receive, timeout=global_thread_timeout) while 1: try: line=gen.next() except Timeout: if hasattr(self, 'exit_status'): # we have an exit status, so the child process is dead (or perhaps we just tried # unsuccessfully to kill it). So let this thread die too # Be careful with this decrement... It's probably fine, but it feels a little # like something that could get messed up in maintenance phase if self.exit_status == 254: got_timeout = True break else: # We don't have an exit status, so go back for more input! gen = readline_with_timeout(self.parent_receive, timeout=global_thread_timeout) except EOFError: break except StopIteration: break else: t1 = time.time() file_output.write(line) file_output.flush() #self.lock.acquire() #coordinated_output('output from %s: %s' % (self.name, line.rstrip('\n')), lock=top_lock) #sys.stdout.flush() #self.lock.release() s = 'output from %s' % self.name # All this +'ing is a little slow if absolute_time: s += ' (abs: %.1fm)' % minutes(t1 - absolute_t0) if relative_time: s += ' (rel: %.1fm)' % minutes(t1 - relative_t0) s += ': %s' % line.rstrip('\n') coordinated_output(s, lock=top_lock) global_num_threads -= 1 coordinated_output('Letting thread die, renaming file and closing file/pipes', lock=top_lock) self.rename_or_delete_with_exit_status(natural=not got_timeout, timeout=got_timeout) self.close_pipes() file_output.close() except: sys.stderr.write('%s: Traceback from within thread related to %s\n' % (sys.argv[0], self.name)) traceback.print_exc() global_num_threads -= 1 sys.exit(1) def close_pipes(self): if hasattr(self,'parent_receive'): os.close(self.parent_receive) if hasattr(self,'child_receive'): os.close(self.child_receive) if hasattr(self,'parent_send'): os.close(self.parent_send) if hasattr(self,'child_send'): os.close(self.child_send) def rename_or_delete_with_exit_status(self, natural=False, timeout=False): if natural + timeout != 1: sys.stderr.write('%s: rename_or_delete_with_exit_status: natural + timeout != 1 (%d, %d)' % (sys.argv[0], \ natural, timeout)) sys.exit(1) if natural: description = 'naturally' elif timeout: description = 'by timeout' else: sys.stderr.write('%s: rename_or_delete_with_exit_status 2: natural + timeout != 1' % sys.argv[0]) sys.exit(1) # compute these in case we need them in a moment # we won't always need them, but it's better than duplicating code old_name = self.name new_name = '%s - %d' % (self.name, self.exit_status) if (and_statuses and not self.exit_status or not and_statuses and self.exit_status) and \ unlink_desired_status_files: # delete the file and tell the user os.unlink(os.path.join(directory, old_name)) if verbosity >= 2: coordinated_output('<-- %s finished %s - deleted outfile' % (self.quoted_cmd, description), \ lock=top_lock) else: # rename the file and tell the user os.rename(os.path.join(directory, old_name), os.path.join(directory, new_name)) self.name = new_name if verbosity >= 2: coordinated_output('<-- %s finished %s - written to %s' % \ (self.quoted_cmd, description, qbq(self.name)), lock=top_lock) # return true when done def is_done(self): p, v = os.waitpid(self.pid, os.WNOHANG) if (p, v) == (0, 0): difference = time.time() - self.start_time if difference > self.maxtime and self.maxtime != -1: if verbosity >= 4: coordinated_output('process %d (%s) has been running for %f and has a max of %f' % \ (self.pid, self.cmd, difference, self.maxtime), lock=top_lock) killed=False for i in xrange(10): os.kill(-self.pid, signal.SIGKILL) time.sleep(resolution/4.0) p, v = os.waitpid(self.pid, os.WNOHANG) if (p, v) == (0, 0): # It's probably not dead yet - or at least we have no particular reason to believe it is continue else: killed=True break if killed: pass else: # Sheesh, the process group won't die. Give up with those deadly signals pending. The processes # in the process group will probably become zombies when they die until this parent process exits. # In maxtime(1), we fork an assassin process to kill the undead over and over - we're a little # less aggressive (thorough) here with subprocess termination. coordinated_output('%s: process group -%d would not die' % (sys.argv[0], self.pid), lock=top_lock) self.exit_status = 254 if not global_to_stdout: self.rename_or_delete_with_exit_status(natural=False, timeout=True) return True else: return False else: if v == None: self.exit_status = 0 else: self.exit_status = v / 256 if not global_to_stdout: self.rename_or_delete_with_exit_status(natural=True, timeout=False) return True commands = [] maxtime = -1 max_concurrency = -1 and_statuses = True resolution = 0.1 directory = '' exit_early = False numbered = False derived = True number_padding = 0 number_of_true_exits = 0 number_of_false_exits = 0 unlink_desired_status_files = True stagger = False absolute_time = False relative_time = False #drain_pipes = True n = 0 if not sys.argv[1:]: usage(0) top_lock = None while sys.argv[1:]: if sys.argv[1] in [ '--help', '-h' ]: usage(0) elif sys.argv[1] == '--no-unlink-desired': unlink_desired_status_files = False elif sys.argv[1] == '--directory': directory = sys.argv[2] del sys.argv[1] elif sys.argv[1] == '--number-padding': number_padding = int(sys.argv[2]) del sys.argv[1] elif sys.argv[1] == '--resolution': resolution = float(sys.argv[2]) del sys.argv[1] # elif sys.argv[1] == '--do-not-drain-pipes': # drain_pipes = False elif sys.argv[1] == '--stagger': stagger = True elif sys.argv[1] == '--stagger-time': stagger = True stagger_time = float(sys.argv[2]) del sys.argv[1] elif sys.argv[1] == '--exit-early': exit_early = True elif sys.argv[1] == '--or-statuses': and_statuses = False elif sys.argv[1] == '--and-statuses': and_statuses = True elif sys.argv[1] == '--max-concurrency': max_concurrency = int(sys.argv[2]) del sys.argv[1] elif sys.argv[1] == '--maxtime': maxtime = float(sys.argv[2]) del sys.argv[1] elif sys.argv[1] == '--derive': numbered = False derived = True elif sys.argv[1] == '--numbered': numbered = True derived = False elif sys.argv[1] == '--command': commands.append(Command(sys.argv[2].rstrip('\n'), maxtime, global_to_stdout, top_lock, \ numbered=numbered, derived=derived)) del sys.argv[1] elif sys.argv[1] == '--commands-file': file = open(sys.argv[2], 'r') commands.extend([ Command(x.rstrip('\n'), maxtime, global_to_stdout, top_lock, \ numbered=numbered, derived=derived) for x in file.readlines() ]) file.close() del sys.argv[1] elif sys.argv[1] == '--commands': commands.extend([ Command(x, maxtime, global_to_stdout, top_lock, \ numbered=numbered, derived=derived) for x in sys.argv[2:] ]) del sys.argv[2:] elif sys.argv[1] == '--labeled-command': partitioned = sys.argv[2].partition(':') commands.append(Command(partitioned[2], maxtime, global_to_stdout, top_lock, \ named=True, name=partitioned[0])) del sys.argv[1] elif sys.argv[1] == '--labeled-commands-file': file = open(sys.argv[2], 'r') for line in file: partitioned = line.partition(':') commands.append(Command(partitioned[2].rstrip('\n'), maxtime, global_to_stdout, top_lock, \ named=True, name=partitioned[0])) file.close() del sys.argv[1] elif sys.argv[1] == '--labeled-commands': for arg in sys.argv[2:]: partitioned = arg.partition(':') commands.append(Command(partitioned[2], maxtime, global_to_stdout, top_lock, \ named=True, name=partitioned[0])) del sys.argv[2:] elif sys.argv[1] == '--parameterized-command': command_string = sys.argv[2] parameters = sys.argv[3].split() for parameter in parameters: commands.append(Command(command_string % parameter, maxtime, global_to_stdout, top_lock, \ named=True, name=parameter)) del sys.argv[1] del sys.argv[1] elif sys.argv[1] == '--ssh-command': command_string_within_ssh = sys.argv[2] command_string = 'ssh %%s %s' % qbq(command_string_within_ssh) parameters = sys.argv[3].split() for parameter in parameters: commands.append(Command(command_string % parameter, maxtime, global_to_stdout, top_lock, \ named=True, name=parameter)) del sys.argv[1] del sys.argv[1] elif sys.argv[1] == '--scp1-command': filename = sys.argv[2] command_string = 'scp %s %%s:%s' % (filename, filename) parameters = sys.argv[3].split() for parameter in parameters: commands.append(Command(command_string % parameter, maxtime, global_to_stdout, top_lock, \ derived=True)) del sys.argv[1] del sys.argv[1] elif sys.argv[1] == '--scp-command': source_filename = sys.argv[2] destination_filename = sys.argv[3] command_string = 'scp %s %%s:%s' % (source_filename, destination_filename) parameters = sys.argv[4].split() for parameter in parameters: commands.append(Command(command_string % parameter, maxtime, global_to_stdout, top_lock, \ derived=True)) del sys.argv[1] del sys.argv[1] del sys.argv[1] elif sys.argv[1] == '--rsync1-command': filename = sys.argv[2] command_string = 'rsync -apl %s %%s:%s' % (filename, filename) parameters = sys.argv[3].split() for parameter in parameters: commands.append(Command(command_string % parameter, maxtime, global_to_stdout, top_lock, \ derived=True)) del sys.argv[1] del sys.argv[1] elif sys.argv[1] in [ '--rsync-command', '--rsync2-command' ]: source_filename = sys.argv[2] destination_filename = sys.argv[3] command_string = 'rsync -apl %s %%s:%s' % (source_filename, destination_filename) parameters = sys.argv[4].split() for parameter in parameters: commands.append(Command(command_string % parameter, maxtime, global_to_stdout, top_lock, \ derived=True)) del sys.argv[1] del sys.argv[1] del sys.argv[1] elif sys.argv[1] == '--rsync3-command': options = sys.argv[2] source_filename = sys.argv[3] destination_filename = sys.argv[4] command_string = 'rsync -apl %s %s %%s:%s' % (options, source_filename, destination_filename) parameters = sys.argv[5].split() for parameter in parameters: commands.append(Command(command_string % parameter, maxtime, global_to_stdout, top_lock, \ derived=True)) del sys.argv[1] del sys.argv[1] del sys.argv[1] del sys.argv[1] elif sys.argv[1] == '--dump-lines': dump_lines_from_1 = int(sys.argv[2]) del sys.argv[1] elif sys.argv[1] == '--dump-entire': dump_lines_from_1 = 'all' elif sys.argv[1] == '--dump-nothing': dump_lines_from_1 = 'nothing' elif sys.argv[1] == '--to-stdout': global_to_stdout = True import thread import socket top_lock=thread.allocate_lock() elif sys.argv[1] == '--not-to-stdout': global_to_stdout = False top_lock=None elif sys.argv[1] == '--verbosity': verbosity=int(sys.argv[2]) del sys.argv[1] elif sys.argv[1] == '-v': verbosity+=1 elif sys.argv[1] == '--absolute-time': absolute_time = True elif sys.argv[1] == '--relative-time': relative_time = True else: sys.stderr.write("%s: Illegal argument: %s\n" % (sys.argv[0], sys.argv[1])) usage(1) del sys.argv[1] if directory == '': directory = '/tmp/looper-%d-%d' % (time.time(), os.getpid()) coordinated_output('using directory %s' % directory, lock=top_lock) if (absolute_time or relative_time) and not global_to_stdout: sys.stderr.write('%s: Warning: --absolute-time and/or --relative-time specified, but not --to-stdout mode\n' % \ sys.argv[0]) # Lots of globals. Sigh. This started off just part of the main body # of the program, and then I realized I needed to do this from two # places. def check_for_termination(): time.sleep(resolution) global result global current_concurrency global number_of_true_exits global number_of_false_exits global current_concurrency if verbosity >= 3: coordinated_output('checking for subprocess termination', lock=top_lock) # Not the most pythonic loop, but it works pretty well and is a # common way of doing things in other languages. It's this way # because we need to modify what we're iterating over, as we're # iterative over it. OK, strictly speaking we could use two lists or # two deques or something, but... is that really any better? for active_command_no in xrange(len(active_commands)-1, -1, -1): active_command = active_commands[active_command_no] if active_command.is_done(): # flip the logic if not active_command.exit_status: number_of_true_exits += 1 else: number_of_false_exits += 1 if and_statuses: result = max(active_command.exit_status, result) if exit_early and result: coordinated_output('%s: Terminating early because command %s exited false' % active_command, \ file=sys.stderr, lock=top_lock) sys.exit(result) else: result = min(active_command.exit_status, result) if exit_early and not result: coordinated_output('%s: Terminating early because command %s exited true' % active_command, \ file=sys.stderr, lock=top_lock) sys.exit(result) del active_commands[active_command_no] current_concurrency -= 1 if current_concurrency < 0: sys.stderr.write('%s: Internal error: current_concurrency < 0\n' % sys.argv[0]) sys.exit(1) try: statbuf = os.stat(directory) except OSError: os.makedirs(directory, 0755) else: if stat.S_ISDIR(statbuf.st_mode): sys.stderr.write('%s: %s already exists and is a directory. Continuing.\n' % (sys.argv[0], directory)) else: sys.stderr.write('%s: %s already exists and is not a directory. Terminating.\n' % (sys.argv[0], directory)) sys.exit(1) current_concurrency = 0 if and_statuses: # we and the values, so start out assuming True result = 0 else: # we or the values, so start out assuming False result = 255 # lst.pop() is much faster than v=lst[0]; del lst[0] commands.reverse() # we're pulling commands out of the "commands" list, and then putting # them into the active_commands list. active_commands = [] while commands: while commands and (max_concurrency == -1 or current_concurrency < max_concurrency): inactive_command = commands.pop() inactive_command.start() if stagger: time.sleep(stagger_time) current_concurrency += 1 active_commands.append(inactive_command) check_for_termination() # beware - globals modified while active_commands: check_for_termination() # beware - globals modified # give the threads time to finish up - otherwise we may lose the end of some of the output streams global_finish_time_0 = time.time() global_max_thread_wait = global_thread_timeout + 0.5 if global_to_stdout: while True: if global_num_threads < 0: sys.stderr.write('%s: internal error: global_num_threads is less than 0\n' % sys.argv[0]) elif global_num_threads == 0: break else: time.sleep(resolution) global_finish_time_1 = time.time() global_difference = global_finish_time_1 - global_finish_time_0 if global_difference >= global_max_thread_wait: if verbosity >= 2: coordinated_output("%d threads are not finished, but that's OK. Finishing up" % \ global_num_threads, lock=top_lock) # Threads are not finished, but they're taking too long - give up on them. They've had # plenty of time to finish by now unless we're on an extremely loaded system break if verbosity >= 2: coordinated_output('waiting for threads to finish: %d thread(s) remain(s), %3.1f seconds remains' % \ (global_num_threads, global_max_thread_wait - global_difference), lock=top_lock) if verbosity >= 1: coordinated_output('true exit statuses: %d' % number_of_true_exits, lock=top_lock) coordinated_output('false exit statuses: %d' % number_of_false_exits, lock=top_lock) result_file_regex = re.compile('^([0-9]+) - .* - ([0-9]+)$') def regex_matches(regex, s): match = regex.match(s) if match == None: return False else: return True class Result_file: def __init__(self, s): self.string = s match = result_file_regex.match(s) self.execution_order = int(match.group(1)) self.exit_status = int(match.group(2)) def __cmp__(left, right): # success < failure. Aside from that we list in execution order if left.exit_status == 0 and right.exit_status != 0: return -1 elif left.exit_status != 0 and right.exit_status == 0: return 1 else: return cmp(left.execution_order, right.execution_order) def __str__(self): return self.string if dump_lines_from_1 == 'all': filenames = os.listdir(directory) result_files = [ Result_file(filename) \ for filename in filenames \ if regex_matches(result_file_regex, filename)] result_files.sort() for result_file in result_files: full_path = os.path.join(directory, result_file.string) print '%s:' % full_path file = open(full_path, 'r') for global_line in file: sys.stdout.write('\t%s' % global_line) elif dump_lines_from_1 == 'nothing': pass else: filenames = os.listdir(directory) #result_files = [ Result_file(filename) for filename in filenames ] result_files = [ Result_file(filename) \ for filename in filenames \ if regex_matches(result_file_regex, filename)] result_files.sort() for result_file in result_files: full_path = os.path.join(directory, result_file.string) print '%s:' % full_path summary_file = open(full_path, 'r') lines_not_shown = 0 lineno_from_0=0 #for global_line in summary_file: while 1: global_line = summary_file.readline() if not global_line: break if lineno_from_0 < dump_lines_from_1: sys.stdout.write('\t%s' % global_line) else: lines_not_shown += 1 lineno_from_0 += 1 if lines_not_shown == 0: print '\t(all lines shown)' else: print '\t(%d line(s) not shown)' % lines_not_shown if not os.listdir(directory): print '%s is empty - removing' % directory os.rmdir(directory) else: print '%s is not empty - please look it over' % directory # invert the python result to get a shell (wait(), actually) result sys.exit(result)