Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/node-termination-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func main() {
ASG: autoscaling.New(sess),
EC2: ec2Client,
BeforeCompleteLifecycleAction: func() { <-time.After(completeLifecycleActionDelay) },
SqsMsgVisibilityTimeoutSec: nthConfig.SqsMsgVisibilityTimeoutSec,
}
monitoringFns[sqsEvents] = sqsMonitor
}
Expand Down
1 change: 1 addition & 0 deletions config/helm/aws-node-termination-handler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ The configuration in this table applies to AWS Node Termination Handler in queue
| `topologySpreadConstraints` | [Topology Spread Constraints](https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/) for pod scheduling. Useful with a highly available deployment to reduce the risk of running multiple replicas on the same Node | `[]` |
| `heartbeatInterval` | The time period in seconds between consecutive heartbeat signals. Valid range: 30-3600 seconds (30 seconds to 1 hour). | `-1` |
| `heartbeatUntil` | The duration in seconds over which heartbeat signals are sent. Valid range: 60-172800 seconds (1 minute to 48 hours). | `-1` |
| `sqsMsgVisibilityTimeoutSec` | Duration in seconds that a message is hidden from other consumers after being retrieved from the SQS queue by sqs-monitor. Valid range: 1-119 seconds. | `20` |

### IMDS Mode Configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ spec:
value: {{ .Values.heartbeatInterval | quote }}
- name: HEARTBEAT_UNTIL
value: {{ .Values.heartbeatUntil | quote }}
- name: SQS_MSG_VISIBILITY_TIMEOUT_SEC
value: {{ .Values.sqsMsgVisibilityTimeoutSec | quote }}
{{- with .Values.extraEnv }}
{{- toYaml . | nindent 12 }}
{{- end }}
Expand Down
4 changes: 4 additions & 0 deletions config/helm/aws-node-termination-handler/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ heartbeatInterval: -1
# The duration in seconds over which heartbeat signals are sent. Valid range: 60-172800 seconds (1 minute to 48 hours).
heartbeatUntil: -1

# Duration in seconds that a message is hidden from other consumers after being retrieved from the SQS queue by sqs-monitor. Valid range: 1-119 seconds.
sqsMsgVisibilityTimeoutSec: 20


# ---------------------------------------------------------------------------------------------------------------------
# Testing
# ---------------------------------------------------------------------------------------------------------------------
Expand Down
14 changes: 13 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ const (
// heartbeat
heartbeatIntervalKey = "HEARTBEAT_INTERVAL"
heartbeatUntilKey = "HEARTBEAT_UNTIL"
// sqs monitor
sqsMsgVisibilityTimeoutSecConfigKey = "SQS_MSG_VISIBILITY_TIMEOUT_SEC"
SqsMsgVisibilityTimeoutSecDefault = 20
)

// Config arguments set via CLI, environment variables, or defaults
Expand Down Expand Up @@ -174,6 +177,7 @@ type Config struct {
UseAPIServerCacheToListPods bool
HeartbeatInterval int
HeartbeatUntil int
SqsMsgVisibilityTimeoutSec int
}

