#!/usr/local/cpython-3.9/bin/python3 # Pypy -was- slower for this program. Much slower. It was apparently regex-related. Pypy3 7.3.3 was optimized for # a small number (one?) regex, not thousands like we were doing. I've modified this code to use string.startswith # instead of regexes; we had a one-to-one correspondence between regexes and filenames anyway. # !/usr/local/cpython-3.9/bin/python3 """ Generate car-, phone- and computer-suitable downsampled and converted music files. The phone and computer versions are one in the same, and are one mp3 file per track. The car versions are one mp3 file per album, for the benefit of a car stereo that cannot handle that many files. """ import collections import errno import functools import hashlib import os import pprint import re import shutil import subprocess import sys import typing import file_timestamps def to_str(byte_string: bytes) -> str: """Convert byte_string to str. Assume UTF-8.""" return byte_string.decode("UTF-8") def ripemd160_hasher(filename: str): """Return the ripemd160 hash for filename.""" hasher = hashlib.new("ripemd160") with open(filename, "rb") as file_: # We can just read the whole file, because we know the file is small. hasher.update(file_.read()) return hasher.hexdigest() class FileComparator: """Memorize a couple of attributes of a file, for subsequent equality comparison to other files.""" def __init__(self, original_filename: str) -> None: """Initialize.""" self.filename = original_filename del original_filename self.file_length = os.path.getsize(self.filename) # In practice, this will be only about 6 kilobytes assert self.file_length < 1024 * 1024 self.ripemd160_hash = ripemd160_hasher(self.filename) def is_same(self, other_filename: str) -> bool: """Compare another file to the original file.""" return os.path.getsize(other_filename) == self.file_length and ripemd160_hasher(other_filename) == self.ripemd160_hash class DeltaFile: """Hold one file, its resultant name, and whether it is blocked.""" def __init__(self, filename: str) -> None: """Initialize.""" self._initial_filename = filename # This is just the filename; no directory is included. # print('self._initial_filename is {}'.format(self._initial_filename)) if self._initial_filename.endswith(".mp3"): self._resultant_filename = self._initial_filename self._basename = re.sub(r"\.mp3$", "", self._initial_filename) self._format = "mp3" elif self._initial_filename.endswith(".ogg"): self._resultant_filename = re.sub(r"\.ogg$", ".mp3", self._initial_filename) self._basename = re.sub(r"\.ogg$", "", self._initial_filename) self._format = "ogg" elif self._initial_filename.endswith(".flac"): self._resultant_filename = re.sub(r"\.flac$", ".mp3", self._initial_filename) self._basename = re.sub(r"\.flac$", "", self._initial_filename) self._format = "flac" else: raise ValueError(f"Unrecognized file type for file: {self._initial_filename}") self._to_be_ellided = False def get_initial_filename(self) -> str: """Return initial filename.""" return self._initial_filename def get_resultant_filename(self) -> str: """Return resultant filename.""" return self._resultant_filename def get_basename(self) -> str: """Return basename of filename: directories included, filename sans file extension.""" return self._basename def mark_as_to_be_ellided(self) -> None: """Flag this filename as one that is to be left out of the resulting collection.""" self._to_be_ellided = True def is_to_be_ellided(self) -> bool: """Return whether this filename is one that should be left out of the resulting collection.""" return self._to_be_ellided def get_format(self) -> str: """Return the file's extension (format).""" return self._format def __str__(self) -> str: """Present a simple string representation.""" return self._initial_filename __repr__ = __str__ class Pattern: """Hold a pattern and provide the ability to search for it.""" # pylint: disable=too-few-public-methods # too-few-public-methods: We're primarily a container; we don't need a lot # of public methods def __init__(self, prefix_string: str) -> None: """Initialize.""" self._prefix_string = prefix_string self._match_count = 0 def search(self, line: str) -> bool: """Check if line starts with prefix.""" # this could be made == instead of .startswith was_match = self._prefix_string.startswith(line) if was_match: self.increment_match_count() return was_match def __str__(self): """Return a string description of this pattern.""" return f"{self._match_count} {self._prefix_string}" __repr__ = __str__ def get_match_count(self) -> int: """Return _match_count.""" return self._match_count def get_prefix_string(self) -> str: """Return _regex_string.""" return self._prefix_string def increment_match_count(self) -> None: """Return _match_count.""" self._match_count += 1 def get_eliminate_prefixes() -> typing.List[Pattern]: """Read all the regex strings - but do not convert them to actual regex instances yet.""" with open(os.path.expanduser("~/lib/blocklist-music-prefixes.txt"), "r") as file_: strings = file_.readlines() return [Pattern(string.rstrip("\n")) for string in strings] def appropriate_extension(filename: str) -> bool: """Return True iff this is a sound file with a "relevant" extension.""" return filename.endswith(".flac") or filename.endswith(".ogg") or filename.endswith(".m4a") or filename.endswith(".mp3") def hidden(filename: str) -> bool: """Return True iff this is a hidden file.""" return filename.startswith(".") def get_input_directories( depth: int, base_directory: str, ) -> typing.DefaultDict[str, typing.List[DeltaFile]]: """ Build a dict with keys that are directories, and values that are lists of filenames. Does not include blocking uninteresting tracks. """ result: typing.DefaultDict[str, typing.List[DeltaFile]] = collections.defaultdict(list) os.chdir(base_directory) try: for root, directories, filenames in os.walk("."): if root.count("/") != depth: # We only want to deal with /band/album (for EG, with depth==2) in root continue assert not directories, f"root is {root}, directories is {directories}" for filename in filenames: if appropriate_extension(filename) and not hidden(filename): result[root].append(DeltaFile(filename)) finally: os.chdir("..") return result def match(line: str, patterns: typing.List[Pattern]) -> bool: """Search for a pattern in line.""" for pattern in patterns: if pattern.search(line): return True return False def mark_blocklisted_files(directories: typing.DefaultDict[str, typing.List[DeltaFile]], patterns: typing.List[Pattern]) -> None: """Mark uninteresting tracks. These will be excluded from phone music and car music.""" for directory, delta_filenames in directories.items(): for delta_filename in delta_filenames: full_filename = os.path.join(directory, delta_filename.get_initial_filename()) if match(full_filename, patterns): # os.write(1, b'Got one or more matches for ') # os.write(1, unidecode.unidecode(full_filename).encode('latin-1')) # os.write(1, b'\n') delta_filename.mark_as_to_be_ellided() num_bad_matches = 0 for pattern in patterns: if pattern.get_match_count() != 1: sys.stderr.write("Bad match count (%s) for %s\n" % (pattern.get_match_count(), pattern.get_prefix_string())) num_bad_matches += 1 if num_bad_matches != 0: sys.stderr.write(f"{num_bad_matches} bad matches found\n") sys.exit(1) def attempt_mkdir(directory: str) -> None: """Try to create a directory. If it preexists, don't worry about it.""" try: os.makedirs(directory) except (IOError, OSError) as exception_extra: error_number = exception_extra.errno if error_number == errno.EEXIST: return else: raise def deltafile_newer( archive_dirname: str, archival_deltafile: DeltaFile, result_dirname: str, result_deltafile: DeltaFile, ): """Return True iff archive file is newer than result file (or result file does not yet exist).""" archive_file = os.path.join(archive_dirname, archival_deltafile.get_initial_filename()) result_file = os.path.join(result_dirname, result_deltafile.get_resultant_filename()) # There's a tiny, likely unimportant buglet here: What if result_file is a directory or named pipe or whatever? if not os.path.isfile(result_file): return True return file_newer(archive_file, result_file) def file_newer(left: str, right: str) -> bool: """Return True iff left file is newer than right file.""" return os.path.getmtime(left) > os.path.getmtime(right) class ConversionException: """Raised when converting a file has a problem.""" pass def run_ffmpeg( *, archive_dirname: str, archive_deltafile: DeltaFile, result_filename: str, ): """Downsample mp3 or convert ogg or flac to mp3.""" archive_filename = os.path.join(archive_dirname, archive_deltafile.get_initial_filename()) # If ffmpeg starts exiting false with no error messages, comment out the -v quiet ffmpeg_list = [ "ffmpeg", # '-v', 'quiet', "-y", "-i", archive_filename, "-vol", "100", ] archive_format = archive_deltafile.get_format() if archive_format == "ogg": # -map_metadata 0:s:0 helps ogg metadata get moved over to the # resulting mp3's. flac and mp3 were fine without it. However, # when I converted everything with this option, I started coming up # with mp3's and flac's missing from rocket player on Android. ffmpeg_list.extend( [ "-map_metadata", "0:s:0", ] ) ffmpeg_list.extend( [ "-ar", "22050", "-b:a", "64k", result_filename, ] ) pathname = os.path.normpath(result_filename) run_command_from_list(ffmpeg_list, pathname) def xx_run_normalization(filename: str): """ Downsample mp3 or convert ogg or flac to mp3. This isn't what I was hoping for. It doesn't average out the volume level, and it doesn't increase the volume level. """ # If ffmpeg starts exiting false with no error messages, comment out the -v quiet normalize_list = [ "replaygain", "--no-album", "--reference-loudness=120", "--force", filename, ] run_command_from_list(normalize_list, filename) def run_volume_change(filename: str): """ Downsample mp3 or convert ogg or flac to mp3. This isn't what I was hoping for. It doesn't average out the volume level, and it doesn't increase the volume level. """ temp_filename = filename + ".temp" volume_command = [ "lame", "--scale", "3", filename, temp_filename, ] run_command_from_list(volume_command, filename) shutil.move(temp_filename, filename) def run_command_from_list(list_: typing.List[str], clean_file: str = "") -> None: """Run a shell command.""" try: result = subprocess.run( list_, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) # result.stdout # retval = subprocess.call(list_) except KeyboardInterrupt: attempt_unlink(clean_file) print(f"Cleaned up: Removed {clean_file}") sys.exit(1) if result.returncode == 0 or result.returncode is None: return else: if clean_file: print(f"attempting to unlink {clean_file}") attempt_unlink(clean_file) print(f"Command:\n{list_!r}\nexited with exit code {result.returncode!r}\nand output:\n{to_str(result.stdout)}\n") print(f"and error:\n{to_str(result.stderr)}\n") sys.exit(1) def attempt_unlink(pathname: str) -> None: """Try to unlink a file. If this fails, ignore it.""" try: os.unlink(pathname) except (IOError, OSError) as exception_extra: error_number = exception_extra.errno print(f"unlinking {pathname} failed, errno {error_number}") if error_number == errno.ENOENT: return else: raise def indent(depth: int) -> str: """Return some spaces for output identation.""" return " " * depth def convert_dir_for_phone( archive_dirname: str, result_dirname: str, archival_delta_files: typing.List[DeltaFile], result_delta_files: typing.List[DeltaFile], ): """ Convert archive files to (lower bitrate) mp3 phone/computer files. One directory is handled per invocation of this function. """ ################################################################################ # First, we group files into sets and/or dicts, to make it easier to do The Right Things for them. # Note that ellided files have been pruned/removed in our caller. ################################################################################ archival_basenames_dict = {adf.get_basename(): adf for adf in archival_delta_files} result_basenames_dict = {rdf.get_basename(): rdf for rdf in result_delta_files} archival_basenames_set = set(archival_basenames_dict) result_basenames_set = set(result_basenames_dict) in_archival_only = archival_basenames_set - result_basenames_set in_result_only = result_basenames_set - archival_basenames_set in_both = result_basenames_set & archival_basenames_set in_both_and_updated = { ib for ib in in_both if deltafile_newer(archive_dirname, archival_basenames_dict[ib], result_dirname, result_basenames_dict[ib]) } ################################################################################ # Remove straggling files that have nothing corresponding in archival ################################################################################ for basename in sorted(in_result_only): delta_file = result_basenames_dict[basename] pathname = os.path.normpath(os.path.join(result_dirname, delta_file.get_resultant_filename())) indentation = indent(2) print(f"{indentation}Removing {pathname}") os.unlink(pathname) ################################################################################ # Deal with files that need to be generated from archival into result ################################################################################ for basename in sorted(in_archival_only | in_both_and_updated): # This is a weird one. We get the result_deltafile from the archive_delta_file data, because sometimes # the result_deltafile does not yet exist. archive_deltafile = archival_basenames_dict[basename] result_deltafile = archive_deltafile temp_filename = f"/tmp/{os.path.basename(sys.argv[0])}-{os.getpid()}.temp.mp3" result_filename = os.path.join(result_dirname, result_deltafile.get_resultant_filename()) # This is for files that need to be: # 1) generated because result file doesn't exist at all # 2) updated because archive is newer run_ffmpeg( archive_dirname=archive_dirname, archive_deltafile=archival_basenames_dict[basename], result_filename=temp_filename, ) # run_normalization(temp_filename) run_volume_change(temp_filename) indentation = indent(2) print(f"{indentation}Creating {result_filename}") shutil.move(temp_filename, result_filename) def add_file_timestamps_to_phone_music(phone_dirname_prefix: str) -> None: """Set up .file-timestamp files in "result" (AKA Phone-Music) hierarchy.""" phone_directories = get_input_directories(2, phone_dirname_prefix) for directory in phone_directories: greater_directory = os.path.join(phone_dirname_prefix, directory) file_timestamps.generate(greater_directory) def gen_phone_music(archival_dirname_prefix: str, result_dirname_prefix: str) -> None: """ Derive Phone-Music from Music. Operates at the level of directories, but get_full_car_path gets down to the level of filenames for us. """ attempt_mkdir(result_dirname_prefix) print(" Getting prefixes to eliminate...") patterns = get_eliminate_prefixes() print(" Getting archival directories...") archival_directories = get_input_directories(2, archival_dirname_prefix) print(" Blocklisting files...") mark_blocklisted_files(archival_directories, patterns) print(" Getting phone music directories...") result_directories = get_input_directories(2, result_dirname_prefix) print(" Removing blocklisted files and empty in-memory directories...") for ad in sorted(archival_directories): df_list: typing.List[DeltaFile] = [] for df in archival_directories[ad]: if df.is_to_be_ellided(): continue df_list.append(df) if df_list: archival_directories[ad] = df_list else: # print(' Removing in-memory directory {}'.format(ad)) del archival_directories[ad] archival_directories_set = set(archival_directories) result_directories_set = set(result_directories) directories_only_in_archival = archival_directories_set - result_directories_set directories_only_in_result = result_directories_set - archival_directories_set directories_in_both = archival_directories_set & result_directories_set print(f" len(directories_only_in_archival) is {len(directories_only_in_archival)}") print(f" len(directories_only_in_result) is {len(directories_only_in_result)}") print(f" len(directories_in_both) is {len(directories_in_both)}") print(" Processing only in archival...") for doia in sorted(directories_only_in_archival): archive_dirname = os.path.join(archival_dirname_prefix, doia) result_dirname = os.path.join(result_dirname_prefix, doia) print(" Create directory {}".format(result_dirname)) attempt_mkdir(result_dirname) convert_dir_for_phone( archive_dirname, result_dirname, archival_directories[doia], result_directories[doia], ) print(" Processing only in phone music...") for doir in sorted(directories_only_in_result): result_dirname = os.path.join(result_dirname_prefix, doir) print(" Removing directory {}".format(result_dirname)) shutil.rmtree(result_dirname) print(" Processing in both...") for dib in sorted(directories_in_both): # print('Check for changes (both) directory {}'.format(dib)) archive_dirname = os.path.join(archival_dirname_prefix, dib) result_dirname = os.path.join(result_dirname_prefix, dib) convert_dir_for_phone( archive_dirname, result_dirname, archival_directories[dib], result_directories[dib], ) def catenate_album(from_directory: str, to_file: str, artist: str, song_title: str): """Catenate one album.""" # catenate-mp3s "$album.mp3" "$album"/* list_ = ["catenate-mp3s", "--artist", artist, "--song-title", song_title, "--to-file", to_file] for file_ in os.listdir(from_directory): if file_.startswith("."): continue list_.append(os.path.join(from_directory, file_)) run_command_from_list(list_, to_file) @functools.total_ordering class CarMusic: """ Hold information related to a single car music file. We accept a "phone path", and create a teased-apart representation of the car path and related data. We are hashable, so we can be used in dictionaries and sets. """ def __init__( self, deltafiles: typing.List[DeltaFile], full_phone_path: typing.Optional[str] = None, full_car_path: typing.Optional[str] = None, ) -> None: """Initialize.""" # Exactly _one_ of full_phone_path and full_car_path must be None and one must be a path. # Checking for one None is enough, in part because mypy enforces the str constraint. assert (full_phone_path is None) + (full_car_path is None) == 1 if full_phone_path is not None: # Derive the car path from the phone path # We strip off the leading ./ for simplicity self._full_phone_path: typing.Optional[str] = re.sub(r"^\./", "", full_phone_path) self._deltafiles: typing.Optional[typing.List[DeltaFile]] = deltafiles self._full_band_name: typing.Optional[str] = os.path.dirname(self._full_phone_path) self._full_album_name = os.path.basename(self._full_phone_path) elif full_car_path is not None: # Derive what we can from the car path, filling in the rest with None's self._full_phone_path = None self._deltafiles = None fields = re.sub(r"^\./", "", full_car_path).split("/") # aardvark/album.mp3 assert len(fields) == 2, "fields is {}".format(str(fields)) self._full_band_name = fields[0] # self._full_album_name = fields[1] self._full_album_name = re.sub(r"\.mp3$", "", fields[1]) else: raise AssertionError("Both None's") if self._deltafiles: # list is not empty # DeltaFile's must be of form song.mp3 or album.mp3 # Unfortunately, it is difficult to tell the difference automatically. assert self._deltafiles[0].get_initial_filename().endswith(".mp3") # aardvark self._full_car_dir = "%s" % (self._full_band_name,) # aardvark/album.mp3 self._full_car_path = "%s/%s.mp3" % (self._full_car_dir, self._full_album_name) def __hash__(self): """Return a hash of this object.""" return hash(self._full_car_path) def __lt__(self, other): """Return True iff self < other.""" return self._full_car_path < other._full_car_path def __eq__(self, other): """Return True iff self == other.""" return self._full_car_path == other._full_car_path def get_full_phone_path(self) -> str: """Getter.""" assert self._full_phone_path is not None return self._full_phone_path def get_deltafiles(self) -> typing.List[DeltaFile]: """Getter.""" assert self._deltafiles is not None return self._deltafiles def get_full_band_name(self) -> str: """Getter.""" assert self._full_band_name is not None return self._full_band_name def get_full_album_name(self) -> str: """Getter.""" return self._full_album_name def get_full_car_dir(self) -> str: """Getter.""" return self._full_car_dir def get_full_car_path(self) -> str: """Getter.""" return self._full_car_path def display(string: str, set_or_dict: typing.Set[typing.Any] | typing.Dict[typing.Any, typing.Any]) -> None: """Output the "beginning" of set_or_dict - for debugging. For dicts, we only do the keys.""" print(f"{string}:") list_ = sorted(set_or_dict) print(f"number of elements is {len(list_)}") pprint.pprint(list_[:5]) print() def get_artist_and_song_title(from_directory: str) -> typing.Tuple[str, str]: """Extract the artist and album (which we treat as a song title) from a directory path.""" fields = from_directory.split(os.path.sep) assert len(fields) == 3 assert fields[0] == "Phone-Music" artist = fields[1] # Technically, this "song title" is an album - so that my Boss car stereo won't get cranky. song_title = fields[2] return (artist, song_title) def gen_car_music(phone_dirname_prefix: str, car_dirname_prefix: str): """ Derive Car-Music from Phone-Music using previously-created timestamp files. This operates at the level of files, not directories. """ # This is almost always unnecessary, but it doesn't hurt. attempt_mkdir(car_dirname_prefix) # Get current phone music directories as a dictionary of paths to DeltaFile's # The directory paths should be of the form aardvark/album and the DeltaFile's should be songs. print(" Getting phone music directories...") phone_music_dirs_n_dfs = get_input_directories(2, phone_dirname_prefix) print(" Converting to future_car_music_set...") future_car_music_dict = {} for phone_music_dir, deltafiles in sorted(phone_music_dirs_n_dfs.items()): cm = CarMusic(deltafiles, full_phone_path=phone_music_dir) future_car_music_dict[cm.get_full_car_path()] = cm # Get current car music directories as a dictionary of paths to DeltaFile's # The directory paths should be of the form a/aardvark and the DeltaFile's should be albums. print(" Getting current car music directories...") current_car_music_dirs_n_dfs = get_input_directories(1, car_dirname_prefix) current_car_music_dict = {} for car_music_dir, deltafiles in sorted(current_car_music_dirs_n_dfs.items()): for df in deltafiles: full_car_path = os.path.join(car_music_dir, df.get_initial_filename()) cm = CarMusic(deltafiles, full_car_path=full_car_path) current_car_music_dict[cm.get_full_car_path()] = cm cm_in_future_only_keys = set(future_car_music_dict) - set(current_car_music_dict) # display('cm_in_future_only_keys is set(future_car_music_dict) - set(current_car_music_dict)', cm_in_future_only_keys) cm_in_current_only_keys = set(current_car_music_dict) - set(future_car_music_dict) # display('cm_in_current_only_keys is set(current_car_music_dict) - set(future_car_music_dict)', cm_in_current_only_keys) # display('future_car_music_dict', future_car_music_dict) # display('current_car_music_dict', current_car_music_dict) # display('cm_in_current_only_keys', cm_in_current_only_keys) cm_in_both_c_n_f_keys = set(future_car_music_dict) & set(current_car_music_dict) # display('cm_in_both_c_n_f_keys', cm_in_both_c_n_f_keys) print(" len(cm_in_future_only_keys) is {}".format(len(cm_in_future_only_keys))) print(" len(cm_in_current_only_keys) is {}".format(len(cm_in_current_only_keys))) print(" len(cm_in_both_c_n_f_keys) is {}".format(len(cm_in_both_c_n_f_keys))) print(" Generating car-music-future-only files (not in car-music-current)") for car_music_key in sorted(cm_in_future_only_keys): # Generate unconditionally car_music_value = future_car_music_dict[car_music_key] from_directory = os.path.join(phone_dirname_prefix, car_music_value.get_full_phone_path()) # We have no use for this, because the to_file does not yet exist. # from_timestamp_file = os.path.join(from_directory, '.file-timestamps') to_directory = os.path.join(car_dirname_prefix, car_music_value.get_full_car_dir()) to_file = os.path.join(to_directory, car_music_value.get_full_album_name()) + ".mp3" attempt_mkdir(to_directory) (artist, song_title) = get_artist_and_song_title(from_directory) print(" Catenating {}/* into {}".format(from_directory, to_file)) catenate_album(from_directory, to_file=to_file, artist=artist, song_title=song_title) ################################################################################ print(" Processing car-music-current-only deletions") for car_music_key in sorted(cm_in_current_only_keys): car_music_value = current_car_music_dict[car_music_key] filename_to_remove = os.path.join(car_dirname_prefix, car_music_value.get_full_car_path()) print(" Removing {}".format(filename_to_remove)) os.unlink(filename_to_remove) ################################################################################ print(" Generating car-music-future-and-current files") for car_music_key in sorted(cm_in_both_c_n_f_keys): car_music_value = future_car_music_dict[car_music_key] from_directory = os.path.join(phone_dirname_prefix, car_music_value.get_full_phone_path()) from_timestamp_file = os.path.join(from_directory, ".file-timestamps") to_directory = os.path.join(car_dirname_prefix, car_music_value.get_full_car_dir()) to_file = os.path.join(to_directory, car_music_value.get_full_album_name()) + ".mp3" # If newer, then generate. Otherwise do nothing. if file_newer(from_timestamp_file, to_file): print(" Catenating {}/* to {}".format(from_directory, to_file)) (artist, song_title) = get_artist_and_song_title(from_directory) catenate_album(from_directory, to_file=to_file, artist=artist, song_title=song_title) def all_dot_files(filenames: typing.List[str]) -> bool: """Return True iff all filenames in filenames start with a dot.""" return all(filename.startswith(".") for filename in filenames) def remove_empty_dirs(dirname: str) -> None: """ Remove empty directories from a directory hierarchy. Based on https://gist.github.com/jacobtomlinson/9031697 . It's modified substantially, for such a small function. """ for root, dirs, files in os.walk(dirname, topdown=False): for directory in dirs: subdir = os.path.join(root, directory) if all_dot_files(os.listdir(subdir)): print(f"rmtree'ing {subdir}") shutil.rmtree(subdir) # The below version feels cleaner, but does not work. Specifically, if we have Phone-Music/a/b, and b is # empty and a contains only b, it'll remove b on the first run, but leave a for the next run. The code # above gets them both in a single run. # if dirs or not all_dot_files(files): # # This directory contains directories or files (or both), so we skip it. # continue # print(" rmtree'ing {}".format(root)) # shutil.rmtree(root) # We map these to underscores, for the sake of my Android phone which is probably using a FAT filesystem SIMPLE_BAD_CHARACTERS = b"!?\\*|$" SET_SIMPLE_BAD_CHARACTERS = set(SIMPLE_BAD_CHARACTERS) INVOLVED_BAD_CHARACTERS = b"&\":()'" SET_INVOLVED_BAD_CHARACTERS = set(INVOLVED_BAD_CHARACTERS) SET_ALL_BAD_CHARACTERS = SET_SIMPLE_BAD_CHARACTERS | SET_INVOLVED_BAD_CHARACTERS def contains_bad_char(filename: bytes) -> bool: """Return True iff filename contains a bad character.""" return bool(set(filename) & SET_ALL_BAD_CHARACTERS) def replace_chars(old_filename: bytes) -> bytes: """Return a "fixed" filename.""" new_filename = old_filename # Replace a few strings, that aren't simple one-char to underscore transformations. for from_, to in ( (b" & ", b" and "), (b"&", b" and "), (b"'", b""), (b'"', b""), (b" : ", b" - "), (b": ", b" - "), (b":", b" - "), (b" (", b" - "), (b"(", b" - "), (b") ", b" "), (b")", b""), ): new_filename = new_filename.replace(from_, to) # Replace some "bad characters" with underscore. # We do bad_charno instead of iterating directly, so we get a byte string instead of an integer. for bad_charno in range(len(SIMPLE_BAD_CHARACTERS)): bad_char = SIMPLE_BAD_CHARACTERS[bad_charno : bad_charno + 1] new_filename = new_filename.replace(bad_char, b"_") return new_filename def bad_filename_fix(top: str) -> None: """If we encounter any filenames or directories with "bad" characters in them, rename them.""" # We convert top to bytes, so we get back bytes filenames and directories. bytes_top = top.encode("UTF-8") for root, directories, filenames in os.walk(bytes_top, topdown=True): # Rename filenames, if bad. for filename in filenames: if contains_bad_char(filename): replaced_filename = replace_chars(filename) orig_filename = os.path.join(root, filename) dir_filename = os.path.join(root, replaced_filename) os.rename(orig_filename, dir_filename) # Rename directories, if bad, replacing in list to get single-pass functionality. for dirno, directory in enumerate(directories): if contains_bad_char(directory): dir_directory = os.path.join(root, directory) replaced = replace_chars(dir_directory) os.rename(dir_directory, replaced) directories[dirno] = replaced def is_prefixed_directory(directory: str) -> bool: """Return True iff directory is "prefixed".""" if directory.startswith("Live-"): return True if directory.startswith("Compilation-"): return True if directory.startswith("Alternate-"): return True match_obj = re.match("^[0-9][0-9][0-9][0-9][a-zA-Z]?-", directory) if match_obj is not None: return True return False def make_used(variable: object) -> None: """Convince pylint and pyflakes that "variable" is used.""" assert True or variable def find_year_exceptions(archival_dirname_prefix: str) -> None: """ Search for bands that have directory prefixes on their album names. If one or more year prefixes are found, and one or more non-prefixed albums are also found, sys.exit(1). """ bad_found = False for root, directories, files in os.walk(archival_dirname_prefix): # We're not interested in the files, only the directories make_used(files) if any(is_prefixed_directory(directory) for directory in directories): # One or more of these subdirectories is prefixed. if not all(is_prefixed_directory(directory) for directory in directories): # Some but not all directories are prefixed - report which ones are not prefixed for directory in directories: if not is_prefixed_directory(directory): print("{} not prefixed".format(os.path.join(root, directory))) bad_found = True if bad_found: print("One or more year prefixes missing. Terminating early.") sys.exit(1) def usage(retval: int) -> None: """Output a usage message and terminate.""" if retval: write = sys.stderr.write else: write = sys.stdout.write write("Usage: {}\n".format(sys.argv[0])) write(" --base-dir /digital-assets/sound\n") write(" --archival-dir Music\n") write(" --phone-dir Phone-Music\n") write(" --car-dir Car-Music\n") write("\n") write("--archival-dir, --phone-dir and --car-dir must be relative to --base-dir\n") sys.exit(retval) def main() -> None: """Generate phone and computer-suitable downsampled and converted music files.""" base_dir = None archival_dirname_prefix = None phone_dirname_prefix = None car_dirname_prefix = None while sys.argv[1:]: if sys.argv[1] == "--base-dir": # I've only tested this with an absolute path, but a relative path might work too. base_dir = sys.argv[2] del sys.argv[1] elif sys.argv[1] == "--archival-dir": if not os.path.isabs(sys.argv[2]): archival_dirname_prefix = sys.argv[2] else: sys.stderr.write("--archival-dir must be relative\n") usage(1) del sys.argv[1] elif sys.argv[1] == "--phone-dir": if not os.path.isabs(sys.argv[2]): phone_dirname_prefix = sys.argv[2] else: sys.stderr.write("--phone-dir must be relative\n") usage(1) del sys.argv[1] elif sys.argv[1] == "--car-dir": if not os.path.isabs(sys.argv[2]): car_dirname_prefix = sys.argv[2] else: sys.stderr.write("--car-dir must be relative\n") usage(1) del sys.argv[1] elif sys.argv[1] in ("--help", "-h"): usage(0) else: sys.stderr.write("{}: Unrecognized option: {}\n".format(sys.argv[0], sys.argv[1])) usage(1) del sys.argv[1] if base_dir is None: sys.stderr.write("{}: --base-dir is a required option\n".format(sys.argv[0])) usage(1) if archival_dirname_prefix is None: sys.stderr.write("{}: --archival-dir is a required option\n".format(sys.argv[0])) usage(1) if phone_dirname_prefix is None: sys.stderr.write("{}: --phone-dir is a required option\n".format(sys.argv[0])) usage(1) if car_dirname_prefix is None: sys.stderr.write("{}: --car-dir is a required option\n".format(sys.argv[0])) usage(1) # We've already check these, but these assertions make mypy happy. assert base_dir is not None assert archival_dirname_prefix is not None assert phone_dirname_prefix is not None assert car_dirname_prefix is not None os.chdir(os.path.expanduser(base_dir)) # archival_dirname_prefix = 'Music' # phone_dirname_prefix = 'Phone-Music' # car_dirname_prefix = 'Car-Music' print("Fixing bad filenames...") bad_filename_fix(archival_dirname_prefix) print("Looking for year exceptions...") find_year_exceptions(archival_dirname_prefix) print("Generating phone music from archival music...") gen_phone_music(archival_dirname_prefix, phone_dirname_prefix) print("Adding timestamp files to phone music...") add_file_timestamps_to_phone_music(phone_dirname_prefix) print("Generating car music from phone music...") gen_car_music(phone_dirname_prefix, car_dirname_prefix) print("Cleaning up empty directories...") remove_empty_dirs(archival_dirname_prefix) remove_empty_dirs(phone_dirname_prefix) remove_empty_dirs(car_dirname_prefix) main()