diff --git a/vmupdate/agent/entrypoint.py b/vmupdate/agent/entrypoint.py index 256e1d2..3f5a0df 100755 --- a/vmupdate/agent/entrypoint.py +++ b/vmupdate/agent/entrypoint.py @@ -66,24 +66,41 @@ def get_package_manager(os_data, log, log_handler, log_level, no_progress): try: from source.apt.apt_api import APT as PackageManager except ImportError: - log.warning("Failed to load apt with progress bar. Use apt cli.") + log.warning("Failed to load apt with progress bar. Using apt cli.") # no progress reporting no_progress = True + print(f"Progress reporting not supported.", flush=True) if no_progress: from source.apt.apt_cli import APTCLI as PackageManager elif os_data["os_family"] == "RedHat": try: - from source.dnf.dnf_api import DNF as PackageManager - except ImportError: - log.warning("Failed to load dnf with progress bar. Use dnf cli.") - # no progress reporting - no_progress = True - - if no_progress: + version = int(os_data["release"].split(".")[0]) + except ValueError: + version = 99 # fedora changed its version + + loaded = False + if version >= 41: + try: + from source.dnf.dnf5_api import DNF as PackageManager + loaded = True + except ImportError: + log.warning("Failed to load dnf5.") + + if not loaded: + try: + from source.dnf.dnf_api import DNF as PackageManager + loaded = True + except ImportError: + log.warning( + "Failed to load dnf with progress bar. Using dnf cli.") + print(f"Progress reporting not supported.", flush=True) + + if no_progress or not loaded: from source.dnf.dnf_cli import DNFCLI as PackageManager elif os_data["os_family"] == "ArchLinux": from source.pacman.pacman_cli import PACMANCLI as PackageManager + print(f"Progress reporting not supported.", flush=True) else: raise NotImplementedError( "Only Debian, RedHat and ArchLinux based OS is supported.") diff --git a/vmupdate/agent/source/apt/apt_api.py b/vmupdate/agent/source/apt/apt_api.py index 59b6ac3..543ad31 100644 --- a/vmupdate/agent/source/apt/apt_api.py +++ b/vmupdate/agent/source/apt/apt_api.py @@ -108,6 +108,7 @@ class FetchProgress(apt.progress.base.AcquireProgress, Progress): def __init__(self, weight: int, log, refresh: bool = False): Progress.__init__(self, weight, log) self.action = "refresh" if refresh else "fetch" + self.fetching_notified = False def fail(self, item): """ @@ -126,13 +127,19 @@ def pulse(self, _owner): This function returns a boolean value indicating whether the acquisition should be continued (True) or cancelled (False). """ + if self.action == "fetch" and not self.fetching_notified: + print(f"Fetching {self.total_items} packages " + f"[{self._format_bytes(self.total_bytes)}]", + flush=True) + self.fetching_notified = True self.notify_callback(self.current_bytes / self.total_bytes * 100) return True def start(self): """Invoked when the Acquire process starts running.""" self.log.info(f"{self.action.capitalize()} started.") - print(f"{self.action.capitalize()}ing packages.", flush=True) + if self.action == "refresh": + print("Refreshing available packages.", flush=True) super().start() self.notify_callback(0) diff --git a/vmupdate/agent/source/common/progress_reporter.py b/vmupdate/agent/source/common/progress_reporter.py index b38feac..1f1c0d0 100644 --- a/vmupdate/agent/source/common/progress_reporter.py +++ b/vmupdate/agent/source/common/progress_reporter.py @@ -64,6 +64,16 @@ def notify_callback(self, percent): self._callback(_percent) self._last_percent = _percent + @staticmethod + def _format_bytes(size): + units = ["B", "KB", "MB", "GB", "TB", "PB"] + factor = 1000 + for unit in units: + if size < factor: + return f"{size:.2f} {unit}" + size /= factor + return f"{size:.2f} {units[-1]}" + class ProgressReporter: """ diff --git a/vmupdate/agent/source/dnf/dnf5_api.py b/vmupdate/agent/source/dnf/dnf5_api.py new file mode 100644 index 0000000..d04040e --- /dev/null +++ b/vmupdate/agent/source/dnf/dnf5_api.py @@ -0,0 +1,294 @@ +# coding=utf-8 +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2025 Piotr Bartman-Szwarc +# +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. + +import subprocess + +import libdnf5 +from libdnf5.repo import DownloadCallbacks +from libdnf5.rpm import TransactionCallbacks +from libdnf5.base import Base, Goal + +from source.common.process_result import ProcessResult +from source.common.exit_codes import EXIT +from source.common.progress_reporter import ProgressReporter, Progress + +from .dnf_cli import DNFCLI + + +class TransactionError(RuntimeError): + pass + + +class DNF(DNFCLI): + def __init__(self, log_handler, log_level): + super().__init__(log_handler, log_level) + self.base = Base() + self.base.load_config() + self.base.setup() + self.config = self.base.get_config() + update = FetchProgress(weight=0, log=self.log) # % of total time + fetch = FetchProgress(weight=50, log=self.log) # % of total time + upgrade = UpgradeProgress(weight=50, log=self.log) # % of total time + self.progress = ProgressReporter(update, fetch, upgrade) + + def refresh(self, hard_fail: bool) -> ProcessResult: + """ + Use package manager to refresh available packages. + + :param hard_fail: raise error if some repo is unavailable + :return: (exit_code, stdout, stderr) + """ + self.config.skip_if_unavailable = not hard_fail + + result = ProcessResult() + try: + self.log.debug("Refreshing available packages...") + + result += self.expire_cache() + + repo_sack = self.base.get_repo_sack() + repo_sack.create_repos_from_system_configuration() + repo_sack.load_repos() + self.log.debug("Cache refresh successful.") + except Exception as exc: + self.log.error( + "An error occurred while refreshing packages: %s", str(exc)) + result += ProcessResult(EXIT.ERR_VM_REFRESH, out="", err=str(exc)) + + return result + + def upgrade_internal(self, remove_obsolete: bool) -> ProcessResult: + """ + Use `libdnf5` package to upgrade and track progress. + """ + self.config.obsoletes = remove_obsolete + result = ProcessResult() + try: + self.log.debug("Performing package upgrade...") + goal = Goal(self.base) + goal.add_upgrade("*") + transaction = goal.resolve() + # fill empty `Command line` column in dnf history + transaction.set_description("qubes-vm-update") + + if transaction.get_transaction_packages_count() == 0: + self.log.info("No packages to upgrade, quitting.") + return ProcessResult( + EXIT.OK, out="", + err="\n".join(transaction.get_resolve_logs_as_strings())) + + self.base.set_download_callbacks( + libdnf5.repo.DownloadCallbacksUniquePtr( + self.progress.fetch_progress)) + transaction.download() + + if not transaction.check_gpg_signatures(): + problems = transaction.get_gpg_signature_problems() + raise TransactionError( + f"GPG signatures check failed: {problems}") + + if result.code == EXIT.OK: + print("Updating packages.", flush=True) + self.log.debug("Committing upgrade...") + transaction.set_callbacks( + libdnf5.rpm.TransactionCallbacksUniquePtr( + self.progress.upgrade_progress)) + tnx_result = transaction.run() + if tnx_result != transaction.TransactionRunResult_SUCCESS: + raise TransactionError( + transaction.transaction_result_to_string(tnx_result)) + self.log.debug("Package upgrade successful.") + self.log.info("Notifying dom0 about installed applications") + subprocess.call(['/etc/qubes-rpc/qubes.PostInstall']) + print("Updated", flush=True) + except Exception as exc: + self.log.error( + "An error occurred while upgrading packages: %s", str(exc)) + result += ProcessResult(EXIT.ERR_VM_UPDATE, out="", err=str(exc)) + return result + + +class FetchProgress(DownloadCallbacks, Progress): + def __init__(self, weight: int, log): + DownloadCallbacks.__init__(self) + Progress.__init__(self, weight, log) + self.bytes_to_fetch = 0 + self.bytes_fetched = 0 + self.package_bytes = {} + self.package_names = {} + self.count = 0 + self.fetching_notified = False + + def add_new_download( + self, _user_data, description: str, total_to_download: float + ) -> int: + """ + Notify the client that a new download has been created. + + :param _user_data: User data entered together with url/package to download. + :param description: The message describing new download (url/packagename). + :param total_to_download: Total number of bytes to download. + :return: Associated user data for new download. + """ + self.count += 1 + self.bytes_to_fetch += total_to_download + self.package_bytes[self.count] = 0 + self.package_names[self.count] = description + # downloading is not started yet + self.notify_callback(0) + return self.count + + def progress( + self, user_cb_data: int, total_to_download: float, downloaded: float + ) -> int: + """ + Download progress callback. + + :param user_cb_data: Associated user data obtained from add_new_download. + :param total_to_download: Total number of bytes to download. + :param downloaded: Number of bytes downloaded. + """ + if not self.fetching_notified: + print(f"Fetching {self.count} packages " + f"[{self._format_bytes(self.bytes_to_fetch)}]", + flush=True) + self.fetching_notified = True + self.bytes_fetched += downloaded - self.package_bytes[user_cb_data] + if downloaded > self.package_bytes[user_cb_data]: + if self.package_bytes[user_cb_data] == 0: + print(f"Fetching {self.package_names[user_cb_data]} [{self._format_bytes(total_to_download)}]", + flush=True) + self.package_bytes[user_cb_data] = downloaded + percent = self.bytes_fetched / self.bytes_to_fetch * 100 + self.notify_callback(percent) + # Should return 0 on success, + # in case anything in dnf5 changed we return their default value + return DownloadCallbacks.progress( + self, user_cb_data, total_to_download, downloaded) + + def end(self, user_cb_data: int, status: int, msg: str) -> int: + """ + End of download callback. + + :param user_cb_data: Associated user data obtained from add_new_download. + :param status: The transfer status. + :param msg: The error message in case of error. + """ + if status != 0: + print(msg, flush=True, file=self._stdout) + return DownloadCallbacks.end(self, user_cb_data, status, msg) + + def mirror_failure( + self, user_cb_data: int, msg: str, url: str, metadata: str + ) -> int: + """ + Mirror failure callback. + + :param user_cb_data: Associated user data obtained from add_new_download. + :param msg: Error message. + :param url: Failed mirror URL. + :param metadata: the type of metadata that is being downloaded + """ + print(f"Fetching {metadata} failure " + f"({self.package_names[user_cb_data]}) {msg}", + flush=True, file=self._stdout) + return DownloadCallbacks.mirror_failure( + self, user_cb_data, msg, url, metadata) + + +class UpgradeProgress(TransactionCallbacks, Progress): + def __init__(self, weight: int, log): + TransactionCallbacks.__init__(self) + Progress.__init__(self, weight, log) + self.pgks = None + self.pgks_done = None + self.processed_packages = set() + + def install_progress( + self, item: libdnf5.base.TransactionPackage, amount: int, total: int + ): + r""" + Report the package installation progress periodically. + + :param item: The TransactionPackage class instance for the package currently being installed + :param amount: The portion of the package already installed + :param total: The disk space used by the package after installation + """ + package = item.get_package().get_full_nevra() + if package not in self.processed_packages: + print(f"Installing {package}", flush=True) + self.processed_packages.add(package) + pkg_progress = amount / total + percent = (self.pgks_done + pkg_progress) / self.pgks * 100 + self.notify_callback(percent) + + def transaction_start(self, total: int): + r""" + Preparation phase has started. + + :param total: The total number of packages in the transaction + """ + self.pgks_done = 0 + self.pgks = total + + def uninstall_progress( + self, item: libdnf5.base.TransactionPackage, amount: int, total: int + ): + """ + Report the package removal progress periodically. + + :param item: The TransactionPackage class instance for the package currently being removed + :param amount: The portion of the package already uninstalled + :param total: The disk space freed by the package after removal + """ + package = item.get_package().get_full_nevra() + if package not in self.processed_packages: + print(f"Uninstalling {package}", flush=True) + self.processed_packages.add(package) + pkg_progress = amount / total + percent = (self.pgks_done + pkg_progress) / self.pgks * 100 + self.notify_callback(percent) + + def elem_progress(self, item, amount: int, total: int): + r""" + The installation/removal process for the item has started + + :param item: The TransactionPackage class instance for the package currently being (un)installed + :param amount: Index of the package currently being processed. Items are indexed starting from 0. + :param total: The total number of packages in the transaction + """ + self.pgks_done = amount + percent = amount / total * 100 + self.notify_callback(percent) + + def script_start(self, item: libdnf5.base.TransactionPackage, nevra, type: int): + r""" + Execution of the rpm scriptlet has started + + :param item: The TransactionPackage class instance for the package that owns the executed or triggered + scriptlet. It can be `nullptr` if the scriptlet owner is not part of the transaction + (e.g., a package installation triggered an update of the man database, owned by man-db package). + :param nevra: Nevra of the package that owns the executed or triggered scriptlet. + :param type: Type of the scriptlet + """ + print(f"Running rpm scriptlet for {nevra.get_name()}-{nevra.get_epoch()}:{nevra.get_version()}" + f"-{nevra.get_release()}.{nevra.get_arch()}", flush=True) diff --git a/vmupdate/agent/source/dnf/dnf_api.py b/vmupdate/agent/source/dnf/dnf_api.py index b9ce150..506090b 100644 --- a/vmupdate/agent/source/dnf/dnf_api.py +++ b/vmupdate/agent/source/dnf/dnf_api.py @@ -182,8 +182,10 @@ def start(self, total_files, total_size, total_drpms=0): """ self.log.info("Fetch started.") - print("Fetching packages:", flush=True) self.bytes_to_fetch = total_size + print(f"Fetching {total_files} packages " + f"[{self._format_bytes(self.bytes_to_fetch)}]", + flush=True) self.package_bytes = {} self.notify_callback(0) diff --git a/vmupdate/agent/source/dnf/dnf_cli.py b/vmupdate/agent/source/dnf/dnf_cli.py index 2664e0b..80d530e 100644 --- a/vmupdate/agent/source/dnf/dnf_cli.py +++ b/vmupdate/agent/source/dnf/dnf_cli.py @@ -48,11 +48,7 @@ def refresh(self, hard_fail: bool) -> ProcessResult: :param hard_fail: raise error if some repo is unavailable :return: (exit_code, stdout, stderr) """ - cmd = [self.package_manager, - "-q", - "clean", - "expire-cache"] - result = self.run_cmd(cmd) + result = self.expire_cache() cmd = [self.package_manager, "-q", @@ -67,6 +63,17 @@ def refresh(self, hard_fail: bool) -> ProcessResult: return result + def expire_cache(self) -> ProcessResult: + """ + Use package manager to expire cache. + """ + cmd = [self.package_manager, + "-q", + "clean", + "expire-cache"] + result = self.run_cmd(cmd) + return result + def get_packages(self): """ Use rpm to return the installed packages and their versions.