Update imagerebuild.py

Use systemctl show shortcuts via --property
Ignore masked pods and images
Synchronize quick process runs
Add semaphore for image build
This commit is contained in:
Ben 2022-04-19 12:34:04 +02:00
parent 816ddb12b0
commit 03dbe75cf1
Signed by: ben
GPG key ID: 0F54A7ED232D3319
3 changed files with 107 additions and 73 deletions

View file

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (podlaunch)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10" project-jdk-type="Python SDK" />
<component name="PythonCompatibilityInspectionAdvertiser">
<option name="version" value="3" />
</component>

View file

@ -2,7 +2,7 @@
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.10 (podlaunch)" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="Python 3.10" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View file

@ -1,67 +1,87 @@
#!/usr/bin/env python3
import logging
import multiprocessing
import pathlib
from typing import Set
import time
from typing import Set, Callable
import click
import sh
from sh import systemctl
from sh import podman
from sh import systemctl
SERVICES_BASE_PATH = "/docker/services/"
def resolve_image_units():
services_path = pathlib.Path(SERVICES_BASE_PATH)
services_list = list(map(lambda p: str(p.name), services_path.iterdir()))
services_set = set(map(lambda p: str(p.name), services_path.iterdir()))
logging.info(f"Found {len(services_set)} services: {str(services_set)}")
systemctl("daemon-reload")
def remove_masked_unit(
_item_set: Set[str],
item: str,
item_to_unit: Callable[[str], str] = lambda i: i,
):
load_state = systemctl.show(
"--property=LoadState", "--value", item_to_unit(item)
)
load_state = load_state.stdout.strip().decode(
encoding="utf-8", errors="replace"
)
logging.debug(f"{item} load state: {repr(load_state)}")
if load_state == "masked":
logging.info(f"Removed masked entry: {item}")
_item_set.remove(item)
with click.progressbar(list(services_set), label="Checking service units..", show_pos=True) as bar:
for service in bar:
remove_masked_unit(services_set, service, lambda srv: f"pod@{srv}.service")
def add_wants_to_image_units(_image_units: Set[str], unit: str):
wants = systemctl.show("--property=Wants", "--value", unit)
wants_list = (
wants.stdout.strip().decode(encoding="utf-8", errors="replace").split(" ")
)
logging.debug(f"{unit} wants: {repr(wants_list)}")
for next_unit in wants_list:
if next_unit.startswith("image@") and next_unit.endswith(".service"):
logging.info(f"Found {unit} wants {next_unit}")
_image_units.add(next_unit)
image_units: Set[str] = set()
logging.info(f"Found {len(services_list)} services: {str(services_list)}")
def process_pod_systemctl_show(line: str):
search_str = "Wants="
if line.startswith(search_str):
for unit in line[len(search_str) :].split(" "):
if unit.startswith("image@") and unit.endswith(".service"):
image_units.add(unit)
with click.progressbar(
length=len(services_list) * 2, label="Collecting container image services.."
length=len(services_set) * 2, label="Collecting container image services.."
) as bar:
started_processes = []
for service in services_list:
process = systemctl.show(
f"pod@{service}.service",
_out=process_pod_systemctl_show,
_bg=True,
_done=lambda cmd, success, exit_code: bar.update(1),
)
started_processes.append(process)
# join processes
[p.wait() for p in started_processes]
for service in services_set:
add_wants_to_image_units(image_units, f"pod@{service}.service")
bar.update(1)
first = True
new_image_units: Set[str] = set(image_units)
previous_image_units: Set[str] = set(image_units)
bar.length = len(image_units) * 2
while first or len(new_image_units) > 0:
first = False
started_processes = []
for service in new_image_units:
process = systemctl.show(
service,
_out=process_pod_systemctl_show,
_bg=True,
_done=lambda cmd, success, exit_code: bar.update(1),
)
started_processes.append(process)
# join processes
[p.wait() for p in started_processes]
new_image_units = image_units.difference(previous_image_units)
while len(new_image_units) > 0:
units_to_check = list(new_image_units)
new_image_units = set() # reset new image units
for image_unit in units_to_check:
add_wants_to_image_units(new_image_units, image_unit)
bar.update(1)
bar.length += len(new_image_units)
previous_image_units = set(image_units)
image_units.update(
new_image_units
) # add new image units to all image units
with click.progressbar(
list(image_units), label="Checking container image units..", show_pos=True
) as bar:
for image_unit in bar:
remove_masked_unit(image_units, image_unit)
logging.info(f"Found {len(image_units)} images: {str(image_units)}")
return image_units
@ -79,28 +99,23 @@ def main(verbose):
image_units = resolve_image_units()
image_tags: Set[str] = set()
def process_image_systemctl_show(line: str):
search_str = "Environment="
if line.startswith(search_str):
for unit in line[len(search_str) :].split(" "):
search_str = "IMAGE_TAG="
if unit.startswith(search_str):
image_tags.add(unit[len(search_str) :])
started_processes = []
with click.progressbar(
length=len(image_units), label="Collecting container images.."
) as bar:
for image_service in image_units:
process = systemctl.show(
image_service,
_out=process_image_systemctl_show,
_bg=True,
_done=lambda cmd, success, exit_code: bar.update(1),
with click.progressbar(image_units, label="Collecting container image tags..") as bar:
for image_unit in bar:
environment = systemctl.show(
"--property=Environment",
"--value",
image_unit,
)
started_processes.append(process)
# join processes
[p.wait() for p in started_processes]
environment_list = (
environment.stdout.strip()
.decode(encoding="utf-8", errors="replace")
.split(" ")
)
logging.debug(f"{image_unit} environment: {repr(environment_list)}")
for envvar in environment_list:
search_str = "IMAGE_TAG="
if envvar.startswith(search_str):
image_tags.add(envvar[len(search_str) :])
started_processes = []
with click.progressbar(
@ -110,28 +125,47 @@ def main(verbose):
process = podman.untag(
image_tag,
_bg=True,
_err_to_out=True,
_done=lambda cmd, success, exit_code: bar.update(1),
)
started_processes.append(process)
time.sleep(0.02)
# join processes
for p in started_processes:
try:
p.wait()
except sh.ErrorReturnCode:
# ignore missing tags
if "image not known".encode() not in p.stderr:
except sh.ErrorReturnCode as error:
# ignore missing image tags
if "image not known".encode() in p.stdout:
pass
else:
raise
started_processes = []
with click.progressbar(
length=len(image_units), label="Building images..", show_pos=True
) as bar:
for image_service in image_units:
process = systemctl.restart(
image_service,
_bg=True,
_done=lambda cmd, success, exit_code: bar.update(1),
)
semaphore = multiprocessing.Semaphore(8)
for image_unit in image_units:
try:
systemctl("reset-failed", image_unit, _bg=False, _err_to_out=True)
except sh.ErrorReturnCode as error:
if f"Unit {image_unit} not loaded".encode() in error.stdout:
logging.info(
f"Not resetting failed state for {image_unit}, unit not loaded"
)
else:
raise
semaphore.acquire()
def restart_done(cmd, success, exit_code):
bar.update(1)
if not success:
logging.warning(f"{cmd.cmd}{tuple(cmd.call_args)} completed with exit code {exit_code}")
semaphore.release()
process = systemctl.restart(image_unit, _bg=True, _done=restart_done)
started_processes.append(process)
# join processes
[p.wait() for p in started_processes]