// ParseCliArgs parses cli arguments and uses environment variables as fallback values
Expand Down Expand Up @@ -241,6 +245,7 @@ func ParseCliArgs() (config Config, err error) {
flag.BoolVar(&config.UseAPIServerCacheToListPods, "use-apiserver-cache", getBoolEnv(useAPIServerCache, false), "If true, leverage the k8s apiserver's index on pod's spec.nodeName to list pods on a node, instead of doing an etcd quorum read.")
flag.IntVar(&config.HeartbeatInterval, "heartbeat-interval", getIntEnv(heartbeatIntervalKey, -1), "The time period in seconds between consecutive heartbeat signals. Valid range: 30-3600 seconds (30 seconds to 1 hour).")
flag.IntVar(&config.HeartbeatUntil, "heartbeat-until", getIntEnv(heartbeatUntilKey, -1), "The duration in seconds over which heartbeat signals are sent. Valid range: 60-172800 seconds (1 minute to 48 hours).")
flag.IntVar(&config.SqsMsgVisibilityTimeoutSec, "sqs-msg-visibility-timeout-sec", getIntEnv(sqsMsgVisibilityTimeoutSecConfigKey, SqsMsgVisibilityTimeoutSecDefault), "Duration in seconds that a message is hidden from other consumers after being retrieved from the SQS queue by sqs-monitor. Valid range: 1-119 seconds.")
flag.Parse()

if isConfigProvided("pod-termination-grace-period", podTerminationGracePeriodConfigKey) && isConfigProvided("grace-period", gracePeriodConfigKey) {
Expand Down Expand Up @@ -306,6 +311,10 @@ func ParseCliArgs() (config Config, err error) {
return config, fmt.Errorf("invalid heartbeat configuration: heartbeat-interval should be less than or equal to heartbeat-until")
}

if config.EnableSQSTerminationDraining && (config.SqsMsgVisibilityTimeoutSec <= 0 || config.SqsMsgVisibilityTimeoutSec >= 120) {
return config, fmt.Errorf("invalid SqsMsgVisibilityTimeoutSec configuration: SqsMsgVisibilityTimeoutSec valid range from 1 to 119")
}

// client-go expects these to be set in env vars
os.Setenv(kubernetesServiceHostConfigKey, config.KubernetesServiceHost)
os.Setenv(kubernetesServicePortConfigKey, config.KubernetesServicePort)
Expand Down Expand Up @@ -367,6 +376,7 @@ func (c Config) PrintJsonConfigArgs() {
Bool("use_apiserver_cache", c.UseAPIServerCacheToListPods).
Int("heartbeat_interval", c.HeartbeatInterval).
Int("heartbeat_until", c.HeartbeatUntil).
Int("sqs_msg_visibility_timeout_sec", c.SqsMsgVisibilityTimeoutSec).
Msg("aws-node-termination-handler arguments")
}

Expand Down Expand Up @@ -421,7 +431,8 @@ func (c Config) PrintHumanConfigArgs() {
"\taws-endpoint: %s,\n"+
"\tuse-apiserver-cache: %t,\n"+
"\theartbeat-interval: %d,\n"+
"\theartbeat-until: %d\n",
"\theartbeat-until: %d\n"+
"\tsqs-msg-visibility-timeout-sec: %d\n",
c.DryRun,
c.NodeName,
c.PodName,
Expand Down Expand Up @@ -465,6 +476,7 @@ func (c Config) PrintHumanConfigArgs() {
c.UseAPIServerCacheToListPods,
c.HeartbeatInterval,
c.HeartbeatUntil,
c.SqsMsgVisibilityTimeoutSec,
)
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestParseCliArgsEnvSuccess(t *testing.T) {
h.Equals(t, true, nthConfig.UseAPIServerCacheToListPods)
h.Equals(t, 30, nthConfig.HeartbeatInterval)
h.Equals(t, 60, nthConfig.HeartbeatUntil)
h.Equals(t, 20, nthConfig.SqsMsgVisibilityTimeoutSec)

// Check that env vars were set
value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST")
Expand Down Expand Up @@ -153,6 +154,7 @@ func TestParseCliArgsSuccess(t *testing.T) {
h.Equals(t, true, nthConfig.UseAPIServerCacheToListPods)
h.Equals(t, 30, nthConfig.HeartbeatInterval)
h.Equals(t, 60, nthConfig.HeartbeatUntil)
h.Equals(t, 20, nthConfig.SqsMsgVisibilityTimeoutSec)

// Check that env vars were set
value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST")
Expand Down Expand Up @@ -186,6 +188,7 @@ func TestParseCliArgsOverrides(t *testing.T) {
t.Setenv("CORDON_ONLY", "true")
t.Setenv("HEARTBEAT_INTERVAL", "3601")
t.Setenv("HEARTBEAT_UNTIL", "172801")
t.Setenv("SQS_MSG_VISIBILITY_TIMEOUT_SEC", "30")

os.Args = []string{
"cmd",
Expand Down Expand Up @@ -214,6 +217,7 @@ func TestParseCliArgsOverrides(t *testing.T) {
"--prometheus-server-port=2112",
"--heartbeat-interval=3600",
"--heartbeat-until=172800",
"--sqs-msg-visibility-timeout-sec=30",
}
nthConfig, err := config.ParseCliArgs()
h.Ok(t, err)
Expand Down Expand Up @@ -244,11 +248,15 @@ func TestParseCliArgsOverrides(t *testing.T) {
h.Equals(t, 2112, nthConfig.PrometheusPort)
h.Equals(t, 3600, nthConfig.HeartbeatInterval)
h.Equals(t, 172800, nthConfig.HeartbeatUntil)
h.Equals(t, 30, nthConfig.SqsMsgVisibilityTimeoutSec)

// Check that env vars were set
value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST")
h.Equals(t, true, ok)
h.Equals(t, "KUBERNETES_SERVICE_HOST", value)
value, ok = os.LookupEnv("SQS_MSG_VISIBILITY_TIMEOUT_SEC")
h.Equals(t, true, ok)
h.Equals(t, "30", value)
}

func TestParseCliArgsWithGracePeriodSuccess(t *testing.T) {
Expand Down
9 changes: 8 additions & 1 deletion pkg/monitor/sqsevent/sqs-monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"

"github.com/aws/aws-node-termination-handler/pkg/config"
"github.com/aws/aws-node-termination-handler/pkg/logging"
"github.com/aws/aws-node-termination-handler/pkg/monitor"
"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -54,6 +55,7 @@ type SQSMonitor struct {
CheckIfManaged bool
ManagedTag string
BeforeCompleteLifecycleAction func()
SqsMsgVisibilityTimeoutSec int
}

// InterruptionEventWrapper is a convenience wrapper for associating an interruption event with its error, if any
Expand Down Expand Up @@ -294,6 +296,11 @@ func (m SQSMonitor) processInterruptionEvents(interruptionEventWrappers []Interr

// receiveQueueMessages checks the configured SQS queue for new messages
func (m SQSMonitor) receiveQueueMessages(qURL string) ([]*sqs.Message, error) {
visibilityTimeout := m.SqsMsgVisibilityTimeoutSec
if visibilityTimeout <= 0 || visibilityTimeout >= 120 {
visibilityTimeout = config.SqsMsgVisibilityTimeoutSecDefault
}

result, err := m.SQS.ReceiveMessage(&sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
Expand All @@ -303,7 +310,7 @@ func (m SQSMonitor) receiveQueueMessages(qURL string) ([]*sqs.Message, error) {
},
QueueUrl: &qURL,
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(20), // 20 seconds
VisibilityTimeout: aws.Int64(int64(visibilityTimeout)),
WaitTimeSeconds: aws.Int64(20), // Max long polling
})

Expand Down