diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index f6879fa3..edbd1672 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -236,6 +236,7 @@ func main() { ASG: autoscaling.New(sess), EC2: ec2Client, BeforeCompleteLifecycleAction: func() { <-time.After(completeLifecycleActionDelay) }, + SqsMsgVisibilityTimeoutSec: nthConfig.SqsMsgVisibilityTimeoutSec, } monitoringFns[sqsEvents] = sqsMonitor } diff --git a/config/helm/aws-node-termination-handler/README.md b/config/helm/aws-node-termination-handler/README.md index 081ff487..7175cfa6 100644 --- a/config/helm/aws-node-termination-handler/README.md +++ b/config/helm/aws-node-termination-handler/README.md @@ -123,6 +123,7 @@ The configuration in this table applies to AWS Node Termination Handler in queue | `topologySpreadConstraints` | [Topology Spread Constraints](https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/) for pod scheduling. Useful with a highly available deployment to reduce the risk of running multiple replicas on the same Node | `[]` | | `heartbeatInterval` | The time period in seconds between consecutive heartbeat signals. Valid range: 30-3600 seconds (30 seconds to 1 hour). | `-1` | | `heartbeatUntil` | The duration in seconds over which heartbeat signals are sent. Valid range: 60-172800 seconds (1 minute to 48 hours). | `-1` | +| `sqsMsgVisibilityTimeoutSec` | Duration in seconds that a message is hidden from other consumers after being retrieved from the SQS queue by sqs-monitor. Valid range: 1-119 seconds. | `20` | ### IMDS Mode Configuration diff --git a/config/helm/aws-node-termination-handler/templates/deployment.yaml b/config/helm/aws-node-termination-handler/templates/deployment.yaml index 32a188d5..9dad5dec 100644 --- a/config/helm/aws-node-termination-handler/templates/deployment.yaml +++ b/config/helm/aws-node-termination-handler/templates/deployment.yaml @@ -174,6 +174,8 @@ spec: value: {{ .Values.heartbeatInterval | quote }} - name: HEARTBEAT_UNTIL value: {{ .Values.heartbeatUntil | quote }} + - name: SQS_MSG_VISIBILITY_TIMEOUT_SEC + value: {{ .Values.sqsMsgVisibilityTimeoutSec | quote }} {{- with .Values.extraEnv }} {{- toYaml . | nindent 12 }} {{- end }} diff --git a/config/helm/aws-node-termination-handler/values.yaml b/config/helm/aws-node-termination-handler/values.yaml index 1b145bd7..ada2864e 100644 --- a/config/helm/aws-node-termination-handler/values.yaml +++ b/config/helm/aws-node-termination-handler/values.yaml @@ -294,6 +294,10 @@ heartbeatInterval: -1 # The duration in seconds over which heartbeat signals are sent. Valid range: 60-172800 seconds (1 minute to 48 hours). heartbeatUntil: -1 +# Duration in seconds that a message is hidden from other consumers after being retrieved from the SQS queue by sqs-monitor. Valid range: 1-119 seconds. +sqsMsgVisibilityTimeoutSec: 20 + + # --------------------------------------------------------------------------------------------------------------------- # Testing # --------------------------------------------------------------------------------------------------------------------- diff --git a/pkg/config/config.go b/pkg/config/config.go index 9368ecb9..9de5f69f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -117,6 +117,9 @@ const ( // heartbeat heartbeatIntervalKey = "HEARTBEAT_INTERVAL" heartbeatUntilKey = "HEARTBEAT_UNTIL" + // sqs monitor + sqsMsgVisibilityTimeoutSecConfigKey = "SQS_MSG_VISIBILITY_TIMEOUT_SEC" + SqsMsgVisibilityTimeoutSecDefault = 20 ) // Config arguments set via CLI, environment variables, or defaults @@ -174,6 +177,7 @@ type Config struct { UseAPIServerCacheToListPods bool HeartbeatInterval int HeartbeatUntil int + SqsMsgVisibilityTimeoutSec int } // ParseCliArgs parses cli arguments and uses environment variables as fallback values @@ -241,6 +245,7 @@ func ParseCliArgs() (config Config, err error) { flag.BoolVar(&config.UseAPIServerCacheToListPods, "use-apiserver-cache", getBoolEnv(useAPIServerCache, false), "If true, leverage the k8s apiserver's index on pod's spec.nodeName to list pods on a node, instead of doing an etcd quorum read.") flag.IntVar(&config.HeartbeatInterval, "heartbeat-interval", getIntEnv(heartbeatIntervalKey, -1), "The time period in seconds between consecutive heartbeat signals. Valid range: 30-3600 seconds (30 seconds to 1 hour).") flag.IntVar(&config.HeartbeatUntil, "heartbeat-until", getIntEnv(heartbeatUntilKey, -1), "The duration in seconds over which heartbeat signals are sent. Valid range: 60-172800 seconds (1 minute to 48 hours).") + flag.IntVar(&config.SqsMsgVisibilityTimeoutSec, "sqs-msg-visibility-timeout-sec", getIntEnv(sqsMsgVisibilityTimeoutSecConfigKey, SqsMsgVisibilityTimeoutSecDefault), "Duration in seconds that a message is hidden from other consumers after being retrieved from the SQS queue by sqs-monitor. Valid range: 1-119 seconds.") flag.Parse() if isConfigProvided("pod-termination-grace-period", podTerminationGracePeriodConfigKey) && isConfigProvided("grace-period", gracePeriodConfigKey) { @@ -306,6 +311,10 @@ func ParseCliArgs() (config Config, err error) { return config, fmt.Errorf("invalid heartbeat configuration: heartbeat-interval should be less than or equal to heartbeat-until") } + if config.EnableSQSTerminationDraining && (config.SqsMsgVisibilityTimeoutSec <= 0 || config.SqsMsgVisibilityTimeoutSec >= 120) { + return config, fmt.Errorf("invalid SqsMsgVisibilityTimeoutSec configuration: SqsMsgVisibilityTimeoutSec valid range from 1 to 119") + } + // client-go expects these to be set in env vars os.Setenv(kubernetesServiceHostConfigKey, config.KubernetesServiceHost) os.Setenv(kubernetesServicePortConfigKey, config.KubernetesServicePort) @@ -367,6 +376,7 @@ func (c Config) PrintJsonConfigArgs() { Bool("use_apiserver_cache", c.UseAPIServerCacheToListPods). Int("heartbeat_interval", c.HeartbeatInterval). Int("heartbeat_until", c.HeartbeatUntil). + Int("sqs_msg_visibility_timeout_sec", c.SqsMsgVisibilityTimeoutSec). Msg("aws-node-termination-handler arguments") } @@ -421,7 +431,8 @@ func (c Config) PrintHumanConfigArgs() { "\taws-endpoint: %s,\n"+ "\tuse-apiserver-cache: %t,\n"+ "\theartbeat-interval: %d,\n"+ - "\theartbeat-until: %d\n", + "\theartbeat-until: %d\n"+ + "\tsqs-msg-visibility-timeout-sec: %d\n", c.DryRun, c.NodeName, c.PodName, @@ -465,6 +476,7 @@ func (c Config) PrintHumanConfigArgs() { c.UseAPIServerCacheToListPods, c.HeartbeatInterval, c.HeartbeatUntil, + c.SqsMsgVisibilityTimeoutSec, ) } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 8b7e2399..a3a4786e 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -84,6 +84,7 @@ func TestParseCliArgsEnvSuccess(t *testing.T) { h.Equals(t, true, nthConfig.UseAPIServerCacheToListPods) h.Equals(t, 30, nthConfig.HeartbeatInterval) h.Equals(t, 60, nthConfig.HeartbeatUntil) + h.Equals(t, 20, nthConfig.SqsMsgVisibilityTimeoutSec) // Check that env vars were set value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST") @@ -153,6 +154,7 @@ func TestParseCliArgsSuccess(t *testing.T) { h.Equals(t, true, nthConfig.UseAPIServerCacheToListPods) h.Equals(t, 30, nthConfig.HeartbeatInterval) h.Equals(t, 60, nthConfig.HeartbeatUntil) + h.Equals(t, 20, nthConfig.SqsMsgVisibilityTimeoutSec) // Check that env vars were set value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST") @@ -186,6 +188,7 @@ func TestParseCliArgsOverrides(t *testing.T) { t.Setenv("CORDON_ONLY", "true") t.Setenv("HEARTBEAT_INTERVAL", "3601") t.Setenv("HEARTBEAT_UNTIL", "172801") + t.Setenv("SQS_MSG_VISIBILITY_TIMEOUT_SEC", "30") os.Args = []string{ "cmd", @@ -214,6 +217,7 @@ func TestParseCliArgsOverrides(t *testing.T) { "--prometheus-server-port=2112", "--heartbeat-interval=3600", "--heartbeat-until=172800", + "--sqs-msg-visibility-timeout-sec=30", } nthConfig, err := config.ParseCliArgs() h.Ok(t, err) @@ -244,11 +248,15 @@ func TestParseCliArgsOverrides(t *testing.T) { h.Equals(t, 2112, nthConfig.PrometheusPort) h.Equals(t, 3600, nthConfig.HeartbeatInterval) h.Equals(t, 172800, nthConfig.HeartbeatUntil) + h.Equals(t, 30, nthConfig.SqsMsgVisibilityTimeoutSec) // Check that env vars were set value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST") h.Equals(t, true, ok) h.Equals(t, "KUBERNETES_SERVICE_HOST", value) + value, ok = os.LookupEnv("SQS_MSG_VISIBILITY_TIMEOUT_SEC") + h.Equals(t, true, ok) + h.Equals(t, "30", value) } func TestParseCliArgsWithGracePeriodSuccess(t *testing.T) { diff --git a/pkg/monitor/sqsevent/sqs-monitor.go b/pkg/monitor/sqsevent/sqs-monitor.go index 7ad0513d..84440d4c 100644 --- a/pkg/monitor/sqsevent/sqs-monitor.go +++ b/pkg/monitor/sqsevent/sqs-monitor.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" + "github.com/aws/aws-node-termination-handler/pkg/config" "github.com/aws/aws-node-termination-handler/pkg/logging" "github.com/aws/aws-node-termination-handler/pkg/monitor" "github.com/aws/aws-sdk-go/aws" @@ -54,6 +55,7 @@ type SQSMonitor struct { CheckIfManaged bool ManagedTag string BeforeCompleteLifecycleAction func() + SqsMsgVisibilityTimeoutSec int } // InterruptionEventWrapper is a convenience wrapper for associating an interruption event with its error, if any @@ -294,6 +296,11 @@ func (m SQSMonitor) processInterruptionEvents(interruptionEventWrappers []Interr // receiveQueueMessages checks the configured SQS queue for new messages func (m SQSMonitor) receiveQueueMessages(qURL string) ([]*sqs.Message, error) { + visibilityTimeout := m.SqsMsgVisibilityTimeoutSec + if visibilityTimeout <= 0 || visibilityTimeout >= 120 { + visibilityTimeout = config.SqsMsgVisibilityTimeoutSecDefault + } + result, err := m.SQS.ReceiveMessage(&sqs.ReceiveMessageInput{ AttributeNames: []*string{ aws.String(sqs.MessageSystemAttributeNameSentTimestamp), @@ -303,7 +310,7 @@ func (m SQSMonitor) receiveQueueMessages(qURL string) ([]*sqs.Message, error) { }, QueueUrl: &qURL, MaxNumberOfMessages: aws.Int64(10), - VisibilityTimeout: aws.Int64(20), // 20 seconds + VisibilityTimeout: aws.Int64(int64(visibilityTimeout)), WaitTimeSeconds: aws.Int64(20), // Max long polling })