#!/usr/bin/python3 """Count lines as they go out, optionally using file (line) sizes.""" import errno import os import sys import time import typing sys.path.insert(0, os.path.expanduser('~/lib')) sys.path.insert(0, os.path.expanduser('/usr/local/lib')) import bashquote # noqa: ignore=E402 try: import modunits except ImportError: HAVE_MODUNITS = False else: HAVE_MODUNITS = True try: import readline0 except ImportError: HAVE_READLINE0 = False else: HAVE_READLINE0 = True try: # 3.x has this, 2.x doesn't FileNotFoundError except NameError: # So on 2.x, we create it FileNotFoundError = IOError # pylint: disable=redefined-builtin def make_used(*var: typing.Any) -> None: """Convince linters that var is 'used'.""" assert True or var def usage(retval: int) -> None: """Output a usage message.""" if retval == 0: write = sys.stdout.write else: write = sys.stderr.write write('%s: [-c|-b|-e est|-n|-C listfile checkpointfile n|-p prog] [-d divisor]\n' % sys.argv[0]) write('-c\t\t\t\tmeans to only count lines - do not try to give a progress report\n') write('-b\t\t\t\tmeans to buffer the lines, to get accurate estimates\n') write('-s\t\t\t\tin buffer mode, means to stat lines and compute size for progress\n') write('-e lines\t\t\tmeans the user will provide an estimate of the number of lines\n') write('-n\t\t\t\tmeans no progress report will be given (IE, be a primitive "cat")\n') write('-C listfile checkpointfile n\tmeans to read filenames from listfile,\n') write('\t\t\t\twrite checkpoint indices to checkpointfile, and to checkpoint after every n files\n') write('-p prog\t\t\t\tmeans to run program once per input line. ') # this one continues the previous line write('"%s" in the string is replaced with the input line\n') write('-0\t\t\t\tmeans to read lines null terminated, not newline terminated\n') write('-d divisor\t\t\tadjusts how often to report\n') write('-o\t\t\t\tprogress output overstrikes\n') write('-q\t\t\t\tmeans to operate quietly\n') if not HAVE_READLINE0: write('\n(-0 disabled - no readline0 module found)\n') if not HAVE_MODUNITS: write('\n(-s disabled - no modunits module found)\n') write('\n') sys.exit(1) class Options(object): # pylint: disable=too-few-public-methods,too-many-instance-attributes # too-few-public-methods: We're a container # too-many-instance-attributes: We're a command line parser """Deal with command line options and global state.""" def __init__(self) -> None: """Initialize.""" # pylint: disable=too-many-statements,too-many-branches # too-many-statements: command line parsers tend to need a lot of statements # too-many-branches: command line parsers tend to need a lot of statements self.prog = b'' self.terminator = b'\n' self.specified_divisor = False self.stat_mode = False self.nominal_metadata_size = 100 self.divisor = 10 self.overstrike = False self.line_term = b'\n' self.first_line = True self.quiet = False self.mode = 'unspecified' self.prior_percent_done = -1 self.time0 = 0.0 self.stdin = 0 if not sys.argv[1:]: print('{}: error: no arguments'.format(sys.argv[0]), file=sys.stderr) usage(1) while sys.argv[1:]: if sys.argv[1] == '-c': self.mode = 'count' elif sys.argv[1] == '-o': self.overstrike = True self.line_term = b'\r' elif sys.argv[1] == '-s': self.stat_mode = True elif sys.argv[1] == '-q': self.quiet = True elif sys.argv[1] == '-b': self.mode = 'buffered' elif sys.argv[1] == '-0': if HAVE_READLINE0: self.terminator = b'\0' else: sys.stderr.write('Sorry, -0 is disabled because you have no readline0.py on your python module path\n') sys.exit(1) elif sys.argv[1] == '-p' and sys.argv[2:]: self.prog = bytes(sys.argv[2], 'ISO-8859-1') if b'%s' not in self.prog: sys.stderr.write('{}: -p requires a %s\n'.format(sys.argv[0])) sys.exit(1) del sys.argv[1] elif sys.argv[1] == '-e' and sys.argv[2:]: self.mode = 'estimate' self.lenlines = int(sys.argv[2]) del sys.argv[1] elif sys.argv[1] == '-n': self.mode = 'noprogress' elif sys.argv[1] == '-d': self.specified_divisor = True self.divisor = int(sys.argv[2]) del sys.argv[1] elif sys.argv[1] == '-C' and sys.argv[4:]: self.mode = 'checkpoint' self.listfilename = sys.argv[2] self.checkpointfilename = sys.argv[3] self.number = int(sys.argv[4].strip()) del sys.argv[1] del sys.argv[1] del sys.argv[1] else: print('{}: unrecognized option: {}'.format(sys.argv[0], sys.argv[1]), file=sys.stderr) usage(0) del sys.argv[1] def compute_divisor(lenlines: int, options: Options) -> None: """Compute the divisor for progress data.""" options.divisor = lenlines // 1000 if options.divisor < 10: options.divisor = 10 def progress(lineno: int, numlines: int, line_term: bytes, options: Options) -> None: """Write progress info to stderr.""" if options.first_line: options.first_line = False options.time0 = time.time() sys.stderr.write('Count starting\n') return time1 = time.time() diff = time1 - options.time0 if lineno % options.divisor == 0 and lineno != 0: linespersecond = lineno / diff remainingtime = (numlines - lineno) / linespersecond message1 = b'Line %d of %d, %f%% done, ' tuple1 = ( lineno, numlines, (lineno*1000.0/numlines) / 10.0, ) os.write(2, message1 % tuple1) message2 = b'%f lines/second, %b elapsed, ' tuple2 = ( linespersecond, human_readable_time(diff), ) os.write(2, message2 % tuple2) message3 = b'%b remaining %b' tuple3 = ( human_readable_time(remainingtime), line_term, ) os.write(2, message3 % tuple3) def file_progress(amount_so_far: int, total_amount: int, line_term: bytes, options: Options) -> None: """Output progress data (when appropriate) - assuming lines are files of a given size.""" if options.first_line: options.first_line = False options.prior_percent_done = -1 options.time0 = time.time() sys.stderr.write('Count starting \n') return time1 = time.time() diff = time1 - options.time0 percent_done = float(amount_so_far) * 100.0 / float(total_amount) if options.prior_percent_done != int(percent_done) and diff != 0: options.prior_percent_done = int(percent_done) bytes_per_second = amount_so_far / diff bps = modunits.modunits( 'computer-bit-seconds', int((amount_so_far * 8) / diff), fractional_part_length=2, units='abbreviated') remainingtime = (total_amount - amount_so_far) / bytes_per_second so_far = modunits.modunits( 'computer-size-iec', amount_so_far, fractional_part_length=2, units='abbreviated', ) total = modunits.modunits( 'computer-size-iec', total_amount, fractional_part_length=2, units='abbreviated', ) elapsed_time = human_readable_time(diff) time_remaining = human_readable_time(remainingtime) os.write(2, b'%b of %b, %.1f%% done, %s, %s elapsed, %s remaining %b' % ( bytes(so_far, 'UTF-8'), bytes(total, 'UTF-8'), percent_done, bytes(bps, 'UTF-8'), elapsed_time, time_remaining, line_term, )) # os.write(2, b'%b of %b, ' % ( # bytes(so_far, 'UTF-8'), # bytes(total, 'UTF-8')), # ) # os.write(2, b'%.1f%% done, ' % ( # percent_done, # )) # os.write(2, b'%s, ' % ( # bytes(bps, 'UTF-8'), # )) # os.write(2, b'%s elapsed, %s remaining %b' % ( # elapsed_time, # time_remaining, # line_term, # )) def doline(line: bytes, prog: bytes, terminator: bytes) -> None: """Handle one line - output it, or os.system it.""" if prog: bq = bashquote.BashquoteBytes() bq.add(line) os.system(prog.replace(b'%s', bq.result())) else: os.write(1, line + terminator) def human_readable_time(seconds: float) -> bytes: """Get time in a human-readable format.""" time_remaining = modunits.modunits( 'time', int(seconds), reverse=True, comma=False, detail='two-highest', units='abbreviated', ) return bytes(time_remaining, 'UTF-8') def get_size(filename: bytes) -> int: """Get the size of filename.""" stat = os.stat(filename) return stat.st_size class FileClass(object): # pylint: disable=too-few-public-methods # too-few-public-methods: We're a container """Hold data related to a file (line).""" __slots__ = ('filename', 'size') def __init__(self, filename: bytes) -> None: """Initialize.""" self.filename = filename self.size = get_size(filename) if HAVE_READLINE0: def my_readline(options: Options) -> typing.Iterator[bytes]: """Read a line using options.terminator as a line ending - often a null byte.""" for line in readline0.readline0(options.stdin, options.terminator): yield line else: def my_readline(options: Options) -> typing.Iterator[bytes]: """Read a line using newlines as line terminator; has the advantage of not requiring readline0.""" while True: # We intentionally use sys.stdin here for both python 2 and python 3; # this is just a fallback anyway. assert not isinstance(options.stdin, int) string = options.stdin.readline() if not string: break if string[-1:] == '\n': string = string[:-1] yield string def do_count_mode(options: Options) -> None: """Do count mode.""" every = 1000 # We set lineno to 0 here, in case 0 lines of input are found. lineno = 0 for lineno, line in enumerate(my_readline(options), start=1): doline(line, options.prog, options.terminator) if lineno % every == 0 and lineno: sys.stderr.write('%d \r' % lineno) sys.stderr.write('%d\n' % lineno) def do_buffered_mode(options: Options) -> None: """Do buffered mode.""" if options.stat_mode: files = [] for lineno, line in enumerate(my_readline(options)): if lineno % 1000 == 0: sys.stderr.write('Buffered %d filenames \r' % lineno) try: file_ = FileClass(line) except FileNotFoundError: sys.stderr.write('{}: {!r} not found - dangling symlink? Removing from list\n'.format(sys.argv[0], line)) else: files.append(file_) sys.stderr.write('\n') total_amount = sum(file_.size + options.nominal_metadata_size for file_ in files) amount_processed = 0 for fileno, file_ in enumerate(files): make_used(fileno) doline(file_.filename, options.prog, options.terminator) amount_processed += file_.size + options.nominal_metadata_size file_progress(amount_processed, total_amount, options.line_term, options) else: lines = [] lineno = 0 for lineno, line in enumerate(my_readline(options), start=1): lines.append(line) if lineno % 1000 == 0: sys.stderr.write('Buffered %d lines \r' % lineno) sys.stderr.write('\n') num_lines = len(lines) if not options.specified_divisor: compute_divisor(num_lines, options) for lineno in range(num_lines): doline(lines[lineno], options.prog, options.terminator) progress(lineno, num_lines, options.line_term, options) def do_estimate_mode(options: Options) -> None: """Do estimate mode.""" if options.mode == 'estimate' and not options.specified_divisor: compute_divisor(options.lenlines, options) lineno = 0 for lineno, line in enumerate(my_readline(options), start=1): doline(line, options.prog, options.terminator) progress(lineno, options.lenlines, options.line_term, options) def do_noprogress_mode(options: Options) -> None: """Do no-progress mode.""" for line in my_readline(options): doline(line, options.prog, options.terminator) def do_checkpoint_mode(options: Options) -> None: """Do checkpoint mode.""" listfile = open(options.listfilename, 'r') try: checkpointfile = open(options.checkpointfilename, 'r') except IOError: lineno = 0 else: lineno = int(checkpointfile.readline().strip()) offset = int(checkpointfile.readline().strip()) checkpointfile.close() listfile.seek(offset) for lineno, line in enumerate(my_readline(options), start=1): # leave the newlineage alone :) doline(line, options.prog, options.terminator) remainder = lineno % options.number if remainder == 0: checkpointfile = open(options.checkpointfilename, 'w') checkpointfile.write('%d\n%d\n' % (lineno, listfile.tell())) checkpointfile.close() sys.stderr.write('Checkpointed at line number %d \r' % lineno) listfile.close() def main() -> None: """Count lines of data in various ways, giving progress info.""" options = Options() try: if options.mode == 'unspecified': print(f'{sys.argv[0]}: you must specify one of -c, -b, -e, -n or -C\n', file=sys.stderr) usage(1) if options.mode == 'count': do_count_mode(options) elif options.mode == 'buffered': do_buffered_mode(options) elif options.mode == 'estimate': do_estimate_mode(options) elif options.mode == 'noprogress': do_noprogress_mode(options) elif options.mode == 'checkpoint': do_checkpoint_mode(options) else: print(f'{sys.argv[0]}: internal error: options.mode has a strange value: {options.mode}\n', file=sys.stderr) usage(1) except OSError as exc: if exc.errno == errno.EPIPE: # This is common, and should not be regarded as an error condition. pass else: raise if options.overstrike: sys.stderr.write('\n') if not options.quiet: sys.stderr.write("Count done\n") main()