#!/usr/bin/python3 """A fallback-reboot client in Python.""" import os import sys import socket import getpass import binascii import subprocess sys.path.insert(0, os.path.expanduser("~/lib")) sys.path.insert(0, "/usr/local/lib") import bufsock as bufsock_mod def usage(retval: int) -> None: """Output a usage message.""" if retval == 0: file_ = sys.stdout else: file_ = sys.stderr file_.write(sys.argv[0] + " hostname [portnumber]\n") file_.write("\n") file_.write("portnumber defaults to 3002\n") sys.exit(retval) class Fake_it: """Pretend we have a native RIPEMD-160 hash using the openssl executable.""" def __init__(self) -> None: """Initialize.""" dirs = os.environ["PATH"].split(":") found = False for possible in dirs: try: file_ = open(possible + "/openssl", "r") except (OSError, IOError): pass else: file_.close() found = True openssl_path = possible + "/openssl" break if found: sys.stderr.write("Good! Found the openssl binary.\n") else: sys.stderr.write("Please install openssl (the executable, not the library) and retry.\n") sys.exit(1) command = "{} rmd160".format(openssl_path) process = subprocess.Popen( command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=False ) assert process.stdin is not None assert process.stdout is not None (self.stdin, self.stdouterr) = (bufsock_mod.bufsock(process.stdin), bufsock_mod.bufsock(process.stdout)) def update(self, text: bytes) -> None: """Add text.""" self.stdin.write(text) self.stdin.flush() def digest(self) -> bytes: """Return the digest.""" self.stdin.close() stripped_line = self.stdouterr.readline().strip() for bad_string in (b"(stdin)= ", b"RIPEMD-160(stdin)= "): if stripped_line.startswith(bad_string): stripped_line = stripped_line[len(bad_string) :] return binascii.a2b_hex(stripped_line) def main() -> None: # pylint: disable=too-many-statements,too-many-branches """Connect, authenticate and (implicitly) initiate.""" digest_obj = Fake_it() hostname = None port = None if sys.argv[1:]: try: hostname = sys.argv[1] except IndexError: usage(1) if sys.argv[2:]: try: port = int(sys.argv[2]) except ValueError: usage(1) else: port = 3002 else: sys.stderr.write("{}: Must specify a hostname\n".format(sys.argv[0])) sys.exit(1) all_good = True if hostname is None: print(f"{sys.argv[0]}: argv[1] must be a hostname") all_good = False if port is None: print(f"{sys.argv[0]}: argv[2] must be a port") all_good = False if sys.argv[3:]: print(f"{sys.argv[0]}: argv[3] must not exist") all_good = False if not all_good: print(f"{sys.argv[0]}: argv[3] must not exist") usage(1) assert hostname is not None assert port is not None sock = socket.create_connection((hostname, port)) bufsock = bufsock_mod.bufsock(sock) version = bufsock.readto(b"\n") if b"RIPEMD-160 version" in version: sys.stderr.write("Connected to RIPEMD-160 (cryptographic, supported) version of fallback-reboot.\n") else: print("Garbled input 1, exiting...") sys.exit(1) hashline = bufsock.readto(b"\n") if b"Challenge is " in hashline: fields = hashline.split() challenge_hex = fields[2] print("received challenge {}".format(challenge_hex.decode("utf-8"))) else: print("Garbled input 2, exiting...") sys.exit(1) enter = bufsock.readto(b"\n") if b"Enter" not in enter: print("Garbled input 3, exiting...") sys.exit(1) if os.isatty(0): # If we're on a tty, then prompt, and don't echo passwd_hex = getpass.getpass("Enter password: ").encode("UTF-8") else: # if we're not on a tty, then don't prompt, and let the # nontty sort out whether to echo :) But it probably won't, # and these passwords are too long for shoulder-surfing # anyway :) passwd_hex = sys.stdin.readline().strip().encode("UTF-8") digest_obj.update(passwd_hex) digest_obj.update(challenge_hex) result = binascii.b2a_hex(digest_obj.digest()) bufsock.send(result + b"\n") bufsock.flush() response = bufsock.readto(b"\n") print(response) main()