#!/usr/bin/env python3 """ Write all duplicate values to stdout. stdin is assumed to be already sorted. EG, for 1, 2, 2, 3, 4, 4, 4, 5, 6, 6 we should write: 2, 2, 4, 4, 4, 6, 6 (But that'll be a record, not typically just an int.) This one is, unfortunately, O(n) in the number of duplicate records. It fortunately is not O(n) in the number of files. """ import os import sys import readline0 def usage(retval: int) -> None: """Output a usage message.""" if retval == 0: file_ = sys.stdout else: file_ = sys.stderr # keep-dups --split 1 --check-fields 0 print(f"Usage: {sys.argv[0]}", file=file_) print(file=file_) print("Eliminate unique values. Assumes the input is sorted.", file=file_) print(file=file_) print("--field-separator , split on a comma. Defaults to a single space.", file=file_) print("--split s in each row of n separators, split the first s occurences of separator, ", file=file_) print(" leaving s+1..n as a single, unsplit value", file=file_) print("--check-fields 0,1 extract fields 0 and 1 from the split row, and use them (only) in ", file=file_) print(" uniqueness tests", file=file_) print("--output-fields 1,3 extract fields 1 and 3 from the split row, and output them (only)", file=file_) print(" to stdout", file=file_) print("--output-record-separator $'\\n' Separate output records with newlines. Defaults to a single newline.", file=file_) print("--output-record-separator-null Separate output records with ASCII NUL's.", file=file_) print("--double-record-separator $'\\n' instead of outputting f1,f2,f3\\nf4,f5,f6\\n output", file=file_) print(" f1\\nf2\\nf3\\n\\nf4\\nf5\\nf6\\n\\n for better grouping", file=file_) print(" IOW, a single record separator is used as a field separator, and", file=file_) print(" a double record separator is used to mark end-of-record. The", file=file_) print(" field-separator goes unused for output", file=file_) print("--double-record-separator-null As above, but output f1\\0f2\\0f3\\0\\0f4\\0f5\\0f6\\0\\0", file=file_) print(file=file_) print("Example uses:", file=file_) print(" keep-dups --split 2 --check-fields 0,1 --output-fields 0,1,2", file=file_) print(" keep-dups --split 1 --check-fields 0 --output-fields 1 --field-separator ,", file=file_) print(file=file_) print("Note that, at least for now, the input and output field separators are the same", file=file_) print(file=file_) sys.exit(retval) def record_to_fields(*, record_separator: bytes, field_separator: bytes, record: bytes, split: int) -> list[bytes]: """Split a record into fields.""" fields = record.rstrip(record_separator).split(field_separator, split) return fields def extract_fields(*, fields: list[bytes], fields_to_extract: list[int]) -> list[bytes] | None: """Extract a specific set of fields from `fields`.""" maximum = max(fields_to_extract) if not fields[maximum:]: return None result = [fields[i] for i in fields_to_extract] return result class Options: """Parse and hold command line options.""" def __init__(self) -> None: """Initialize.""" self.split = 0 self.check_fields = [] self.output_fields = [] # We default to an ASCII nul, because bash can't pass that, and it's commonly useful. self.field_separator = b"," self.input_record_separator = b"\n" self.have_output_record_separator = False self.output_record_separator = b"\n" self.have_double_record_separator = False self.double_record_separator = b"\n" while sys.argv[1:]: match sys.argv[1]: case "--split": self.split = int(sys.argv[2]) del sys.argv[1] case "--check-fields": self.check_fields = [int(i) for i in sys.argv[2].encode("UTF-8").split(b",")] del sys.argv[1] case "--output-fields": self.output_fields = [int(i) for i in sys.argv[2].encode("UTF-8").split(b",")] del sys.argv[1] case "--input-record-separator": self.input_record_separator = sys.argv[2].encode("UTF-8") del sys.argv[1] case "--output-record-separator": self.output_record_separator = sys.argv[2].encode("UTF-8") self.have_output_record_separator = True del sys.argv[1] case "--output-record-separator-null": self.output_record_separator = b"\0" self.have_output_record_separator = True case "--double-record-separator": self.double_record_separator = sys.argv[2].encode("UTF-8") self.have_double_record_separator = True del sys.argv[1] case "--double-record-separator-null": self.double_record_separator = b"\0" self.have_double_record_separator = True case "--field-separator": self.field_separator = sys.argv[2].encode("UTF-8") del sys.argv[1] case "--help" | "-h": usage(0) case _: print(f"{sys.argv[0]}: unrecognized option: {sys.argv[1]}", file=sys.stderr) usage(1) del sys.argv[1] def check(self) -> None: """Check command line options.""" preflight_good = True if not self.check_fields: print(f"{sys.argv[0]}: --check-fields is a required option", file=sys.stderr) preflight_good = False if not self.output_fields: print(f"{sys.argv[0]}: --output-fields is a required option", file=sys.stderr) preflight_good = False if len(self.input_record_separator) == 0: print(f"{sys.argv[0]}: --record-separator must be nonempty", file=sys.stderr) preflight_good = False if self.have_output_record_separator + self.have_double_record_separator != 1: print(f"{sys.argv[0]}: You must specify exactly one of --output-record-separator and", file=sys.stderr) print("--double-record-separator", file=sys.stderr) preflight_good = False if self.input_record_separator is not None and len(self.input_record_separator) == 0: print(f"{sys.argv[0]}: --input-record-separator must not be zero-length", file=sys.stderr) preflight_good = False if self.field_separator is not None and len(self.field_separator) == 0: print(f"{sys.argv[0]}: --field-separator must not be zero-length", file=sys.stderr) preflight_good = False if not preflight_good: print(f"{sys.argv[0]}: one or more items in preflight check failed", file=sys.stderr) usage(1) def extract_and_write( *, fields: list[bytes], fields_to_extract: list[int], field_separator: bytes, have_output_record_separator: bool, have_double_record_separator: bool, output_record_separator: bytes, double_record_separator: bytes, ) -> None: """Extract fields to output and output them.""" fields_to_output = extract_fields(fields=fields, fields_to_extract=fields_to_extract) if fields_to_output is None: return if have_output_record_separator: os.write(1, field_separator.join(fields_to_output) + output_record_separator) elif have_double_record_separator: os.write(1, field_separator.join(fields_to_output) + double_record_separator) else: raise AssertionError("neither options.have_output_record_separator are options.have_double_record_separator are True") def read_dedup_and_write( *, input_record_separator: bytes, have_output_record_separator: bool, output_record_separator: bytes, have_double_record_separator: bool, double_record_separator: bytes, field_separator: bytes, split: int, check_fields: list[int], output_fields: list[int], ) -> None: """Read data, de-unique it, and write only the duplicates.""" prior_checked_fields = None match_list = [] for record in readline0.readline0(file_=0, separator=input_record_separator): all_fields = record_to_fields(record_separator=field_separator, field_separator=field_separator, record=record, split=split) checked_fields = extract_fields(fields=all_fields, fields_to_extract=check_fields) if checked_fields == prior_checked_fields or prior_checked_fields is None: match_list.append(all_fields) else: if len(match_list) >= 2: for one_match in match_list: extract_and_write( fields=one_match, fields_to_extract=output_fields, field_separator=field_separator, have_output_record_separator=have_output_record_separator, output_record_separator=output_record_separator, have_double_record_separator=have_double_record_separator, double_record_separator=double_record_separator, ) match_list = [all_fields] prior_checked_fields = checked_fields for one_match in match_list: extract_and_write( fields=one_match, fields_to_extract=output_fields, field_separator=field_separator, have_output_record_separator=have_output_record_separator, output_record_separator=output_record_separator, have_double_record_separator=have_double_record_separator, double_record_separator=double_record_separator, ) def main() -> None: """Start the ball rolling.""" options = Options() options.check() read_dedup_and_write( input_record_separator=options.input_record_separator, have_output_record_separator=options.have_output_record_separator, output_record_separator=options.output_record_separator, have_double_record_separator=options.have_double_record_separator, double_record_separator=options.double_record_separator, field_separator=options.field_separator, split=options.split, check_fields=options.check_fields, output_fields=options.output_fields, ) if __name__ == "__main__": main()