diff --git a/internal/runtime/kubernetes/procedure_attempts.go b/internal/runtime/kubernetes/procedure_attempts.go index 11b1abb..7c92903 100644 --- a/internal/runtime/kubernetes/procedure_attempts.go +++ b/internal/runtime/kubernetes/procedure_attempts.go @@ -80,7 +80,7 @@ func (b *Backend) resumeRestartProcedureIndex(ctx context.Context, root string, return resumeIndex, nil } -func (b *Backend) createOrReuseProcedureJob(ctx context.Context, namespace string, root string, commandName string, procedureName string, baseName string, procedure *domain.Procedure, env map[string]string) (*batchv1.Job, error) { +func (b *Backend) createOrReuseProcedureJob(ctx context.Context, namespace string, root string, commandName string, procedureName string, baseName string, procedure *domain.Procedure, globalPorts []domain.Port, env map[string]string) (*batchv1.Job, error) { _, pvc, err := parseRef(root) if err != nil { return nil, err @@ -134,7 +134,7 @@ func (b *Backend) createOrReuseProcedureJob(ctx context.Context, namespace strin return active, nil } name := procedureAttemptName(baseName, nextAttempt) - job, err := procedureJobSpec(namespace, root, commandName, procedureName, name, nextAttempt, procedure, env, b.config.RegistrySecret) + job, err := procedureJobSpec(namespace, root, commandName, procedureName, name, nextAttempt, procedure, globalPorts, env, b.config.RegistrySecret) if err != nil { return nil, err } diff --git a/internal/runtime/kubernetes/procedures.go b/internal/runtime/kubernetes/procedures.go index 6b02c2d..b51d7f8 100644 --- a/internal/runtime/kubernetes/procedures.go +++ b/internal/runtime/kubernetes/procedures.go @@ -179,7 +179,7 @@ func (b *Backend) runJobProcedure(scrollID string, commandName string, procedure zap.Int("expected_ports", len(procedure.ExpectedPorts)), zap.Int("mounts", len(procedure.Mounts)), ) - createdJob, err := b.createOrReuseProcedureJob(ctx, namespace, root, commandName, procedureName, resourceName, procedure, env) + createdJob, err := b.createOrReuseProcedureJob(ctx, namespace, root, commandName, procedureName, resourceName, procedure, globalPorts, env) if err != nil { logger.Log().Error("Failed to create Kubernetes job procedure", zap.String("scroll_id", scrollID), zap.String("command", commandName), zap.String("procedure", procedureName), zap.String("namespace", namespace), zap.String("base_job", resourceName), zap.Error(err)) return nil, err @@ -237,7 +237,7 @@ func (b *Backend) ensurePersistentProcedure(ctx context.Context, scrollID string logger.Log().Error("Kubernetes persistent procedure root ref invalid", zap.String("scroll_id", scrollID), zap.String("command", commandName), zap.String("procedure", procedureName), zap.String("root", root), zap.Error(err)) return err } - statefulSet, err := procedureStatefulSetSpec(namespace, root, commandName, procedureName, resourceName, procedure, env, b.config.RegistrySecret) + statefulSet, err := procedureStatefulSetSpec(namespace, root, commandName, procedureName, resourceName, procedure, globalPorts, env, b.config.RegistrySecret) if err != nil { logger.Log().Error("Failed to build Kubernetes persistent procedure StatefulSet", zap.String("scroll_id", scrollID), zap.String("command", commandName), zap.String("procedure", procedureName), zap.String("namespace", namespace), zap.Error(err)) return err diff --git a/internal/runtime/kubernetes/resources.go b/internal/runtime/kubernetes/resources.go index 631f9cf..2895fc9 100644 --- a/internal/runtime/kubernetes/resources.go +++ b/internal/runtime/kubernetes/resources.go @@ -148,7 +148,7 @@ func helperJobSpec(namespace string, jobName string, pvc string, image string, c } } -func procedureJobSpec(namespace string, root string, commandName string, procedureName string, resourceName string, attempt int, procedure *domain.Procedure, env map[string]string, registrySecret string) (*batchv1.Job, error) { +func procedureJobSpec(namespace string, root string, commandName string, procedureName string, resourceName string, attempt int, procedure *domain.Procedure, globalPorts []domain.Port, env map[string]string, registrySecret string) (*batchv1.Job, error) { _, pvc, err := parseRef(root) if err != nil { return nil, err @@ -173,6 +173,7 @@ func procedureJobSpec(namespace string, root string, commandName string, procedu Env: envVars(env), VolumeMounts: volumeMounts(procedure.Mounts), } + applyProcedureReadiness(&container, procedure, globalPorts) podSpec := corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyNever, Containers: []corev1.Container{container}, @@ -197,7 +198,7 @@ func procedureJobSpec(namespace string, root string, commandName string, procedu }, nil } -func procedureStatefulSetSpec(namespace string, root string, commandName string, procedureName string, resourceName string, procedure *domain.Procedure, env map[string]string, registrySecret string) (*appsv1.StatefulSet, error) { +func procedureStatefulSetSpec(namespace string, root string, commandName string, procedureName string, resourceName string, procedure *domain.Procedure, globalPorts []domain.Port, env map[string]string, registrySecret string) (*appsv1.StatefulSet, error) { _, pvc, err := parseRef(root) if err != nil { return nil, err @@ -221,6 +222,7 @@ func procedureStatefulSetSpec(namespace string, root string, commandName string, Env: envVars(env), VolumeMounts: volumeMounts(procedure.Mounts), } + applyProcedureReadiness(&container, procedure, globalPorts) podSpec := corev1.PodSpec{ Containers: []corev1.Container{container}, Volumes: []corev1.Volume{pvcVolume("data", pvc)}, @@ -246,6 +248,38 @@ func procedureStatefulSetSpec(namespace string, root string, commandName string, }, nil } +func applyProcedureReadiness(container *corev1.Container, procedure *domain.Procedure, globalPorts []domain.Port) { + port, ok := firstTCPExpectedPort(procedure, globalPorts) + if !ok { + return + } + container.ReadinessProbe = &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{Port: intstr.FromInt(port.Port)}, + }, + PeriodSeconds: 2, + FailureThreshold: 3, + } +} + +func firstTCPExpectedPort(procedure *domain.Procedure, globalPorts []domain.Port) (domain.Port, bool) { + if procedure == nil { + return domain.Port{}, false + } + ports := portsByName(globalPorts) + for _, expected := range procedure.ExpectedPorts { + port, ok := ports[expected.Name] + if !ok { + continue + } + if normalizeProtocol(port.Protocol) == "udp" { + continue + } + return port, true + } + return domain.Port{}, false +} + func devStatefulSetSpec(namespace string, root string, pvc string, image string, action ports.RuntimeDevAction, registrySecret string) *appsv1.StatefulSet { labels := baseLabels(pvc) labels[labelProcedure] = "dev" diff --git a/internal/runtime/kubernetes/resources_test.go b/internal/runtime/kubernetes/resources_test.go index ba0d5f8..2f10714 100644 --- a/internal/runtime/kubernetes/resources_test.go +++ b/internal/runtime/kubernetes/resources_test.go @@ -95,7 +95,7 @@ func TestProcedureJobSpecBuildsDeterministicMountsAndLabels(t *testing.T) { Mounts: []domain.Mount{{Path: "/work", SubPath: "cache"}}, } - job, err := procedureJobSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", 1, procedure, procedure.Env, "registry-secret") + job, err := procedureJobSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", 1, procedure, nil, procedure.Env, "registry-secret") if err != nil { t.Fatal(err) } @@ -129,7 +129,7 @@ func TestProcedureJobSpecUsesProvidedRuntimeEnv(t *testing.T) { "PROCEDURE_ONLY": "ignored", }, } - job, err := procedureJobSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", 1, procedure, map[string]string{ + job, err := procedureJobSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", 1, procedure, nil, map[string]string{ "DRUID_PORT_HTTP": "8080", }, "registry-secret") if err != nil { @@ -141,6 +141,51 @@ func TestProcedureJobSpecUsesProvidedRuntimeEnv(t *testing.T) { } } +func TestProcedureJobSpecAddsTCPReadinessProbe(t *testing.T) { + procedure := &domain.Procedure{ + Image: "itzg/minecraft-server", + ExpectedPorts: []domain.ExpectedPort{{Name: "main"}}, + } + job, err := procedureJobSpec("druid", ref("druid", "druid-minecraft-data"), "start", "start", "minecraft-start-0", 1, procedure, []domain.Port{{Name: "main", Protocol: "tcp", Port: 25565}}, nil, "registry-secret") + if err != nil { + t.Fatal(err) + } + if probe := job.Spec.Template.Spec.Containers[0].ReadinessProbe; probe == nil || probe.TCPSocket == nil || probe.TCPSocket.Port.IntVal != 25565 { + t.Fatalf("readiness probe = %#v, want tcp 25565", probe) + } +} + +func TestProcedureReadinessProbeSkipsUDPOnlyPorts(t *testing.T) { + procedure := &domain.Procedure{ + Image: "steam", + ExpectedPorts: []domain.ExpectedPort{{Name: "query"}}, + } + statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-game-data"), "start", "start", "game-start-0", procedure, []domain.Port{{Name: "query", Protocol: "udp", Port: 27015}}, nil, "registry-secret") + if err != nil { + t.Fatal(err) + } + if probe := statefulSet.Spec.Template.Spec.Containers[0].ReadinessProbe; probe != nil { + t.Fatalf("readiness probe = %#v, want nil for udp-only ports", probe) + } +} + +func TestProcedureReadinessProbeUsesFirstTCPExpectedPort(t *testing.T) { + procedure := &domain.Procedure{ + Image: "steam", + ExpectedPorts: []domain.ExpectedPort{{Name: "query"}, {Name: "rcon"}}, + } + statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-game-data"), "start", "start", "game-start-0", procedure, []domain.Port{ + {Name: "query", Protocol: "udp", Port: 27015}, + {Name: "rcon", Protocol: "tcp", Port: 27020}, + }, nil, "registry-secret") + if err != nil { + t.Fatal(err) + } + if probe := statefulSet.Spec.Template.Spec.Containers[0].ReadinessProbe; probe == nil || probe.TCPSocket == nil || probe.TCPSocket.Port.IntVal != 27020 { + t.Fatalf("readiness probe = %#v, want tcp 27020", probe) + } +} + func TestProcedureStatefulSetSpecUsesProvidedRuntimeEnv(t *testing.T) { procedure := &domain.Procedure{ Image: "nginx:1.27", @@ -148,7 +193,7 @@ func TestProcedureStatefulSetSpecUsesProvidedRuntimeEnv(t *testing.T) { "PROCEDURE_ONLY": "ignored", }, } - statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", procedure, map[string]string{ + statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", procedure, nil, map[string]string{ "DRUID_PORT_HTTP": "8080", }, "registry-secret") if err != nil { @@ -168,7 +213,7 @@ func TestProcedureStatefulSetSpecBuildsPersistentWorkload(t *testing.T) { Mounts: []domain.Mount{{Path: "/usr/share/nginx/html", SubPath: "site", ReadOnly: true}}, } - statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", procedure, procedure.Env, "registry-secret") + statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", procedure, []domain.Port{{Name: "http", Protocol: "tcp", Port: 8080}}, procedure.Env, "registry-secret") if err != nil { t.Fatal(err) } @@ -189,6 +234,9 @@ func TestProcedureStatefulSetSpecBuildsPersistentWorkload(t *testing.T) { if len(pod.ImagePullSecrets) != 1 || pod.ImagePullSecrets[0].Name != "registry-secret" { t.Fatalf("image pull secrets = %#v", pod.ImagePullSecrets) } + if probe := pod.Containers[0].ReadinessProbe; probe == nil || probe.TCPSocket == nil || probe.TCPSocket.Port.IntVal != 8080 { + t.Fatalf("readiness probe = %#v, want tcp 8080", probe) + } container := pod.Containers[0] if container.Image != "nginx:1.27" { t.Fatalf("image = %s", container.Image) @@ -338,7 +386,7 @@ func TestCreateOrReuseProcedureJobRetainsFailedBaseAndCreatesRetry(t *testing.T) }) backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) - created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "start.1", base, &domain.Procedure{Image: "alpine"}, nil) + created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "start.1", base, &domain.Procedure{Image: "alpine"}, nil, nil) if err != nil { t.Fatal(err) } @@ -359,7 +407,7 @@ func TestCreateOrReuseProcedureJobUsesNextRetryAttempt(t *testing.T) { ) backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) - created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "start.1", base, &domain.Procedure{Image: "alpine"}, nil) + created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "start.1", base, &domain.Procedure{Image: "alpine"}, nil, nil) if err != nil { t.Fatal(err) } @@ -382,7 +430,7 @@ func TestCreateOrReuseProcedureJobReusesActiveAttempt(t *testing.T) { client := fake.NewSimpleClientset(active) backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) - created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "coldstart", base, &domain.Procedure{Image: "alpine"}, nil) + created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "coldstart", base, &domain.Procedure{Image: "alpine"}, nil, nil) if err != nil { t.Fatal(err) } @@ -478,7 +526,7 @@ func TestCreateOrReuseProcedureJobDeletesSucceededAttempt(t *testing.T) { }) backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) - created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "install", "install", base, &domain.Procedure{Image: "alpine"}, nil) + created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "install", "install", base, &domain.Procedure{Image: "alpine"}, nil, nil) if err != nil { t.Fatal(err) }