Compare commits
2 commits
d0d23416d6
...
828478916d
Author | SHA1 | Date | |
---|---|---|---|
Ben | 828478916d | ||
Ben | 3cd0e7adfb |
|
@ -3,7 +3,7 @@
|
||||||
<component name="Black">
|
<component name="Black">
|
||||||
<option name="sdkName" value="Python 3.11" />
|
<option name="sdkName" value="Python 3.11" />
|
||||||
</component>
|
</component>
|
||||||
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.11" project-jdk-type="Python SDK" />
|
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12" project-jdk-type="Python SDK" />
|
||||||
<component name="PythonCompatibilityInspectionAdvertiser">
|
<component name="PythonCompatibilityInspectionAdvertiser">
|
||||||
<option name="version" value="3" />
|
<option name="version" value="3" />
|
||||||
</component>
|
</component>
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
<module type="PYTHON_MODULE" version="4">
|
<module type="PYTHON_MODULE" version="4">
|
||||||
<component name="NewModuleRootManager">
|
<component name="NewModuleRootManager">
|
||||||
<content url="file://$MODULE_DIR$" />
|
<content url="file://$MODULE_DIR$" />
|
||||||
<orderEntry type="jdk" jdkName="Python 3.11" jdkType="Python SDK" />
|
<orderEntry type="jdk" jdkName="Python 3.12" jdkType="Python SDK" />
|
||||||
<orderEntry type="sourceFolder" forTests="false" />
|
<orderEntry type="sourceFolder" forTests="false" />
|
||||||
</component>
|
</component>
|
||||||
</module>
|
</module>
|
|
@ -8,11 +8,17 @@ from typing import Set, Callable
|
||||||
|
|
||||||
import click
|
import click
|
||||||
import sh
|
import sh
|
||||||
from sh import podman
|
|
||||||
from sh import systemctl
|
|
||||||
|
|
||||||
SERVICES_BASE_PATH = "/infra/services/"
|
SERVICES_BASE_PATH = "/infra/services/"
|
||||||
|
|
||||||
|
progressbar = click.progressbar
|
||||||
|
|
||||||
|
|
||||||
|
def hidden_progressbar(*args, **kwargs):
|
||||||
|
bar = click.progressbar(*args, **kwargs)
|
||||||
|
bar.is_hidden = True
|
||||||
|
return bar
|
||||||
|
|
||||||
|
|
||||||
def resolve_image_units():
|
def resolve_image_units():
|
||||||
services_path = pathlib.Path(SERVICES_BASE_PATH)
|
services_path = pathlib.Path(SERVICES_BASE_PATH)
|
||||||
|
@ -20,33 +26,29 @@ def resolve_image_units():
|
||||||
|
|
||||||
logging.info(f"Found {len(services_set)} services: {str(services_set)}")
|
logging.info(f"Found {len(services_set)} services: {str(services_set)}")
|
||||||
|
|
||||||
systemctl("daemon-reload")
|
sh.systemctl("daemon-reload")
|
||||||
|
|
||||||
def remove_masked_unit(
|
def remove_masked_unit(
|
||||||
_item_set: Set[str],
|
_item_set: Set[str],
|
||||||
item: str,
|
item: str,
|
||||||
item_to_unit: Callable[[str], str] = lambda i: i,
|
item_to_unit: Callable[[str], str] = lambda i: i,
|
||||||
):
|
):
|
||||||
load_state = systemctl.show(
|
load_state_output = sh.systemctl.show(
|
||||||
"--property=LoadState", "--value", item_to_unit(item)
|
"--property=LoadState", "--value", item_to_unit(item)
|
||||||
)
|
)
|
||||||
load_state = load_state.stdout.strip().decode(
|
load_state = load_state_output.strip()
|
||||||
encoding="utf-8", errors="replace"
|
|
||||||
)
|
|
||||||
logging.debug(f"{item} load state: {repr(load_state)}")
|
logging.debug(f"{item} load state: {repr(load_state)}")
|
||||||
if load_state == "masked":
|
if load_state == "masked":
|
||||||
logging.info(f"Removed masked entry: {item}")
|
logging.info(f"Removed masked entry: {item}")
|
||||||
_item_set.remove(item)
|
_item_set.remove(item)
|
||||||
|
|
||||||
with click.progressbar(list(services_set), label="Checking service units..", show_pos=True) as bar:
|
with progressbar(list(services_set), label="Checking service units..", show_pos=True) as bar:
|
||||||
for service in bar:
|
for service in bar:
|
||||||
remove_masked_unit(services_set, service, lambda srv: f"pod@{srv}.service")
|
remove_masked_unit(services_set, service, lambda srv: f"pod@{srv}.service")
|
||||||
|
|
||||||
def add_wants_to_image_units(_image_units: Set[str], unit: str):
|
def add_wants_to_image_units(_image_units: Set[str], unit: str):
|
||||||
wants = systemctl.show("--property=Wants", "--value", unit)
|
wants_output = sh.systemctl.show("--property=Wants", "--value", unit)
|
||||||
wants_list = (
|
wants_list = wants_output.strip().split(" ")
|
||||||
wants.stdout.strip().decode(encoding="utf-8", errors="replace").split(" ")
|
|
||||||
)
|
|
||||||
logging.debug(f"{unit} wants: {repr(wants_list)}")
|
logging.debug(f"{unit} wants: {repr(wants_list)}")
|
||||||
for next_unit in wants_list:
|
for next_unit in wants_list:
|
||||||
if next_unit.startswith("image@") and next_unit.endswith(".service"):
|
if next_unit.startswith("image@") and next_unit.endswith(".service"):
|
||||||
|
@ -55,7 +57,7 @@ def resolve_image_units():
|
||||||
|
|
||||||
image_units: Set[str] = set()
|
image_units: Set[str] = set()
|
||||||
|
|
||||||
with click.progressbar(
|
with progressbar(
|
||||||
length=len(services_set) * 2, label="Collecting container image services.."
|
length=len(services_set) * 2, label="Collecting container image services.."
|
||||||
) as bar:
|
) as bar:
|
||||||
for service in services_set:
|
for service in services_set:
|
||||||
|
@ -77,7 +79,7 @@ def resolve_image_units():
|
||||||
new_image_units
|
new_image_units
|
||||||
) # add new image units to all image units
|
) # add new image units to all image units
|
||||||
|
|
||||||
with click.progressbar(
|
with progressbar(
|
||||||
list(image_units), label="Checking container image units..", show_pos=True
|
list(image_units), label="Checking container image units..", show_pos=True
|
||||||
) as bar:
|
) as bar:
|
||||||
for image_unit in bar:
|
for image_unit in bar:
|
||||||
|
@ -88,39 +90,47 @@ def resolve_image_units():
|
||||||
|
|
||||||
|
|
||||||
@click.command()
|
@click.command()
|
||||||
@click.option("--verbose", is_flag=True, default=False, help="Enable INFO logging")
|
@click.option("-v", "--verbose", count=True, help="Enable INFO logging, use twice for DEBUG")
|
||||||
def main(verbose):
|
def main(verbose: int):
|
||||||
if verbose:
|
if verbose > 0:
|
||||||
logging.root.setLevel(logging.INFO)
|
if verbose > 2:
|
||||||
|
logging.root.setLevel(logging.DEBUG)
|
||||||
|
elif verbose == 2:
|
||||||
|
logging.root.setLevel(logging.DEBUG)
|
||||||
|
shlogger = logging.getLogger("sh")
|
||||||
|
shlogger.setLevel(logging.INFO)
|
||||||
|
else:
|
||||||
|
logging.root.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
global progressbar
|
||||||
|
progressbar = hidden_progressbar
|
||||||
|
|
||||||
image_units = resolve_image_units()
|
image_units = resolve_image_units()
|
||||||
image_tags: Set[str] = set()
|
image_tags: Set[str] = set()
|
||||||
|
|
||||||
with click.progressbar(image_units, label="Collecting container image tags..") as bar:
|
with progressbar(image_units, label="Collecting container image tags..") as bar:
|
||||||
for image_unit in bar:
|
for image_unit in bar:
|
||||||
environment = systemctl.show(
|
environment_output = sh.systemctl.show(
|
||||||
"--property=Environment",
|
"--property=Environment",
|
||||||
"--value",
|
"--value",
|
||||||
image_unit,
|
image_unit,
|
||||||
)
|
)
|
||||||
environment_list = (
|
environment_list = environment_output.strip().split(" ")
|
||||||
environment.stdout.strip()
|
|
||||||
.decode(encoding="utf-8", errors="replace")
|
|
||||||
.split(" ")
|
|
||||||
)
|
|
||||||
logging.debug(f"{image_unit} environment: {repr(environment_list)}")
|
logging.debug(f"{image_unit} environment: {repr(environment_list)}")
|
||||||
for envvar in environment_list:
|
for envvar in environment_list:
|
||||||
search_str = "IMAGE_TAG="
|
search_str = "IMAGE_TAG="
|
||||||
if envvar.startswith(search_str):
|
if envvar.startswith(search_str):
|
||||||
image_tags.add(envvar[len(search_str) :])
|
keylen = len(search_str)
|
||||||
|
image_tags.add(envvar[keylen:])
|
||||||
|
|
||||||
started_processes = []
|
started_processes = []
|
||||||
with click.progressbar(
|
with progressbar(
|
||||||
length=len(image_tags), label="Untagging container images..", show_pos=True
|
length=len(image_tags), label="Untagging container images..", show_pos=True
|
||||||
) as bar:
|
) as bar:
|
||||||
for image_tag in image_tags:
|
for image_tag in image_tags:
|
||||||
process = podman.untag(
|
process = sh.podman.untag(
|
||||||
image_tag,
|
image_tag,
|
||||||
|
_return_cmd=True,
|
||||||
_bg=True,
|
_bg=True,
|
||||||
_err_to_out=True,
|
_err_to_out=True,
|
||||||
_done=lambda cmd, success, exit_code: bar.update(1),
|
_done=lambda cmd, success, exit_code: bar.update(1),
|
||||||
|
@ -131,7 +141,7 @@ def main(verbose):
|
||||||
for p in started_processes:
|
for p in started_processes:
|
||||||
try:
|
try:
|
||||||
p.wait()
|
p.wait()
|
||||||
except sh.ErrorReturnCode as error:
|
except sh.ErrorReturnCode:
|
||||||
# ignore missing image tags
|
# ignore missing image tags
|
||||||
if "image not known".encode() in p.stdout:
|
if "image not known".encode() in p.stdout:
|
||||||
pass
|
pass
|
||||||
|
@ -139,18 +149,16 @@ def main(verbose):
|
||||||
raise
|
raise
|
||||||
|
|
||||||
started_processes = []
|
started_processes = []
|
||||||
with click.progressbar(
|
with progressbar(
|
||||||
length=len(image_units), label="Building images..", show_pos=True
|
length=len(image_units), label="Building images..", show_pos=True
|
||||||
) as bar:
|
) as bar:
|
||||||
semaphore = multiprocessing.Semaphore(8)
|
semaphore = multiprocessing.Semaphore(8)
|
||||||
for image_unit in image_units:
|
for image_unit in image_units:
|
||||||
try:
|
try:
|
||||||
systemctl("reset-failed", image_unit, _bg=False, _err_to_out=True)
|
sh.systemctl("reset-failed", image_unit, _bg=False, _err_to_out=True)
|
||||||
except sh.ErrorReturnCode as error:
|
except sh.ErrorReturnCode as error:
|
||||||
if f"Unit {image_unit} not loaded".encode() in error.stdout:
|
if f"Unit {image_unit} not loaded".encode() in error.stdout:
|
||||||
logging.info(
|
logging.info(f"Not resetting failed state for {image_unit}, unit not loaded")
|
||||||
f"Not resetting failed state for {image_unit}, unit not loaded"
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -162,7 +170,9 @@ def main(verbose):
|
||||||
logging.warning(f"{cmd.cmd}{tuple(cmd.call_args)} completed with exit code {exit_code}")
|
logging.warning(f"{cmd.cmd}{tuple(cmd.call_args)} completed with exit code {exit_code}")
|
||||||
semaphore.release()
|
semaphore.release()
|
||||||
|
|
||||||
process = systemctl.restart(image_unit, _bg=True, _done=restart_done)
|
process = sh.systemctl.restart(
|
||||||
|
image_unit, _return_cmd=True, _bg=True, _done=restart_done
|
||||||
|
)
|
||||||
started_processes.append(process)
|
started_processes.append(process)
|
||||||
# join processes
|
# join processes
|
||||||
[p.wait() for p in started_processes]
|
[p.wait() for p in started_processes]
|
||||||
|
|
24
main.py
24
main.py
|
@ -1,11 +1,12 @@
|
||||||
|
import datetime
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
|
import random
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
import traceback
|
import traceback
|
||||||
from datetime import datetime, timedelta
|
|
||||||
from queue import SimpleQueue
|
from queue import SimpleQueue
|
||||||
from signal import signal, SIGHUP, SIGINT, SIGTERM, setitimer, SIGALRM, ITIMER_REAL, SIGUSR1, SIGUSR2, strsignal
|
from signal import signal, SIGHUP, SIGINT, SIGTERM, setitimer, SIGALRM, ITIMER_REAL, SIGUSR1, SIGUSR2, strsignal
|
||||||
|
|
||||||
|
@ -13,6 +14,8 @@ import click
|
||||||
import sh
|
import sh
|
||||||
|
|
||||||
SERVICES_BASE_PATH = "/infra/services/"
|
SERVICES_BASE_PATH = "/infra/services/"
|
||||||
|
POD_CHECK_TIME = 120.0
|
||||||
|
RAND_OFFSET_MAX_SECONDS = 10.0
|
||||||
|
|
||||||
shlog = sh.bake(_out=sys.stdout, _err=sys.stderr)
|
shlog = sh.bake(_out=sys.stdout, _err=sys.stderr)
|
||||||
sdnotify = sh.Command("systemd-notify")
|
sdnotify = sh.Command("systemd-notify")
|
||||||
|
@ -41,7 +44,7 @@ class PodKeeper:
|
||||||
self.reloading = threading.Event()
|
self.reloading = threading.Event()
|
||||||
self.checking = threading.Event()
|
self.checking = threading.Event()
|
||||||
self.waiter = threading.Event()
|
self.waiter = threading.Event()
|
||||||
self.last_check = datetime.utcnow()
|
self.last_check = datetime.datetime.now(datetime.UTC)
|
||||||
self.passing_signal = threading.Event()
|
self.passing_signal = threading.Event()
|
||||||
self.pass_signal_nums = SimpleQueue()
|
self.pass_signal_nums = SimpleQueue()
|
||||||
|
|
||||||
|
@ -115,14 +118,20 @@ class PodKeeper:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|
||||||
def check_pod(self):
|
def check_pod(self):
|
||||||
new_timestamp = datetime.utcnow()
|
new_timestamp = datetime.datetime.now(datetime.UTC)
|
||||||
inspect_command = sh.podman.pod.inspect(self.podname, _return_cmd=True)
|
inspect_command = sh.podman.pod.inspect(self.podname, _return_cmd=True)
|
||||||
pod_description = json.loads(inspect_command.stdout)
|
multiple_descriptions = json.loads(inspect_command.stdout)
|
||||||
|
if not multiple_descriptions:
|
||||||
|
print(f"No pod descriptions found for {self.podname}", file=sys.stderr, flush=True)
|
||||||
|
self.stopping.set()
|
||||||
|
return
|
||||||
|
assert len(multiple_descriptions) == 1, f"Single pod description expected for {self.podname}"
|
||||||
|
pod_description = multiple_descriptions[0]
|
||||||
for container in pod_description["Containers"]:
|
for container in pod_description["Containers"]:
|
||||||
if container["State"] != "running":
|
if container["State"] != "running":
|
||||||
print(f"Container {container['Name']} exited", file=sys.stderr, flush=True)
|
print(f"Container {container['Name']} exited", file=sys.stderr, flush=True)
|
||||||
logs_since = self.last_check - timedelta(seconds=10)
|
logs_since = self.last_check - datetime.timedelta(seconds=POD_CHECK_TIME)
|
||||||
print(f"Log since last check (-10s):\n", file=sys.stderr, flush=True)
|
print(f"Log since last check (-{POD_CHECK_TIME}s):\n", file=sys.stderr, flush=True)
|
||||||
shlog.podman.logs('--since', logs_since.isoformat(), container['Name'], _out=sys.stderr)
|
shlog.podman.logs('--since', logs_since.isoformat(), container['Name'], _out=sys.stderr)
|
||||||
self.stopping.set()
|
self.stopping.set()
|
||||||
self.last_check = new_timestamp
|
self.last_check = new_timestamp
|
||||||
|
@ -178,7 +187,8 @@ def main(network, log_driver, log_level, replace, remove, verbose, identifier):
|
||||||
signal(SIGALRM, keeper.check)
|
signal(SIGALRM, keeper.check)
|
||||||
signal(SIGUSR1, keeper.passthrough)
|
signal(SIGUSR1, keeper.passthrough)
|
||||||
signal(SIGUSR2, keeper.passthrough)
|
signal(SIGUSR2, keeper.passthrough)
|
||||||
setitimer(ITIMER_REAL, 4.0, 120.0)
|
random_offset = random.random() * RAND_OFFSET_MAX_SECONDS
|
||||||
|
setitimer(ITIMER_REAL, 1.0 + random_offset, POD_CHECK_TIME + random_offset)
|
||||||
|
|
||||||
keeper.run()
|
keeper.run()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue