podlaunch/main.py

189 lines
7.3 KiB
Python

import json
import logging
import os
import pathlib
import sys
import threading
import traceback
from datetime import datetime, timedelta
from queue import SimpleQueue
from signal import signal, SIGHUP, SIGINT, SIGTERM, setitimer, SIGALRM, ITIMER_REAL, SIGUSR1, SIGUSR2, strsignal
import click
import sh
SERVICES_BASE_PATH = "/infra/services/"
shlog = sh.bake(_out=sys.stdout, _err=sys.stderr)
sdnotify = sh.Command("systemd-notify")
class PodKeeper:
def __init__(self, network, log_driver, log_level, replace, remove, identifier):
self.podnet_args = ()
self.podnet_args += ("--network", network) if network else ()
self.podnet_args += ("--log-driver", log_driver) if log_driver else ()
self.podnet_args += ("--log-level", log_level) if log_level else ()
self.replace = replace
self.remove = remove
identifier_path = pathlib.PurePath(identifier)
if len(identifier_path.parts) != 1:
raise ValueError(f"identifier has path parts: {identifier_path}")
self.podhome = pathlib.Path(SERVICES_BASE_PATH) / identifier_path
if not self.podhome.exists():
raise NotADirectoryError(f"pod home does not exist: {self.podhome}")
self.podname = f"{identifier}_pod"
self.podyaml = f"pod-{identifier}.yaml"
podyaml_complete = (self.podhome / self.podyaml)
if not podyaml_complete.exists():
raise FileNotFoundError(f"pod definition does not exist: {podyaml_complete}")
self.stopping = threading.Event()
self.reloading = threading.Event()
self.checking = threading.Event()
self.waiter = threading.Event()
self.last_check = datetime.utcnow()
self.passing_signal = threading.Event()
self.pass_signal_nums = SimpleQueue()
def destroy(self, signum, stackframe):
print("Destroy signal", signum, file=sys.stderr, flush=True)
self.stopping.set()
self.waiter.set()
def reload(self, signum, stackframe):
print("Reload signal", signum, file=sys.stderr, flush=True)
self.reloading.set()
self.waiter.set()
def check(self, signum, stackframe):
self.checking.set()
self.waiter.set()
def passthrough(self, signum, stackframe):
self.pass_signal_nums.put(item=signum, block=True, timeout=3)
self.passing_signal.set()
self.waiter.set()
def run(self):
os.chdir(self.podhome)
if self.replace and sh.podman.pod.exists(self.podname, _ok_code=[0, 1], _return_cmd=True).exit_code == 0:
print(f"Replacing existing pod {self.podname}", file=sys.stderr, flush=True)
shlog.podman.pod.stop(self.podname)
shlog.podman.pod.rm("-f", self.podname)
print(f"Starting pod {self.podname} at {self.last_check}", file=sys.stderr, flush=True)
shlog.podman.play.kube(self.podyaml, *self.podnet_args)
try:
shlogger = logging.getLogger("sh.command")
oldlevel = shlogger.level
shlogger.setLevel(logging.ERROR)
if 'NOTIFY_SOCKET' in os.environ:
sdnotify("--ready", f"--pid={os.getpid()}", "--status=Monitoring pod...")
while not self.stopping.is_set():
self.waiter.wait()
self.waiter.clear()
if self.passing_signal.is_set():
self.passing_signal.clear()
while not self.pass_signal_nums.empty():
signum = self.pass_signal_nums.get(block=True, timeout=2)
self.signal_pod(signum)
if self.checking.is_set():
self.checking.clear()
self.check_pod()
if self.reloading.is_set():
self.reloading.clear()
self.signal_pod(SIGHUP)
if 'NOTIFY_SOCKET' in os.environ:
sdnotify("--status=Stopping pod")
logging.getLogger("sh.command").setLevel(oldlevel)
finally:
self.stop_pod()
def signal_pod(self, signum):
print(f"Sending signal '{strsignal(signum)}' to pod {self.podname}", file=sys.stderr, flush=True)
try:
shlog.podman.pod.kill("--signal", str(signum), self.podname)
except sh.ErrorReturnCode:
print("Error signaling pod", file=sys.stderr, flush=True)
traceback.print_exc()
def check_pod(self):
new_timestamp = datetime.utcnow()
inspect_command = sh.podman.pod.inspect(self.podname, _return_cmd=True)
pod_description = json.loads(inspect_command.stdout)
for container in pod_description["Containers"]:
if container["State"] != "running":
print(f"Container {container['Name']} exited", file=sys.stderr, flush=True)
logs_since = self.last_check - timedelta(seconds=10)
print(f"Log since last check (-10s):\n", file=sys.stderr, flush=True)
shlog.podman.logs('--since', logs_since.isoformat(), container['Name'], _out=sys.stderr)
self.stopping.set()
self.last_check = new_timestamp
def stop_pod(self):
print("Stopping pod", self.podname, file=sys.stderr, flush=True)
try:
shlog.podman.pod.stop("-t", "19", self.podname)
successful_stopped = True
except sh.ErrorReturnCode:
print(f"First stop of {self.podname} was not successful!", file=sys.stderr, flush=True)
successful_stopped = False
try:
shlog.podman.pod.stop("-t", "5", self.podname)
except sh.ErrorReturnCode:
if not successful_stopped:
print(f"Second stop of {self.podname} was not successful!", file=sys.stderr, flush=True)
if self.remove:
try:
shlog.podman.pod.rm(self.podname)
except sh.ErrorReturnCode:
print(f"Removal of {self.podname} was not successful!", file=sys.stderr, flush=True)
@click.command()
@click.option("--network", default="brodge", help="Network for the created pod")
@click.option("--log-driver", default="journald", help="Logging driver for the created pod")
@click.option("--log-level", default="", help="Controls log-level on podman call")
@click.option("--replace/--no-replace", default=True, help="Controls replacement of previously running pod with the "
"same name")
@click.option("--remove/--keep", default=True, help="Controls removal of pod after stopping")
@click.option("--verbose", is_flag=True, default=False, help="Enable DEBUG logging")
@click.argument("identifier")
def main(network, log_driver, log_level, replace, remove, verbose, identifier):
if verbose:
logging.root.setLevel(logging.DEBUG)
else:
logging.root.setLevel(logging.INFO)
keeper = PodKeeper(
network=network,
log_driver=log_driver,
log_level=log_level,
replace=replace,
remove=remove,
identifier=identifier
)
signal(SIGINT, keeper.destroy)
signal(SIGTERM, keeper.destroy)
signal(SIGHUP, keeper.reload)
signal(SIGALRM, keeper.check)
signal(SIGUSR1, keeper.passthrough)
signal(SIGUSR2, keeper.passthrough)
setitimer(ITIMER_REAL, 4.0, 120.0)
keeper.run()
if __name__ == '__main__':
logging.basicConfig()
main()