#!/usr/local/cpython-3.6/bin/python3 """Search for strings in files, with the option of not messing up the buffer cache.""" # pylint: disable=wrong-import-position import os import re import sys import typing sys.path.insert(0, '/usr/local/lib') sys.path.insert(0, os.path.expanduser('~/lib')) import readline0 # noqa: ignore=E402 import bufsock # noqa: ignore=E402 try: import odirect except ImportError as import_message: HAVE_ODIRECT = False NO_ODIRECT_MESSAGE = import_message else: HAVE_ODIRECT = True def usage(retval: int) -> None: """Generate a usage message.""" sys.stderr.write('Usage: {0}\n'.format(sys.argv[0])) sys.stderr.write('\t-0 Read and write null separated lines (but not filenames)\n') sys.stderr.write('\t--filenames file1 file2 ... filen Search in specified filenames\n') sys.stderr.write('\t--pattern regex Search for regex\n') sys.stderr.write('\t--use-odirect Do not pollute the buffer cache - experimental\n') sys.stderr.write('\t--invert-match|-v List non-matches\n') sys.stderr.write('\t--files-with-matches|-l List filenames containing one or more matches - only\n') sys.stderr.write('\t--help Output this help message\n') sys.exit(retval) def lines_gen( null_separated: bool, file_: typing.Union[typing.BinaryIO, int, bufsock.bufsock], ) -> typing.Iterable[bytes]: """We generate the lines of a file in a few different ways.""" if null_separated: separator = b'\0' else: separator = b'\n' for line in readline0.readline0(file_, separator=separator): yield line def process( null_separated: bool, pattern: typing.Pattern[bytes], invert_match: bool, files_with_matches: bool, file_: typing.Union[typing.BinaryIO, int, bufsock.bufsock], filename: bytes = None, ) -> None: # pylint: disable=too-many-arguments,too-many-branches """Process one file.""" if null_separated: output_separator = b'\0' else: output_separator = b'\n' for line in lines_gen(null_separated, file_): match_obj = pattern.search(line) if invert_match: if match_obj is None: if files_with_matches: os.write(1, b'%s%s' % (filename, output_separator)) break else: if filename is not None: os.write(1, b'%s: %s%s' % (filename, line, output_separator)) else: os.write(1, line + output_separator) else: if match_obj is not None: if files_with_matches: os.write(1, b'%s%s' % (filename, output_separator)) break else: if match_obj.groups(): matches = b' '.join(match_obj.groups()) else: matches = line if filename is not None: os.write(1, b'%s: %s%s' % (filename, matches, output_separator)) else: os.write(1, matches + output_separator) def main() -> None: # pylint: disable=too-many-branches,too-many-statements """Do the search - user interface.""" pattern_string = None filenames = None invert_match = False use_odirect = False files_with_matches = False null_separated = False while sys.argv[1:]: if sys.argv[1] in ('--filenames', '--'): filenames = [os.fsencode(filename) for filename in sys.argv[2:]] break elif sys.argv[1] == '-0': null_separated = True elif sys.argv[1] == '--use-odirect': use_odirect = True elif sys.argv[1] in ('-v', '--invert-match'): invert_match = True elif sys.argv[1] in ('-l', '--files-with-matches'): files_with_matches = True elif sys.argv[1] in ('-h', '--help'): usage(0) elif sys.argv[1] == '--pattern': pattern_string = os.fsencode(sys.argv[2]) del sys.argv[1] else: sys.stderr.write('{0}: Illegal option: {1}\n'.format(sys.argv[0], sys.argv[1])) usage(1) del sys.argv[1] if not pattern_string: sys.stderr.write('{0}: --pattern is a required option\n'.format(sys.argv[0])) usage(1) assert pattern_string is not None if use_odirect and not HAVE_ODIRECT: sys.stderr.write('{0}: Warning: --use-odirect given but odirect module is unavailable\n'.format(sys.argv[0])) sys.stderr.write('{0}\n'.format(NO_ODIRECT_MESSAGE)) use_odirect = False pattern = re.compile(pattern_string) if filenames is None: process(null_separated, pattern, invert_match, files_with_matches, file_=0) else: chunk_len = 2**18 for filename in filenames: if use_odirect: try: odirect_file = odirect.odirect(filename, b'rbd', chunk_len) except OSError as exception: sys.stderr.write('%s\n' % exception) continue bufsock_file = bufsock.bufsock(odirect_file, chunk_len=chunk_len) process(null_separated, pattern, invert_match, files_with_matches, bufsock_file, filename=filename) del bufsock_file del odirect_file else: with open(filename, 'rb') as file_: process(null_separated, pattern, invert_match, files_with_matches, file_, filename=filename) main()