#!/usr/bin/python3 """Count down the seconds until a command needs to be run, announcing how much time is left periodically.""" # pylint: disable=superfluous-parens # superfluous-parens: Parentheses are good for portability and clarity import hashlib import os import random import re import subprocess import sys import time import typing sys.path.append('/usr/local/lib') # noqa: E402 import speak def get_not_music() -> typing.Set[str]: """Return a set of files that are "not music".""" result_set: typing.Set[str] = set() with subprocess.Popen('not-music', stdout=subprocess.PIPE) as subp: tuple_ = subp.communicate() full_output = tuple_[0].rstrip(b'\n') for line in full_output.split(b'\n'): result_set.add(str(line, encoding='UTF-8')) return result_set NOT_MUSIC = get_not_music() def make_used(var: typing.Any) -> None: """Convince pyflakes that var is used.""" assert True or var def usage(retval: int) -> None: """Output a usage message.""" sys.stderr.write('Usage: {}\n'.format(sys.argv[0])) sys.stderr.write(' --period 60 Number of seconds per announcement period\n') sys.stderr.write(' --count 0 Number of seconds to wait before playing music\n') sys.stderr.write(' --spawn command Command to run after countdown finishes\n') sys.stderr.write(' --mplayer *.mp3 Music files to play after count finishes\n') sys.stderr.write(' --random-music count Number of random music files to play\n') sys.stderr.write(' --nfiles count Number of random music files to play (minus --mplayer files)\n') sys.stderr.write(' --lock Lock screen after playing half the music files\n') sys.stderr.write(' --display-off Turn off screen after playing half the music files\n') sys.stderr.write(' --hibernate Hibernate the system when done playing\n') sys.stderr.write(' --poweroff power down the system when done playing\n') sys.exit(retval) def quotes(list_: typing.List[str]) -> str: """Quote a list for the shell.""" result = [] result.append("'") result.extend("' '".join(list_)) result.append("'") return ''.join(result) def spawn(command: str) -> None: """Run command as a shell command.""" # We intentionally ignore the exit code subprocess.call(command, shell=True) def period_description(num_periods: int, period: int) -> str: """Describe the period. This will usually be minutes.""" if num_periods == 1: ess = '' else: ess = 's' if period == 1: return 'second' if period == 60: return 'minute' if period == 60 * 60: return 'hour' return '{} second period{}'.format(period, ess) def is_hidden(filename: str) -> bool: """Return true iff this is a "hidden" file.""" return filename.startswith('.') def is_mp3(filename: str) -> bool: """Return true iff filename is an mp3 and isn't a "hidden file".""" return filename.endswith('.mp3') def get_dir() -> str: """Get a directory containing mp3 files.""" trial_directories = [ '~/sound/not-backed-up/Phone-Music', '~/sound-local/Phone-Music', '~/sound-local/avconv-Music', '~/sound/Phone-Music', '~/sound/avconv-Music', '~/not-backed-up/Phone-Music', '/mymount/sound/Phone-Music', '/mymount/sound/avconv-Music', ] for trial_directory in trial_directories: assert not trial_directory.endswith(os.path.sep) for trial_directory in trial_directories: directory = os.path.expanduser(trial_directory) if os.path.isdir(directory): print('Getting music from {}'.format(directory)) return directory raise ValueError('No sound files found') DIRECTORY = get_dir() def ripemd160_hasher(filename: str): """Return the ripemd160 hash for filename.""" hasher = hashlib.new('ripemd160') with open(filename, 'rb') as file_: # We can just read the whole file, because we know the file is small. hasher.update(file_.read()) return hasher.hexdigest() class FileComparator: # pylint: disable=too-few-public-methods """Memorize a couple of attributes of a file, for subsequent equality comparison to other files.""" def __init__(self, original_filename: str) -> None: """Initialize.""" self.filename = original_filename del original_filename self.file_length = os.path.getsize(self.filename) # In practice, this will be only about 6 kilobytes assert self.file_length < 1024*1024 self.ripemd160_hash = ripemd160_hasher(self.filename) def is_same(self, other_filename: str) -> bool: """Compare another file to the original file.""" return \ os.path.getsize(other_filename) == self.file_length and \ ripemd160_hasher(other_filename) == self.ripemd160_hash def gen_mp3s() -> typing.Generator[str, None, None]: """Yield all the mp3's under .""" for root, dirs, files in os.walk('.'): make_used(dirs) norm_root = os.path.normpath(root) for filename in files: # pathname = os.path.join(re.sub(r'^\./', '', root), filename) pathname = os.path.join(norm_root, filename) if is_hidden(pathname): continue if is_mp3(pathname): yield pathname def randomize(filenames: typing.List[str], num_files: int) -> typing.Generator[str, None, None]: """Return num_files elements from filenames.""" len_filenames = len(filenames) sys.stderr.write("Number of mp3's found: {}\n".format(len_filenames)) for index in range(num_files): make_used(index) randno = random.randint(0, len_filenames - 1) result_filename = filenames[randno] yield result_filename class Options(object): # pylint: disable=too-few-public-methods # too-few-public-methods: We're mostly a container """Parse, check and hold command line options.""" def __init__(self) -> None: """Initialize.""" self.period = 60 self.count = 0 self.command = '' self.mplayer_files: typing.List[str] = [] self.random_music = 0 self.nfiles = 0 self.lock = False self.display_off = False self.hibernate = False self.poweroff = False def parse_argv(self) -> None: """Parse up argv and store results in attributes.""" while sys.argv[1:]: if sys.argv[1] == '--period': self.period = max(int(sys.argv[2]), 1) del sys.argv[1] elif sys.argv[1] == '--count': self.count = int(sys.argv[2]) del sys.argv[1] elif sys.argv[1] == '--spawn': self.command = sys.argv[2] del sys.argv[1] elif sys.argv[1] == '--mplayer': self.mplayer_files = sys.argv[2:] del sys.argv[2:] elif sys.argv[1] == '--random-music': self.random_music = int(sys.argv[2]) del sys.argv[2] elif sys.argv[1] == '--nfiles': self.nfiles = int(sys.argv[2]) del sys.argv[2] elif sys.argv[1] == '--lock': self.lock = True elif sys.argv[1] == '--display-off': self.display_off = True elif sys.argv[1] == '--hibernate': self.hibernate = True elif sys.argv[1] == '--poweroff': self.poweroff = True elif sys.argv[1] in ['-h', '--help']: usage(0) else: message = '{}: Unrecognized option: {}\n' sys.stderr.write(message.format(sys.argv[0], sys.argv[1])) usage(1) del sys.argv[1] if \ self.command == '' and \ self.mplayer_files == '' and \ self.nfiles == 0 and \ self.random_music == 0: message = '{}: Warning: No action specified\n' sys.stderr.write(message.format(sys.argv[0])) all_regular_files = True for mplayer_file in self.mplayer_files: if not os.path.isfile(mplayer_file): sys.stderr.write('{}: {} does not exist or is not a regular file\n'.format(sys.argv[0], mplayer_file)) all_regular_files = False if not all_regular_files: sys.stderr.write('{}: one or more values to --mplayer not a regular file\n'.format(sys.argv[0])) sys.exit(1) if self.nfiles != 0: if self.random_music != 0: raise SystemExit('--nfiles and --random-music are mutually exclusive.\n') self.random_music = self.nfiles - len(self.mplayer_files) if self.random_music < 0: raise SystemExit('Too many --mplayer files\n') def speechize(filename: str) -> str: """Convert a filename to something a little more speech-friendly.""" result1 = filename.replace('/', '. ') result2 = re.sub(r'\.mp3$', '.', result1) result3 = re.sub('_', ' ', result2) return result3 def mplayers(*, lock, display_off, hibernate, poweroff, filenames: typing.List[str]) -> None: """Spawn one mplayer for each filename, sequentially.""" filenames_list = list(filenames) len_filenames = len(filenames_list) for filenameno, filename in enumerate(filenames_list): if filenameno >= len_filenames / 2.0: if lock: print('Locking') spawn('cinnamon-screensaver-command --lock') lock = False if display_off: print('Turning off display') spawn('xset dpms force off') display_off = False speechy_filename = speechize(filename) sys.stderr.write('Playing {} of {}: {}\n'.format(filenameno + 1, len_filenames, filename)) sys.stderr.flush() speak.speak('Starting {} of {}: {}'.format(filenameno + 1, len_filenames, speechy_filename)) spawn("mplayer '{}' > /dev/null 2>&1".format(filename)) speak.speak('That was {}'.format(speechy_filename)) if hibernate: spawn('/usr/bin/sudo /usr/bin/systemctl hibernate') if poweroff: spawn('/usr/bin/sudo /sbin/poweroff') def chop(filename: str) -> str: """Chop off the music directory, to keep pathnames more listenable.""" prefix = DIRECTORY + '/' if filename.startswith(prefix): length = len(prefix) return filename[length:] else: return filename def normalize_and_chop(in_files: typing.List[str]) -> typing.List[str]: """Make files absolute, and then chop off path to music (if any).""" abs_files = [os.path.abspath(f) for f in in_files] out_files = [chop(f) for f in abs_files] return out_files def main() -> None: """Play music.""" options = Options() options.parse_argv() prior_quotient = int(options.count // options.period) + 1 time0 = time.time() while True: time1 = time.time() if time0 + options.count < time1: if options.command: spawn(options.command) mplayer_list = [] if options.mplayer_files: result = normalize_and_chop(options.mplayer_files) mplayer_list.extend(result) # The timing of this chdir is important os.chdir(DIRECTORY) if options.random_music: random_songs = list(gen_mp3s()) random_songs_set = set(random_songs) # If this assertion fails, then we should expect a file in the not-music script that isn't in the Phone-Music # directory. Pay particular attention to the .mp3 vs .ogg and .flac extensions. assert NOT_MUSIC & random_songs_set == NOT_MUSIC, str(NOT_MUSIC) random_songs_set -= NOT_MUSIC random_songs_list = list(random_songs_set) result = list(randomize(random_songs_list, options.random_music)) mplayer_list.extend(result) if mplayer_list: mplayers( lock=options.lock, display_off=options.display_off, hibernate=options.hibernate, poweroff=options.poweroff, filenames=mplayer_list, ) sys.exit(0) difference = time1 - time0 current_quotient = int((options.count - difference) // options.period) + 1 if prior_quotient != current_quotient: prior_quotient = current_quotient speak.speak('countdown {}, {}, remaining\n'.format( current_quotient, period_description(current_quotient, options.period), )) print(round(options.count - difference)) time.sleep(1) main()