Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/runtime/kubernetes/procedure_attempts.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (b *Backend) resumeRestartProcedureIndex(ctx context.Context, root string,
return resumeIndex, nil
}

func (b *Backend) createOrReuseProcedureJob(ctx context.Context, namespace string, root string, commandName string, procedureName string, baseName string, procedure *domain.Procedure, env map[string]string) (*batchv1.Job, error) {
func (b *Backend) createOrReuseProcedureJob(ctx context.Context, namespace string, root string, commandName string, procedureName string, baseName string, procedure *domain.Procedure, globalPorts []domain.Port, env map[string]string) (*batchv1.Job, error) {
_, pvc, err := parseRef(root)
if err != nil {
return nil, err
Expand Down Expand Up @@ -134,7 +134,7 @@ func (b *Backend) createOrReuseProcedureJob(ctx context.Context, namespace strin
return active, nil
}
name := procedureAttemptName(baseName, nextAttempt)
job, err := procedureJobSpec(namespace, root, commandName, procedureName, name, nextAttempt, procedure, env, b.config.RegistrySecret)
job, err := procedureJobSpec(namespace, root, commandName, procedureName, name, nextAttempt, procedure, globalPorts, env, b.config.RegistrySecret)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/runtime/kubernetes/procedures.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (b *Backend) runJobProcedure(scrollID string, commandName string, procedure
zap.Int("expected_ports", len(procedure.ExpectedPorts)),
zap.Int("mounts", len(procedure.Mounts)),
)
createdJob, err := b.createOrReuseProcedureJob(ctx, namespace, root, commandName, procedureName, resourceName, procedure, env)
createdJob, err := b.createOrReuseProcedureJob(ctx, namespace, root, commandName, procedureName, resourceName, procedure, globalPorts, env)
if err != nil {
logger.Log().Error("Failed to create Kubernetes job procedure", zap.String("scroll_id", scrollID), zap.String("command", commandName), zap.String("procedure", procedureName), zap.String("namespace", namespace), zap.String("base_job", resourceName), zap.Error(err))
return nil, err
Expand Down Expand Up @@ -237,7 +237,7 @@ func (b *Backend) ensurePersistentProcedure(ctx context.Context, scrollID string
logger.Log().Error("Kubernetes persistent procedure root ref invalid", zap.String("scroll_id", scrollID), zap.String("command", commandName), zap.String("procedure", procedureName), zap.String("root", root), zap.Error(err))
return err
}
statefulSet, err := procedureStatefulSetSpec(namespace, root, commandName, procedureName, resourceName, procedure, env, b.config.RegistrySecret)
statefulSet, err := procedureStatefulSetSpec(namespace, root, commandName, procedureName, resourceName, procedure, globalPorts, env, b.config.RegistrySecret)
if err != nil {
logger.Log().Error("Failed to build Kubernetes persistent procedure StatefulSet", zap.String("scroll_id", scrollID), zap.String("command", commandName), zap.String("procedure", procedureName), zap.String("namespace", namespace), zap.Error(err))
return err
Expand Down
38 changes: 36 additions & 2 deletions internal/runtime/kubernetes/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func helperJobSpec(namespace string, jobName string, pvc string, image string, c
}
}

func procedureJobSpec(namespace string, root string, commandName string, procedureName string, resourceName string, attempt int, procedure *domain.Procedure, env map[string]string, registrySecret string) (*batchv1.Job, error) {
func procedureJobSpec(namespace string, root string, commandName string, procedureName string, resourceName string, attempt int, procedure *domain.Procedure, globalPorts []domain.Port, env map[string]string, registrySecret string) (*batchv1.Job, error) {
_, pvc, err := parseRef(root)
if err != nil {
return nil, err
Expand All @@ -173,6 +173,7 @@ func procedureJobSpec(namespace string, root string, commandName string, procedu
Env: envVars(env),
VolumeMounts: volumeMounts(procedure.Mounts),
}
applyProcedureReadiness(&container, procedure, globalPorts)
podSpec := corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
Containers: []corev1.Container{container},
Expand All @@ -197,7 +198,7 @@ func procedureJobSpec(namespace string, root string, commandName string, procedu
}, nil
}

func procedureStatefulSetSpec(namespace string, root string, commandName string, procedureName string, resourceName string, procedure *domain.Procedure, env map[string]string, registrySecret string) (*appsv1.StatefulSet, error) {
func procedureStatefulSetSpec(namespace string, root string, commandName string, procedureName string, resourceName string, procedure *domain.Procedure, globalPorts []domain.Port, env map[string]string, registrySecret string) (*appsv1.StatefulSet, error) {
_, pvc, err := parseRef(root)
if err != nil {
return nil, err
Expand All @@ -221,6 +222,7 @@ func procedureStatefulSetSpec(namespace string, root string, commandName string,
Env: envVars(env),
VolumeMounts: volumeMounts(procedure.Mounts),
}
applyProcedureReadiness(&container, procedure, globalPorts)
podSpec := corev1.PodSpec{
Containers: []corev1.Container{container},
Volumes: []corev1.Volume{pvcVolume("data", pvc)},
Expand All @@ -246,6 +248,38 @@ func procedureStatefulSetSpec(namespace string, root string, commandName string,
}, nil
}

func applyProcedureReadiness(container *corev1.Container, procedure *domain.Procedure, globalPorts []domain.Port) {
port, ok := firstTCPExpectedPort(procedure, globalPorts)
if !ok {
return
}
container.ReadinessProbe = &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{Port: intstr.FromInt(port.Port)},
},
PeriodSeconds: 2,
FailureThreshold: 3,
}
}

func firstTCPExpectedPort(procedure *domain.Procedure, globalPorts []domain.Port) (domain.Port, bool) {
if procedure == nil {
return domain.Port{}, false
}
ports := portsByName(globalPorts)
for _, expected := range procedure.ExpectedPorts {
port, ok := ports[expected.Name]
if !ok {
continue
}
if normalizeProtocol(port.Protocol) == "udp" {
continue
}
return port, true
}
return domain.Port{}, false
}

func devStatefulSetSpec(namespace string, root string, pvc string, image string, action ports.RuntimeDevAction, registrySecret string) *appsv1.StatefulSet {
labels := baseLabels(pvc)
labels[labelProcedure] = "dev"
Expand Down
64 changes: 56 additions & 8 deletions internal/runtime/kubernetes/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestProcedureJobSpecBuildsDeterministicMountsAndLabels(t *testing.T) {
Mounts: []domain.Mount{{Path: "/work", SubPath: "cache"}},
}

job, err := procedureJobSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", 1, procedure, procedure.Env, "registry-secret")
job, err := procedureJobSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", 1, procedure, nil, procedure.Env, "registry-secret")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestProcedureJobSpecUsesProvidedRuntimeEnv(t *testing.T) {
"PROCEDURE_ONLY": "ignored",
},
}
job, err := procedureJobSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", 1, procedure, map[string]string{
job, err := procedureJobSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", 1, procedure, nil, map[string]string{
"DRUID_PORT_HTTP": "8080",
}, "registry-secret")
if err != nil {
Expand All @@ -141,14 +141,59 @@ func TestProcedureJobSpecUsesProvidedRuntimeEnv(t *testing.T) {
}
}

func TestProcedureJobSpecAddsTCPReadinessProbe(t *testing.T) {
procedure := &domain.Procedure{
Image: "itzg/minecraft-server",
ExpectedPorts: []domain.ExpectedPort{{Name: "main"}},
}
job, err := procedureJobSpec("druid", ref("druid", "druid-minecraft-data"), "start", "start", "minecraft-start-0", 1, procedure, []domain.Port{{Name: "main", Protocol: "tcp", Port: 25565}}, nil, "registry-secret")
if err != nil {
t.Fatal(err)
}
if probe := job.Spec.Template.Spec.Containers[0].ReadinessProbe; probe == nil || probe.TCPSocket == nil || probe.TCPSocket.Port.IntVal != 25565 {
t.Fatalf("readiness probe = %#v, want tcp 25565", probe)
}
}

func TestProcedureReadinessProbeSkipsUDPOnlyPorts(t *testing.T) {
procedure := &domain.Procedure{
Image: "steam",
ExpectedPorts: []domain.ExpectedPort{{Name: "query"}},
}
statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-game-data"), "start", "start", "game-start-0", procedure, []domain.Port{{Name: "query", Protocol: "udp", Port: 27015}}, nil, "registry-secret")
if err != nil {
t.Fatal(err)
}
if probe := statefulSet.Spec.Template.Spec.Containers[0].ReadinessProbe; probe != nil {
t.Fatalf("readiness probe = %#v, want nil for udp-only ports", probe)
}
}

func TestProcedureReadinessProbeUsesFirstTCPExpectedPort(t *testing.T) {
procedure := &domain.Procedure{
Image: "steam",
ExpectedPorts: []domain.ExpectedPort{{Name: "query"}, {Name: "rcon"}},
}
statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-game-data"), "start", "start", "game-start-0", procedure, []domain.Port{
{Name: "query", Protocol: "udp", Port: 27015},
{Name: "rcon", Protocol: "tcp", Port: 27020},
}, nil, "registry-secret")
if err != nil {
t.Fatal(err)
}
if probe := statefulSet.Spec.Template.Spec.Containers[0].ReadinessProbe; probe == nil || probe.TCPSocket == nil || probe.TCPSocket.Port.IntVal != 27020 {
t.Fatalf("readiness probe = %#v, want tcp 27020", probe)
}
}

func TestProcedureStatefulSetSpecUsesProvidedRuntimeEnv(t *testing.T) {
procedure := &domain.Procedure{
Image: "nginx:1.27",
Env: map[string]string{
"PROCEDURE_ONLY": "ignored",
},
}
statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", procedure, map[string]string{
statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", procedure, nil, map[string]string{
"DRUID_PORT_HTTP": "8080",
}, "registry-secret")
if err != nil {
Expand All @@ -168,7 +213,7 @@ func TestProcedureStatefulSetSpecBuildsPersistentWorkload(t *testing.T) {
Mounts: []domain.Mount{{Path: "/usr/share/nginx/html", SubPath: "site", ReadOnly: true}},
}

statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", procedure, procedure.Env, "registry-secret")
statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", procedure, []domain.Port{{Name: "http", Protocol: "tcp", Port: 8080}}, procedure.Env, "registry-secret")
if err != nil {
t.Fatal(err)
}
Expand All @@ -189,6 +234,9 @@ func TestProcedureStatefulSetSpecBuildsPersistentWorkload(t *testing.T) {
if len(pod.ImagePullSecrets) != 1 || pod.ImagePullSecrets[0].Name != "registry-secret" {
t.Fatalf("image pull secrets = %#v", pod.ImagePullSecrets)
}
if probe := pod.Containers[0].ReadinessProbe; probe == nil || probe.TCPSocket == nil || probe.TCPSocket.Port.IntVal != 8080 {
t.Fatalf("readiness probe = %#v, want tcp 8080", probe)
}
container := pod.Containers[0]
if container.Image != "nginx:1.27" {
t.Fatalf("image = %s", container.Image)
Expand Down Expand Up @@ -338,7 +386,7 @@ func TestCreateOrReuseProcedureJobRetainsFailedBaseAndCreatesRetry(t *testing.T)
})
backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client)

created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "start.1", base, &domain.Procedure{Image: "alpine"}, nil)
created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "start.1", base, &domain.Procedure{Image: "alpine"}, nil, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -359,7 +407,7 @@ func TestCreateOrReuseProcedureJobUsesNextRetryAttempt(t *testing.T) {
)
backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client)

created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "start.1", base, &domain.Procedure{Image: "alpine"}, nil)
created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "start.1", base, &domain.Procedure{Image: "alpine"}, nil, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -382,7 +430,7 @@ func TestCreateOrReuseProcedureJobReusesActiveAttempt(t *testing.T) {
client := fake.NewSimpleClientset(active)
backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client)

created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "coldstart", base, &domain.Procedure{Image: "alpine"}, nil)
created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "coldstart", base, &domain.Procedure{Image: "alpine"}, nil, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -478,7 +526,7 @@ func TestCreateOrReuseProcedureJobDeletesSucceededAttempt(t *testing.T) {
})
backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client)

created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "install", "install", base, &domain.Procedure{Image: "alpine"}, nil)
created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "install", "install", base, &domain.Procedure{Image: "alpine"}, nil, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading