#!/usr/bin/python3 """Play some movie or music files, updating a timestamp file for each one. Or list the files in order.""" from __future__ import with_statement import os import re import sys import time import errno # port pprint import socket import typing import subprocess import collections def make_used(*variable: typing.Any) -> None: """Persuade pyflakes that variable is used.""" assert True or variable def cat(filename): """Copy a filename to stdout, sans trailing whitespace.""" with open(filename, "r") as file_: for line in file_: print(line.rstrip("\n")) def do_URL(cwd, filename, views_filename): """Do a URL file - EG a Youtube URL.""" print("Pull up:") cat(filename) print("Then hit enter when done viewing") input() touch(cwd, views_filename, "end") def is_filename_to_ignore(filename: str) -> bool: """Return True if this is a filename we should ignore.""" if filename == "core": return True if filename.endswith(".log"): return True if filename.endswith(".tar.xz"): return True if filename.endswith(".tar"): return True if filename == "URLs" or filename.endswith("/URLs"): # This gets compressed .rip-output files, and renamed rip-output too return True if ".rip-output" in filename: # This gets compressed .rip-output files, and renamed rip-output too return True if "movie-player" in filename: return True return False def is_views_filename(filename: str) -> bool: """Return True if this is a filename we should ignore.""" return filename.endswith(".views") def timestamp() -> str: """Return a timestamp - both machine and human readable.""" return "%s %s" % (time.time(), time.ctime()) def usage(retval: int) -> None: """Output a usage message and exit.""" if retval: write = sys.stderr.write else: write = sys.stdout.write write("%s: --dvd --timestamps --temp --pause --help\n" % sys.argv[0]) write(" --temp says to copy the file locally before playing\n") write(" --pause says to wait for user input before playing after copying to /tmp/mplay-cache\n") write(" --ext-count does a frequency count of directories' file extensions\n") sys.exit(retval) def get_hostname() -> str: """Return the machine's hostname.""" return os.uname()[1] def quote(file_: str) -> str: """Quote a list of filenames for the shell.""" return f'"{file_}"' def find_file(file_: str) -> str: """Look for a file in the CWD, then .., then ../.., etcetera until we reach the top.""" while True: cwd = os.getcwd() print(f"checking for {file_} in {cwd}") if os.path.exists(file_): return os.path.join(cwd, file_) if cwd == "/": break os.chdir("..") # This is unreachable code really, but this return keeps mypy happy return "" def search_for_command_override(file: str) -> str: """Hunt for a player command override, including the chdir'ing bits.""" result = "" prior_cwd = os.getcwd() if os.path.isdir(file): os.chdir(file) elif os.path.isfile(file): dirname = os.path.dirname(file) if dirname != "": os.chdir(dirname) else: print("Neither a directory nor a file?", file=sys.stderr) sys.exit(1) found_file = find_file("movie-player") if found_file: print(f"found_file is {found_file}") with open(found_file, "r") as file_: result = file_.readline().rstrip("\n") else: print(f"found_file is {found_file}") os.chdir(prior_cwd) return result def command() -> str: """Return the command to run.""" hostname = get_hostname() if hostname == "old-laptop": # This is a very old laptop that can't keep up with video well # -vo sdl is black most of the time, but gives a small green box in the upper left # -vo vdpau gave no video at all # -vo xv gave no video at all # -vo gl_nosw plays a second, pauses a second, plays a second, pauses a second # -vo x11 seems to work well with -framedrop, at least for a minimally-churning video # -vo xover didn't try it # -vo gl didn't try it # -vo gl2 didn't try it # -vo dga didn't try it # -framedrop was giving out of synch audio and video, even in a < 30 minute video, trying these # mplayer-recommended optimizations instead extra = "-vfm ffmpeg -lavdopts lowres=1:fast:skiploopfilter=all -ao sdl" cache_min_percent = 12 cache_min_meg = 64 # old-laptop has serious audio-video synch issues - see below for totem! elif hostname == "dstromberg-laptop": extra = "-ao sdl" cache_min_percent = 12 cache_min_meg = 512 elif hostname == "benchbox": # -framedrop made the difference between A/V sync and slowly diverging A/V extra = "-vfm ffmpeg -lavdopts lowres=1:fast:skiploopfilter=all -framedrop -ao sdl" cache_min_percent = 50 cache_min_meg = 512 # elif hostname == 'zareason-strata7440': # This was unnecessary - the interior of the laptop just needed to be cleaned. # # Added this -autosync to deal with common A/V sync problem. # extra = '-autosync 30 -framedrop -vfm ffmpeg' # # Increased these to deal with that same A/V sync problem; the autosync and framedrop didn't help. # cache_min_percent = 80 # cache_min_meg = 1024 elif hostname == "Daniels-MBP.attlocal.net": extra = "-vo gl" cache_min_percent = 50 cache_min_meg = 512 else: extra = "" cache_min_percent = 50 cache_min_meg = 512 # http://unix.stackexchange.com/questions/1337/tell-mplayer-to-prevent-the-screensaver-from-kicking-in-while-playing # -ao and -vo are key result = " ".join( [ "mplayer", '-font "Bitstream Vera Sans"', "-sid 0", "-osdlevel 3", "-zoom", "-cache $(({}*1024))".format(cache_min_meg), "-cache-min {}".format(cache_min_percent), '"{}"', extra, ] ) if hostname == "old-laptop": # We use totem because old-laptop doesn't work well with mplayer # --sm-client-disable prevents one movie from interrupting another # It also keeps a new movie from playing :( result = "totem --sm-client-disable {}" print("Command is {}".format(result)) return result def touch(cwd: str, filename: str, kind: str) -> None: """Update the .views file.""" not_open = True while not_open: try: os.chdir(cwd) file_ = open(filename, "a") except (OSError, IOError): sys.stderr.write("Error opening %s - try mounting the filesystem and hitting enter\n" % filename) line = sys.stdin.readline() make_used(line) else: break hostname = socket.gethostname() string = "%s %s %s" % (timestamp(), kind, hostname) sys.stdout.write("Updating %s with %s\n" % (filename, string)) file_.write("%s\n" % string) file_.close() def my_mkdir(path: str) -> None: """Create path as a directory, ignoring it if the path already exists.""" try: os.mkdir(path) except OSError as exc: if exc.errno == errno.EEXIST: return else: raise def copy_local(filename: str) -> None: """Copy the file(s) locally for ease of navigation in the movie player.""" filename_string = quote(filename) title_string = quote(filename) du_command = "du -sk {}".format(filename_string) process = subprocess.Popen(du_command, shell=True, stdout=subprocess.PIPE) total_size_in_k = 0 file_ = process.stdout assert file_ is not None for line in file_: fields = line.split() assert len(fields) >= 2 size = int(fields[0]) total_size_in_k += size total_size_in_bytes = total_size_in_k * 1024 my_mkdir("/tmp/mplay-cache") unparmed_cmd = "tar cflS - {} | gprog --size-estimate {} --title {} | (cd /tmp/mplay-cache && tar xfp -)" tar_command = unparmed_cmd.format(filename_string, total_size_in_bytes, title_string) subprocess.check_call(tar_command, shell=True) def get_line() -> None: """Get a line of input from the user, as a "hit enter to continue".""" try: # We try to open /dev/tty and read from it, because I/O should not be saved from earlier. with open("/dev/tty", "r") as file_: line = file_.readline() make_used(line) except OSError: # That didn't work, just read a line from stdin. line = sys.stdin.readline() make_used(line) def play(temp: bool, pause: bool, filename: str, check: bool) -> None: """Play the files, updating .views files before and after.""" unlink_later = False cwd = os.getcwd() if not filename: sys.stderr.write("%s: No file specified\n" % sys.argv[0]) usage(1) if check: if os.path.isabs(filename): sys.stderr.write("%s: %s is an absolute path, but should not be\n" % (sys.argv[0], filename)) sys.exit(1) else: if not os.path.isfile(filename): sys.stderr.write("%s: %s does not exist\n" % (sys.argv[0], filename)) sys.exit(1) views_filename = "%s.views" % filename touch(cwd, views_filename, "start") # We'll update the .views file in the original hierarchy, irrespective of "temp" views_filename = f"{filename}.views" if filename == "URL" or filename.endswith("/URL"): do_URL(cwd, filename, views_filename) sys.exit(0) cmd_with_unspec_filename = command() override_command = search_for_command_override(filename) if override_command: cmd_with_unspec_filename = override_command if temp: copy_local(filename) possible_cache_filename = os.path.join("/tmp/mplay-cache", filename) unlink_later = True if pause: sys.stdout.write("Hit enter to play {}\n".format(possible_cache_filename)) get_line() else: possible_cache_filename = filename cmd = cmd_with_unspec_filename.format(possible_cache_filename) sys.stdout.write("Running command %s\n" % cmd) subprocess.call(cmd, shell=True) if check: touch(cwd, views_filename, "end") # Added 2023-04-09, because I was never reusing a temp filename anyway. # Note that possible_cache_filename is only in /tmp/mplay-cache when temp is truthy. # I'm finally removing this, because /tmp/mplay-cache was getting quite large for no good reason. if unlink_later: os.unlink(possible_cache_filename) def get_mtime(filename: str, from_views: bool = False) -> typing.Tuple[bool, str, float]: """ Get the modification time of a filename. Also return whether there was an apparent crash last time the file was watched. """ last_line = "" unfinished_warning_desired = False unfinished_host = "" if from_views: with open(filename, "r") as file_: for line in file_: last_line = line fields = last_line.split() if fields[6:] and fields[6] == "start": unfinished_warning_desired = True if fields[7:]: unfinished_host = fields[7] if last_line == "" or not from_views: statbuf = os.stat(filename) result = statbuf.st_mtime else: fields = last_line.split() result = float(fields[0]) tuple_ = (unfinished_warning_desired, unfinished_host, result) return tuple_ def adjust(filename: typing.Optional[str]) -> str: """Convert a filename to something printable.""" if filename is None: return "None" bytes_filename = filename.encode("iso-8859-1", "replace") list_ = [] for character in bytes_filename: if 0 <= character <= 127: list_.append(chr(character)) str_filename = "".join(list_) return str_filename.replace(" ", r"\ ") class FilePair(object): """ Maintain a matching pair of files - one will be a movie (like a .mkv, .avi, .mpeg, etc), one will be a .views - at most. We can actually have one or the other but not both. """ def __init__(self, full_path: str) -> None: """Initialize.""" self.views_filename: typing.Union[str, None] = None self.nonviews_filename: typing.Union[str, None] = None self.add_file(full_path) self.mtime = 0.0 self.from_views = True self.unfinished_warning_desired = False self.unfinished_host = "" def add_file(self, full_path: str) -> None: """Add a file to a preexisting file pair object.""" if full_path.endswith(".views"): self.views_filename = full_path else: self.nonviews_filename = full_path def gen_timestamp(self) -> None: """Generate a timestamp as best we can.""" if self.views_filename is not None: self.from_views = True (self.unfinished_warning_desired, self.unfinished_host, self.mtime) = get_mtime(self.views_filename, from_views=True) else: self.from_views = False if self.nonviews_filename is None: raise SystemExit("Internal error: self.nonviews_filename is None") (self.unfinished_warning_desired, self.unfinished_host, self.mtime) = get_mtime(self.nonviews_filename) def fulfillment(self) -> str: """Return a string indicating whether we get the timestamp from the views file, or from the mtime.""" if self.from_views: return "views" return "mtime" def __lt__(self, other: "FilePair") -> bool: """Compare: Python 3.x way.""" return self.mtime < other.mtime def __str__(self) -> str: """Convert to string.""" return "FilePair(%s, %s, %s, %s)" % ( self.nonviews_filename, self.views_filename, self.mtime, self.unfinished_warning_desired, ) def __repr__(self) -> str: """Convert to string.""" return str(self) def timestamps(dirs: typing.List[str] = None) -> None: """Display a sorted list of .views files. Also, if there are any 'unfinished' files, report those at the end.""" if dirs is None: dirs = ["."] files_dict: typing.Dict[str, FilePair] = {} for directory in dirs: stat_buf = os.stat(directory) make_used(stat_buf) for directory in dirs: for internal_root_dir, list_of_dirs, list_of_files in os.walk(directory, topdown=True): make_used(list_of_dirs) for filename in list_of_files: if is_filename_to_ignore(filename): continue base_filename = re.sub(r"\.views$", "", filename) full_path_to_filename = os.path.join(internal_root_dir, filename) full_path_to_base_filename = os.path.join(internal_root_dir, base_filename) if full_path_to_base_filename in files_dict: files_dict[full_path_to_base_filename].add_file(full_path_to_filename) else: files_dict[full_path_to_base_filename] = FilePair(full_path_to_filename) files_list = list(files_dict.values()) for file_pair in files_list: file_pair.gen_timestamp() files_list.sort() unfinished_list = [] for file_pair in files_list: if file_pair.mtime is None: raise SystemExit("file_pair.mtime is None") sys.stdout.write( "%d (%5s) %s %s\n" % ( file_pair.mtime, file_pair.fulfillment(), time.ctime(file_pair.mtime).replace(" ", "-"), adjust(file_pair.nonviews_filename), ) ) sys.stdout.flush() if file_pair.unfinished_warning_desired: unfinished_list.append("File {} unfinished on host {}!".format(file_pair.nonviews_filename, file_pair.unfinished_host)) sys.stdout.flush() for unfinished_file in unfinished_list: print(unfinished_file, file=sys.stderr) sys.stderr.flush() def ext_count(dirs: typing.List[str] = None) -> None: """Display a sorted list of .views files.""" if dirs is None: dirs = ["."] exts: typing.Dict[str, int] = collections.defaultdict(int) for directory in dirs: for internal_root_dir, list_of_dirs, list_of_files in os.walk(directory, topdown=True): make_used(internal_root_dir, list_of_dirs) for filename in list_of_files: basename, extension = os.path.splitext(filename) exts[extension] += 1 list_ = [(-ext_count, ext) for ext, ext_count in exts.items()] list_.sort() for ext_count, ext in list_: print("{:10} {}".format(-ext_count, ext)) class Options(object): # pylint: disable=too-few-public-methods """Command line option parser and container.""" def __init__(self) -> None: """Initialize.""" self.check = True self.mode = "play" self.file: str = "" self.dirs: typing.List[str] = [] self.temp = False self.pause = False def parse_argv(self) -> None: """Parse argv, changing program settings as needed.""" while sys.argv[1:]: if sys.argv[1] == "--dvd": self.check = False elif sys.argv[1] == "--temp": self.temp = True elif sys.argv[1] == "--pause": self.pause = True elif sys.argv[1] == "--timestamps": self.mode = "timestamps" if sys.argv[2:]: self.dirs = sys.argv[2:] del sys.argv[2:] elif sys.argv[1] == "--ext-count": self.mode = "ext-count" if sys.argv[2:]: self.dirs = sys.argv[2:] del sys.argv[2:] elif sys.argv[1] in ["--help", "-h"]: usage(0) elif sys.argv[1].startswith("-"): sys.stderr.write("%s: Illegal option: %s\n" % (sys.argv[0], sys.argv[1])) usage(1) else: if len(sys.argv[1:]) > 2: sys.stderr.write(f"{sys.argv[0]}: Too many files specified\n") usage(1) filename = sys.argv[1] if is_filename_to_ignore(filename) or is_views_filename(filename): sys.stderr.write(f"{sys.argv[0]}: {filename} is inappropriate\n") usage(1) del sys.argv[1] self.file = filename break del sys.argv[1] def check_args(self) -> None: """Check command line arguments for reasonableness.""" if self.temp and self.mode == "timestamps": sys.stderr.write("--temp does not do anything in --timestamp mode\n") sys.exit(1) if self.pause and not self.temp: sys.stderr.write("--pause only makes sense with --temp\n") sys.exit(1) def main() -> None: """Play movies or music.""" options = Options() options.parse_argv() options.check_args() if options.mode == "play": play(options.temp, options.pause, options.file, options.check) elif options.mode == "timestamps": if options.file: sys.stderr.write("%s: --timestamps accepts no argument\n" % sys.argv[0]) usage(1) timestamps(options.dirs) elif options.mode == "ext-count": if options.file: sys.stderr.write("%s: --ext-count accepts no argument\n" % sys.argv[0]) usage(1) ext_count(options.dirs) else: raise ValueError("mode not play or timestsamps") if __name__ == "__main__": main()