#!/usr/bin/python3 # pylint: disable=superfluous-parens # superfluous-parens: Parentheses are good for clarity and portabilty """A script(1) reimplementation in Python, with support for dated output files.""" # This version is much better abstracted than the first attempt import os import pty import sys import tty import time import errno import fcntl import random import select import signal import struct import termios VERSION = "1.02" sys.path.insert(0, "/usr/local/lib") sys.path.insert(0, os.path.expanduser("~/lib")) def make_used(*vars): """Persuade linters that vars are used.""" assert True or vars def usage(retval): """Output a usage message.""" sys.stderr.write("Usage: %s [file]\n" % sys.argv[0]) sys.stderr.write("\t-a Append the output to file or typescript, retaining the prior contents\n") sys.stderr.write("\t-c command\n") sys.stderr.write("\t-f Flush output after each write.\n") sys.stderr.write("\t-q Be quiet.\n") sys.stderr.write("\t-d Use dated files.\n") sys.stderr.write("\t-t Output timing data\n") sys.stderr.write("\t In dated mode, using a .timing file.\n") sys.stderr.write("\t In non-dated mode, use standard error.\n") sys.stderr.write("\t--verbosity levelno\n") sys.stderr.write("\t-v\n") sys.stderr.write("\t--version report the version of the program\n") sys.stderr.write("\t--help\n") sys.exit(retval) def write_regardless(function, *args): """If we have a str, encode as bytes first. Otherwise, just write it.""" args = list(args) if isinstance(args[-1], str): args[-1] = args[-1].encode("UTF-8") retry_on_eintr(function, *args) def rnd(): """Return a random string.""" # We let python worry about seeding. For this application, that should be sufficient characters = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" len_characters = len(characters) result_list = [] for count in range(12): make_used(count) result_list.append(characters[int(random.random() * len_characters)]) return "".join(result_list) def retry_on_eintr(function, *args, **kw): """ Retry a system call. Keep retrying until one of the following happens: 1) It succeeds 2) It errors with something other than EINTR """ while True: try: return function(*args, **kw) except OSError: nothing, extra, nothing2 = sys.exc_info() make_used(nothing, nothing2) if extra.errno == errno.EINTR: continue else: raise class SimpleFile(object): # pylint: disable=R0902 # R0902: We need several instance attributes """A simple file-like object with timing data and optional synchronous writes.""" def __init__(self, program_name, options): """Initialize.""" self.program_name = program_name.title() self.filename = options.base_output_filename self.append = options.append self.quiet = options.quiet self.flush_opt = options.flush_opt self.timing_data = options.timing_data self.timing_file = None self.file_ = None self.prior_time = None self.__open() def close(self, continued=False): """Close 'the file'.""" if not self.quiet: string = "%s finished, file was %s\r\n" % (self.program_name, self.filename) if continued: string = string + "...To be continued\r\n" print(string) sys.stdout.flush() self.write(string) if self.file_ is not None: retry_on_eintr(self.file_.close) if self.timing_file != sys.stderr and self.timing_file is not None: retry_on_eintr(self.timing_file.close) def flush(self): """Flush 'the file'.""" retry_on_eintr(self.file_.flush) if self.timing_data: retry_on_eintr(self.timing_file.flush) def __open(self): """Open 'the file', but only if not yet open.""" if self.file_ is None: if self.append: open_flag = "ab" else: open_flag = "wb" self.file_ = retry_on_eintr(open, self.filename, open_flag) if not self.quiet: string = "%s started, file is %s\r\n" % (self.program_name, self.filename) write_regardless(os.write, sys.stdout.fileno(), string) write_regardless(os.write, self.file_.fileno(), string) if self.timing_data: self.prior_time = time.time() self.timing_file = sys.stderr retry_on_eintr(self.timing_file.write, "0.0 1\n") def write(self, data): """Write a chunk of data to the file.""" self.__open() if self.timing_data: current_time = retry_on_eintr(time.time) entry = "%f %d\n" % (current_time - self.prior_time, len(data)) retry_on_eintr(self.timing_file.write, entry) self.prior_time = current_time write_regardless(os.write, self.file_.fileno(), data) if self.flush_opt: retry_on_eintr(self.file_.flush) if self.timing_data: retry_on_eintr(self.timing_file.flush) class DatedFiles(object): # pylint: disable=R0902 # R0902: We need several instance attributes """A file-like object that writes dated files.""" def __init__(self, program_name, options): """Initialize.""" self.program_name = program_name.title() self.base_output_filename = options.base_output_filename self.append = options.append self.quiet = options.quiet self.flush_opt = options.flush_opt self.timing_data = options.timing_data self.timing_file = None self.prior_filename = "" self.prior_session_file_part = "" self.file_ = None self.filename = "" self.session_file_part = "" self.first_time = True self.prior_time = None self.session_id = rnd() self.timing_filename = None self.__open() def close(self, continued=False): """Close 'the file'.""" if not self.quiet: string = "%s finished, file was %s\r\n" % (self.program_name, self.prior_filename) if continued: string = string + "...To be continued\r\n" print(string) sys.stdout.flush() self.write(string) if self.file_ is not None: retry_on_eintr(self.file_.close) if self.timing_file != sys.stderr and self.timing_file is not None: retry_on_eintr(self.timing_file.close) def flush(self): """Flush 'the file'.""" retry_on_eintr(self.file_.flush) if self.timing_data: retry_on_eintr(self.timing_file.flush) @staticmethod def __make_dirs(year, year_and_month, year_month_and_day): """Make prerequisite directories.""" for directory in [year, year_and_month, year_month_and_day]: if not os.path.isdir(directory): try: retry_on_eintr(os.mkdir, directory) except (OSError, IOError): dummy, extra, dummy = sys.exc_info() if extra.errno == errno.EEXIST: pass else: raise def __open(self): """Open the file, but only if necessary.""" year = time.strftime("%Y") month = time.strftime("%m") day = time.strftime("%d") year_and_month = os.path.join(year, month) year_month_and_day = os.path.join(year_and_month, day) self.session_file_part = "%s/%s-%s" % (year_month_and_day, self.base_output_filename, self.session_id) if self.session_file_part != self.prior_session_file_part: self.prior_session_file_part = self.session_file_part self.filename = "%s-%s" % (self.session_file_part, rnd()) if self.file_ is not None: retry_on_eintr(self.close, continued=True) self.__make_dirs(year, year_and_month, year_month_and_day) if self.append: open_flag = "ab" else: open_flag = "wb" # Open the file. self.file_ = retry_on_eintr(open, self.filename, open_flag) # Announce what we're doing if not self.quiet: string = "%s started, file is %s\r\n" % (self.program_name, self.filename) write_regardless(os.write, sys.stdout.fileno(), string) write_regardless(os.write, self.file_.fileno(), string) self.prior_session_file_part = self.session_file_part self.prior_filename = self.filename if self.timing_data: self.prior_time = time.time() self.timing_filename = "%s.timing" % self.filename self.timing_file = retry_on_eintr(open, self.timing_filename, open_flag) if self.first_time: self.first_time = False retry_on_eintr(self.timing_file.write, "0.0 1\n") def write(self, data): """Write data to 'the file'.""" self.__open() if self.timing_data: current_time = retry_on_eintr(time.time) retry_on_eintr(self.timing_file.write, "%f %d\n" % (current_time - self.prior_time, len(data))) self.prior_time = current_time write_regardless(os.write, self.file_.fileno(), data) if self.flush_opt: retry_on_eintr(self.file_.flush) if self.timing_data: retry_on_eintr(self.timing_file.flush) def resize(dummy1, dummy2): """Resize the window - we're a SIGWINCH handler.""" if hasattr(termios, "TIOCGWINSZ"): window_size = struct.pack("HHHH", 0, 0, 0, 0) try: window_size = retry_on_eintr(fcntl.ioctl, sys.stdin.fileno(), termios.TIOCGWINSZ, window_size) except IOError: pass else: retry_on_eintr(fcntl.ioctl, GLOBALS.filedes, termios.TIOCSWINSZ, window_size) retry_on_eintr(os.kill, GLOBALS.pid, signal.SIGWINCH) class Globals(object): """Hold some "global" variables needed by many parts of the code.""" # pylint: disable=R0903 # R0903: We don't need a lot of public methods def __init__(self): """Initialize.""" self.pid = None self.filedes = None GLOBALS = Globals() class Options(object): """Hold command line options.""" # pylint: disable=R0903,R0902 # R0903: We don't need a lot of public methods; we're a container # R0902: We need a bunch of instance attributes; we're a container def __init__(self): """Initialize.""" self.verbosity = 0 self.base_output_filename = "typescript" self.append = False self.command = "" self.flush_opt = False self.timing_data = False self.dated_files = False self.use_psyco = True self.quiet = False def parse_options(options): """Parse command line options.""" # pylint: disable=R0912 # R0912: We need lots of branches, because we're a CLI options parser while sys.argv[1:]: if sys.argv[1] in ("--help", "-h"): usage(0) elif sys.argv[1] == "--verbosity": options.verbosity = int(sys.argv[2]) del sys.argv[1] elif sys.argv[1] == "--version": print(VERSION) elif sys.argv[1] == "-v": options.verbosity += 1 elif sys.argv[1] == "-a": options.append = True elif sys.argv[1] == "-c": options.command = sys.argv[2] del sys.argv[1] elif sys.argv[1] == "-f": options.flush_opt = True elif sys.argv[1] == "-q": options.quiet = True elif sys.argv[1] == "-t": options.timing_data = True elif sys.argv[1] == "-d": options.dated_files = True elif sys.argv[1].startswith("-"): sys.stderr.write("%s: Illegal option: %s\n" % (sys.argv[0], sys.argv[1])) usage(1) else: options.base_output_filename = sys.argv[1] del sys.argv[1] if sys.argv[1:]: sys.stderr.write("%s: Too many arguments starting with %s\n" % (sys.argv[0], sys.argv[1])) sys.exit(1) break del sys.argv[1] check_options(options) def check_options(options): """Vet results of command line options.""" if options.dated_files and os.path.isabs(options.base_output_filename): sys.stderr.write( "%s: base path (%s) must be relative in dated files (-d) mode\n" % ( sys.argv[0], options.base_output_filename, ) ) sys.exit(1) def child_process(command): """Exec the child process.""" if "SHELL" in os.environ: shell = os.environ["SHELL"] else: shell = "/bin/sh" if command == "": os.execvp(shell, [shell, "-i"]) else: os.execvp(shell, [shell, "-c", command]) def restore_and_exit(stdin, file_, have_old_termios, old_termios): """Restore the pty to its original state, close, and terminate.""" if have_old_termios: termios.tcsetattr(stdin, termios.TCSADRAIN, old_termios) retry_on_eintr(file_.close) sys.exit(0) def loop(stdin, file_, filedes, have_old_termios, old_termios): """Do the parent process' loop.""" read_length = 2**16 stdout = sys.stdout.fileno() while True: try: (input_filedes, output_filedes, exception_filedes) = retry_on_eintr(select.select, [stdin, filedes], [], []) except select.error: pass else: break if exception_filedes: sys.stderr.write("%s: should never get an exception filedes from select\n" % sys.argv[0]) sys.exit(1) if output_filedes: sys.stderr.write("%s: should never get an output filedes from select\n" % sys.argv[0]) sys.exit(1) if filedes in input_filedes: try: char = retry_on_eintr(os.read, filedes, read_length) except OSError: # Unfortunately, with pty's a read past EOF (IOW, when the # child process exits) raises an OSError exception restore_and_exit(stdin, file_, have_old_termios, old_termios) else: if not char: # We used to sleep here and continue looping, but macOS wants to exit. restore_and_exit(stdin, file_, have_old_termios, old_termios) retry_on_eintr(os.write, stdout, char) retry_on_eintr(file_.write, char) if stdin in input_filedes: char = retry_on_eintr(os.read, stdin, read_length) retry_on_eintr(file_.write, b"") retry_on_eintr(os.write, filedes, char) def parent_process(options, program_name, filedes): """Do the parent process work.""" signal.signal(signal.SIGWINCH, resize) resize(None, None) stdin = sys.stdin.fileno() if options.dated_files: file_ = DatedFiles(program_name, options) else: file_ = SimpleFile(program_name, options) # pylint: disable=redefined-variable-type try: old_termios = retry_on_eintr(termios.tcgetattr, stdin) retry_on_eintr(tty.setraw, stdin) except termios.error: have_old_termios = False old_termios = None else: have_old_termios = True while True: loop(stdin, file_, filedes, have_old_termios, old_termios) def main(): """Start the ball rolling.""" options = Options() parse_options(options) program_name = os.path.basename(sys.argv[0]) if options.append and options.dated_files: sys.stderr.write("%s: You may not combine -a and -d\n" % sys.argv[0]) usage(1) GLOBALS.pid, GLOBALS.filedes = pty.fork() if GLOBALS.pid == 0: child_process(options.command) else: parent_process(options, program_name, GLOBALS.filedes) main()