#!/usr/bin/env python3 # pylint: disable=import-error,unused-import,wrong-import-position """Wait for an event, and notify the user via e-mail, page and/or GUI popup.""" from __future__ import print_function import decimal import getpass import os import pwd import re import shutil import shlex import signal import smtplib import socket import ssl import subprocess import sys import time import traceback import typing sys.path.append(os.path.expanduser('~/lib')) sys.path.append('/usr/local/lib') import bashquote # noqa: ignore=E402 import drsmenu # noqa: ignore=E402 import ps_subset # noqa: ignore=E402 class NeverException(Exception): """An exception that is never raised.""" pass try: import gi gi.require_version('Notify', '0.7') # noqa: disable=E402 except (ValueError, ImportError): HAVE_NOTIFY = False GLIB_ERROR: typing.Any = NeverException else: HAVE_NOTIFY = True from gi.repository import Notify # pylint: disable=no-name-in-module from gi.repository import GLib GLIB_ERROR = GLib.Error def have_x11() -> bool: """Return True iff X11 is available.""" if os.environ.get('DISPLAY') is None: return False xdpyinfo_retval = os.system('xdpyinfo > /dev/null 2>&1') if xdpyinfo_retval is None or xdpyinfo_retval == 0: return True return False HAVE_POPUP_OPTION = False HAVE_X11 = False HAVE_OSASCRIPT = False # Sometimes Macs have both X11 and osascript. For these we prefer osascript. osascript_path = shutil.which('osascript') if osascript_path is not None: HAVE_OSASCRIPT = True HAVE_POPUP_OPTION = True elif have_x11(): try: import gi gi.require_version('Gtk', '3.0') from gi.repository import Gtk except (ImportError, RuntimeError): pass else: HAVE_X11 = True HAVE_POPUP_OPTION = True else: print('Warning: Neither X11 nor osascript found.') try: from mac_notifications import client as mn_client except ImportError: HAVE_MN_CLIENT = False else: HAVE_MN_CLIENT = True try: import deep_ssh except ImportError: HAVE_DEEP_SSH = False else: HAVE_DEEP_SSH = True INITIAL_DELTA_VALUE = None SHOULD_SEND_PAGE = False def make_used(variable: typing.Any) -> None: """Persuade pyflakes that 'variable' is used.""" assert True or variable def usage(retval: typing.Optional[int]): """Output a usage message.""" if retval == 0: write = sys.stdout.write else: write = sys.stderr.write write('Usage: %s --true-command cmd\n' % sys.argv[0]) write('\t-f|--false-command cmd Loop until cmd returns shell-false\n') write('\t-t|--true-command cmd Loop until cmd returns shell-true\n') write('\t-g|--greater-than fieldno cmd value Loop until fieldno of cmd output is > value\n') write('\t-G|--greater-than-or-equal-to fieldno cmd value Loop until fieldno of cmd output is >= value\n') write('\t-l|--less-than fieldno cmd value Loop until fieldno of cmd output is < value\n') write('\t-L|--less-than-or-equal-to fieldno cmd value Loop until fieldno of cmd output is <= value\n') write('\t-e|--equal-to fieldno cmd value Loop until fieldno of cmd output is == value\n') write('\t-p|--pid pid Loop until pid exists\n') write('\t-c|--spawn-command cmd Spawn cmd and loop until it exits\n') write("\t-S|--pid-by-process-name process-name-regex Wait for pid (id'd by regex) to exit (flat view)\n") write("\t-s|--pid-by-process-name-hierarchically process-name-regex Wait for pid (id'd by regex) to exit (tree view)\n") write('\t-n|--check-host-port host tcpport Wait for host:tcpport to listen\n') write('\t-E|--email-address email-address\n') write('\t-m|--max-seconds seconds\n') write('\t-i|--check-interval n Check every n seconds\n') write('\t-r|--remote-host hostname Check on hostname\n') write('\t-b|--block\n') write('\t-w|--no-blocking\n') write('\t-P|--send-page\n') write('\t --delta command\n') write('\t-d|--docker Prompt with a list of docker containers and use one\n') write('\t-D|--docker-container dc Use dc as docker container\n') write('\t --sudo sudo on the final hop. Does not try to authenticate\n') write('\t Change final field to "NOPASSWD:ALL" in sudoers\n') write('\t-h|--help\n') write('\n') write('Options are very order sensitive - EG, you need to specify\n') write('--remote-host before --pid, to make --pid operate on the correct\n') write('host.\n') write('\n') write('Also, you can specify more than one checking option on the same command.\n') write('For example:\n') write('\t%s --pid 12345 --spawn-command "sleep 10" --pid 54321\n' % sys.argv[0]) write('\n') write('Special (and optional) environment variables\n') write('\tNWUEMAIL The email address to send alerts to and from\n') write('\tNWUPAGE The email address to send pages to and from\n') write('\tNWUMAILACCOUNT The name of the account to which you wish to send e-mail\n') write('\n') write('Please note that if sudo on the (remote) host requires a password, this program will appear to hang.\n') sys.exit(retval) class Callback: def __init__(self): self.called_back = False def callback(self): self.called_back = True def ensure_dir(option_values: 'OptionValues') -> typing.Tuple[str, str, str]: """Create ~/.notify-when-up2.""" directory = os.path.expanduser('~/.notify-when-up2') prior_dir = os.getcwd() # create the directory if it doesn't yet exist try: os.chdir(directory) except (OSError, IOError): os.mkdir(directory, 7*64 + 5*8 + 5) os.chdir(prior_dir) # create the filename cred_filename = os.path.join(directory, '%s.cred' % option_values.mail_account) return directory, prior_dir, cred_filename def get_mail_account(option_values: 'OptionValues') -> None: """Look up the mail account data, or request it from the user and save it for next time.""" # what's our magic directory? _unused, prior_dir, cred_filename = ensure_dir(option_values) make_used(_unused) # get the username and password for the SMTP account try: os.chmod(cred_filename, 0o400) except OSError: print('Account %s' % option_values.mail_account) sys.stdout.write('SMTP Username: ') sys.stdout.flush() option_values.smtp_username = sys.stdin.readline().rstrip() # This prompts on its own option_values.smtp_password = getpass.getpass().rstrip() sys.stdout.write('SMTP Server: ') sys.stdout.flush() option_values.smtp_server = sys.stdin.readline().rstrip() sys.stdout.write('In the following question, Office365 needs 465. Gmail needs 587.\n') sys.stdout.write('SMTP Server Port (default 587) : ') sys.stdout.flush() port = sys.stdin.readline().rstrip() if port == '': option_values.smtp_port = 587 else: option_values.smtp_port = int(port) cred_file = open(cred_filename, 'w') os.chmod(cred_filename, 0o400) string = '%s\n%s\n%s\n%s\n' % ( option_values.smtp_username, option_values.smtp_password, option_values.smtp_server, option_values.smtp_port, ) cred_file.write(string) else: cred_file = open(cred_filename, 'r') option_values.smtp_username = cred_file.readline().rstrip('\n') option_values.smtp_password = cred_file.readline().rstrip('\n') option_values.smtp_server = cred_file.readline().rstrip('\n') port = cred_file.readline().rstrip('\n') if port == '': option_values.smtp_port = None else: option_values.smtp_port = int(port) cred_file.close() os.chdir(prior_dir) # This is notify-when-up version 2 # Differences from notify-when-up version 1: # 1) Structured much better internally. Version 1 is one of those programs you look at and say # "Did I write this? Bummer. This code stinks." In version 1's defense though, it started # as a very small/simple project, and grew incrementally without any real planning # 2) Long options in addition to the prior short options # 3) Remote host checks, to keep quoting simple, particularly since we don't load X on most of our # boxes # 5) Options are very order-sensitive in this version, but that means you can bunch together # some notify-when-up notification options into the same command now def last_hop_additions(sudo: bool, docker: bool, docker_container: str, command: str) -> str: """Add optional docker and sudo commands.""" temp = command # We actually don't -want- to quote these if docker: temp = 'docker exec {} {}'.format(docker_container, temp) if sudo: temp = 'sudo {}'.format(temp) return temp def possibly_remote_run(remote_host: str, command: str) -> int: """Run a command locally or remotely, as required.""" retval: typing.Union[int, os._wrap_close, subprocess.Popen] = 0 if is_local(remote_host): retval = os.system(command) if retval is None: retval = 0 assert isinstance(retval, int) else: subp = deep_ssh.handle(optional_opts='', chain=remote_host, command=command, popen=2) # subp has to be a subprocess.Popen, because we passed popen=2 above. assert isinstance(subp, subprocess.Popen) subp.communicate() retval = subp.returncode # retval has to be of type int because we did not specify a popen argument, and it defaults to 0, meaning "just run it". assert isinstance(retval, int) return retval def possibly_remote_popen(option_values: 'OptionValues', command: str) -> subprocess.Popen: """Popen or deep_ssh the command on the local or remote host.""" if is_local(option_values.remote_host): return subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, encoding='utf-8') retval = deep_ssh.handle( optional_opts='', chain=option_values.remote_host, command=command, popen=2, echo_chain=True, echo_command=True, ) # retval has to be of type subprocess.Popen because we passed popen=2 above. assert isinstance(retval, subprocess.Popen) return retval class ProcessClass(object): # pylint: disable=too-few-public-methods # too-few-public-methods: We don't need a lot of public methods """A class for describing the processes on a host.""" def __init__(self, line) -> None: """Initialize.""" fields = line.split() self.line = line self.pid = int(fields[0]) self.command = ' '.join(fields[1:]) def __cmp__(self, other: 'ProcessClass') -> int: """2.x comparison.""" if self.pid < other.pid: return -1 if self.pid > other.pid: return 1 return 0 def __str__(self) -> str: """Convert to string.""" return self.line def get_processes(option_values: 'OptionValues') -> typing.Dict[int, 'ProcessClass']: """Look up all processes on option_values.remote_host using a ps command.""" dictionary = {} # Sadly, the BSD's don't grok SysV ps options well. Everything else I've encountered does. command = ( 'case $(uname -s) in ' 'Darwin|FreeBSD) ps axo pid,command ;; ' '*) if ps --help 2>&1 | grep -qi busybox; then ps -eo pid,comm; else ps -ewwo pid,cmd; fi ;; esac' ) subprocess_popen = possibly_remote_popen(option_values, command) (stdout_data, stderr_data) = subprocess_popen.communicate() _ = stderr_data stdout_lines = stdout_data.rstrip('\n').split('\n') header_line = stdout_lines[0] if header_line.strip().startswith('PID'): pass else: raise ValueError('weird output from {}'.format(command)) for line in stdout_lines[1:]: process = ProcessClass(line) dictionary[process.pid] = process return dictionary def get_pid_by_command_flat(option_values: 'OptionValues', command_re: str): """Look up the pid of a command by a substring search an d prompting the user.""" if is_local(option_values.remote_host): my_pid = os.getpid() else: # This is a "pid" that will never match. my_pid = -1 processes = get_processes(option_values) keys = list(processes) keys.sort() match_count = 0 matches = [] for pid in keys: # skip this program, because we probably pretty much never want to wait on this program regex = re.compile(command_re) match_obj = regex.search(processes[pid].command) if match_obj and pid != my_pid: matches.append(processes[pid]) match_count += 1 if match_count == 0: sys.stderr.write('No matches found on %s\n' % command_re) # an illegal pid value used as a sentinel sys.exit(1) elif match_count == 1: return (matches[0].pid, matches[0].command) else: pids = [getattr(item, 'pid') for item in matches] pids.sort() pidno = 0 for pid in pids: sys.stderr.write('%d) %s\n' % (pidno, str(processes[pid]).strip())) pidno += 1 # may need to open /dev/tty for this input user_matchno = int(sys.stdin.readline()) return (matches[user_matchno].pid, matches[user_matchno].command) def get_pid_by_command_hierarchically(option_values: 'OptionValues', command_re: str): """Look up the pid of a command by a substring search and prompting the user - hierarchically.""" relevant_pids: typing.Set[int] = set() regex = re.compile(command_re) command = ' '.join(ps_subset.command_list) command = last_hop_additions(option_values.sudo, option_values.docker, option_values.docker_container, command) subprocess_popen = possibly_remote_popen(option_values, command) if is_local(option_values.remote_host): my_pid = os.getpid() else: my_pid = -1 pid, command = ps_subset.do([regex], relevant_pids, ps_subset.prompt_and_select, subprocess_popen, my_pid) return (pid, command) def get_command_by_pid(option_values: 'OptionValues', pid: int): """Look up the pid of our command.""" processes = get_processes(option_values) if pid in processes: return processes[pid].command else: return None def is_local(host: str) -> bool: """If our remote_host is the current machine, return True, else False.""" return host in ('', 'localhost', hostname(), canonical_hostname()) def spawn_command(remote_host: str, command: str) -> int: """Spawn a subprocess.""" pid = os.fork() if pid == 0: sys.exit(possibly_remote_run(remote_host, command)) else: return pid def by_line_output_cmp_fn(option_values, descr, fieldno, command, number, comparison_function): # pylint: disable=too-many-arguments """Check a line of output for a particular threshold - if it matches, we've reached our event.""" success = False reps = 100 for _unused in range(reps): make_used(_unused) subprocess_popen = possibly_remote_popen(option_values, command) stdout_data, stderr_data = subprocess_popen.communicate() stdout_lines = stdout_data.rstrip('\n').split('\n') line = stdout_lines[0] fields = line.split() try: value = decimal.Decimal(fields[fieldno]) except IndexError: delay = 5 print('%s: failed to get field %d from command %s retrying in %d seconds' % ( sys.argv[0], fieldno, command, delay), file=sys.stderr) time.sleep(delay) continue else: success = True break if success: print('got value %f, want %s %f' % (value, descr, number), file=sys.stderr) return comparison_function(value, number) else: tuple_ = (sys.argv[0], fieldno, command, reps) sys.stderr.write('%s: Obtaining field %d from %s failed %d times in a row\n' % tuple_) sys.exit(1) class Comparator(object): # pylint: disable=too-few-public-methods # too-few-public-methods: We just hold a comparison function and ancillary data """Comparison stuff.""" def __init__(self, description, comparison_function) -> None: """Initialize.""" self.description = description self.option = '--' + description.replace(' ', '-') self.comparison_function = comparison_function def by_shell_exit_status(option_values, shell_command): """If our command exits with a particular value, we've reached our event.""" if is_local(option_values.remote_host): retval = os.system(shell_command) else: retval = deep_ssh.handle(optional_opts='', chain=option_values.remote_host, command=shell_command) # invert the logic, because shell uses negative logic while python is normal ^_^ if retval is None or retval == 0: return True return not retval def slurp(option_values, shell_command): """Read all input from our command.""" if is_local(option_values.remote_host): proc = subprocess.Popen(shlex.split(shell_command), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (stdout, stderr) = proc.communicate() if stderr: print(stderr, file=sys.stderr) return stdout.decode('utf-8') else: filedes = deep_ssh.handle(optional_opts='', chain=option_values.remote_host, command=shell_command, popen=True) value = filedes.read() # pylint: disable=no-member filedes.close() return value def by_command_output(option_values, shell_command): """If our command's output has changed, let the caller know that we've reached our event.""" if option_values.initial_delta_value is None: option_values.initial_delta_value = slurp(option_values, shell_command) sys.stdout.write('Initial value is %s\n' % option_values.initial_delta_value.rstrip('\n')) return True current_delta_value = slurp(option_values, shell_command) sys.stdout.write('Current value is %s\n' % current_delta_value.rstrip('\n')) if option_values.initial_delta_value == current_delta_value: return True return False def by_waitpid(pid): """If our subprocess exits, we've reached our event.""" pid_exited, exit_status = os.waitpid(pid, os.WNOHANG) if pid_exited == pid: # The not inverts the shell's negative logic to our positive logic return not exit_status return 0 def drop_privileges() -> None: """ Drop privileges. Based on https://stackoverflow.com/questions/2699907/dropping-root-permissions-in-python """ if os.getuid() != 0 and os.geteuid() != 0: # We have nothing to give up, so just return without changing anything. return login_name = os.getlogin() pwent = pwd.getpwnam(login_name) login_uid = pwent.pw_uid login_gid = pwent.pw_gid # Remove all auxiliary groups os.setgroups([]) # Try setting the uid/gid if login_gid != 0: os.setregid(login_gid, login_gid) os.setreuid(login_uid, login_uid) class Apa(object): """Create a window with an "acknowledge" button that isn't prone to accidentally dismissal.""" def __init__(self) -> None: """Initialize.""" drop_privileges() def accident_proof_acknowledge(self, description: str) -> None: """Create a window with an "acknowledge" button that isn't prone to accidentally dismissal.""" self.window = Gtk.Window(type=Gtk.WindowType.TOPLEVEL) self.window.set_title('Accident proof acknowledge') self.window.connect("delete_event", self.delete_event) self.window.show() self.vbox = Gtk.VBox() self.vbox.show() self.window.add(self.vbox) self.label = Gtk.Label(label=description) self.label.show() self.vbox.pack_start(self.label, True, True, 0) self.hbox = Gtk.HBox() self.hbox.show() self.vbox.pack_start(self.hbox, True, True, 0) self.acknowledge_button = Gtk.Button(label='Acknowledge') self.acknowledge_button.show() self.acknowledge_button.connect("clicked", self.acknowledge_callback) self.hbox.pack_start(self.acknowledge_button, True, True, 0) Gtk.main() def delete_event(self, *args, **kwargs) -> None: """Shut down GTK.""" # Thu Dec 20 11:02:20 PST 2018 # I suspect this is called with different numbers of arguments on different versions of gi. _unused = (self, args, kwargs) make_used(_unused) Gtk.main_quit() sys.exit(0) def acknowledge_callback(self, widget=None) -> None: """Process click of acknowledge button.""" _unused = self _unused = widget make_used(_unused) Gtk.main_quit() def whoami() -> str: """Return the current user's username.""" return pwd.getpwuid(os.geteuid())[0] def hostname() -> str: """Return a naive hostname for the host we're running on.""" full_uname = os.uname() result = full_uname[1] return result def canonical_hostname() -> str: """Return the canonical hostname for the machine we're running on.""" uname = hostname() try: record = socket.gethostbyname(uname) good_hostname = socket.gethostbyaddr(record) except (socket.herror, socket.gaierror): # reverse lookup failed, so use uname return uname return good_hostname[0] def from_address(address: str) -> str: """Generate a from address for our emails.""" return '"notify-when-up2: %s on %s" <%s>' % (whoami(), canonical_hostname(), address) def notify_by_email(option_values: 'OptionValues', to_address: str, subject: str, report: str) -> None: """Notify the user that our event has occurred - via an e-mail.""" if not option_values.send_email: sys.stderr.write('Skipping e-mail by request of user\n') return text_list = [] text_list.append('From: %s\n' % from_address(option_values.email_address)) text_list.append('To: %s\n' % to_address) text_list.append('Subject: %s\n' % subject) text_list.append('\n') text_list.append(report) text = ''.join(text_list) if option_values.mail_account == 'sendmail': print("Notifying via pipe to sendmail command", file=sys.stderr) try: pipe = os.popen('/usr/lib/sendmail %s' % to_address, 'w') except (OSError, IOError): try: pipe = os.popen('/usr/sbin/sendmail %s' % to_address, 'w') except (OSError, IOError): sys.stderr.write('Warning! Opening a pipe to sendmail failed!\n') return pipe.write(text) pipe.close() else: print("Notifying via SMTP to %s:%s (%s)" % ( option_values.smtp_server, option_values.smtp_port, to_address), file=sys.stderr) try: context = ssl.create_default_context() assert option_values.smtp_port is not None with smtplib.SMTP(option_values.smtp_server, option_values.smtp_port) as smtp_obj: if option_values.smtp_port != 25: smtp_obj.starttls(context=context) smtp_obj.ehlo(canonical_hostname()) if option_values.smtp_username and option_values.smtp_password: # Some of us still don't use SMTP authentication smtp_obj.login(option_values.smtp_username, option_values.smtp_password) smtp_obj.sendmail(from_address(option_values.email_address), to_address, text) except socket.error: sys.stderr.write('SMTP to %s:%s failed\n' % (option_values.smtp_server, option_values.smtp_port)) traceback.print_exc(file=sys.stderr) sys.stderr.write('Continuing...\n') def notify_by_pager(option_values: 'OptionValues', pager_address: str, subject: str, report: str) -> None: """Notify the user that our event has occurred - via a page to an e-mail address.""" paged = 0 if pager_address == '': pass else: fields = pager_address.split('@') if len(fields) == 1 or len(fields) == 2: notify_by_email(option_values, pager_address, subject, report) paged = 1 elif len(fields) == 3: notify_by_email(option_values, '%s@%s' % (fields[0], fields[1]), fields[2], report) paged = 1 else: raise ValueError("Illegal number of @'s in pager address") if paged: print('sent page', file=sys.stderr) def notify_by_logfile(report: str) -> None: """Notify the user that our event has occurred - in a logfile. Unimplemented.""" # how necessary is this, really? _unused = report make_used(_unused) def notify_by_gui(report: str) -> None: """Create a popup window using GTK+, indicating we've reached our event.""" apa = Apa() apa.accident_proof_acknowledge(report) class NotLocal(Exception): """An exception for notify_by_libnotify to raise when not on the console.""" def is_local_display(regex: typing.Pattern = re.compile(r'^:[0-9](\.[0-9]+)?$')) -> bool: """Return True iff $DISPLAY points at a local display.""" if 'DISPLAY' not in os.environ: return False match = regex.match(os.environ['DISPLAY']) return bool(match) def notify_by_libnotify(report: str) -> None: """Create a notify-osd (or similar) popup using GTK+, indicating we've reached our event.""" if not is_local_display(): # We're not local, and libnotify might notify the wrong console, so we avoid libnotify raise NotLocal # One time initialization of libnotify Notify.init("notify-when-up2") # Create the notification object summary = "notify-when-up2 state change" notification = Notify.Notification.new( summary, report, ) # This seems to be the key to getting a longer notification notification.set_urgency(2) # Highest priority # Actually show on screen notification.show() # This didn't work - and yet gprog does have the desired behavior... # while True: # # Loop forever, so that when the user clicks the notification, the virtual # # desktop comes back to this window. # time.sleep(1) def notify_via_osascript(report: str) -> None: """Do a notification on macOS via osascript.""" # Note that this probably should use bashquote to escape things, so we don't get quoting errors # from more complex shell commands. # # We probably should also use the subprocess module, not os.system(). # Based on https://stackoverflow.com/questions/17651017/python-post-osx-notification # title = 'notify-when-up2 state change' # # 315576000 is about 10 years osacommand = """tell app "System Events" with timeout of 315576000 seconds display dialog "{}" end timeout end tell""".format(report.replace('"', r'\\"')) assert osascript_path is not None command = [osascript_path, "-e", osacommand] subprocess.run(command, shell=False) def noop(*args, **kwargs): """No operation: do nothing.""" pass def notify_via_mn_client(report: str) -> None: """Do a notification on macOS via the OS' notification system.""" summary = "notify-when-up2 state change" # cb = Callback() mn_client.create_notification( title=summary, subtitle=report, icon=None, # action_button_str="Acknowledge", # action_callback=cb.callback, ) # print('fred 4') # while not cb.called_back: # print('fred 4.5', cb.called_back) # time.sleep(1) # print('fred 5') # mn_client.stop_listening_for_callbacks() def notify_on_tty(report: str) -> None: """Notify the user our event has occurred - on stderr.""" print("We're there!\n%s\n" % report, file=sys.stderr) def shorten(argv0: str) -> str: """Return the basename of a pathname.""" last = os.path.basename(argv0) return last def notify(description: str, initial_time: float, current_time: float, option_values: 'OptionValues', timed_out: bool): """Notify the user that the event has occured - in a variety of ways.""" if is_local(option_values.remote_host): host = canonical_hostname() else: host = option_values.remote_host if timed_out: report = 'Timed out, did not get to: %s\nStarted %s\nEnded %s\nRan on host %s\n' % ( description, time.ctime(initial_time), time.ctime(current_time), host) subject = 'command timed out on %s' % host else: report = '%s\nStarted %s, ended %s\nRan on host %s\n' % ( description, time.ctime(initial_time), time.ctime(current_time), host) subject = '%s done against %s' % (shorten(sys.argv[0]), host) notify_by_email(option_values, option_values.email_address, subject, report) if option_values.should_send_page: notify_by_pager(option_values, option_values.pager_address, subject, report) notify_by_logfile(report) notify_on_tty(report) libnotify_worked = False if HAVE_NOTIFY: # Note that libnotify is nonblocking. try: notify_by_libnotify(report) except (GLIB_ERROR, NotLocal): # pylint: disable=catching-non-exception libnotify_worked = False else: libnotify_worked = True # Note that we only block if we do not have libnotify now... if option_values.go_ahead_and_block: if HAVE_X11 and not libnotify_worked: notify_by_gui(report) elif HAVE_MN_CLIENT: notify_via_mn_client(report) elif HAVE_OSASCRIPT: notify_via_osascript(report) def update(description: str, initial_time: float, current_time: float, option_values: 'OptionValues'): """Output one record on the tty describing what we're waiting for, and indicating we've not yet reached it.""" if is_local(option_values.remote_host): host = canonical_hostname() else: host = option_values.remote_host sys.stderr.write("We're waiting for: '%s' on %s, but we're not there yet\n" % (description, host)) sys.stderr.write("We started waiting at %s, and now it's %s\n" % (time.ctime(initial_time), time.ctime(current_time))) sys.stderr.flush() def signal_received(signum, frame): """Terminate on signal.""" _unused = signum _unused = frame make_used(_unused) sys.stderr.write('Terminating on signal %d %s %s\n' % (signum, time.ctime(time.time()), ' '.join(sys.argv))) sys.exit(1) def enable_paging(signum, frame): """Deal with SIGUSR1 having been received; enable paging.""" # pylint: disable=global-statement _unused = signum _unused = frame make_used(_unused) global SHOULD_SEND_PAGE SHOULD_SEND_PAGE = True print('Enabled paging due to SIGUSR1') def disable_paging(signum, frame): """Deal with SIGUSR2 having been received; disable paging.""" # pylint: disable=global-statement _unused = signum _unused = frame make_used(_unused) global SHOULD_SEND_PAGE SHOULD_SEND_PAGE = False print('Disabled paging due to SIGUSR2') def wait_for_it( option_values: 'OptionValues', interrogation_function: typing.Callable[['OptionValues'], bool], description: str, short_interval: int = 0, ): """Wait for our event to occur.""" signal.signal(signal.SIGTERM, signal_received) signal.signal(signal.SIGHUP, signal_received) signal.signal(signal.SIGINT, signal_received) signal.signal(signal.SIGUSR1, enable_paging) signal.signal(signal.SIGUSR2, disable_paging) # we need global lifetime, not global scope - but we get both :( if not option_values.graphics_warned: if not HAVE_POPUP_OPTION: sys.stderr.write('Warning: No graphics available, so you will not get a popup window when the time comes.\n') option_values.graphics_warned = True initial_time = time.time() while True: # this is the main benefit internally, of version 2 - we're using function variables to # get just one main loop instead of a gazillion # # interrogation_function should return true when we're done if interrogation_function(option_values): current_time = time.time() notify(description, initial_time, current_time, option_values, timed_out=False) return current_time = time.time() if option_values.max_seconds != 0: if current_time - initial_time >= option_values.max_seconds: notify(description, initial_time, current_time, option_values, timed_out=True) return else: # when max_seconds is 0, we let things run forever pass # let the user know what's going on. users like to know what's going on :) update(description, initial_time, current_time, option_values) # Note that we're making no effort to account for how long the commands run! # # Also, if the check_interval is 10 minutes and the max duration is 1 minute, we # sleep far too long - don't worry about it - not yet at least. # # This program isn't (yet?) about precise timing though; it's more about making sure # you know what's happening and when it happened if short_interval: time.sleep(1.0) else: time.sleep(option_values.check_interval) class OptionValues(object): """Singleton to hold command line options and other tunables.""" # pylint: disable=too-few-public-methods,too-many-instance-attributes # too-few-public-methods: We're just as singleton for a lot of things that would be a pain to pass around otherwise # too-many-instance-attributes: We're a container def __init__(self) -> None: """Initialize.""" self.go_ahead_and_block = True self.should_send_page = False self.max_seconds = 0.0 self.check_interval = 60.0 self.remote_host = '' self.smtp_username = '' self.smtp_password = '' self.smtp_server = '' self.smtp_port: typing.Optional[int] = None self.initial_delta_value = None self.email_address = '' self.pager_address = '' self.mail_account = '' self.send_email = True self.graphics_warned = False self.sudo = False self.docker = False self.docker_container = '' def interpret(option_values: 'OptionValues', comparators: typing.Dict[str, 'Comparator']): # pylama:ignore=C901 # pylint: disable=too-many-statements,too-many-branches,too-many-locals # We're a rather large function, unfortunately """Interpret the command line's mini-language.""" modifiers_unused = 0 while sys.argv[1:]: if sys.argv[1] in ('--true-command', '-t'): modifiers_unused = 0 command = sys.argv[2] command = last_hop_additions(option_values.sudo, option_values.docker, option_values.docker_container, command) # interrogation_function = lambda option_values: by_shell_exit_status(option_values, command) def true_cmd_interrogfn(option_values): """--true-command interrogation.""" return by_shell_exit_status(option_values, command) description = '%s returned true' % sys.argv[2] wait_for_it(option_values, true_cmd_interrogfn, description) del sys.argv[1] elif sys.argv[1] in ('--false-command', '-f'): modifiers_unused = 0 command = sys.argv[2] command = last_hop_additions(option_values.sudo, option_values.docker, option_values.docker_container, command) # interrogfn = lambda option_values: not by_shell_exit_status(option_values, command) def false_cmd_interrogfn(option_values): """--false-command interrogation.""" return not by_shell_exit_status(option_values, command) description = '%s returned false' % sys.argv[2] wait_for_it(option_values, false_cmd_interrogfn, description) del sys.argv[1] elif sys.argv[1] in ('--delta', ): modifiers_unused = 0 command = sys.argv[2] command = last_hop_additions(option_values.sudo, option_values.docker, option_values.docker_container, command) def delta_interrogfn(option_values): """--delta interrogation function.""" return not by_command_output(option_values, command) description = '%s returned different' % command wait_for_it(option_values, delta_interrogfn, description) del sys.argv[1] elif sys.argv[1] in comparators: modifiers_unused = 0 option = sys.argv[1] fieldno = int(sys.argv[2]) command = sys.argv[3] command = last_hop_additions(option_values.sudo, option_values.docker, option_values.docker_container, command) try: value = decimal.Decimal(sys.argv[4]) except decimal.InvalidOperation: raise SystemExit(f'Error: Unable to convert {sys.argv[4]} to decimal.Decimal') # interrogfn = lambda option_values: by_line_output_cmp_fn( # option_values, comparators[option].description, fieldno, command, value, comparators[option].comparison_function) def comparator_interrogfn(option_values): """Comparison interrogation function - handles <, >, <=, >=, ==.""" tuple_ = ( option_values, comparators[option].description, fieldno, command, value, comparators[option].comparison_function, ) return by_line_output_cmp_fn(*tuple_) description = 'command %s returned %s %s' % (command, comparators[sys.argv[1]].description, value) wait_for_it(option_values, comparator_interrogfn, description) del sys.argv[1] del sys.argv[1] del sys.argv[1] elif sys.argv[1] in ('--pid', '-p'): modifiers_unused = 0 corresponding_command = get_command_by_pid(option_values, int(sys.argv[2])) command = last_hop_additions( option_values.sudo, option_values.docker, option_values.docker_container, corresponding_command, ) # interrogfn = lambda option_values: by_shell_exit_status( # option_values, '! ps -p %s > /dev/null 2>&1' % sys.argv[2]) if corresponding_command is None: print('pid {} does not exist (on the relevant system)'.format(sys.argv[2]), file=sys.stderr) del sys.argv[1] del sys.argv[1] continue def pid_interrogfn(option_values): """--pid interrogation function.""" # Note that OS/X and FreeBSD do not have /proc. This ps -p seems to work on them and Linux # at least. return by_shell_exit_status(option_values, '! ps -p %s > /dev/null 2>&1' % sys.argv[2]) # I used to use this for everything, but then I hit OS/X and FreeBSD. # return by_shell_exit_status(option_values, '! ls -ld /proc/%s > /dev/null 2>&1' % sys.argv[2]) description = 'pid %s (command %s) exited' % (sys.argv[2], corresponding_command) wait_for_it(option_values, pid_interrogfn, description) del sys.argv[1] elif sys.argv[1] in ('--spawn-command', '-c'): modifiers_unused = 0 # potentially two ssh chains - oh well command = sys.argv[2] command = last_hop_additions(option_values.sudo, option_values.docker, option_values.docker_container, command) pid = spawn_command(option_values.remote_host, command) # we intentionally ignore remote_host for this one, because our child handles that # for us instead def spawn_command_interrogfn(option_values): """--spawn-command interrogation function.""" make_used(option_values) return by_waitpid(pid) description = '%s exited' % sys.argv[2] wait_for_it(option_values, spawn_command_interrogfn, description, short_interval=1) del sys.argv[1] elif sys.argv[1] in ('--pid-by-process-name-flat', '-S'): modifiers_unused = 0 # potentially two ssh chains - oh well (pid, command) = get_pid_by_command_flat(option_values, sys.argv[2]) del sys.argv[1] if pid != -1: # This may not always be a directory! # Note that OS/X does not have /proc def pid_by_process_name_interrogfn(option_values): """--pid-by-command-name interrogation function.""" # Works on Linux, FreeBSD, OS/X. Hopefully other systems as well. return by_shell_exit_status(option_values, '! ps -p %s > /dev/null 2>&1' % (pid, )) # Used to use this for everything, but it doesn't work on the BSD's # return by_shell_exit_status(option_values, '! ls -ld /proc/%d > /dev/null 2>&1' % (pid, )) description = '%s exited' % command wait_for_it(option_values, pid_by_process_name_interrogfn, description) elif sys.argv[1] in ('--pid-by-process-name-hierarchically', '-s'): modifiers_unused = 0 # potentially two ssh chains - oh well (pid, command) = get_pid_by_command_hierarchically(option_values, sys.argv[2]) del sys.argv[1] if pid != -1: # This may not always be a directory! # Note that OS/X does not have /proc def pid_by_process_name_interrogfn(option_values): """--pid-by-command-name interrogation function.""" # Works on Linux, FreeBSD, OS/X. Hopefully other systems as well. command = 'sh -c "! ps -p %s > /dev/null 2>&1"' % (pid, ) command = last_hop_additions(option_values.sudo, option_values.docker, option_values.docker_container, command) return by_shell_exit_status(option_values, command) # Used to use this for everything, but it doesn't work on the BSD's # return by_shell_exit_status(option_values, '! ls -ld /proc/%d > /dev/null 2>&1' % (pid, )) description = '%s exited' % command wait_for_it(option_values, pid_by_process_name_interrogfn, description) elif sys.argv[1] in ('--check-host-port', '-n'): modifiers_unused = 0 # note that this is only going to work on V3 appliances, not V1 or V2, because V1 and V2 # are based on an old SuSE 9.3 that doesn't include netcat. But perhaps it's better to # do it this way than to ship over some python or something command = 'nc -w 5 %s %s < /dev/null' % (sys.argv[2], sys.argv[3]) command = last_hop_additions(option_values.sudo, option_values.docker, option_values.docker_container, command) def check_port_interrogfn(option_values): """--check-host-port interrogation function.""" return by_shell_exit_status(option_values, command) description = 'host %s:port %s is active' % (sys.argv[2], sys.argv[3]) wait_for_it(option_values, check_port_interrogfn, description) del sys.argv[1] del sys.argv[1] elif sys.argv[1] in ('--email-address', '-E'): modifiers_unused = 1 option_values.email_address = sys.argv[2] del sys.argv[1] elif sys.argv[1] == '--no-email': modifiers_unused = 1 option_values.send_email = False elif sys.argv[1] in ('--max-seconds', '-m'): modifiers_unused = 1 option_values.max_seconds = float(sys.argv[2]) del sys.argv[1] elif sys.argv[1] in ('--check-interval', '-i'): modifiers_unused = 1 option_values.check_interval = float(sys.argv[2]) del sys.argv[1] elif sys.argv[1] in ('--remote-host', '-r'): modifiers_unused = 1 if HAVE_DEEP_SSH: option_values.remote_host = sys.argv[2] else: sys.stderr.write('%s: No deep_ssh.py module found\n' % sys.argv[0]) sys.stderr.write('-r does not work without deep_ssh.py\n') usage(1) del sys.argv[1] elif sys.argv[1] in ('--block', '-b'): modifiers_unused = 1 option_values.go_ahead_and_block = True elif sys.argv[1] == '--mail-account' and sys.argv[2:]: option_values.mail_account = sys.argv[2] del sys.argv[1] get_mail_account(option_values) elif sys.argv[1] in ('--no-blocking', '-w'): modifiers_unused = 1 option_values.go_ahead_and_block = False elif sys.argv[1] in ('--send-page', '-P'): modifiers_unused = 1 option_values.should_send_page = True elif sys.argv[1] in ('--sudo', ): modifiers_unused = 1 option_values.sudo = True elif sys.argv[1] in ('--docker', '-d'): modifiers_unused = 1 option_values.docker = True option_values.docker_container = get_docker_container(option_values) elif sys.argv[1] in ('--docker-container', '-D'): modifiers_unused = 1 option_values.docker = True option_values.docker_container = sys.argv[2] del sys.argv[1] elif sys.argv[1] in ('--help', '-h'): usage(0) else: sys.stderr.write('%s: Illegal option: %s\n' % (sys.argv[0], sys.argv[1])) usage(1) del sys.argv[1] if modifiers_unused: sys.stderr.write('%s; Warning: trailing modifiers unused\n' % sys.argv[0]) def tabs_to_fixed_width(lines: typing.List[str]) -> typing.List[str]: """Convert a list of str containing tab-delimited fields, to a presentable list of str with fixed-width columns.""" list_of_field_lists = [string.split('\t') for string in lines] list_of_field_lists.sort() max_num_cols = max(len(fields) for fields in list_of_field_lists) col_widths = [0] * max_num_cols for colno in range(max_num_cols): col_widths[colno] = max(len(row[colno]) for row in list_of_field_lists) result = [] for row in list_of_field_lists: padding_list = [] for fieldno in range(max_num_cols): if row[fieldno:]: string = row[fieldno].ljust(col_widths[fieldno]) padding_list.append(string) result.append(' '.join(padding_list)) return result def get_docker_container(option_values: OptionValues) -> str: """Select a container name using 'docker ps' and drsmenu.""" cmd = r"docker ps --format '{{ .Names }}\t{{ .Image }}\t{{ .Status }}'" if option_values.sudo: cmd = 'sudo ' + cmd subprocess_popen = possibly_remote_popen( option_values, cmd, ) (stdout, stderr) = subprocess_popen.communicate() lines = stdout.split('\n') if lines[-1] == '': del lines[-1] list_of_strings = tabs_to_fixed_width(lines) docker_container_names = [line.split()[0] for line in list_of_strings] choice = drsmenu.get_menu_selection('Select a docker container by name', list_of_strings) return docker_container_names[choice] def quick_bashquote(string: str) -> str: """Quote string for use in bash.""" bashq = bashquote.BashquoteString() bashq.add(string) return bashq.result() def main() -> None: """Deal with options and interpret the command line.""" option_values = OptionValues() option_values.go_ahead_and_block = True option_values.should_send_page = False option_values.max_seconds = 0 option_values.remote_host = '' option_values.smtp_username = '' option_values.smtp_password = '' option_values.smtp_server = '' option_values.smtp_port = None option_values.initial_delta_value = None comparators: typing.Dict[str, 'Comparator'] = {} for comp in ( ('-l', 'less than', lambda x, y: x < y), ('-L', 'less than or equal to', lambda x, y: x <= y), ('-g', 'greater than', lambda x, y: x > y), ('-G', 'greater than or equal to', lambda x, y: x >= y), ('-e', 'equal to', lambda x, y: x == y) ): comparator = Comparator(comp[1], comp[2]) comparators[comparator.option] = comparator comparators[comp[0]] = comparator # this is just a default if 'NWUEMAIL' in os.environ: option_values.email_address = os.environ['NWUEMAIL'] else: option_values.email_address = '%s@%s' % (pwd.getpwuid(os.geteuid())[0], canonical_hostname()) if 'NWUPAGE' in os.environ: option_values.pager_address = os.environ['NWUPAGE'] else: option_values.pager_address = '' if 'NWUMAILACCOUNT' in os.environ: option_values.mail_account = os.environ['NWUMAILACCOUNT'] get_mail_account(option_values) else: option_values.mail_account = 'sendmail' directory, _unused, _unused = ensure_dir(option_values) make_used(_unused) with open(os.path.join(directory, 'log'), 'a') as log_file: # Write our arguments to ~/.notify-when-up2/log time_of_entry = time.time() command = ' '.join(quick_bashquote(argument) for argument in sys.argv) log_file.write('%s %s %s\n' % (time_of_entry, time.ctime(time_of_entry), command)) interpret(option_values, comparators) if __name__ == '__main__': main()