#!/usr/bin/env python3 """Eliminate duplicate values from a file containing many lines.""" import itertools import os import sys import typing import readline0 def usage(retval: int) -> None: """Output a usage message.""" if retval == 0: file_ = sys.stdout else: file_ = sys.stderr print(f"Usage: {sys.argv[0]}", file=file_) print(file=file_) print("Eliminate duplicates. Assumes the input is sorted.", file=file_) print(file=file_) print("--elim-all-but-one-dup for n duplicates, output 1, suppressing n-1", file=file_) print("--elim-all-dups for n duplicates, output 0, suppressing n", file=file_) print("--field-separator , split on a comma. Defaults to a single space.", file=file_) print("--record-separator '\\0' split on a comma. Defaults to a single space.", file=file_) print("--split s in each row of n separators, split the first s occurences of separator, ", file=file_) print(" leaving s+1..n as a single, unsplit value", file=file_) print("--check-fields 0,1 extract fields 0 and 1 from the split row, and use them (only) in ", file=file_) print(" uniqueness tests", file=file_) print("--output-fields 1,3 extract fields 1 and 3 from the split row, and output them (only) to stdout", file=file_) print(file=file_) print("Example uses:", file=file_) print(" keep-uniques --elim-all-but-one-dup --split 2 --check-fields 0,1 --output-fields 0,1,2", file=file_) print(" keep-uniques --elim-all-dups --split 1 --check-fields 0 --output-fields 1 --field-separator ,", file=file_) print(file=file_) print("Note that, at least for now, the input and output record separators, and input and output field", file=file_) print("separators, are the same.", file=file_) sys.exit(retval) def sliding_triples(iterable: typing.Iterable[bytes]) -> typing.Iterator[tuple[bytes, bytes, bytes]]: """Return an iterator that produces a sliding window of 3 values.""" a, b, c = itertools.tee(iterable, 3) return zip(a, itertools.islice(b, 1, None), itertools.islice(c, 2, None)) def record_to_fields(*, record_separator: bytes, field_separator: bytes, record: bytes, split: int) -> list[bytes]: """Split a record into fields.""" fields = record.rstrip(record_separator).split(field_separator, split) return fields def do_elim_all_but_one_dup( *, check_fields: list[int], record_separator: bytes, field_separator: bytes, split: int, output_fields: list[int] ) -> None: """Eliminate duplicates, leaving one of each duplicate.""" prior_fields_to_compare = None for line in readline0.readline0(file_=0, separator=record_separator): fields = record_to_fields( record=line, record_separator=record_separator, field_separator=field_separator, split=split, ) fields_to_compare = extract_fields(fields=fields, fields_to_extract=check_fields) if fields_to_compare != prior_fields_to_compare: fields_to_output = extract_fields(fields=fields, fields_to_extract=output_fields) os.write(1, field_separator.join(fields_to_output) + record_separator) prior_fields_to_compare = fields_to_compare def do_elim_all_dups( *, record_separator: bytes, field_separator: bytes, check_fields: list[int], output_fields: list[int], split: int ) -> None: """Eliminate all duplicates, leaving zero of any that are the same.""" first_time = True # In case we fall through the loop without actually reading anything. list_of_fields_to_check = [] for records in sliding_triples(readline0.readline0(file_=0, separator=record_separator)): assert len(records) == 3 list_of_rows = [ record_to_fields( record=record, record_separator=record_separator, field_separator=field_separator, split=split, ) for record in records ] list_of_fields_to_check = [extract_fields(fields=row, fields_to_extract=check_fields) for row in list_of_rows] if first_time: # Edge case: deal with first row if list_of_fields_to_check[0] != list_of_fields_to_check[1]: fields_to_output = extract_fields(fields=list_of_rows[0], fields_to_extract=output_fields) os.write(1, field_separator.join(fields_to_output) + record_separator) first_time = False if list_of_fields_to_check[0] != list_of_fields_to_check[1] != list_of_fields_to_check[2]: # Deal with the main case: not the first line, not the last line - something in between. fields_to_output = extract_fields(fields=list_of_rows[1], fields_to_extract=output_fields) os.write(1, field_separator.join(fields_to_output) + record_separator) len_list_of_fields_to_check = len(list_of_fields_to_check) match len_list_of_fields_to_check: case 0: # This means stdin is empty; do nothing pass case 1: # This means stdin has only one line; it has to be unique, so output it. fields_to_output = extract_fields(fields=list_of_rows[0], fields_to_extract=output_fields) os.write(1, field_separator.join(fields_to_output) + record_separator) case 2: # This means stdin has two lines. If they are equal, output nothing; they are duplicates. # If they are different output them both. if list_of_fields_to_check[0] != list_of_fields_to_check[1]: for i in range(2): fields_to_output = extract_fields(fields=list_of_rows[i], fields_to_extract=output_fields) os.write(1, field_separator.join(fields_to_output) + record_separator) case 3: # This could be an stdin with three lines. It also could be the end of an stdin that was > 3 lines. if list_of_fields_to_check[1] != list_of_fields_to_check[2]: # Edge case: deal with last row fields_to_output = extract_fields(fields=list_of_rows[2], fields_to_extract=output_fields) os.write(1, field_separator.join(fields_to_output) + record_separator) case _: raise AssertionError( f"{sys.argv[0]}: internal error: len_list_of_fields_to_check has a strange value: ${len_list_of_fields_to_check}" ) def extract_fields(*, fields: list[bytes], fields_to_extract: list[int]) -> list[bytes]: """Extract a specific set of fields from `fields`.""" result = [fields[i] for i in fields_to_extract if fields[i:]] return result class Options: """Parse and hold command line options.""" def __init__(self) -> None: """Initialize.""" self.elim_all_but_one_dup = False self.elim_all_dups = False self.split = 0 self.check_fields = [] self.output_fields = [] # We default to an ASCII nul, because bash can't pass that, and it's commonly useful. self.record_separator = b"\0" self.field_separator = b" " while sys.argv[1:]: match sys.argv[1]: case "--elim-all-but-one-dup": self.elim_all_but_one_dup = True case "--elim-all-dups": self.elim_all_dups = True case "--split": self.split = int(sys.argv[2]) del sys.argv[1] case "--check-fields": self.check_fields = [int(i) for i in sys.argv[2].encode("UTF-8").split(b",")] del sys.argv[1] case "--output-fields": self.output_fields = [int(i) for i in sys.argv[2].encode("UTF-8").split(b",")] del sys.argv[1] case "--record-separator": self.record_separator = sys.argv[2].encode("UTF-8") del sys.argv[1] case "--field-separator": self.field_separator = sys.argv[2].encode("UTF-8") del sys.argv[1] case "--help" | "-h": usage(0) case _: print(f"{sys.argv[0]}: unrecognized option: {sys.argv[1]}", file=sys.stderr) usage(1) del sys.argv[1] def check(self) -> None: """Check command line options.""" preflight_good = True if self.elim_all_but_one_dup + self.elim_all_dups != 1: print(f"{sys.argv[0]}: you must specify exactly one of --elim-all-but-one-dup and --elim-all-dups", file=sys.stderr) preflight_good = False if not self.check_fields: print(f"{sys.argv[0]}: --check-fields is a required option", file=sys.stderr) preflight_good = False if not self.output_fields: print(f"{sys.argv[0]}: --output-fields is a required option", file=sys.stderr) preflight_good = False if len(self.record_separator) == 0: print(f"{sys.argv[0]}: --record-separator must be nonempty", file=sys.stderr) preflight_good = False if not preflight_good: print(f"{sys.argv[0]}: one or more items in preflight check failed", file=sys.stderr) usage(1) def main() -> None: """Start the ball rolling.""" options = Options() options.check() if options.elim_all_but_one_dup: do_elim_all_but_one_dup( check_fields=options.check_fields, record_separator=options.record_separator, field_separator=options.field_separator, split=options.split, output_fields=options.output_fields, ) if options.elim_all_dups: # This isn't even close to correct do_elim_all_dups( check_fields=options.check_fields, record_separator=options.record_separator, field_separator=options.field_separator, split=options.split, output_fields=options.output_fields, ) if __name__ == "__main__": main()