#!/usr/bin/env python3 """Read from stdin, pipe to multiple subprocesses.""" import os import sys import errno import subprocess from typing import Callable, List def is_broken_pipe(exc): """Examine an exception. Return True iff it is a broken pipe error.""" if exc.errno == errno.EPIPE: return True else: return False def usage(retval): """Output a usage message.""" if retval == 0: write = sys.stdout.write else: write = sys.stderr.write write('%s sets up multiple pipes and sends a copy of stdin to each of them\n' % sys.argv[0]) write('\n') write('Usage: %s [--file file_of_commands1] [--help] [--] command1 command2 ... commandn\n' % sys.argv[0]) write('\n') write('--help shows this help message\n') write('-- signals the end of options, so that commands may start with a "-"\n') write('--file filename reads commands to set up pipes with, one command per line\n') write(' May be repeated.\n') write('--quiet says to operate quietly\n') write('--serial says to process the pipes serially, big inhale into virtual memory, use only for small inputs\n') write('commandx says to use commandx to set up a pipe\n') sys.exit(retval) class Pipe(object): """A class to hold a (possibly inactive) pipe.""" def __init__(self, command: str) -> None: """Initialize.""" self.command = command # Universal newlines are off by default, meaning binary I/O, but we specify it anyway self.pipe = subprocess.Popen(self.command, shell=True, stdin=subprocess.PIPE, universal_newlines=False) self.active = True self.exit_code = None def write(self, string: bytes) -> None: """Write string to the pipe.""" self.pipe.stdin.write(string) def close(self) -> None: """Close the pipe.""" try: self.pipe.stdin.flush() except BrokenPipeError: pass try: self.pipe.stdin.close() except BrokenPipeError: pass # Note that there's a subtle race condition without this wait() that sometimes # causes subprocesses to terminate prematurely, sometimes yielding short output # files. self.exit_code = self.pipe.wait() class Options(object): # pylint: disable=too-few-public-methods """Deal with command line options.""" def __init__(self) -> None: """Initialize.""" self.commands = [] self.quiet = False self.serial = False while sys.argv[1:] and sys.argv[1][0:1] == '-': if sys.argv[1] == '--': del sys.argv[1] break elif sys.argv[1] in ('-f', '--file') and sys.argv[2:]: filename = sys.argv[2] with open(filename, 'r') as file_: self.commands.extend(line.strip() for line in file_.readlines()) del sys.argv[1] elif sys.argv[1] in ('-h', '--help'): usage(0) elif sys.argv[1] in ('-q', '--quiet'): self.quiet = True elif sys.argv[1] in ('-s', '--serial'): self.serial = True else: sys.stderr.write('%s: Illegal option: %s\n' % (sys.argv[0], sys.argv[1])) usage(1) del sys.argv[1] self.commands.extend(sys.argv[1:]) def open_pipes(options: Options) -> List[Pipe]: """Open the pipes.""" pipes = [] if not options.quiet: sys.stderr.write('Creating %d pipes\n' % len(options.commands)) for index, command in enumerate(options.commands): if not options.quiet: sys.stderr.write('popening command %d: %s\n' % (index, command)) pipes.append(Pipe(command)) return pipes def retry_on_eintr(function: Callable, *args, **kw) -> bytes: """Retry function(*args, **kw) if errno comes up EINTR.""" while True: try: return function(*args, **kw) except OSError: _unused, errdata, _unused = sys.exc_info() if errdata.errno == errno.EINTR: continue else: raise def do_blocked(max_buf_len: int, options: Options, commands: List[str]) -> None: # pylint: disable=too-many-branches """Do the pipes one block at a time.""" eof = False pipes = open_pipes(options) while True: at_least_one_pipe_remains = False buf = retry_on_eintr(os.read, 0, max_buf_len) # Nice for debugging, distracting in real life # sys.stderr.write('fred: len(buf) is %d\n' % len(buf)) if not buf: eof = True break for pipeno, pipe in enumerate(pipes): if pipe.active: try: pipe.write(buf) except (OSError, IOError): _unused, extra, _unused = sys.exc_info() if is_broken_pipe(extra): if not options.quiet: sys.stderr.write('terminated: %s, errno: %d\n' % (commands[pipeno], extra.errno)) pipes[pipeno].active = False else: raise else: at_least_one_pipe_remains = True if not at_least_one_pipe_remains: break if eof: for pipe in pipes: if pipe.active: pipe.close() return [pipe.exit_code for pipe in pipes] def do_serial(max_buf_len, options, commands): """Inhale the entire input, and write to the pipes one at a time.""" list_ = [] while True: block = os.read(sys.stdin.fileno(), max_buf_len) if not block: break list_.append(block) data = b''.join(list_) del list_ sys.stdin.close() pipes = [] for index, command in enumerate(commands): if not options.quiet: sys.stderr.write('popening command %d: %s\n' % (index, command)) pipes.append(Pipe(command)) try: pipes[-1].write(data) except (OSError, IOError): _unused, extra, _unused = sys.exc_info() if is_broken_pipe(extra): pass else: raise pipes[-1].close() return [pipe.exit_code for pipe in pipes] def main() -> None: """Get things started.""" options = Options() # Bourne shell (or bash, as the case may be) doesn't care about leading and trailing whitespace anyway commands = [command.strip() for command in options.commands] max_buf_len = 1024 * 1024 if options.serial: exit_codes = do_serial(max_buf_len, options, commands) else: exit_codes = do_blocked(max_buf_len, options, commands) sys.exit(max(exit_codes)) if __name__ == '__main__': main()