From 03dbe75cf111b7d67be5750f897ddcd7226b5f67 Mon Sep 17 00:00:00 2001 From: Benedikt Ziemons Date: Tue, 19 Apr 2022 12:34:04 +0200 Subject: [PATCH] Update imagerebuild.py Use systemctl show shortcuts via --property Ignore masked pods and images Synchronize quick process runs Add semaphore for image build --- .idea/misc.xml | 2 +- .idea/podlaunch.iml | 2 +- imagerebuild.py | 176 ++++++++++++++++++++++++++------------------ 3 files changed, 107 insertions(+), 73 deletions(-) diff --git a/.idea/misc.xml b/.idea/misc.xml index f7fcb5a..bc77e74 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,6 +1,6 @@ - + diff --git a/.idea/podlaunch.iml b/.idea/podlaunch.iml index ed7ac0e..8437fe6 100644 --- a/.idea/podlaunch.iml +++ b/.idea/podlaunch.iml @@ -2,7 +2,7 @@ - + \ No newline at end of file diff --git a/imagerebuild.py b/imagerebuild.py index 3d552b7..49b4da0 100755 --- a/imagerebuild.py +++ b/imagerebuild.py @@ -1,67 +1,87 @@ #!/usr/bin/env python3 import logging +import multiprocessing import pathlib -from typing import Set +import time +from typing import Set, Callable import click import sh -from sh import systemctl from sh import podman +from sh import systemctl SERVICES_BASE_PATH = "/docker/services/" def resolve_image_units(): services_path = pathlib.Path(SERVICES_BASE_PATH) - services_list = list(map(lambda p: str(p.name), services_path.iterdir())) + services_set = set(map(lambda p: str(p.name), services_path.iterdir())) + + logging.info(f"Found {len(services_set)} services: {str(services_set)}") + + systemctl("daemon-reload") + + def remove_masked_unit( + _item_set: Set[str], + item: str, + item_to_unit: Callable[[str], str] = lambda i: i, + ): + load_state = systemctl.show( + "--property=LoadState", "--value", item_to_unit(item) + ) + load_state = load_state.stdout.strip().decode( + encoding="utf-8", errors="replace" + ) + logging.debug(f"{item} load state: {repr(load_state)}") + if load_state == "masked": + logging.info(f"Removed masked entry: {item}") + _item_set.remove(item) + + with click.progressbar(list(services_set), label="Checking service units..", show_pos=True) as bar: + for service in bar: + remove_masked_unit(services_set, service, lambda srv: f"pod@{srv}.service") + + def add_wants_to_image_units(_image_units: Set[str], unit: str): + wants = systemctl.show("--property=Wants", "--value", unit) + wants_list = ( + wants.stdout.strip().decode(encoding="utf-8", errors="replace").split(" ") + ) + logging.debug(f"{unit} wants: {repr(wants_list)}") + for next_unit in wants_list: + if next_unit.startswith("image@") and next_unit.endswith(".service"): + logging.info(f"Found {unit} wants {next_unit}") + _image_units.add(next_unit) + image_units: Set[str] = set() - logging.info(f"Found {len(services_list)} services: {str(services_list)}") - - def process_pod_systemctl_show(line: str): - search_str = "Wants=" - if line.startswith(search_str): - for unit in line[len(search_str) :].split(" "): - if unit.startswith("image@") and unit.endswith(".service"): - image_units.add(unit) - with click.progressbar( - length=len(services_list) * 2, label="Collecting container image services.." + length=len(services_set) * 2, label="Collecting container image services.." ) as bar: - started_processes = [] - for service in services_list: - process = systemctl.show( - f"pod@{service}.service", - _out=process_pod_systemctl_show, - _bg=True, - _done=lambda cmd, success, exit_code: bar.update(1), - ) - started_processes.append(process) - # join processes - [p.wait() for p in started_processes] + for service in services_set: + add_wants_to_image_units(image_units, f"pod@{service}.service") + bar.update(1) - first = True new_image_units: Set[str] = set(image_units) - previous_image_units: Set[str] = set(image_units) bar.length = len(image_units) * 2 - while first or len(new_image_units) > 0: - first = False - started_processes = [] - for service in new_image_units: - process = systemctl.show( - service, - _out=process_pod_systemctl_show, - _bg=True, - _done=lambda cmd, success, exit_code: bar.update(1), - ) - started_processes.append(process) - # join processes - [p.wait() for p in started_processes] - new_image_units = image_units.difference(previous_image_units) + while len(new_image_units) > 0: + units_to_check = list(new_image_units) + new_image_units = set() # reset new image units + for image_unit in units_to_check: + add_wants_to_image_units(new_image_units, image_unit) + bar.update(1) + bar.length += len(new_image_units) - previous_image_units = set(image_units) + image_units.update( + new_image_units + ) # add new image units to all image units + + with click.progressbar( + list(image_units), label="Checking container image units..", show_pos=True + ) as bar: + for image_unit in bar: + remove_masked_unit(image_units, image_unit) logging.info(f"Found {len(image_units)} images: {str(image_units)}") return image_units @@ -79,28 +99,23 @@ def main(verbose): image_units = resolve_image_units() image_tags: Set[str] = set() - def process_image_systemctl_show(line: str): - search_str = "Environment=" - if line.startswith(search_str): - for unit in line[len(search_str) :].split(" "): - search_str = "IMAGE_TAG=" - if unit.startswith(search_str): - image_tags.add(unit[len(search_str) :]) - - started_processes = [] - with click.progressbar( - length=len(image_units), label="Collecting container images.." - ) as bar: - for image_service in image_units: - process = systemctl.show( - image_service, - _out=process_image_systemctl_show, - _bg=True, - _done=lambda cmd, success, exit_code: bar.update(1), + with click.progressbar(image_units, label="Collecting container image tags..") as bar: + for image_unit in bar: + environment = systemctl.show( + "--property=Environment", + "--value", + image_unit, ) - started_processes.append(process) - # join processes - [p.wait() for p in started_processes] + environment_list = ( + environment.stdout.strip() + .decode(encoding="utf-8", errors="replace") + .split(" ") + ) + logging.debug(f"{image_unit} environment: {repr(environment_list)}") + for envvar in environment_list: + search_str = "IMAGE_TAG=" + if envvar.startswith(search_str): + image_tags.add(envvar[len(search_str) :]) started_processes = [] with click.progressbar( @@ -110,28 +125,47 @@ def main(verbose): process = podman.untag( image_tag, _bg=True, + _err_to_out=True, _done=lambda cmd, success, exit_code: bar.update(1), ) started_processes.append(process) + time.sleep(0.02) # join processes for p in started_processes: try: p.wait() - except sh.ErrorReturnCode: - # ignore missing tags - if "image not known".encode() not in p.stderr: + except sh.ErrorReturnCode as error: + # ignore missing image tags + if "image not known".encode() in p.stdout: + pass + else: raise started_processes = [] with click.progressbar( length=len(image_units), label="Building images..", show_pos=True ) as bar: - for image_service in image_units: - process = systemctl.restart( - image_service, - _bg=True, - _done=lambda cmd, success, exit_code: bar.update(1), - ) + semaphore = multiprocessing.Semaphore(8) + for image_unit in image_units: + try: + systemctl("reset-failed", image_unit, _bg=False, _err_to_out=True) + except sh.ErrorReturnCode as error: + if f"Unit {image_unit} not loaded".encode() in error.stdout: + logging.info( + f"Not resetting failed state for {image_unit}, unit not loaded" + ) + else: + raise + + semaphore.acquire() + + def restart_done(cmd, success, exit_code): + bar.update(1) + if not success: + logging.warning(f"{cmd.cmd}{tuple(cmd.call_args)} completed with exit code {exit_code}") + semaphore.release() + + process = systemctl.restart(image_unit, _bg=True, _done=restart_done) started_processes.append(process) # join processes [p.wait() for p in started_processes]