#!/usr/bin/env python3 """Provide a GUI progress meter for a pipeline or redirect.""" # disable=wrong-import-position # pylint: disable=not-callable,superfluous-parens,wrong-import-position # not-callable: GObject gives lots of not callable errors in pylint # superfluous-parens: Parentheses are good for clarity and portability # wrong-import-position: pylint painted me into a corner import math import multiprocessing import os import pwd import re import select import signal import stat import sys import time import typing class NeverException(BaseException): # noqa: disable=E402 """An exception that is never raised.""" pass # This environment change prevents the annoying warning: # (gprog:8173): dbind-WARNING **: 13:00:49.990: Couldn't connect to accessibility bus: Failed to connect to # socket /tmp/dbus-PFFuABEcgi: Connection refused os.environ["NO_AT_BRIDGE"] = "1" # noqa: E402 import gi gi.require_version("Gtk", "3.0") from gi.repository import Gtk # noqa: E402 # from gi.repository import GObject # noqa: E402 from gi.repository import GLib # noqa: E402 try: gi.require_version("Notify", "0.7") # noqa: disable=E402 except (ValueError, ImportError): HAVE_NOTIFY = False GLIB_ERROR: typing.Type[BaseException] = NeverException else: HAVE_NOTIFY = True from gi.repository import Notify # pylint: disable=no-name-in-module GLIB_ERROR = GLib.Error sys.path.append("/usr/local/lib") import modunits # noqa: E402 import blocksize_optimizer # noqa: E402 import num_den_boxcar_average # noqa: E402 NAN = "Unknown" COMPLETION_MESSAGE = "gprog transfer finished" def make_used(variable): """Convince pyflakes and pylint that variable is used.""" assert True or variable class Status(object): # pylint: disable=too-few-public-methods,too-many-instance-attributes """Status of the transfer.""" def __init__( self, fraction, amount_transferred, rate, blocksize, current_time, iteration_duration, current_start_time, start_time, ): """Initialize.""" # pylint: disable=too-many-arguments self.fraction = fraction self.amount_transferred = amount_transferred self.rate = rate self.blocksize = blocksize self.current_time = current_time self.iteration_duration = iteration_duration self.current_start_time = current_start_time self.start_time = start_time class Block_description(object): # pylint: disable=too-few-public-methods """Hold performance characteristics of a block transfer.""" def __init__(self, duration, size) -> None: """Initialize.""" self.duration = duration self.size = size class DoneDialog(Gtk.Window): # pylint: disable=too-few-public-methods """Pop up a dialog indicating that the transfer is finished.""" def __init__(self) -> None: """Initialize.""" Gtk.Window.__init__(self, title=COMPLETION_MESSAGE) def show_dialog(self) -> None: """Make the dialog pop up.""" dialog = Gtk.MessageDialog( parent=self, flags=0, message_type=Gtk.MessageType.INFO, buttons=Gtk.ButtonsType.CLOSE, text=COMPLETION_MESSAGE, ) dialog.format_secondary_text(Globals.TITLE) dialog.connect("response", self.quit) # These made the dialog modal - it was a headache because it meant you had to track down the popup to exit # the application; that meant File -> Exit did not work. # dialog.run() # dialog.destroy() dialog.show() @staticmethod def quit(*list_) -> None: """Quit True; all went well.""" # pylint: disable=unused-argument Gtk.main_quit() sys.exit(0) class NotLocal(BaseException): """An exception for notify_by_libnotify to raise when not on the console.""" pass def is_local_display(regex: typing.Pattern[str] = re.compile(r"^:[0-9](\.[0-9]+)?$")): """Return True iff $DISPLAY points at a local display.""" if "DISPLAY" not in os.environ: return False match = regex.match(os.environ["DISPLAY"]) return bool(match) def notify_by_libnotify(report: str) -> None: """Create a notify-osd (or similar) popup using GTK+, indicating we've reached our event.""" if not is_local_display(): # We're not local, and libnotify might notify the wrong console, so we avoid libnotify raise NotLocal # One time initialization of libnotify Notify.init("notify-when-up2") # Create the notification object notification = Notify.Notification.new(report) # This seems to be the key to getting a longer notification notification.set_urgency(2) # Highest priority # Actually show on screen notification.show() def get_next_block(read_iterator) -> typing.Tuple[bytes, bool]: """Get a block from read_iterator, detecting EOF.""" block = b"" passed_last_block = False while len(block) == 0: try: block = next(read_iterator) except StopIteration: passed_last_block = True break if len(block) != 0: break time.sleep(0.000001) return (block, passed_last_block) class GUI: # pylint: disable=too-many-public-methods,too-many-statements,too-many-instance-attributes """Present a GUI for a progress meter.""" def __init__(self, size_estimate: typing.Union[int, float], eager_exit: bool) -> None: """Initialize.""" self.done = False self.size_estimate = size_estimate self.eager_exit = eager_exit # 3/4ths of a second self.timeout = 0.75 # Otherwise these are flagged by pylint as "defined outside __init__" self.amount_transferred = None self.blocksize = None self.blocksize_optimizer = None self.gui_update = False self.paused = multiprocessing.Value("i", 0) self.pause_group = None self.prior_gui_update_time = time.time() - 0.5 self.process = multiprocessing.Process(target=self.transfer) self.queue: typing.Optional[multiprocessing.Queue] = None self.status = None self.all_done = False self.storage_units = Globals.INITIAL_STORAGE_UNITS self.rates_units = Globals.INITIAL_RATE_UNITS # We can't set these in the parent process, because macOS' multiprocessing doesn't like it. It doesn't # support fork(). # self.infile = sys.stdin.buffer # self.outfile = sys.stdout.buffer # we make sys.stdin something the multiprocessing module can close without creating an elusive bug sys.stdin = open(os.devnull, "r") self.boxcar_average = num_den_boxcar_average.Num_den_boxcar_average( Globals.NUM_FOR_PARTIAL_ESTIMATE, numerator_attr="size", denominator_attr="duration", independent_total_attr="size" ) self.blocks_transferred = 0 self.subprocess_init() self.gui_init() def subprocess_init(self) -> None: """Set up our subprocess.""" # We'll use the Queue for interprocess communication, one process for the stdin/stdout I/O, and one process for the GUI. # We limit the number of items in the queue to 1000, so we don't overwhelm a system. self.queue = multiprocessing.Queue(1000) # Using a shared integer like this, as a boolean, allows us to avoid the deadlocks of locks, # and simplify the code, at the expense of a touch of busy waiting in the child process # during pauses. This way is also quite a bit more responsive in the GUI. self.process.start() # Catch some signals so we kill our subprocess if we die for sig in [signal.SIGTERM, signal.SIGINT, signal.SIGHUP]: signal.signal(sig, self.quit_with_error_exit_status) def iec_for_storage(self, widget): """Use IEC for storage units.""" make_used(widget) self.storage_units = "iec" def si_for_storage(self, widget): """Use SI for storage units.""" make_used(widget) self.storage_units = "si" def iec_for_rates(self, widget): """Use IEC for rate units.""" make_used(widget) self.rates_units = "iec" def si_for_rates(self, widget): """Use SI for rate units.""" make_used(widget) self.rates_units = "si" def pause(self, widget): """Pause the transfer.""" if widget.get_active(): self.paused.value = 1 else: self.paused.value = 0 def resume(self, widget): """Resume after pausing.""" if widget.get_active(): self.paused.value = 0 else: self.paused.value = 1 def exit_eagerly(self, widget): """Exit at the end - don't wait for the user to acknowledge transfer status.""" make_used(widget) if self.all_done: sys.exit(0) self.eager_exit = True def exit_reluctantly(self, widget): """Don't exit at the end unless manually requested by the user in the GUI.""" make_used(widget) self.eager_exit = False def gui_init(self) -> None: """Initialize the GUI.""" # pylint: disable=too-many-statements self.window = Gtk.Window(type=Gtk.WindowType.TOPLEVEL) self.window.connect("delete_event", self.quit_with_cond_exit_status) self.window.set_title(Globals.TITLE) self.vbox = Gtk.VBox() self.vbox.show() self.window.add(self.vbox) self.vbox.set_border_width(10) # Start of menu stuff self.menu_bar = Gtk.MenuBar() self.menu_bar.show() # File menu with exit option self.file_menu = Gtk.Menu() self.file_menu.show() file_menu_item = Gtk.MenuItem(label="File") file_menu_item.show() file_menu_item.set_submenu(self.file_menu) exit_menu_option = Gtk.MenuItem(label="Exit") exit_menu_option.show() exit_menu_option.connect("activate", self.quit_with_cond_exit_status) self.file_menu.append(exit_menu_option) self.menu_bar.append(file_menu_item) # view menu with options for changing size and rate units self.view_menu = Gtk.Menu() self.view_menu.show() view_menu_item = Gtk.MenuItem(label="View") view_menu_item.show() view_menu_item.set_submenu(self.view_menu) iec_storage_menu_option = Gtk.RadioMenuItem(label="Storage reported in IEC units (2^10)") if self.storage_units == "iec": iec_storage_menu_option.set_active(True) iec_storage_menu_option.show() iec_storage_menu_option.connect("activate", self.iec_for_storage) self.view_menu.append(iec_storage_menu_option) si_storage_menu_option = Gtk.RadioMenuItem( group=iec_storage_menu_option, label="Storage reported in SI units (10^3)", ) if self.storage_units == "si": si_storage_menu_option.set_active(True) si_storage_menu_option.show() si_storage_menu_option.connect("activate", self.si_for_storage) self.view_menu.append(si_storage_menu_option) sep_storage_rates_menu_option = Gtk.SeparatorMenuItem() sep_storage_rates_menu_option.show() self.view_menu.append(sep_storage_rates_menu_option) iec_rates_menu_option = Gtk.RadioMenuItem(label="Rates reported in IEC units (2^10)") if self.rates_units == "iec": iec_rates_menu_option.set_active(True) iec_rates_menu_option.show() iec_rates_menu_option.connect("activate", self.iec_for_rates) self.view_menu.append(iec_rates_menu_option) si_rates_menu_option = Gtk.RadioMenuItem(group=iec_rates_menu_option, label="Rates reported in SI units (10^3)") if self.rates_units == "si": si_rates_menu_option.set_active(True) si_rates_menu_option.show() si_rates_menu_option.connect("activate", self.si_for_rates) self.view_menu.append(si_rates_menu_option) self.menu_bar.append(view_menu_item) # processing menu with pause and resume options self.processing_menu = Gtk.Menu() self.processing_menu.show() processing_menu_item = Gtk.MenuItem(label="Processing") processing_menu_item.show() processing_menu_item.set_submenu(self.processing_menu) self.pause_menu_option = Gtk.RadioMenuItem(label="Pause") self.pause_group = self.pause_menu_option self.pause_menu_option.show() self.pause_menu_option.connect("activate", self.pause) self.processing_menu.append(self.pause_menu_option) self.resume_menu_option = Gtk.RadioMenuItem(group=self.pause_group, label="Resume") self.resume_menu_option.set_active(True) self.resume_menu_option.show() self.resume_menu_option.connect("activate", self.resume) self.processing_menu.append(self.resume_menu_option) sep_processing_menu_option = Gtk.SeparatorMenuItem() sep_processing_menu_option.show() self.processing_menu.append(sep_processing_menu_option) self.quit_when_done_menu_option = Gtk.RadioMenuItem(label="Exit immediately on End of File") self.quit_when_done_menu_option.show() self.quit_when_done_menu_option.connect("activate", self.exit_eagerly) if Globals.QUIT_WHEN_DONE: self.quit_when_done_menu_option.set_active(True) self.wait_when_done_menu_option = Gtk.RadioMenuItem( group=self.quit_when_done_menu_option, label="Get user confirmation on End of File" ) self.wait_when_done_menu_option.show() self.wait_when_done_menu_option.connect("activate", self.exit_reluctantly) if not Globals.QUIT_WHEN_DONE: self.wait_when_done_menu_option.set_active(True) self.processing_menu.append(self.quit_when_done_menu_option) self.processing_menu.append(self.wait_when_done_menu_option) self.menu_bar.append(processing_menu_item) self.vbox.pack_start(self.menu_bar, False, False, 0) # end of menu stuff self.table = Gtk.Table() self.table.show() self.table.set_border_width(10) # self.table.set_homogeneous(True) # FIXME: Deprecated since version 3.4: Use Gtk.Grid.set_row_homogeneous() and # Gtk.Grid.set_column_homogeneous() with Gtk.Grid. self.vbox.pack_start(self.table, True, True, 0) # add canvas here later self.start_time = time.time() row = 0 self.start_time_label0 = Gtk.Label() self.start_time_label0.show() self.start_time_label0.set_text("Start time") self.start_time_label1 = Gtk.Label() self.start_time_label1.show() self.start_time_label1.set_text(time.ctime(self.start_time)) self.table.attach(self.start_time_label0, 0, 1, row, row + 1) self.table.attach(self.start_time_label1, 1, 2, row, row + 1) row += 1 self.size_estimate_label0 = Gtk.Label() self.size_estimate_label0.show() self.size_estimate_label0.set_text("Size estimate") self.size_estimate_label1 = Gtk.Label() self.size_estimate_label1.show() # We don't need to special case 0 for this part if math.isnan(self.size_estimate): self.size_estimate_label1.set_text(NAN) else: assert isinstance(self.size_estimate, int) self.size_estimate_label1.set_text( modunits.modunits( "computer-size-%s" % self.storage_units, self.size_estimate, fractional_part_length=1, units="unabbreviated" ) ) self.table.attach(self.size_estimate_label0, 0, 1, row, row + 1) self.table.attach(self.size_estimate_label1, 1, 2, row, row + 1) row += 1 self.unchanging_separator = Gtk.HSeparator() self.unchanging_separator.show() self.table.attach(self.unchanging_separator, 0, 2, row, row + 1) row += 1 self.current_time_label0 = Gtk.Label(label="Current time") self.current_time_label0.show() self.current_time_label1 = Gtk.Label() self.current_time_label1.show() self.table.attach(self.current_time_label0, 0, 1, row, row + 1) self.table.attach(self.current_time_label1, 1, 2, row, row + 1) row += 1 self.elapsed_time_label0 = Gtk.Label(label="Elapsed time") self.elapsed_time_label0.show() self.elapsed_time_label1 = Gtk.Label() self.elapsed_time_label1.show() self.table.attach(self.elapsed_time_label0, 0, 1, row, row + 1) self.table.attach(self.elapsed_time_label1, 1, 2, row, row + 1) row += 1 self.amount_transferred_label0 = Gtk.Label(label="Amount transferred") self.amount_transferred_label0.show() self.amount_transferred_label1 = Gtk.Label() self.amount_transferred_label1.show() self.table.attach(self.amount_transferred_label0, 0, 1, row, row + 1) self.table.attach(self.amount_transferred_label1, 1, 2, row, row + 1) row += 1 self.blocks_transferred_label0 = Gtk.Label(label="Number of blocks transferred") self.blocks_transferred_label0.show() self.blocks_transferred_label1 = Gtk.Label(label="Unknown") self.blocks_transferred_label1.show() self.table.attach(self.blocks_transferred_label0, 0, 1, row, row + 1) self.table.attach(self.blocks_transferred_label1, 1, 2, row, row + 1) row += 1 self.blocksize_label0 = Gtk.Label(label="Current blocksize") self.blocksize_label0.show() self.blocksize_label1 = Gtk.Label() self.blocksize_label1.show() self.table.attach(self.blocksize_label0, 0, 1, row, row + 1) self.table.attach(self.blocksize_label1, 1, 2, row, row + 1) row += 1 self.percent_label0 = Gtk.Label(label="Percent complete by size") self.percent_label0.show() self.percent_label1 = Gtk.Label() self.percent_label1.show() self.table.attach(self.percent_label0, 0, 1, row, row + 1) self.table.attach(self.percent_label1, 1, 2, row, row + 1) row += 1 self.progress_bar = Gtk.ProgressBar() self.progress_bar.show() self.table.attach(self.progress_bar, 0, 2, row, row + 1) self.set_progress(0.0) row += 1 self.both_separator = Gtk.HSeparator() self.both_separator.show() self.table.attach(self.both_separator, 0, 2, row, row + 1) row += 1 self.entire_throughput_label0 = Gtk.Label(label="Throughput over entire transfer so far") self.entire_throughput_label0.show() self.entire_throughput_label1 = Gtk.Label() self.entire_throughput_label1.show() self.table.attach(self.entire_throughput_label0, 0, 1, row, row + 1) self.table.attach(self.entire_throughput_label1, 1, 2, row, row + 1) row += 1 self.entire_est_time_left_label0 = Gtk.Label(label="Estimated time remaining based on transfer so far") self.entire_est_time_left_label0.show() self.entire_est_time_left_label1 = Gtk.Label(label="Unknown") self.entire_est_time_left_label1.show() self.table.attach(self.entire_est_time_left_label0, 0, 1, row, row + 1) self.table.attach(self.entire_est_time_left_label1, 1, 2, row, row + 1) row += 1 self.entire_est_time_of_done_label0 = Gtk.Label(label="Estimated time of completion based on transfer so far") self.entire_est_time_of_done_label0.show() self.entire_est_time_of_done_label1 = Gtk.Label(label="Unknown") self.entire_est_time_of_done_label1.show() self.table.attach(self.entire_est_time_of_done_label0, 0, 1, row, row + 1) self.table.attach(self.entire_est_time_of_done_label1, 1, 2, row, row + 1) row += 1 self.partial_separator = Gtk.HSeparator() self.partial_separator.show() self.table.attach(self.partial_separator, 0, 2, row, row + 1) row += 1 self.partial_throughput_label0 = Gtk.Label(label="Throughput over last %d blocks" % Globals.NUM_FOR_PARTIAL_ESTIMATE) self.partial_throughput_label0.show() self.partial_throughput_label1 = Gtk.Label(label="Unknown") self.partial_throughput_label1.show() self.table.attach(self.partial_throughput_label0, 0, 1, row, row + 1) self.table.attach(self.partial_throughput_label1, 1, 2, row, row + 1) row += 1 self.partial_est_time_left_label0 = Gtk.Label( label="Estimated time remaining based on last %d blocks" % Globals.NUM_FOR_PARTIAL_ESTIMATE ) self.partial_est_time_left_label0.show() self.partial_est_time_left_label1 = Gtk.Label(label="Unknown") self.partial_est_time_left_label1.show() self.table.attach(self.partial_est_time_left_label0, 0, 1, row, row + 1) self.table.attach(self.partial_est_time_left_label1, 1, 2, row, row + 1) row += 1 self.partial_est_time_of_done_label0 = Gtk.Label( label="Estimated time of completion based on last %d blocks" % Globals.NUM_FOR_PARTIAL_ESTIMATE ) self.partial_est_time_of_done_label0.show() self.partial_est_time_of_done = Gtk.Label(label="Unknown") self.partial_est_time_of_done.show() self.table.attach(self.partial_est_time_of_done_label0, 0, 1, row, row + 1) self.table.attach(self.partial_est_time_of_done, 1, 2, row, row + 1) row += 1 self.size_of_part_blocks_label0 = Gtk.Label(label="Total size of last %d blocks" % Globals.NUM_FOR_PARTIAL_ESTIMATE) self.size_of_part_blocks_label0.show() self.size_of_part_blocks_label1 = Gtk.Label(label="Unknown") self.size_of_part_blocks_label1.show() self.table.attach(self.size_of_part_blocks_label0, 0, 1, row, row + 1) self.table.attach(self.size_of_part_blocks_label1, 1, 2, row, row + 1) row += 1 self.done = True self.done_button = Gtk.Button(label="Acknowledge completion and exit") self.done_button.connect("pressed", self.quit_with_success_exit_status) self.vbox.pack_start(self.done_button, True, True, 0) self.done_dialog = DoneDialog() # We do the window last so we don't get a brief GUI glitch on startup. # Other shows are done immediately so they don't get missed self.window.show() GLib.idle_add(self.get_status_and_display) def set_progress(self, fraction: float) -> None: """Update the progress display.""" if math.isnan(fraction): self.percent_label1.set_text(NAN) self.progress_bar.pulse() else: self.percent_label1.set_text("%3.1f%%" % (fraction * 100)) self.progress_bar.set_fraction(min(fraction, 1.0)) def read(self): """Read a block from the input file.""" while True: select_result = select.select([0], [], [], self.timeout) assert not select_result[1] assert not select_result[2] if not select_result[0]: yield b"" continue buffer_ = os.read(0, self.blocksize) if not buffer_: return yield buffer_ def write(self, buffer_): """Write a buffer to the output file.""" offset = 0 length_of_buffer = len(buffer_) while offset < length_of_buffer: select_result = select.select([], [1], [], self.timeout) assert not select_result[0] assert not select_result[2] if not select_result[1]: return offset length_written = os.write(1, buffer_[offset:]) offset += length_written return offset def closes(self): """Close the input and output files.""" os.close(1) os.close(0) def transfer(self): """Transfer the data :).""" # pylint: disable=too-many-locals,too-many-branches initial_time = time.time() # We do this in the child now, for the benefit of macOS # self.infile = sys.stdin.buffer # self.outfile = sys.stdout.buffer # This "extra_time" variable is weird. We want to track the duration of something, # but how long is the duration of the tracking itself? # How does the watcher watch itself? The solution used here is to keep track of # how long the "watch" part of an iteration # takes, and add it to the _NEXT_ iteration - with an initial duration of 0. extra_time = 0 self.amount_transferred = 0 self.blocksize_optimizer = blocksize_optimizer.blocksize_optimizer( Globals.MIN_BLOCKSIZE, Globals.MAX_BLOCKSIZE, Globals.RESIZE_PROBABILITY_DECREMENT ) self.blocksize = Globals.INITIAL_BLOCKSIZE read_iterator = self.read() first_iteration = True block = b"" while True: while self.paused.value: time.sleep(0.1) current_start_time = time.time() passed_last_block = False block, passed_last_block = get_next_block(read_iterator) if passed_last_block: break length_written = self.write(block) block = block[length_written:] current_end_time = time.time() watch_t0 = current_end_time this_transfer_size = length_written self.amount_transferred += this_transfer_size iteration_duration = current_end_time - current_start_time + extra_time total_duration_so_far = current_end_time - initial_time if first_iteration: first_iteration = False else: if this_transfer_size == self.blocksize: self.blocksize_optimizer.put(this_transfer_size, iteration_duration) if total_duration_so_far != 0: bytes_per_second = self.amount_transferred / total_duration_so_far else: bytes_per_second = 0 if self.size_estimate == 0: frac = float("nan") else: frac = float(self.amount_transferred) / self.size_estimate self.status = Status( fraction=frac, amount_transferred=self.amount_transferred, rate=bytes_per_second, blocksize=this_transfer_size, current_time=time.time(), iteration_duration=iteration_duration, current_start_time=current_start_time, start_time=initial_time, ) self.queue.put(("status", self.status)) self.blocksize = self.blocksize_optimizer.get() watch_t1 = time.time() extra_time = watch_t1 - watch_t0 self.closes() self.queue.put(("done", 100)) # sys.exit(0) def show_throughput_numbers( self, *, avg_xfer_bytes_per_sec: float, bytes_remaining: int, current_time: float, throughput_label: Gtk.Label, time_remaining_label: Gtk.Label, time_of_completion_label: Gtk.Label, ): """Show throughput data.""" # pylint: disable=too-many-arguments if self.gui_update: throughput_label.set_text( modunits.modunits( "computer-bits-per-second-%s" % self.rates_units, int(avg_xfer_bytes_per_sec) * 8, fractional_part_length=1, units="unabbreviated", ) ) if avg_xfer_bytes_per_sec: time_remaining = bytes_remaining / avg_xfer_bytes_per_sec if self.gui_update: if math.isnan(time_remaining): time_remaining_label.set_text(NAN) else: if time_remaining >= 0: time_remaining_label.set_text( modunits.modunits( "time", int(time_remaining), detail="two-highest", reverse=True, units="unabbreviated" ) ) else: time_remaining_label.set_text( "-" + modunits.modunits( "time", -int(time_remaining), detail="two-highest", reverse=True, units="unabbreviated" ) ) time_of_completion = current_time + time_remaining if self.gui_update and not math.isnan(time_of_completion): time_of_completion_label.set_text(time.ctime(time_of_completion)) def handle_status(self, *, extra) -> None: """Deal with a status update.""" current_time = time.time() if self.prior_gui_update_time < current_time - Globals.GUI_UPDATE_INTERVAL: self.prior_gui_update_time = current_time self.gui_update = True else: self.gui_update = False status = extra if self.gui_update: # This doesn't change much, but the _units_ may change... if math.isnan(self.size_estimate): self.size_estimate_label1.set_text(NAN) else: assert isinstance(self.size_estimate, int) self.size_estimate_label1.set_text( modunits.modunits( "computer-size-%s" % self.storage_units, self.size_estimate, fractional_part_length=1, units="unabbreviated", ) ) if self.gui_update: self.set_progress(status.fraction) self.blocks_transferred += 1 if self.gui_update: self.blocks_transferred_label1.set_text(str(self.blocks_transferred)) elapsed_time = status.current_time - status.start_time amount_remaining = Globals.SIZE_ESTIMATE - status.amount_transferred # start of misc labels if self.gui_update: self.amount_transferred_label1.set_text( modunits.modunits( "computer-size-%s" % self.storage_units, status.amount_transferred, fractional_part_length=1, units="unabbreviated", ) ) self.current_time_label1.set_text(time.ctime(status.current_time)) self.elapsed_time_label1.set_text( modunits.modunits("time", elapsed_time, detail="two-highest", reverse=True, units="unabbreviated") ) self.blocksize_label1.set_text(str(status.blocksize)) # end of misc labels # partial throughput numbers result = self.boxcar_average.add(Block_description(status.iteration_duration, status.blocksize)) partial_throughput_average = result.average if partial_throughput_average is not None: self.show_throughput_numbers( avg_xfer_bytes_per_sec=partial_throughput_average, bytes_remaining=amount_remaining, current_time=status.current_time, throughput_label=self.partial_throughput_label1, time_remaining_label=self.partial_est_time_left_label1, time_of_completion_label=self.partial_est_time_of_done, ) if self.gui_update: text = modunits.modunits( "computer-size-%s" % self.storage_units, result.independent_total, fractional_part_length=1, units="unabbreviated", ) self.size_of_part_blocks_label1.set_text(text) # end of partial throughput numbers # entire throughput numbers if elapsed_time == 0: entire_throughput_average = 0 else: entire_throughput_average = status.amount_transferred / elapsed_time self.show_throughput_numbers( avg_xfer_bytes_per_sec=entire_throughput_average, bytes_remaining=amount_remaining, current_time=status.current_time, throughput_label=self.entire_throughput_label1, time_remaining_label=self.entire_est_time_left_label1, time_of_completion_label=self.entire_est_time_of_done_label1, ) # end of entire throughput numbers def handle_done(self) -> None: """Deal with EOF.""" if self.eager_exit: self.quit_with_success_exit_status() else: self.set_progress(1) self.done_button.show() libnotify_worked = False if HAVE_NOTIFY: try: notify_by_libnotify(COMPLETION_MESSAGE) except (GLIB_ERROR, NotLocal): # pylint: disable=catching-non-exception libnotify_worked = False else: libnotify_worked = True if not libnotify_worked: self.done_dialog.show_dialog() self.all_done = True def get_status_and_display(self) -> bool: """Get status of transfer and display it.""" # pylint: disable=too-many-branches assert self.queue is not None if self.queue.empty(): time.sleep(0.01) else: tuple_ = self.queue.get() kind, extra = tuple_ del tuple_ if kind == "status": self.handle_status(extra=extra) elif kind == "done": self.handle_done() return False return True def quit_it(self, gui_kill=True) -> None: """Exit.""" if hasattr(self, "process") and hasattr(self.process, "pid"): assert isinstance(self.process.pid, int) os.kill(self.process.pid, signal.SIGTERM) else: pass if gui_kill: Gtk.main_quit() def quit_with_success_exit_status(self, *list_: typing.List[typing.Any]) -> None: """Quit True; all went well.""" # pylint: disable=unused-argument self.quit_it(gui_kill=True) sys.exit(0) def quit_with_cond_exit_status(self, *list_): """Quit True if finished well, False if not.""" # pylint: disable=unused-argument self.quit_it(gui_kill=True) if self.done: sys.exit(0) else: sys.exit(1) def quit_with_error_exit_status(self, signum, frame, gui_kill=True): """Kill the GUI and exit False.""" # pylint: disable=unused-argument make_used(signum) make_used(frame) self.quit_it(gui_kill=True) sys.exit(1) def usage(exit_code: int) -> None: """Give a usage message.""" if exit_code == 0: write = sys.stdout.write else: write = sys.stderr.write write("Usage: %s\n" % sys.argv[0]) write("\t--initial-blocksize defaults to %d and will be adjusted according to performance\n" % Globals.INITIAL_BLOCKSIZE) write("\t--resize-probability-decrement defaults to %f; 1 guesses minimally,\n" % Globals.RESIZE_PROBABILITY_DECREMENT) write("\t 0 guesses every time.\n") write("\t Guessing a lot means you spend more time finding a fast blocksize, but the result is\n") write("\t closer to optimal once settled upon\n") write("\t--gui-update-interval defaults to %f. Units are (fractions of) seconds.\n" % Globals.GUI_UPDATE_INTERVAL) write("\t--min-blocksize defaults to %d\n" % Globals.MIN_BLOCKSIZE) write("\t--max-blocksize defaults to %d\n" % Globals.MAX_BLOCKSIZE) write("\t--size-estimate defaults to the length from stat (if possible)\n") string = "\t--partial-estimate-blocks The number of blocks to use for the partial estimate " + ( "(defaults to %d)\n" % Globals.NUM_FOR_PARTIAL_ESTIMATE ) write(string) write("\t--initial-storage-units SI|IEC\n") write("\t--initial-rate-units SI|IEC\n") write("\t--title title\n") write("\t--quit-when-done\n") write("\t--help\n") write("\n") write("All command line options' units are in bytes\n") sys.exit(exit_code) def drop_privileges() -> None: """ Drop privileges. Based on https://stackoverflow.com/questions/2699907/dropping-root-permissions-in-python """ if os.uname().sysname == "Darwin": # macOS (Darwin) does not allow setuid or setgid in about any form, apparently. return if os.getuid() != 0 and os.geteuid() != 0: # We have nothing to give up, so just return without changing anything. return login_name = os.getlogin() pwent = pwd.getpwnam(login_name) login_uid = pwent.pw_uid login_gid = pwent.pw_gid # Remove all auxiliary groups os.setgroups([]) # Try setting the uid/gid if login_gid != 0: os.setregid(login_gid, login_gid) os.setreuid(login_uid, login_uid) class Globals: """A singleton of globals.""" MIN_BLOCKSIZE = 2**10 MAX_BLOCKSIZE = 2**20 INITIAL_BLOCKSIZE = (MIN_BLOCKSIZE + MAX_BLOCKSIZE) // 2 SIZE_ESTIMATE: typing.Union[int, float] = -1 QUIT_WHEN_DONE = False RESIZE_PROBABILITY_DECREMENT = 0.001 GUI_UPDATE_INTERVAL = 0.25 TITLE = "gprog" INITIAL_STORAGE_UNITS = "iec" INITIAL_RATE_UNITS = "si" NUM_FOR_PARTIAL_ESTIMATE = 1000 class Options: """Parse commandline options.""" def __init__(self) -> None: """Parse all commandline options.""" while sys.argv[1:]: self.parse_one(sys.argv[1]) del sys.argv[1] def parse_one(self, argument: str) -> None: """Parse one commandline option.""" if self.parse_a(argument): return elif self.parse_b(argument): return elif self.parse_c(argument): return sys.stderr.write("%s: Illegal option: %s\n" % (sys.argv[0], sys.argv[1])) usage(1) def parse_a(self, argument: str) -> bool: """Parse some of the commandline options.""" match argument: case "--initial-blocksize": Globals.INITIAL_BLOCKSIZE = int(sys.argv[2]) del sys.argv[1] return True case "--initial-storage-units": ARGUMENT = sys.argv[2].lower() if ARGUMENT in ["iec", "si"]: Globals.INITIAL_STORAGE_UNITS = ARGUMENT else: sys.stderr.write("%s: storage units must be IEC or SI (case insignificant)\n" % sys.argv[0]) usage(1) del sys.argv[1] return True case "--initial-rate-units": ARGUMENT = sys.argv[2].lower() if ARGUMENT in ["iec", "si"]: Globals.INITIAL_RATE_UNITS = ARGUMENT else: sys.stderr.write("%s: rate units must be IEC or SI (case insignificant)\n" % sys.argv[0]) usage(1) del sys.argv[1] return True return False def parse_b(self, argument: str) -> bool: """Parse some of the commandline options.""" match argument: case "--title": Globals.TITLE = "gprog: %s" % sys.argv[2] del sys.argv[1] return True case "--gui-update-interval": Globals.GUI_UPDATE_INTERVAL = float(sys.argv[2]) del sys.argv[1] return True case "--resize-probability-decrement": Globals.RESIZE_PROBABILITY_DECREMENT = float(sys.argv[2]) del sys.argv[1] return True case "--max-blocksize": Globals.MAX_BLOCKSIZE = int(sys.argv[2]) del sys.argv[1] return True case "--partial-estimate-blocks": Globals.NUM_FOR_PARTIAL_ESTIMATE = int(sys.argv[2]) del sys.argv[1] return True return False def parse_c(self, argument: str) -> bool: """Parse some of the commandline options.""" match argument: case "--min-blocksize": Globals.MIN_BLOCKSIZE = int(sys.argv[2]) del sys.argv[1] return True case "--size-estimate": Globals.SIZE_ESTIMATE = int(sys.argv[2]) del sys.argv[1] return True case "--quit-when-done": Globals.QUIT_WHEN_DONE = True return True case "--help" | "-h": usage(0) return True return False def main() -> None: """Present GUI and perform data transfer.""" # pylint: disable=no-member drop_privileges() Options() if Globals.SIZE_ESTIMATE == -1: # This is still only an estimate, because a file can grow while we're reading it STAT_BUF = os.fstat(0) Globals.SIZE_ESTIMATE = STAT_BUF.st_size if Globals.SIZE_ESTIMATE == 0: # This could be one of two things: # 1) An empty file # 2) Some sort of special file, like /dev/zero, a disk (as opposed to a file on a disk), or a pipe if stat.S_ISFIFO(STAT_BUF.st_mode): sys.stderr.write("Pipe or FIFO detected\n") Globals.SIZE_ESTIMATE = float("nan") elif not (stat.S_IFREG & STAT_BUF.st_mode): sys.stderr.write("Something other than a regular file or pipe detected\n") Globals.SIZE_ESTIMATE = float("nan") gui = GUI(Globals.SIZE_ESTIMATE, Globals.QUIT_WHEN_DONE) try: Gtk.main() except KeyboardInterrupt: gui.quit_it(gui_kill=True) if __name__ == "__main__": main()