#!/usr/bin/env python3 """ Write all duplicate values to stdout. stdin is assumed to be already sorted. EG, for 1, 2, 2, 3, 4, 4, 4, 5, 6, 6 we should write: 2, 2, 4, 4, 4, 6, 6 (But that'll be a record, not typically just an int.) This one is, unfortunately, O(n) in the number of duplicate records. It fortunately is not O(n) in the number of files. """ import collections import os import sys import readline0 def usage(retval: int) -> None: """Output a usage message.""" if retval == 0: file_ = sys.stdout else: file_ = sys.stderr # keep-dups --split 1 --check-fields 0 print(f"Usage: {sys.argv[0]}", file=file_) print(file=file_) print("Eliminate unique values. Assumes the input is sorted.", file=file_) print(file=file_) print("--field-separator , split on a comma. Defaults to a single space.", file=file_) print("--split s in each row of n separators, split the first s occurences of separator, ", file=file_) print(" leaving s+1..n as a single, unsplit value", file=file_) print("--check-fields 0,1 extract fields 0 and 1 from the split row, and use them (only) in ", file=file_) print(" uniqueness tests", file=file_) print("--output-fields 1,3 extract fields 1 and 3 from the split row, and output them (only)", file=file_) print(" to stdout", file=file_) print("--input-record-separator $'\\n' Separate input records with newlines. Defaults to a single newline.", file=file_) print("--input-record-separator-null Separate input records with ASCII NULL's.", file=file_) print("--output-record-separator $'\\n' Separate output records with newlines. Defaults to a single newline.", file=file_) print("--output-record-separator-null Separate output records with ASCII NUL's.", file=file_) print("--double-record-separator $'\\n' instead of outputting f1,f2,f3\\nf4,f5,f6\\n output", file=file_) print(" f1\\nf2\\nf3\\n\\nf4\\nf5\\nf6\\n\\n for better grouping", file=file_) print(" IOW, a single record separator is used as a field separator, and", file=file_) print(" a double record separator is used to mark end-of-record. The", file=file_) print(" field-separator goes unused for output", file=file_) print("--double-record-separator-null As above, but output f1\\0f2\\0f3\\0\\0f4\\0f5\\0f6\\0\\0", file=file_) print(file=file_) print("Example uses:", file=file_) print(" keep-dups --split 2 --check-fields 0,1 --output-fields 0,1,2", file=file_) print(" keep-dups --split 1 --check-fields 0 --output-fields 1 --field-separator ,", file=file_) print(file=file_) print("Note that, at least for now, the input and output field separators are the same", file=file_) print(file=file_) sys.exit(retval) def record_to_fields(*, record_separator: bytes, field_separator: bytes, record: bytes, split: int) -> list[bytes]: """Split a record into fields.""" fields = record.rstrip(record_separator).split(field_separator, split) return fields class Options: """Parse and hold command line options.""" def __init__(self) -> None: """Initialize.""" self.split = 0 self.check_field_indexes: list[int] = [] self.output_field_indexes: list[int] = [] self.field_separator = b"," self.input_record_separator = b"\n" self.output_record_separator = b"" self.double_record_separator = b"" while sys.argv[1:]: match sys.argv[1]: case "--split": self.split = int(sys.argv[2]) del sys.argv[1] case "--check-field-indexes": self.check_field_indexes = [int(i) for i in sys.argv[2].encode("UTF-8").split(b",")] del sys.argv[1] case "--output-field-indexes": self.output_field_indexes = [int(i) for i in sys.argv[2].encode("UTF-8").split(b",")] del sys.argv[1] case "--input-record-separator": self.input_record_separator = sys.argv[2].encode("UTF-8") del sys.argv[1] case "--input-record-separator-null": self.input_record_separator = b"\0" case "--output-record-separator": self.output_record_separator = sys.argv[2].encode("UTF-8") del sys.argv[1] case "--output-record-separator-null": self.output_record_separator = b"\0" case "--double-record-separator": self.double_record_separator = sys.argv[2].encode("UTF-8") del sys.argv[1] case "--double-record-separator-null": self.double_record_separator = b"\0" case "--field-separator": self.field_separator = sys.argv[2].encode("UTF-8") del sys.argv[1] case "--help" | "-h": usage(0) case _: print(f"{sys.argv[0]}: unrecognized option: {sys.argv[1]}", file=sys.stderr) usage(1) del sys.argv[1] def check(self) -> None: """Check command line options.""" preflight_good = True if not self.check_field_indexes: print(f"{sys.argv[0]}: --check-field-indexes is a required option", file=sys.stderr) preflight_good = False if not self.output_field_indexes: print(f"{sys.argv[0]}: --output-field-indexes is a required option", file=sys.stderr) preflight_good = False if len(self.input_record_separator) == 0: print(f"{sys.argv[0]}: --record-separator must be nonempty", file=sys.stderr) preflight_good = False if (self.output_record_separator != b"") + (self.double_record_separator != b"") != 1: print(f"{sys.argv[0]}: You must specify exactly one of --output-record-separator and", file=sys.stderr) print("--double-record-separator", file=sys.stderr) preflight_good = False if self.input_record_separator is not None and len(self.input_record_separator) == 0: print(f"{sys.argv[0]}: --input-record-separator must not be zero-length", file=sys.stderr) preflight_good = False if self.field_separator is not None and len(self.field_separator) == 0: print(f"{sys.argv[0]}: --field-separator must not be zero-length", file=sys.stderr) preflight_good = False if not preflight_good: print(f"{sys.argv[0]}: one or more items in preflight check failed", file=sys.stderr) usage(1) def extract_fields(*, fields: list[bytes], fields_to_extract: list[int]) -> list[bytes] | None: """Extract a specific set of fields from `fields`.""" if fields_to_extract: maximum = max(fields_to_extract) if not fields[maximum:]: print(f"{sys.argv[0]}: got a short fields value: {fields!r}", file=sys.stderr) return None else: raise AssertionError(f"Got an empty fields value: {fields_to_extract!r}") result = [fields[i] for i in fields_to_extract] return result def extract_and_write( *, values: list[list[bytes]], field_separator: bytes, output_record_separator: bytes, double_record_separator: bytes, output_field_indexes: list[int], ) -> None: """Get the output_fields using output_field_indexes and write them with appropriate separators and possible terminator.""" assert (output_record_separator == b"") + (double_record_separator == b"") == 1 revalues = [] for value in values: of = extract_fields(fields=value, fields_to_extract=output_field_indexes) if of is None: continue revalues.append(of) if output_record_separator: for all_fields in revalues: os.write(1, field_separator.join(all_fields) + output_record_separator) elif double_record_separator: for all_fields in revalues: os.write(1, field_separator.join(all_fields) + double_record_separator) os.write(1, double_record_separator) else: raise AssertionError("This shouldn't happen") def evict_from_dict( *, dict_: dict[tuple[bytes, ...], list[list[bytes]]], field_separator: bytes, output_record_separator: bytes, double_record_separator: bytes, output_field_indexes: list[int], ) -> None: """Remove our (single) key, writing the associated value to stdout. Also delete that key from the dict.""" # There should be only one key in the dict. keys = list(dict_) if len(keys) == 0: return assert len(keys) == 1 key = keys[0] # Only write anything if there are two or more elements in the list. if dict_[key][1:]: extract_and_write( values=dict_[key], field_separator=field_separator, output_record_separator=output_record_separator, double_record_separator=double_record_separator, output_field_indexes=output_field_indexes, ) del dict_[key] def read_dedup_and_write( *, input_record_separator: bytes, output_record_separator: bytes, double_record_separator: bytes, field_separator: bytes, split: int, check_field_indexes: list[int], output_field_indexes: list[int], ) -> None: """Read the data, through away uniques, and write the data. We assume the input is sorted in some arbitrary order.""" dict_: dict[tuple[bytes, ...], list[list[bytes]]] = collections.defaultdict(list) for record in readline0.readline0(file_=0, separator=input_record_separator): all_fields = record.split(field_separator, split) assert all_fields is not None f = extract_fields(fields=all_fields, fields_to_extract=check_field_indexes) if f is None: continue check_fields = tuple(f) if check_fields in dict_: dict_[check_fields].append(all_fields) else: evict_from_dict( dict_=dict_, field_separator=field_separator, output_record_separator=output_record_separator, double_record_separator=double_record_separator, output_field_indexes=output_field_indexes, ) assert len(dict_) == 0 dict_[check_fields].append(all_fields) if dict_: evict_from_dict( dict_=dict_, field_separator=field_separator, output_record_separator=output_record_separator, double_record_separator=double_record_separator, output_field_indexes=output_field_indexes, ) def main() -> None: """Start the ball rolling.""" options = Options() options.check() read_dedup_and_write( input_record_separator=options.input_record_separator, output_record_separator=options.output_record_separator, double_record_separator=options.double_record_separator, field_separator=options.field_separator, split=options.split, check_field_indexes=options.check_field_indexes, output_field_indexes=options.output_field_indexes, ) if __name__ == "__main__": main()