#!/usr/bin/python3 # pylint: disable=superfluous-parens # superfluous-parens: Parentheses are good for clarity and portability """ Merge .views entries from one hierarchy (the source) to another (the destination). For example, merge Ben 10 .views filenames from my laptop to the main movie disk """ import os import sys import shutil import typing import filecmp import collections def make_used(variable) -> None: """Persuade pyflakes that variable is used.""" assert True or variable def usage(retval: int) -> None: """Output a usage message.""" if retval == 0: write = sys.stdout.write else: write = sys.stderr.write write('Usage: {} source_dir dest_dir\n'.format(sys.argv[0])) sys.exit(retval) def get_views_files(directory: str) -> typing.Iterable: """Get the views files from the source directory.""" for rootdir, subdirs, filenames in os.walk(directory): make_used(subdirs) for filename in filenames: if filename.endswith('.views'): full_source_filename = os.path.normpath(os.path.join(rootdir, filename)) yield full_source_filename def merge(dictionary: typing.Dict[str, typing.Set[str]], filename: str) -> None: """ Merge entries from filename into dictionary. For empty/nonexistent .views filenames, the dictionary always gets an empty list (it's a defaultdict really) """ try: with open(filename, 'r') as file_: for line in file_: dictionary[filename].add(line.rstrip()) except FileNotFoundError: return def write_result_file(full_temp_filename: str, content: typing.Set[str]) -> None: """Write the resulting .views file.""" # content is a set content_list = list(content) content_list.sort() with open(full_temp_filename, 'w') as file_: for line in content_list: file_.write('{}\n'.format(line)) def cat(filename: str) -> None: """Output the content of a file to the tty.""" try: with open(filename, 'r') as file_: while True: block = file_.read(2**20) if not block: break sys.stdout.write(block) except FileNotFoundError: print('(File not found)') def files_equal(filename1: str, filename2: str): """Return True iff filename1 is identical to filename2.""" if os.path.isfile(filename1) + os.path.isfile(filename2) == 1: # print('One file of {} and {} nonexistent: {}'.format(filename1, filename2, result)) result = False else: # filecmp.cmp returns True if files are equal result = filecmp.cmp(filename1, filename2) # print('Files {} and {} equal: {}'.format(filename1, filename2, result)) # if not result: # print('files are different') # print('filename1:') # cat(filename1) # print('filename2') # cat(filename2) # print('') return result def copy(source_filename: str, dest_filename: str) -> None: """Copy source_filename to dest_filename, creating directories as needed.""" directory = os.path.dirname(dest_filename) os.makedirs(directory, exist_ok=True) shutil.copyfile(source_filename, dest_filename) def main() -> None: """Merge views files.""" if len(sys.argv) != 3: sys.stderr.write('You must give me two directories in argv\n') usage(1) source_dir = sys.argv[1] dest_dir = sys.argv[2] if not os.path.isabs(source_dir): sys.stderr.write('{}: source directory (argv[1]) must be an absolute path'.format(sys.argv[0])) sys.exit(1) if not os.path.isabs(dest_dir): sys.stderr.write('{}: dest directory (argv[1]) must be an absolute path'.format(sys.argv[0])) sys.exit(1) # these two are just simple error checks os.chdir(source_dir) os.chdir(dest_dir) os.chdir(source_dir) filename_map = collections.defaultdict(set) # type: typing.Dict[str, typing.Set] for source_filename in sorted(get_views_files('.')): merge(filename_map, source_filename) if not filename_map: sys.stderr.write('{}: Error, no .views files found in source hierarchy {}\n'.format(sys.argv[0], source_dir)) sys.exit(1) # Note that we do not inhale all of dest - that would take too long. # We only look for .views files that existed in the source hierarchy. for possible_dest_filename in sorted(filename_map): merge(filename_map, possible_dest_filename) filenames = list(filename_map) filenames.sort() tempdir = '/tmp' full_temp_filename = os.path.normpath(os.path.join(tempdir, 'msync.temp.{}'.format(os.getpid()))) for filename in filenames: write_result_file(full_temp_filename, filename_map[filename]) dest_filename = os.path.normpath(os.path.join(dest_dir, filename)) if not files_equal(full_temp_filename, dest_filename): print('Updating {}'.format(dest_filename)) copy(full_temp_filename, dest_filename) try: os.unlink(full_temp_filename) except FileNotFoundError: pass main()