#!/usr/bin/env python3 """Find the 'next' video to watch in a directory hierarchy and play it, updating the .views file appropriately.""" import collections import decimal import functools import math import os import pathlib import sys import typing def usage(retval: int) -> None: """Output a usage message and exit with the appropriate shell exit code.""" if retval == 0: write = sys.stdout.write else: write = sys.stderr.write write(f'Usage: {sys.argv[0]} dirhier\n') sys.exit(retval) class Extensions: """Just hold some variables with a global lifetime.""" media_extensions = set([ '3gp', '3gp,400x240', 'AVI', 'avi', 'cbr', 'cbz', 'flv', 'm4v', 'mkv', 'mov', 'mp3', 'mp4', 'mpeg', 'mpg', 'ogg', 'ogm', 'ogv', 'pdf', 'qt', 'rar', 'video', 'webm', 'wmv', 'yuv', 'URL', ]) not_media_extensions = set([ 'README', 'rating-reason', 'RERIP', 'REVIEW', 'delay', 'doc', 'docx', 'failed', 'idx', 'jpg', 'keep', 'log', 'movie-player', 'nfo', 'png', 'rip-output', 'sfv', 'srt', 'swp', 'tar', 'txt', 'views', 'xz', ]) not_media_suffixes = set([ 'URL', ]) def is_media_file(filename: pathlib.Path) -> bool: """ Classify a file with respect to its "medianess". If a filename is a media file return True. If it's not a media file return False. If we're unsure exit with shell False. """ extension = filename.suffix if extension == '': extension = filename.name if extension.startswith('.'): extension = extension[1:] if extension in Extensions.media_extensions: return True if extension in Extensions.not_media_extensions: return False if any(str(filename).endswith(ms) for ms in Extensions.not_media_suffixes): return False raise SystemExit(f'Unknown whether pathname {filename} is a media file or not') def is_views_file(filename: pathlib.Path) -> bool: """Return True iff this is a .views file.""" extension = filename.suffix return extension == '.views' def append_extension(media_file: pathlib.Path, extension: str) -> pathlib.Path: """Append a suffix (extension) to a pathlib.Path.""" return media_file.with_suffix(media_file.suffix + extension) def get_last_line(path: pathlib.Path) -> typing.Optional[str]: """Return the last line of textfile at path.""" last_line = None print(f'Checking {path}', file=sys.stderr) try: with open(path, 'r') as file_: for line in file_: last_line = line except OSError: # The file most likely doesn't exist. Return None return None except UnicodeDecodeError: raise SystemExit(f'{sys.argv[0]}: File {path} contains one or more non-UTF-8 characters') if last_line is None: raise SystemExit(f'{sys.argv[0]}: File {path} has 0 lines in it') return last_line.rstrip('\n') @functools.total_ordering class MediaFileEvent: """Contain a bit of data related to a media_file event.""" def __init__(self, timestamp: decimal.Decimal, hostname: str, media_file: pathlib.Path): """Initialize.""" self.timestamp = timestamp self.hostname = hostname self.media_file = media_file def __lt__(self, other): """Return True if self < other.""" if self.timestamp < other.timestamp: return True if self.media_file < other.media_file: return True return False def __eq__(self, other): """Return True if self == other.""" return self.timestamp == other.timestamp and self.media_file == other.media_file def __str__(self): """Return a str equivalent of this instance.""" return f'MediaFileEvent({self.timestamp}, {self.hostname}, {self.media_file})' __repr__ = __str__ def get_ends_starts_and_nevers(media_file_list: typing.List[pathlib.Path]) \ -> typing.Tuple[typing.List[MediaFileEvent], typing.List[MediaFileEvent], typing.List[MediaFileEvent]]: """ Categorize the media files into three categories. 1) Files that were started and suitably ended 2) Files that were started but never finished 3) Files that have never been started. For each of these, return a list of MediaFileEvent instances. """ ends = [] starts = [] nevers = [] for media_file in media_file_list: views_filename = append_extension(media_file, '.views') last_line_of_views = get_last_line(views_filename) if last_line_of_views is None: # We make this look like a file that was last viewed at an unreachable point in the future media_file_event = MediaFileEvent(timestamp=decimal.Decimal(math.inf), hostname='', media_file=media_file) start_end_or_never = 'never' else: fields = last_line_of_views.split() start_end_or_never = fields[6] # A very old version of mplay didn't add a hostname to the .views files. if fields[7:]: hostname = fields[7] else: hostname = '' media_file_event = MediaFileEvent(timestamp=decimal.Decimal(fields[0]), hostname=hostname, media_file=media_file) # We could use a dict of 3 elements for this, but then the type annotations get ugly. if start_end_or_never == 'end': ends.append(media_file_event) elif start_end_or_never == 'start': starts.append(media_file_event) elif start_end_or_never == 'never': nevers.append(media_file_event) else: raise SystemExit(f'{sys.argv[0]}: no start or end in file {views_filename}') return (ends, starts, nevers) def get_by_time(events: typing.List[MediaFileEvent]) \ -> typing.DefaultDict[decimal.Decimal, typing.List[MediaFileEvent]]: """Group events by timestamp.""" dict_ = collections.defaultdict(list) for media_file_event in events: dict_[media_file_event.timestamp].append(media_file_event) return dict_ def handle_youngest_never(nevers: typing.List[MediaFileEvent]) -> None: """Output the youngest 'never', unless we don't have one.""" if nevers: youngest_media_file_in_nevers = nevers[0] print(youngest_media_file_in_nevers.media_file) sys.exit(0) else: print('You have reached the end of this show. mplay the first if you wish to start over.', file=sys.stderr) sys.exit(1) def split_on_seps_mfe(media_file_event: MediaFileEvent) -> typing.List[str]: """Split up a filename path into a list of strings, to facilitate sorting.""" media_file = media_file_event.media_file return split_on_seps_path(media_file) def split_on_seps_path(media_file: pathlib.Path) -> typing.List[str]: """Split up a filename path into a list of strings, to facilitate sorting.""" str_media_file = str(media_file) result = str_media_file.split(os.path.sep) return result def main() -> None: """Start the ball rolling.""" if len(sys.argv) != 2: usage(1) directory = pathlib.Path(sys.argv[1]) if not directory.is_dir(): raise SystemExit(f'{directory} does not exist or is not a directory') all_media_files_set = set() all_views_files_set = set() for pathname in directory.rglob('*'): if pathname.is_dir(): continue if is_media_file(pathname): all_media_files_set.add(pathname) if is_views_file(pathname): all_views_files_set.add(pathname) if len(all_media_files_set) == 0: raise SystemExit(f'{sys.argv[0]}: no media files found') all_media_files_list = list(all_media_files_set) all_media_files_list.sort(key=split_on_seps_path) (ends, starts, nevers) = get_ends_starts_and_nevers(all_media_files_list) # It's normal to have a bunch of ends and no starts. For a very-new TV series, there'll be zero ends and zero starts. # These are probably already sorted, but oh well :) ends.sort(key=split_on_seps_mfe) starts.sort(key=split_on_seps_mfe) nevers.sort(key=split_on_seps_mfe) ends_by_time = get_by_time(ends) starts_by_time = get_by_time(starts) if starts: # There is at least one start. Output the one whose media_file sorted lowest and exit. index = min(starts_by_time) youngest_start = starts_by_time[index][0] print(f'Unfinished media file detected: {youngest_start.media_file}', file=sys.stderr) sys.exit(1) if ends: # There is at least one end. If we are at the very last media file, exit False with an error message. # Otherwise, output the media_file of the "next one". max_key_in_ends_by_time = max(ends_by_time) # Note that there could be ties. In that case, we get the earliest one among the tied media events by lexicographically. oldest_media_event_in_ends_by_time = ends_by_time[max_key_in_ends_by_time][0] try: # This could be made O(logn) instead of O(n) using binary search, but it probably won't help much in this case, because # n is going to be pretty small. index_of_oldest_media_event_in_ends = all_media_files_list.index(oldest_media_event_in_ends_by_time.media_file) except ValueError: handle_youngest_never(nevers) else: if index_of_oldest_media_event_in_ends == len(ends) - 1: # We are at the end of the "ends". Output the lowest of the nevers handle_youngest_never(nevers) elif not all_media_files_list[index_of_oldest_media_event_in_ends + 1:]: print('This show is over. To start again:', file=sys.stderr) first = all_media_files_list[0] print('mplay --temp "{}"'.format(first), file=sys.stderr) else: # We are somewhere in the middle (or very beginning) of the ends. Output the next one. next_media_file = all_media_files_list[index_of_oldest_media_event_in_ends + 1] print(next_media_file) else: # There are no ends either. This means we haven't started this show yet. Output the lowest-sorting media_file and exit. handle_youngest_never(nevers) main()