#!/usr/bin/env python3 """Provide a GUI progress meter for a pipeline or redirect.""" # disable=wrong-import-position import ctypes import math import multiprocessing import multiprocessing.sharedctypes import os import pwd import re import select import signal import stat import sys import time import types import typing # This is the default on Linux. On MacOS the default is "spawn", which causes tracebacks. multiprocessing.set_start_method("fork") class NeverException(BaseException): # noqa: disable=E402 """An exception that is never raised.""" pass # This environment change prevents the annoying warning: # (gprog:8173): dbind-WARNING **: 13:00:49.990: Couldn't connect to accessibility bus: Failed to connect to # socket /tmp/dbus-PFFuABEcgi: Connection refused os.environ["NO_AT_BRIDGE"] = "1" # noqa: E402 import gi gi.require_version("Gtk", "4.0") gi.require_version("Gio", "2.0") from gi.repository import Gtk, Gio # noqa: E402 # from gi.repository import GObject # noqa: E402 from gi.repository import GLib # noqa: E402 try: gi.require_version("Notify", "0.7") # noqa: disable=E402 except (ValueError, ImportError): HAVE_NOTIFY = False GLIB_ERROR: typing.Type[BaseException] = NeverException else: HAVE_NOTIFY = True from gi.repository import Notify GLIB_ERROR = GLib.Error sys.path.append("/usr/local/lib") import modunits # noqa: E402 import blocksize_optimizer # noqa: E402 import num_den_boxcar_average # noqa: E402 import block_description NAN = "Unknown" COMPLETION_MESSAGE = "gprog transfer finished" def resizeable_label(label: Gtk.Label) -> None: """ Make a label resizable. Shamelessly lifted from Google AI Summary. """ # Set the label to wrap text when it exceeds its allocated width # label.set_wrap(True) # Align the label to fill its container's space label.set_halign(Gtk.Align.FILL) label.set_valign(Gtk.Align.FILL) # Set the label to expand horizontally and vertically within the box label.set_hexpand(True) label.set_vexpand(True) # Set a maximum width for characters to encourage wrapping initially label.set_max_width_chars(1) # This helps trigger the wrapping behavior class Status(object): """Status of the transfer.""" def __init__( self, fraction: float, amount_transferred: int, rate: float, blocksize: int, current_time: float, iteration_duration: float, current_start_time: float, start_time: float, ) -> None: """Initialize.""" self.fraction = fraction self.amount_transferred = amount_transferred self.rate = rate self.blocksize = blocksize self.current_time = current_time self.iteration_duration = iteration_duration self.current_start_time = current_start_time self.start_time = start_time class DoneDialog(Gtk.Window): """Pop up a dialog indicating that the transfer is finished.""" def __init__(self) -> None: """Initialize.""" Gtk.Window.__init__(self, title=COMPLETION_MESSAGE) def show_dialog(self) -> None: """Make the dialog pop up.""" dialog = Gtk.MessageDialog( # parent=self, # flags=0, message_type=Gtk.MessageType.INFO, buttons=Gtk.ButtonsType.CLOSE, text=COMPLETION_MESSAGE, ) # dialog.format_secondary_text(Globals.TITLE) dialog.connect("response", self.quit) # These made the dialog modal - it was a headache because it meant you had to track down the popup to exit # the application; that meant File -> Exit did not work. # dialog.run() # dialog.destroy() dialog.show() def quit(self, *list_: list[typing.Any]) -> None: """Quit True; all went well.""" del list_ sys.exit(0) class NotLocal(BaseException): """An exception for notify_by_libnotify to raise when not on the console.""" pass def is_local_display(regex: typing.Pattern[str] = re.compile(r"^:[0-9](\.[0-9]+)?$")) -> bool: """Return True iff $DISPLAY points at a local display.""" if "DISPLAY" not in os.environ: return False match = regex.match(os.environ["DISPLAY"]) return bool(match) def notify_by_libnotify(report: str) -> None: """Create a notify-osd (or similar) popup using GTK+, indicating we've reached our event.""" if not is_local_display(): # We're not local, and libnotify might notify the wrong console, so we avoid libnotify raise NotLocal # One time initialization of libnotify Notify.init("notify-when-up2") # Create the notification object notification = Notify.Notification.new(report) # This seems to be the key to getting a longer notification # notification.set_urgency(2) # Highest priority notification.set_urgency(Notify.Urgency.CRITICAL) # Actually show on screen # notification.set_visible(True) notification.show() def get_next_block(read_iterator: typing.Iterator[bytes]) -> typing.Tuple[bytes, bool]: """Get a block from read_iterator, detecting EOF.""" block = b"" passed_last_block = False while len(block) == 0: try: block = next(read_iterator) except StopIteration: passed_last_block = True break if len(block) != 0: break time.sleep(0.000_001) return (block, passed_last_block) class GUI(Gtk.Application): """Present a GUI for a progress meter.""" # def __init__(self, size_estimate: typing.Union[int, float], eager_exit: bool) -> None: def __init__(self, application_id: str, flags: Gio.ApplicationFlags) -> None: """Initialize.""" super().__init__(application_id=application_id, flags=flags) self.window: typing.Optional[Gtk.ApplicationWindow] = None def do_activate(self, *args: list[typing.Any]) -> None: """Set things up.""" del args if self.window is None: self.done = False self.size_estimate = Globals.SIZE_ESTIMATE self.eager_exit = Globals.QUIT_WHEN_DONE # 3/4ths of a second self.timeout = 0.75 self.amount_transferred: typing.Optional[int] = None self.blocksize: typing.Optional[int] = None self.blocksize_optimizer: typing.Optional[blocksize_optimizer.Common] = None self.gui_update = False # self.paused = multiprocessing.Value("i", 0) self.paused = multiprocessing.sharedctypes.synchronized(multiprocessing.RawValue(ctypes.c_int, 0)) self.pause_group = None self.prior_gui_update_time = time.time() - 0.5 self.process = multiprocessing.Process(target=self.transfer) # self.queue: typing.Optional[multiprocessing.Queue[tuple[str, Status]]] = None # We'll use the Queue for interprocess communication, one process for the stdin/stdout I/O, and one process for the GUI. # We limit the number of items in the queue to 1000, so we don't overwhelm a system. self.queue: multiprocessing.Queue[tuple[str, Status]] = multiprocessing.Queue(1000) self.status: typing.Optional[Status] = None self.all_done = False self.storage_units = Globals.INITIAL_STORAGE_UNITS self.rates_units = Globals.INITIAL_RATE_UNITS # We can't set these in the parent process, because macOS' multiprocessing doesn't like it. It doesn't # support fork(). # self.infile = sys.stdin.buffer # self.outfile = sys.stdout.buffer # we make sys.stdin something the multiprocessing module can close without creating an elusive bug sys.stdin = open(os.devnull, "r") self.boxcar_average = num_den_boxcar_average.Num_den_boxcar_average( Globals.NUM_FOR_PARTIAL_ESTIMATE, numerator_attr="size", denominator_attr="duration", independent_total_attr="size" ) self.blocks_transferred = 0 self.subprocess_init() self.gui_init() def subprocess_init(self) -> None: """Set up our subprocess.""" # Using a shared integer like this, as a boolean, allows us to avoid the deadlocks of locks, # and simplify the code, at the expense of a touch of busy waiting in the child process # during pauses. This way is also quite a bit more responsive in the GUI. self.process.start() # Catch some signals so we kill our subprocess if we die for sig in [signal.SIGTERM, signal.SIGINT, signal.SIGHUP]: signal.signal(sig, self.quit_with_error_exit_status) def iec_for_storage(self, *cruft: list[typing.Any]) -> None: """Use IEC for storage units.""" del cruft self.storage_units = "iec" def si_for_storage(self, *cruft: list[typing.Any]) -> None: """Use SI for storage units.""" del cruft self.storage_units = "si" def iec_for_rates(self, *cruft: list[typing.Any]) -> None: """Use IEC for rate units.""" del cruft self.rates_units = "iec" def si_for_rates(self, *cruft: list[typing.Any]) -> None: """Use SI for rate units.""" del cruft self.rates_units = "si" def pause(self, button: Gtk.ToggleButton) -> None: """Pause the transfer.""" if button.props.active: self.paused.value = 1 else: self.paused.value = 0 def resume(self, button: Gtk.ToggleButton) -> None: """Resume after pausing.""" if button.props.active: self.paused.value = 0 else: self.paused.value = 1 def exit_eagerly(self, button: Gtk.ToggleButton) -> None: """Exit at the end - don't wait for the user to acknowledge transfer status.""" del button if self.all_done: sys.exit(0) self.eager_exit = True def exit_reluctantly(self, button: Gtk.ToggleButton) -> None: """Don't exit at the end unless manually requested by the user in the GUI.""" del button self.eager_exit = False def gui_init(self) -> None: """Initialize the GUI.""" self.window = Gtk.ApplicationWindow(application=self, title=Globals.TITLE) assert self.window is not None self.create_menu() self.window.connect("close-request", self.quit_with_cond_exit_status) self.vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) self.vbox.set_vexpand(True) self.vbox.set_hexpand(True) self.window.set_child(self.vbox) margin = 10 self.vbox.set_margin_top(margin) self.vbox.set_margin_bottom(margin) self.vbox.set_margin_start(margin) self.vbox.set_margin_end(margin) self.create_main_body() self.window.present() def create_menu(self) -> None: """Create the menu.""" self.menu_bar = Gio.Menu() # File menu with just an Exit optin. self.file_menu = Gio.Menu() self.file_menu.append("Quit", "app.quit") self.menu_bar.append_submenu("File", self.file_menu) quit_action = Gio.SimpleAction.new("quit", None) quit_action.connect("activate", self.quit_with_success_exit_status) self.add_action(quit_action) # I've become pretty convinced that GTK 4.0 no longer allows radio buttons in menus. You can do a popover menu with # arbitrary widgets, but... that's a bit icky. So instead the units and processing buttons are all moving into the hbox. self.set_menubar(self.menu_bar) assert self.window is not None self.window.set_show_menubar(True) # end of menu stuff def create_main_body(self) -> None: self.grid = Gtk.Grid() self.grid.set_valign(Gtk.Align.FILL) self.grid.set_halign(Gtk.Align.FILL) self.grid.set_hexpand(True) self.grid.set_vexpand(True) # Apparently this is done with CSS now. Yuck. # self.grid.set_border_width(10) # self.grid.set_homogeneous(True) # FIXME: Deprecated since version 3.4: Use Gtk.Grid.set_row_homogeneous() and # Gtk.Grid.set_column_homogeneous() with Gtk.Grid. self.iec_for_storage_button = Gtk.CheckButton(label="Storage reported in IEC units (2^10)") self.si_for_storage_button = Gtk.CheckButton(label="Storage reported in SI units (10^3)") self.si_for_storage_button.set_group(self.iec_for_storage_button) self.iec_for_storage_button.connect("toggled", self.iec_for_storage) self.si_for_storage_button.connect("toggled", self.si_for_storage) self.iec_for_storage_button.set_active(self.storage_units == "iec") self.si_for_storage_button.set_active(self.storage_units == "si") self.vbox.append(self.iec_for_storage_button) self.vbox.append(self.si_for_storage_button) horiz_sep1 = Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL) self.vbox.append(horiz_sep1) self.iec_for_rates_button = Gtk.CheckButton(label="Rates reported in IEC units (2^10)") self.si_for_rates_button = Gtk.CheckButton(label="Rates reported in SI units (10^3)") self.si_for_rates_button.set_group(self.iec_for_rates_button) self.iec_for_rates_button.connect("toggled", self.iec_for_rates) self.si_for_rates_button.connect("toggled", self.si_for_rates) self.iec_for_rates_button.set_active(self.rates_units == "iec") self.si_for_rates_button.set_active(self.rates_units == "si") self.vbox.append(self.iec_for_rates_button) self.vbox.append(self.si_for_rates_button) horiz_sep2 = Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL) self.vbox.append(horiz_sep2) self.quit_when_done_button = Gtk.CheckButton(label="Exit immediately on End of File") self.quit_when_done_button.connect("toggled", self.exit_eagerly) if Globals.QUIT_WHEN_DONE: self.quit_when_done_button.set_active(True) self.wait_when_done_button = Gtk.CheckButton(group=self.quit_when_done_button, label="Get user confirmation on End of File") self.wait_when_done_button.connect("toggled", self.exit_reluctantly) if not Globals.QUIT_WHEN_DONE: self.wait_when_done_button.set_active(True) self.vbox.append(self.quit_when_done_button) self.vbox.append(self.wait_when_done_button) horiz_sep3 = Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL) self.vbox.append(horiz_sep3) self.pause_button = Gtk.CheckButton(label="Pause") self.pause_button.connect("toggled", self.pause) self.resume_button = Gtk.CheckButton(group=self.pause_button, label="Resume") self.resume_button.set_active(True) self.resume_button.connect("toggled", self.resume) self.vbox.append(self.pause_button) self.vbox.append(self.resume_button) horiz_sep4 = Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL) self.vbox.append(horiz_sep4) self.vbox.append(self.grid) # add canvas here later self.start_time = time.time() row = 0 self.start_time_label0 = Gtk.Label() self.start_time_label0.set_text("Start time") resizeable_label(self.start_time_label0) self.start_time_label1 = Gtk.Label() self.start_time_label1.set_text(time.ctime(self.start_time)) resizeable_label(self.start_time_label1) self.grid.attach(self.start_time_label0, 0, row, 1, 1) self.grid.attach(self.start_time_label1, 1, row, 1, 1) row += 1 self.size_estimate_label0 = Gtk.Label() self.size_estimate_label0.set_text("Size estimate") resizeable_label(self.size_estimate_label0) self.size_estimate_label1: Gtk.Label = Gtk.Label() resizeable_label(self.size_estimate_label1) # We don't need to special case 0 for this part if math.isnan(self.size_estimate): self.size_estimate_label1.set_text(NAN) else: assert isinstance(self.size_estimate, int) num = modunits.modunits( "computer-size-%s" % self.storage_units, self.size_estimate, fractional_part_length=1, units="unabbreviated" ) assert isinstance(num, str) self.size_estimate_label1.set_text(num) self.grid.attach(self.size_estimate_label0, 0, row, 1, 1) self.grid.attach(self.size_estimate_label1, 1, row, 1, 1) row += 1 self.unchanging_separator = Gtk.Separator.new(Gtk.Orientation.HORIZONTAL) self.grid.attach(self.unchanging_separator, 0, row, 2, 1) row += 1 self.current_time_label0 = Gtk.Label(label="Current time") resizeable_label(self.current_time_label0) # self.current_time_label0.show() self.current_time_label1: Gtk.Label = Gtk.Label() resizeable_label(self.current_time_label1) # self.current_time_label1.show() self.grid.attach(self.current_time_label0, 0, row, 1, 1) self.grid.attach(self.current_time_label1, 1, row, 1, 1) row += 1 self.elapsed_time_label0: Gtk.Label = Gtk.Label(label="Elapsed time") resizeable_label(self.elapsed_time_label0) # self.elapsed_time_label0.show() self.elapsed_time_label1: Gtk.Label = Gtk.Label() resizeable_label(self.elapsed_time_label1) # self.elapsed_time_label1.show() self.grid.attach(self.elapsed_time_label0, 0, row, 1, 1) self.grid.attach(self.elapsed_time_label1, 1, row, 1, 1) row += 1 self.amount_transferred_label0: Gtk.Label = Gtk.Label(label="Amount transferred") resizeable_label(self.amount_transferred_label0) # self.amount_transferred_label0.show() self.amount_transferred_label1: Gtk.Label = Gtk.Label() resizeable_label(self.amount_transferred_label1) # self.amount_transferred_label1.show() self.grid.attach(self.amount_transferred_label0, 0, row, 1, 1) self.grid.attach(self.amount_transferred_label1, 1, row, 1, 1) row += 1 self.blocks_transferred_label0: Gtk.Label = Gtk.Label(label="Number of blocks transferred") resizeable_label(self.blocks_transferred_label0) # self.blocks_transferred_label0.show() self.blocks_transferred_label1: Gtk.Label = Gtk.Label(label="Unknown") resizeable_label(self.blocks_transferred_label1) # self.blocks_transferred_label1.show() self.grid.attach(self.blocks_transferred_label0, 0, row, 1, 1) self.grid.attach(self.blocks_transferred_label1, 1, row, 1, 1) row += 1 self.blocksize_label0 = Gtk.Label(label="Current blocksize") resizeable_label(self.blocksize_label0) # self.blocksize_label0.show() self.blocksize_label1: Gtk.Label = Gtk.Label() resizeable_label(self.blocksize_label1) self.grid.attach(self.blocksize_label0, 0, row, 1, 1) self.grid.attach(self.blocksize_label1, 1, row, 1, 1) row += 1 self.percent_label0 = Gtk.Label(label="Percent complete by size") resizeable_label(self.percent_label0) self.percent_label1: Gtk.Label = Gtk.Label() resizeable_label(self.percent_label1) self.grid.attach(self.percent_label0, 0, row, 1, 1) self.grid.attach(self.percent_label1, 1, row, 1, 1) row += 1 self.progress_bar: Gtk.ProgressBar = Gtk.ProgressBar() # self.progress_bar.show() self.grid.attach(self.progress_bar, 0, row, 2, 1) self.set_progress(0.0) row += 1 self.both_separator = Gtk.Separator.new(Gtk.Orientation.HORIZONTAL) self.grid.attach(self.both_separator, 0, row, 2, 1) row += 1 self.entire_throughput_label0 = Gtk.Label(label="Throughput over entire transfer so far") resizeable_label(self.entire_throughput_label0) # self.entire_throughput_label0.show() self.entire_throughput_label1: Gtk.Label = Gtk.Label() resizeable_label(self.entire_throughput_label1) # self.entire_throughput_label1.show() self.grid.attach(self.entire_throughput_label0, 0, row, 1, 1) self.grid.attach(self.entire_throughput_label1, 1, row, 1, 1) row += 1 self.entire_est_time_left_label0 = Gtk.Label(label="Estimated time remaining based on transfer so far") resizeable_label(self.entire_est_time_left_label0) # self.entire_est_time_left_label0.show() self.entire_est_time_left_label1: Gtk.Label = Gtk.Label(label="Unknown") resizeable_label(self.entire_est_time_left_label1) # self.entire_est_time_left_label1.show() self.grid.attach(self.entire_est_time_left_label0, 0, row, 1, 1) self.grid.attach(self.entire_est_time_left_label1, 1, row, 1, 1) row += 1 self.entire_est_time_of_done_label0: Gtk.Label = Gtk.Label(label="Estimated time of completion based on transfer so far") resizeable_label(self.entire_est_time_of_done_label0) # self.entire_est_time_of_done_label0.show() self.entire_est_time_of_done_label1: Gtk.Label = Gtk.Label(label="Unknown") resizeable_label(self.entire_est_time_of_done_label1) # self.entire_est_time_of_done_label1.show() self.grid.attach(self.entire_est_time_of_done_label0, 0, row, 1, 1) self.grid.attach(self.entire_est_time_of_done_label1, 1, row, 1, 1) row += 1 self.partial_throughput_label0: Gtk.Label = Gtk.Label( label="Throughput over last %d blocks" % Globals.NUM_FOR_PARTIAL_ESTIMATE ) resizeable_label(self.partial_throughput_label0) # self.partial_throughput_label0.show() self.partial_throughput_label1: Gtk.Label = Gtk.Label(label="Unknown") resizeable_label(self.partial_throughput_label1) # self.partial_throughput_label1.show() self.grid.attach(self.partial_throughput_label0, 0, row, 1, 1) self.grid.attach(self.partial_throughput_label1, 1, row, 1, 1) row += 1 self.partial_est_time_left_label0: Gtk.Label = Gtk.Label( label="Estimated time remaining based on last %d blocks" % Globals.NUM_FOR_PARTIAL_ESTIMATE ) resizeable_label(self.partial_est_time_left_label0) # self.partial_est_time_left_label0.show() self.partial_est_time_left_label1: Gtk.Label = Gtk.Label(label="Unknown") resizeable_label(self.partial_est_time_left_label1) # self.partial_est_time_left_label1.show() self.grid.attach(self.partial_est_time_left_label0, 0, row, 1, 1) self.grid.attach(self.partial_est_time_left_label1, 1, row, 1, 1) row += 1 self.partial_est_time_of_done_label0: Gtk.Label = Gtk.Label( label="Estimated time of completion based on last %d blocks" % Globals.NUM_FOR_PARTIAL_ESTIMATE ) resizeable_label(self.partial_est_time_of_done_label0) # self.partial_est_time_of_done_label0.show() self.partial_est_time_of_done_label1: Gtk.Label = Gtk.Label(label="Unknown") resizeable_label(self.partial_est_time_of_done_label1) # self.partial_est_time_of_done.show() self.grid.attach(self.partial_est_time_of_done_label0, 0, row, 1, 1) self.grid.attach(self.partial_est_time_of_done_label1, 1, row, 1, 1) row += 1 self.size_of_part_blocks_label0: Gtk.Label = Gtk.Label( label="Total size of last %d blocks" % Globals.NUM_FOR_PARTIAL_ESTIMATE ) resizeable_label(self.size_of_part_blocks_label0) # self.size_of_part_blocks_label0.show() self.size_of_part_blocks_label1: Gtk.Label = Gtk.Label(label="Unknown") resizeable_label(self.size_of_part_blocks_label1) # self.size_of_part_blocks_label1.show() self.grid.attach(self.size_of_part_blocks_label0, 0, row, 1, 1) self.grid.attach(self.size_of_part_blocks_label1, 1, row, 1, 1) row += 1 self.done = True self.done_button: Gtk.Button = Gtk.Button(label="Acknowledge completion and exit") self.done_button.connect("clicked", self.quit_with_success_exit_status) self.done_button.set_visible(False) self.vbox.append(self.done_button) self.done_dialog: DoneDialog = DoneDialog() # We do the window last so we don't get a brief GUI glitch on startup. # Other shows are done immediately so they don't get missed assert self.window is not None self.window.set_visible(True) GLib.idle_add(self.get_status_and_display) def set_progress(self, fraction: float) -> None: """Update the progress display.""" if math.isnan(fraction): self.percent_label1.set_text(NAN) self.progress_bar.pulse() else: self.percent_label1.set_text("%3.1f%%" % (fraction * 100)) self.progress_bar.set_fraction(min(fraction, 1.0)) def read(self) -> typing.Iterator[bytes]: """Read a block from the input file.""" assert self.blocksize is not None while True: select_result = select.select([0], [], [], self.timeout) assert not select_result[1] assert not select_result[2] if not select_result[0]: yield b"" continue buffer_ = os.read(0, self.blocksize) if not buffer_: return yield buffer_ def write(self, buffer_: bytes) -> int: """Write a buffer to the output file.""" offset = 0 length_of_buffer = len(buffer_) while offset < length_of_buffer: select_result = select.select([], [1], [], self.timeout) assert not select_result[0] assert not select_result[2] if not select_result[1]: return offset length_written = os.write(1, buffer_[offset:]) offset += length_written return offset def closes(self) -> None: """Close the input and output files.""" try: os.close(1) except Exception: pass try: os.close(0) except Exception: pass def transfer(self) -> None: """Transfer the data :).""" initial_time = time.time() # We do this in the child now, for the benefit of macOS # self.infile = sys.stdin.buffer # self.outfile = sys.stdout.buffer # This "extra_time" variable is weird. We want to track the duration of something, # but how long is the duration of the tracking itself? # How does the watcher watch itself? The solution used here is to keep track of # how long the "watch" part of an iteration # takes, and add it to the _NEXT_ iteration - with an initial duration of 0. extra_time = 0.0 self.amount_transferred = 0 self.blocksize_optimizer = blocksize_optimizer.blocksize_optimizer( Globals.MIN_BLOCKSIZE, Globals.MAX_BLOCKSIZE, Globals.RESIZE_PROBABILITY_DECREMENT ) self.blocksize = Globals.INITIAL_BLOCKSIZE read_iterator = self.read() first_iteration = True block = b"" while True: while self.paused.value: time.sleep(0.1) current_start_time = time.time() passed_last_block = False block, passed_last_block = get_next_block(read_iterator) length_written = self.write(block) block = block[length_written:] current_end_time = time.time() watch_t0 = current_end_time this_transfer_size = length_written self.amount_transferred += this_transfer_size iteration_duration = current_end_time - current_start_time + extra_time total_duration_so_far = current_end_time - initial_time if first_iteration: first_iteration = False else: if this_transfer_size == self.blocksize: self.blocksize_optimizer.put(this_transfer_size, iteration_duration) if total_duration_so_far > 1e-6: bytes_per_second = self.amount_transferred / total_duration_so_far else: bytes_per_second = 0 if self.size_estimate == 0: frac = float("nan") else: frac = float(self.amount_transferred) / self.size_estimate self.status = Status( fraction=frac, amount_transferred=self.amount_transferred, rate=bytes_per_second, blocksize=this_transfer_size, current_time=time.time(), iteration_duration=iteration_duration, current_start_time=current_start_time, start_time=initial_time, ) tuple_ = ("status", self.status) self.queue.put(tuple_) self.blocksize = self.blocksize_optimizer.get() watch_t1 = time.time() extra_time = watch_t1 - watch_t0 if passed_last_block: break self.closes() self.status = Status( fraction=frac, amount_transferred=self.amount_transferred, rate=bytes_per_second, blocksize=this_transfer_size, current_time=time.time(), iteration_duration=iteration_duration, current_start_time=current_start_time, start_time=initial_time, ) tuple_ = ("done", self.status) self.queue.put(tuple_) # sys.exit(0) def show_throughput_numbers( self, *, avg_xfer_bytes_per_sec: float, bytes_remaining: int, current_time: float, throughput_label: Gtk.Label, time_remaining_label: Gtk.Label, time_of_completion_label: Gtk.Label, ) -> None: """Show throughput data.""" if self.gui_update: description = modunits.modunits( "computer-bits-per-second-%s" % self.rates_units, int(avg_xfer_bytes_per_sec) * 8, fractional_part_length=1, units="unabbreviated", ) assert isinstance(description, str) throughput_label.set_text(description) del description if avg_xfer_bytes_per_sec: time_remaining = bytes_remaining / avg_xfer_bytes_per_sec if self.gui_update: if math.isnan(time_remaining): time_remaining_label.set_text(NAN) else: if time_remaining >= 0: description = modunits.modunits( "time", int(time_remaining), detail="two-highest", reverse=True, units="unabbreviated" ) assert isinstance(description, str) time_remaining_label.set_text(description) del description else: description = modunits.modunits( "time", -int(time_remaining), detail="two-highest", reverse=True, units="unabbreviated" ) assert isinstance(description, str) time_remaining_label.set_text("-" + description) del description time_of_completion = current_time + time_remaining if self.gui_update and not math.isnan(time_of_completion): time_of_completion_label.set_text(time.ctime(time_of_completion)) def handle_status(self, *, status: Status) -> None: """Deal with a status update.""" current_time = time.time() if self.prior_gui_update_time < current_time - Globals.GUI_UPDATE_INTERVAL: self.prior_gui_update_time = current_time self.gui_update = True else: self.gui_update = False if self.gui_update: # This doesn't change much, but the _units_ may change... if math.isnan(self.size_estimate): self.size_estimate_label1.set_text(NAN) else: assert isinstance(self.size_estimate, int) description = modunits.modunits( "computer-size-%s" % self.storage_units, self.size_estimate, fractional_part_length=1, units="unabbreviated", ) assert isinstance(description, str) self.size_estimate_label1.set_text(description) del description if self.gui_update: self.set_progress(status.fraction) self.blocks_transferred += 1 if self.gui_update: self.blocks_transferred_label1.set_text(str(self.blocks_transferred)) elapsed_time = status.current_time - status.start_time amount_remaining = Globals.SIZE_ESTIMATE - status.amount_transferred # start of misc labels if self.gui_update: description = modunits.modunits( "computer-size-%s" % self.storage_units, status.amount_transferred, fractional_part_length=1, units="unabbreviated", ) assert isinstance(description, str) self.amount_transferred_label1.set_text(description) del description self.current_time_label1.set_text(time.ctime(status.current_time)) description = modunits.modunits("time", int(elapsed_time), detail="two-highest", reverse=True, units="unabbreviated") assert isinstance(description, str) self.elapsed_time_label1.set_text(description) del description self.blocksize_label1.set_text(str(status.blocksize)) # end of misc labels # partial throughput numbers result = self.boxcar_average.add( block_description.BlockDescription(duration=status.iteration_duration, size=status.blocksize) ) partial_throughput_average = result.average if partial_throughput_average is not None: self.show_throughput_numbers( avg_xfer_bytes_per_sec=partial_throughput_average, bytes_remaining=int(amount_remaining), current_time=status.current_time, throughput_label=self.partial_throughput_label1, time_remaining_label=self.partial_est_time_left_label1, time_of_completion_label=self.partial_est_time_of_done_label1, ) if self.gui_update: description = modunits.modunits( "computer-size-%s" % self.storage_units, result.independent_total, fractional_part_length=1, units="unabbreviated", ) assert isinstance(description, str) self.size_of_part_blocks_label1.set_text(description) del description # end of partial throughput numbers # entire throughput numbers if elapsed_time == 0: entire_throughput_average = 0.0 else: entire_throughput_average = status.amount_transferred / elapsed_time self.show_throughput_numbers( avg_xfer_bytes_per_sec=entire_throughput_average, bytes_remaining=int(amount_remaining), current_time=status.current_time, throughput_label=self.entire_throughput_label1, time_remaining_label=self.entire_est_time_left_label1, time_of_completion_label=self.entire_est_time_of_done_label1, ) # end of entire throughput numbers def handle_done(self) -> None: """Deal with EOF.""" self.closes() if self.eager_exit: self.quit_with_success_exit_status() else: self.set_progress(1) self.done_button.set_visible(True) libnotify_worked = False if HAVE_NOTIFY: try: notify_by_libnotify(COMPLETION_MESSAGE) except (GLIB_ERROR, NotLocal): libnotify_worked = False else: libnotify_worked = True if not libnotify_worked: self.done_dialog.show_dialog() self.all_done = True def get_status_and_display(self) -> bool: """Get status of transfer and display it.""" assert self.queue is not None if self.queue.empty(): time.sleep(0.01) else: tuple_ = self.queue.get() kind, status = tuple_ del tuple_ if kind == "status": self.handle_status(status=status) elif kind == "done": self.handle_done() return False return True def quit_it(self, gui_kill: bool = True) -> None: """Exit.""" if hasattr(self, "process") and hasattr(self.process, "pid"): assert isinstance(self.process.pid, int) os.kill(self.process.pid, signal.SIGTERM) else: pass if gui_kill: # Gtk.main_quit() self.quit() def quit_with_success_exit_status(self, *list_: list[typing.Any]) -> None: """Quit True; all went well.""" self.quit_it(gui_kill=True) sys.exit(0) def quit_with_cond_exit_status(self, *list_: list[typing.Any]) -> None: """Quit True if finished well, False if not.""" self.quit_it(gui_kill=True) if self.done: sys.exit(0) else: sys.exit(1) def quit_with_error_exit_status(self, signum: int, frame: typing.Optional[types.FrameType]) -> None: """Kill the GUI and exit False.""" del signum del frame self.quit_it(gui_kill=True) sys.exit(1) def usage(exit_code: int) -> None: """Give a usage message.""" if exit_code == 0: write = sys.stdout.write else: write = sys.stderr.write write("Usage: %s\n" % sys.argv[0]) write("\t--initial-blocksize defaults to %d and will be adjusted according to performance\n" % Globals.INITIAL_BLOCKSIZE) write("\t--resize-probability-decrement defaults to %f; 1 guesses minimally,\n" % Globals.RESIZE_PROBABILITY_DECREMENT) write("\t 0 guesses every time.\n") write("\t Guessing a lot means you spend more time finding a fast blocksize, but the result is\n") write("\t closer to optimal once settled upon\n") write("\t--gui-update-interval defaults to %f. Units are (fractions of) seconds.\n" % Globals.GUI_UPDATE_INTERVAL) write("\t--min-blocksize defaults to %d\n" % Globals.MIN_BLOCKSIZE) write("\t--max-blocksize defaults to %d\n" % Globals.MAX_BLOCKSIZE) write("\t--size-estimate defaults to the length from stat (if possible)\n") string = "\t--partial-estimate-blocks The number of blocks to use for the partial estimate " + ( "(defaults to %d)\n" % Globals.NUM_FOR_PARTIAL_ESTIMATE ) write(string) write("\t--initial-storage-units SI|IEC\n") write("\t--initial-rate-units SI|IEC\n") write("\t--title title\n") write("\t--quit-when-done\n") write("\t--verbose\n") write("\t--help\n") write("\n") write("All command line options' units are in bytes\n") sys.exit(exit_code) def drop_privileges() -> None: """ Drop privileges. Based on https://stackoverflow.com/questions/2699907/dropping-root-permissions-in-python """ if os.uname().sysname == "Darwin": # macOS (Darwin) does not allow setuid or setgid in about any form, apparently. return if os.getuid() != 0 and os.geteuid() != 0: # We have nothing to give up, so just return without changing anything. return login_name = os.getlogin() pwent = pwd.getpwnam(login_name) login_uid = pwent.pw_uid login_gid = pwent.pw_gid # Remove all auxiliary groups os.setgroups([]) # Try setting the uid/gid if login_gid != 0: os.setregid(login_gid, login_gid) os.setreuid(login_uid, login_uid) class Globals: """A singleton of globals.""" MIN_BLOCKSIZE = 2**10 MAX_BLOCKSIZE = 2**20 INITIAL_BLOCKSIZE = (MIN_BLOCKSIZE + MAX_BLOCKSIZE) // 2 SIZE_ESTIMATE: typing.Union[int, float] = -1 QUIT_WHEN_DONE = False RESIZE_PROBABILITY_DECREMENT = 0.001 GUI_UPDATE_INTERVAL = 0.25 TITLE = "gprog" INITIAL_STORAGE_UNITS = "iec" INITIAL_RATE_UNITS = "si" NUM_FOR_PARTIAL_ESTIMATE = 1000 VERBOSE = False class Options: """Parse commandline options.""" def __init__(self) -> None: """Parse all commandline options.""" while sys.argv[1:]: self.parse_one(sys.argv[1]) del sys.argv[1] def parse_one(self, argument: str) -> None: """Parse one commandline option.""" if self.parse_a(argument): return elif self.parse_b(argument): return elif self.parse_c(argument): return sys.stderr.write("%s: Illegal option: %s\n" % (sys.argv[0], sys.argv[1])) usage(1) def parse_a(self, argument: str) -> bool: """Parse some of the commandline options.""" match argument: case "--initial-blocksize": Globals.INITIAL_BLOCKSIZE = int(sys.argv[2]) del sys.argv[1] return True case "--initial-storage-units": ARGUMENT = sys.argv[2].lower() if ARGUMENT in ["iec", "si"]: Globals.INITIAL_STORAGE_UNITS = ARGUMENT else: sys.stderr.write("%s: storage units must be IEC or SI (case insignificant)\n" % sys.argv[0]) usage(1) del sys.argv[1] return True case "--initial-rate-units": ARGUMENT = sys.argv[2].lower() if ARGUMENT in ["iec", "si"]: Globals.INITIAL_RATE_UNITS = ARGUMENT else: sys.stderr.write("%s: rate units must be IEC or SI (case insignificant)\n" % sys.argv[0]) usage(1) del sys.argv[1] return True return False def parse_b(self, argument: str) -> bool: """Parse some of the commandline options.""" match argument: case "--title": Globals.TITLE = "gprog: %s" % sys.argv[2] del sys.argv[1] return True case "--gui-update-interval": Globals.GUI_UPDATE_INTERVAL = float(sys.argv[2]) del sys.argv[1] return True case "--resize-probability-decrement": Globals.RESIZE_PROBABILITY_DECREMENT = float(sys.argv[2]) del sys.argv[1] return True case "--max-blocksize": Globals.MAX_BLOCKSIZE = int(sys.argv[2]) del sys.argv[1] return True case "--partial-estimate-blocks": Globals.NUM_FOR_PARTIAL_ESTIMATE = int(sys.argv[2]) del sys.argv[1] return True return False def parse_c(self, argument: str) -> bool: """Parse some of the commandline options.""" match argument: case "--min-blocksize": Globals.MIN_BLOCKSIZE = int(sys.argv[2]) del sys.argv[1] return True case "--size-estimate": Globals.SIZE_ESTIMATE = int(sys.argv[2]) del sys.argv[1] return True case "--quit-when-done": Globals.QUIT_WHEN_DONE = True return True case "--help" | "-h": usage(0) return True case "--verbose" | "-v": Globals.VERBOSE = True return True return False def main() -> None: """Present GUI and perform data transfer.""" if Globals.VERBOSE: print("dropping privileges", file=sys.stderr) drop_privileges() if Globals.VERBOSE: print("parsing options", file=sys.stderr) Options() if Globals.SIZE_ESTIMATE == -1: if Globals.VERBOSE: print("No SIZE_ESTIMATE", file=sys.stderr) # This is still only an estimate, because a file can grow while we're reading it stat_buf = os.fstat(0) Globals.SIZE_ESTIMATE = stat_buf.st_size if Globals.SIZE_ESTIMATE == 0: # This could be one of two things: # 1) An empty file # 2) Some sort of special file, like /dev/zero, a disk (as opposed to a file on a disk), or a pipe if stat.S_ISFIFO(stat_buf.st_mode): sys.stderr.write("Pipe or FIFO detected\n") Globals.SIZE_ESTIMATE = 0 elif not (stat.S_IFREG & stat_buf.st_mode): sys.stderr.write("Something other than a regular file or pipe detected\n") Globals.SIZE_ESTIMATE = 0 if Globals.VERBOSE: print("Instantiating GUI", file=sys.stderr) gui = GUI(application_id="com.dnsalias.stromberg.gprog", flags=Gio.ApplicationFlags.FLAGS_NONE) gui.connect("activate", gui.do_activate) if Globals.VERBOSE: print("run'ing GUI", file=sys.stderr) try: gui.run(None) except KeyboardInterrupt: if Globals.VERBOSE: print("quit_it GUI", file=sys.stderr) gui.quit_it(gui_kill=True) if __name__ == "__main__": main()