#!/usr/bin/python3 '''Read from stdin, pipe to multiple subprocesses''' import os import sys import errno import subprocess def is_broken_pipe(exc): '''Examine an exception. Return True iff it is a broken pipe error''' if exc.errno == errno.EPIPE: return True else: return False def usage(retval): '''Output a usage message''' sys.stderr.write('%s sets up multiple pipes and sends a copy of stdin to each of them\n' % sys.argv[0]) sys.stderr.write('\n') sys.stderr.write('Usage: %s [-f file_of_commands1] [-h] [--] command1 command2 ... commandn\n' % sys.argv[0]) sys.stderr.write('\n') sys.stderr.write('-h shows this help message\n') sys.stderr.write('-- signals the end of options, so that commands may start with a "-"\n') sys.stderr.write('-f filename reads commands to set up pipes with, one command per line\n') sys.stderr.write(' May be repeated.\n') sys.stderr.write('-q says to operate quietly\n') sys.stderr.write('-s says to process the pipes serially, big inhale - use only for small inputs\n') sys.stderr.write('commandx says to use commandx to set up a pipe\n') sys.exit(retval) class Pipe(object): '''A class to hold a (possibly inactive) pipe''' def __init__(self, command): self.command = command # 'wb' for mode doesn't work. Weird #pipe_wrapped_as_file = os.popen(self.command, 'w') # Universal newlines are off by default, meaning binary I/O, but we specify it anyway self.pipe = subprocess.Popen(self.command, shell=True, stdin=subprocess.PIPE, universal_newlines=False) self.active = True def write(self, string): '''Write string to the pipe''' self.pipe.stdin.write(string) def close(self): '''Close the pipe''' self.pipe.stdin.flush() self.pipe.stdin.close() # Note that there's a subtle race condition without this wait() that sometimes # causes subprocesses to terminate prematurely, sometimes yielding short output # files. self.pipe.wait() class Options(object): # pylint: disable=too-few-public-methods '''Deal with command line options''' def __init__(self): self.commands = [] self.quiet = False self.serial = False while sys.argv[1:] and sys.argv[1][0:1] == '-': if sys.argv[1] == '--': del sys.argv[1] break elif sys.argv[1] == '-f' and sys.argv[2:]: filename = sys.argv[2] self.commands += open(filename, 'r').readlines() del sys.argv[1] elif sys.argv[1] == '-h': usage(0) elif sys.argv[1] == '-q': self.quiet = True elif sys.argv[1] == '-s': self.serial = True else: sys.stderr.write('%s: Illegal option: %s\n' % (sys.argv[0], sys.argv[1])) usage(1) del sys.argv[1] self.commands += sys.argv[1:] def open_pipes(options): '''Open the pipes''' pipes = [] if not options.quiet: sys.stderr.write('Creating %d pipes\n' % len(options.commands)) for index, command in enumerate(options.commands): if not options.quiet: sys.stderr.write('popening command %d: %s\n' % (index, command)) pipes.append(Pipe(command)) return pipes def retry_on_eintr(function, *args, **kw): '''Retry function(*args, **kw) if errno comes up EINTR''' while True: try: return function(*args, **kw) except OSError: _unused, errdata, _unused = sys.exc_info() if errdata.errno == errno.EINTR: continue else: raise def do_blocked(max_buf_len, options, commands): # pylint: disable=too-many-branches '''Do the pipes one block at a time''' eof = False pipes = open_pipes(options) while True: at_least_one_pipe_remains = False buf = retry_on_eintr(os.read, 0, max_buf_len) # Nice for debugging, distracting in real life #sys.stderr.write('fred: len(buf) is %d\n' % len(buf)) if not buf: eof = True break for pipeno, pipe in enumerate(pipes): if pipe.active: try: pipe.write(buf) except (OSError, IOError): _unused, extra, _unused = sys.exc_info() if is_broken_pipe(extra): if not options.quiet: sys.stderr.write('terminated: %s, errno: %d\n' % (commands[pipeno], extra.errno)) pipes[pipeno].active = False else: raise else: at_least_one_pipe_remains = True if not at_least_one_pipe_remains: break if eof: for pipe in pipes: if pipe.active: pipe.close() def do_serial(max_buf_len, options, commands): '''Inhale the entire input, and write to the pipes one at a time''' list_ = [] while True: block = os.read(sys.stdin.fileno(), max_buf_len) if not block: break list_.append(block) data = b''.join(list_) del list_ #data = sys.stdin.read() #sys.stderr.write('type(data) is %s\n' % (type(data, ))) sys.stdin.close() for index, command in enumerate(commands): if not options.quiet: sys.stderr.write('popening command %d: %s\n' % (index, command)) pipe = Pipe(command) try: pipe.write(data) except (OSError, IOError): _unused, extra, _unused = sys.exc_info() if is_broken_pipe(extra): pass else: raise pipe.close() def main(): '''Main function''' options = Options() # Bourne shell (or bash, as the case may be) doesn't care about leading and trailing whitespace anyway commands = [command.strip() for command in options.commands] max_buf_len = 1024 * 1024 if options.serial: do_serial(max_buf_len, options, commands) else: do_blocked(max_buf_len, options, commands) main()