From 6778a939570d7008424f44872951a94dd5e383ab Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 12 Mar 2026 14:07:44 +0700 Subject: [PATCH 01/18] set timeout on kafka produce --- plugin/output/kafka/README.md | 6 ++++++ plugin/output/kafka/kafka.go | 21 +++++++++++++++++---- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/plugin/output/kafka/README.md b/plugin/output/kafka/README.md index d1ef51b20..cb618a88a 100755 --- a/plugin/output/kafka/README.md +++ b/plugin/output/kafka/README.md @@ -59,6 +59,12 @@ After this timeout the batch will be sent even if batch isn't full.
+**`produce_timeout`** *`cfg.Duration`* *`default=15s`* + +Timeout for the produce request + +
+ **`max_message_bytes`** *`cfg.Expression`* *`default=1000000`* The maximum permitted size of a message. diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go index 6903dda66..07789a8b3 100644 --- a/plugin/output/kafka/kafka.go +++ b/plugin/output/kafka/kafka.go @@ -36,8 +36,10 @@ type Plugin struct { avgEventSize int controller pipeline.OutputPluginController - client KafkaClient - batcher *pipeline.RetriableBatcher + client KafkaClient + batcher *pipeline.RetriableBatcher + ctx context.Context + cancelFunc context.CancelFunc // plugin metrics sendErrorMetric *metric.Counter @@ -97,6 +99,12 @@ type Config struct { BatchFlushTimeout cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` // * BatchFlushTimeout_ time.Duration + // > @3@4@5@6 + // > + // > Timeout for the produce request + Timeout cfg.Duration `json:"produce_timeout" default:"15s" parse:"duration"` // * + Timeout_ time.Duration + // > @3@4@5@6 // > // > The maximum permitted size of a message. @@ -238,6 +246,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.avgEventSize = params.PipelineSettings.AvgEventSize p.controller = params.Controller p.registerMetrics(params.MetricCtl) + p.ctx, p.cancelFunc = context.WithCancel(context.Background()) if p.config.Retention_ < 1 { p.logger.Fatal("'retention' can't be <1") @@ -290,7 +299,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP onError, ) - p.batcher.Start(context.TODO()) + p.batcher.Start(p.ctx) } func (p *Plugin) Out(event *pipeline.Event) { @@ -338,7 +347,10 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err i++ }) - if err := p.client.ProduceSync(context.Background(), data.messages[:i]...).FirstErr(); err != nil { + ctx, cancel := context.WithTimeout(p.ctx, p.config.Timeout_) + defer cancel() + + if err := p.client.ProduceSync(ctx, data.messages[:i]...).FirstErr(); err != nil { if errors.Is(err, kerr.LeaderNotAvailable) || errors.Is(err, kerr.NotLeaderForPartition) { p.client.ForceMetadataRefresh() } @@ -351,6 +363,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err } func (p *Plugin) Stop() { + p.cancelFunc() p.batcher.Stop() p.client.Close() } From e85982b712db071a50f93c4d3cf2053ec5b23b40 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 12 Mar 2026 17:20:28 +0700 Subject: [PATCH 02/18] set timeout on kafka consume --- plugin/input/kafka/README.md | 6 ++++++ plugin/input/kafka/kafka.go | 16 +++++++++++++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/plugin/input/kafka/README.md b/plugin/input/kafka/README.md index 56885602c..45edf8382 100755 --- a/plugin/input/kafka/README.md +++ b/plugin/input/kafka/README.md @@ -122,6 +122,12 @@ AutoCommitInterval sets how long to go between autocommits
+**`timeout`** *`cfg.Duration`* *`default=15s`* + +Timeout for consume + +
+ **`session_timeout`** *`cfg.Duration`* *`default=10s`* SessionTimeout sets how long a member in the group can go between heartbeats diff --git a/plugin/input/kafka/kafka.go b/plugin/input/kafka/kafka.go index f601e6259..35904af77 100644 --- a/plugin/input/kafka/kafka.go +++ b/plugin/input/kafka/kafka.go @@ -156,6 +156,12 @@ type Config struct { AutoCommitInterval cfg.Duration `json:"auto_commit_interval" default:"1s" parse:"duration"` // * AutoCommitInterval_ time.Duration + // > @3@4@5@6 + // > + // > Timeout for consume + Timeout cfg.Duration `json:"timeout" default:"15s" parse:"duration"` // * + Timeout_ time.Duration + // > @3@4@5@6 // > // > SessionTimeout sets how long a member in the group can go between heartbeats @@ -287,7 +293,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa p.idByTopic[topic] = i } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), p.config.Timeout_) p.cancel = cancel p.s = &splitConsume{ consumers: make(map[tp]*pconsumer), @@ -313,13 +319,17 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) { func (p *Plugin) Stop() { p.logger.Infof("Stopping") - err := p.client.CommitMarkedOffsets(context.Background()) + p.cancel() + + ctx, cancel := context.WithTimeout(context.Background(), p.config.Timeout_) + defer cancel() + + err := p.client.CommitMarkedOffsets(ctx) if err != nil { p.commitErrorsMetric.Inc() p.logger.Errorf("can't commit marked offsets: %s", err.Error()) } p.client.Close() - p.cancel() } func (p *Plugin) Commit(event *pipeline.Event) { From 32d2cc075ba33d5169863af7e77df31d6d294ba6 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 12 Mar 2026 18:12:59 +0700 Subject: [PATCH 03/18] use kafka kraft in e2e test kafka_file --- e2e/kafka_file/docker-compose.yml | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/e2e/kafka_file/docker-compose.yml b/e2e/kafka_file/docker-compose.yml index 82500c94a..1f6af3b8b 100644 --- a/e2e/kafka_file/docker-compose.yml +++ b/e2e/kafka_file/docker-compose.yml @@ -1,32 +1,22 @@ version: "2" services: - zookeeper: - image: zookeeper:3.9 - ports: - - "2181:2181" - volumes: - - "zookeeper_data:/data" - environment: - ZOO_MY_ID: 1 - ZOO_PORT: 2181 - ZOO_4LW_COMMANDS_WHITELIST: "*" - ZOO_SERVERS: server.1=zookeeper:2888:3888;2181 kafka: - image: docker.io/bitnamilegacy/kafka:3.1 + image: docker.io/bitnamilegacy/kafka:3.9 ports: - "9092:9092" volumes: - "kafka_data:/bitnami" environment: - - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_CFG_NODE_ID=1001 + - KAFKA_CFG_PROCESS_ROLES=broker,controller + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 - depends_on: - - zookeeper + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_CONTROLLER_LISTENERS=CONTROLLER://:9093 + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1001@localhost:9093 volumes: - zookeeper_data: - driver: local kafka_data: driver: local From 5718e1b15796d9604e95ef76aa85faf708cacb1f Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 12 Mar 2026 18:21:28 +0700 Subject: [PATCH 04/18] set kafka timeout in e2e tests --- e2e/kafka_file/config.yml | 5 +++-- e2e/split_join/config.yml | 5 +++-- plugin/input/kafka/README.md | 2 +- plugin/input/kafka/kafka.go | 2 +- plugin/output/kafka/README.md | 2 +- plugin/output/kafka/kafka.go | 2 +- 6 files changed, 10 insertions(+), 8 deletions(-) diff --git a/e2e/kafka_file/config.yml b/e2e/kafka_file/config.yml index 97a4c75b0..f6ae3b717 100644 --- a/e2e/kafka_file/config.yml +++ b/e2e/kafka_file/config.yml @@ -3,8 +3,9 @@ pipelines: input: type: kafka offset: oldest + timeout: 60s meta: - partition: 'partition_{{ .partition }}' - topic: '{{ .topic }}' + partition: "partition_{{ .partition }}" + topic: "{{ .topic }}" output: type: file diff --git a/e2e/split_join/config.yml b/e2e/split_join/config.yml index 5aac2602c..90f55f9b3 100644 --- a/e2e/split_join/config.yml +++ b/e2e/split_join/config.yml @@ -14,9 +14,10 @@ pipelines: field: data - type: join field: message - start: '/^start/' - continue: '/^continue/' + start: "/^start/" + continue: "/^continue/" - type: debug message: output event sample output: type: kafka + timeout: 600s diff --git a/plugin/input/kafka/README.md b/plugin/input/kafka/README.md index 45edf8382..20fc2075e 100755 --- a/plugin/input/kafka/README.md +++ b/plugin/input/kafka/README.md @@ -124,7 +124,7 @@ AutoCommitInterval sets how long to go between autocommits **`timeout`** *`cfg.Duration`* *`default=15s`* -Timeout for consume +Timeout for fetch messages
diff --git a/plugin/input/kafka/kafka.go b/plugin/input/kafka/kafka.go index 35904af77..292fee33a 100644 --- a/plugin/input/kafka/kafka.go +++ b/plugin/input/kafka/kafka.go @@ -158,7 +158,7 @@ type Config struct { // > @3@4@5@6 // > - // > Timeout for consume + // > Timeout for fetch messages Timeout cfg.Duration `json:"timeout" default:"15s" parse:"duration"` // * Timeout_ time.Duration diff --git a/plugin/output/kafka/README.md b/plugin/output/kafka/README.md index cb618a88a..bb4dc3c28 100755 --- a/plugin/output/kafka/README.md +++ b/plugin/output/kafka/README.md @@ -59,7 +59,7 @@ After this timeout the batch will be sent even if batch isn't full.
-**`produce_timeout`** *`cfg.Duration`* *`default=15s`* +**`timeout`** *`cfg.Duration`* *`default=15s`* Timeout for the produce request diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go index 07789a8b3..8e5174452 100644 --- a/plugin/output/kafka/kafka.go +++ b/plugin/output/kafka/kafka.go @@ -102,7 +102,7 @@ type Config struct { // > @3@4@5@6 // > // > Timeout for the produce request - Timeout cfg.Duration `json:"produce_timeout" default:"15s" parse:"duration"` // * + Timeout cfg.Duration `json:"timeout" default:"15s" parse:"duration"` // * Timeout_ time.Duration // > @3@4@5@6 From d247eebf80f2d75dc8186d5dcc59f58f3eba35c5 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 12 Mar 2026 19:26:00 +0700 Subject: [PATCH 05/18] use kafka kraft in e2e test kafka_auth --- e2e/kafka_auth/docker-compose.yml | 31 +++++++++---------------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/e2e/kafka_auth/docker-compose.yml b/e2e/kafka_auth/docker-compose.yml index 73371203c..e9a2d5def 100644 --- a/e2e/kafka_auth/docker-compose.yml +++ b/e2e/kafka_auth/docker-compose.yml @@ -1,19 +1,8 @@ version: "2.1" services: - zookeeper: - image: zookeeper:3.9 - ports: - - "2182:2181" - volumes: - - "zookeeper_data:/data" - environment: - ZOO_MY_ID: 1 - ZOO_PORT: 2181 - ZOO_4LW_COMMANDS_WHITELIST: "*" - ZOO_SERVERS: server.1=zookeeper:2888:3888;2181 init-certs: - image: docker.io/bitnamilegacy/kafka:3.6 + image: docker.io/bitnamilegacy/kafka:3.9 command: /tmp/generate.sh working_dir: /tmp/ user: 0:0 @@ -21,11 +10,9 @@ services: - ./certs/:/tmp/certs/ - "./generate.sh:/tmp/generate.sh" kafka: - image: docker.io/bitnamilegacy/kafka:3.6 + image: docker.io/bitnamilegacy/kafka:3.9 container_name: kafka depends_on: - zookeeper: - condition: service_started init-certs: condition: service_completed_successfully ports: @@ -36,12 +23,14 @@ services: - ./certs/kafka.truststore.jks:/bitnami/kafka/config/certs/kafka.truststore.jks - ./certs/kafka.keystore.jks:/bitnami/kafka/config/certs/kafka.keystore.jks environment: - # Zookeeper - - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_CFG_NODE_ID=1001 + - KAFKA_CFG_PROCESS_ROLES=broker,controller + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1001@localhost:9096 # Listeners - - KAFKA_CFG_LISTENERS=SASL_SSL://:9093,SASL_PLAINTEXT://:9095,PLAINTEXT://:9094 - - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://localhost:9093,SASL_PLAINTEXT://localhost:9095,PLAINTEXT://:9094 - - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,SSL:SSL + - KAFKA_CFG_LISTENERS=SASL_SSL://:9093,SASL_PLAINTEXT://:9095,PLAINTEXT://:9094,CONTROLLER://:9096 + - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://localhost:9093,SASL_PLAINTEXT://localhost:9095,PLAINTEXT://localhost:9094 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,SSL:SSL,CONTROLLER:PLAINTEXT # Inter broker - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN @@ -57,7 +46,5 @@ services: - KAFKA_CFG_SSL_CLIENT_AUTH=required volumes: - zookeeper_data: - driver: local kafka_data: driver: local From 3a58814f667334c11cba4ed71f0b81c45913e537 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 12 Mar 2026 19:39:35 +0700 Subject: [PATCH 06/18] add kafka for split_join e2e test --- e2e/split_join/config.yml | 2 +- e2e/split_join/docker-compose.yml | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) create mode 100644 e2e/split_join/docker-compose.yml diff --git a/e2e/split_join/config.yml b/e2e/split_join/config.yml index 90f55f9b3..ab59db379 100644 --- a/e2e/split_join/config.yml +++ b/e2e/split_join/config.yml @@ -20,4 +20,4 @@ pipelines: message: output event sample output: type: kafka - timeout: 600s + timeout: 60s diff --git a/e2e/split_join/docker-compose.yml b/e2e/split_join/docker-compose.yml new file mode 100644 index 000000000..1f6af3b8b --- /dev/null +++ b/e2e/split_join/docker-compose.yml @@ -0,0 +1,22 @@ +version: "2" + +services: + kafka: + image: docker.io/bitnamilegacy/kafka:3.9 + ports: + - "9092:9092" + volumes: + - "kafka_data:/bitnami" + environment: + - KAFKA_CFG_NODE_ID=1001 + - KAFKA_CFG_PROCESS_ROLES=broker,controller + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_CONTROLLER_LISTENERS=CONTROLLER://:9093 + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1001@localhost:9093 + +volumes: + kafka_data: + driver: local From 3be6209c665529c556a348b21b3356866d32724b Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 12 Mar 2026 19:54:51 +0700 Subject: [PATCH 07/18] Revert "add kafka for split_join e2e test" This reverts commit 64642c75267adf72fd3ce76477506a9d2f04dcf4. --- e2e/split_join/docker-compose.yml | 22 ---------------------- 1 file changed, 22 deletions(-) delete mode 100644 e2e/split_join/docker-compose.yml diff --git a/e2e/split_join/docker-compose.yml b/e2e/split_join/docker-compose.yml deleted file mode 100644 index 1f6af3b8b..000000000 --- a/e2e/split_join/docker-compose.yml +++ /dev/null @@ -1,22 +0,0 @@ -version: "2" - -services: - kafka: - image: docker.io/bitnamilegacy/kafka:3.9 - ports: - - "9092:9092" - volumes: - - "kafka_data:/bitnami" - environment: - - KAFKA_CFG_NODE_ID=1001 - - KAFKA_CFG_PROCESS_ROLES=broker,controller - - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - - ALLOW_PLAINTEXT_LISTENER=yes - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 - - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - - KAFKA_CFG_CONTROLLER_LISTENERS=CONTROLLER://:9093 - - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1001@localhost:9093 - -volumes: - kafka_data: - driver: local From 1c0f9d80b883da06bcc41ec2752e74de946b9efb Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Fri, 13 Mar 2026 20:47:14 +0700 Subject: [PATCH 08/18] set context in kafka record --- e2e/kafka_file/config.yml | 2 +- plugin/output/kafka/kafka.go | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/e2e/kafka_file/config.yml b/e2e/kafka_file/config.yml index f6ae3b717..826f3e5fe 100644 --- a/e2e/kafka_file/config.yml +++ b/e2e/kafka_file/config.yml @@ -3,7 +3,7 @@ pipelines: input: type: kafka offset: oldest - timeout: 60s + timeout: 5s meta: partition: "partition_{{ .partition }}" topic: "{{ .topic }}" diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go index 8e5174452..d8abc4d0a 100644 --- a/plugin/output/kafka/kafka.go +++ b/plugin/output/kafka/kafka.go @@ -327,6 +327,10 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err outBuf := data.outBuf[:0] start := 0 i := 0 + + ctx, cancel := context.WithTimeout(p.ctx, p.config.Timeout_) + defer cancel() + batch.ForEach(func(event *pipeline.Event) { outBuf, start = event.Encode(outBuf) @@ -344,12 +348,10 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err data.messages[i].Timestamp = time.Now() data.messages[i].Value = outBuf[start:] data.messages[i].Topic = topic + data.messages[i].Context = ctx i++ }) - ctx, cancel := context.WithTimeout(p.ctx, p.config.Timeout_) - defer cancel() - if err := p.client.ProduceSync(ctx, data.messages[:i]...).FirstErr(); err != nil { if errors.Is(err, kerr.LeaderNotAvailable) || errors.Is(err, kerr.NotLeaderForPartition) { p.client.ForceMetadataRefresh() From 7ae84ccbd0b06890f4082b60f4efb2bafd585019 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Fri, 13 Mar 2026 20:49:44 +0700 Subject: [PATCH 09/18] franz-go v1.20.7 --- go.mod | 22 +++++++++++----------- go.sum | 44 ++++++++++++++++++++++---------------------- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/go.mod b/go.mod index c6e54b5dd..b182dbc18 100644 --- a/go.mod +++ b/go.mod @@ -28,10 +28,10 @@ require ( github.com/jackc/pgconn v1.14.3 github.com/jackc/pgproto3/v2 v2.3.3 github.com/jackc/pgx/v4 v4.18.3 - github.com/klauspost/compress v1.18.1 + github.com/klauspost/compress v1.18.4 github.com/minio/minio-go v6.0.14+incompatible github.com/ozontech/insane-json v0.1.9 - github.com/pierrec/lz4/v4 v4.1.22 + github.com/pierrec/lz4/v4 v4.1.25 github.com/prometheus/client_golang v1.16.0 github.com/prometheus/client_model v0.3.0 github.com/prometheus/common v0.42.0 @@ -42,7 +42,7 @@ require ( github.com/stretchr/testify v1.10.0 github.com/tidwall/gjson v1.18.0 github.com/timtadh/lexmachine v0.2.3 - github.com/twmb/franz-go v1.20.5 + github.com/twmb/franz-go v1.20.7 github.com/twmb/franz-go/pkg/kadm v1.12.0 github.com/twmb/franz-go/plugin/kzap v1.1.2 github.com/twmb/tlscfg v1.2.1 @@ -51,7 +51,7 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/automaxprocs v1.5.3 go.uber.org/zap v1.27.0 - golang.org/x/net v0.47.0 + golang.org/x/net v0.49.0 google.golang.org/protobuf v1.36.5 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 @@ -142,15 +142,15 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect - golang.org/x/crypto v0.45.0 // indirect - golang.org/x/mod v0.29.0 // indirect + golang.org/x/crypto v0.48.0 // indirect + golang.org/x/mod v0.32.0 // indirect golang.org/x/oauth2 v0.27.0 // indirect - golang.org/x/sync v0.18.0 // indirect - golang.org/x/sys v0.38.0 // indirect - golang.org/x/term v0.37.0 // indirect - golang.org/x/text v0.31.0 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.41.0 // indirect + golang.org/x/term v0.40.0 // indirect + golang.org/x/text v0.34.0 // indirect golang.org/x/time v0.12.0 // indirect - golang.org/x/tools v0.38.0 // indirect + golang.org/x/tools v0.41.0 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.62.0 // indirect diff --git a/go.sum b/go.sum index 617cba0aa..8c125d74a 100644 --- a/go.sum +++ b/go.sum @@ -213,8 +213,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= -github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -273,8 +273,8 @@ github.com/ozontech/insane-json v0.1.9 h1:JG5cEsmuSDwmU7KTJTHfTJ40XMgvtPdsUQbXdb github.com/ozontech/insane-json v0.1.9/go.mod h1:xZLf3tVLOqaT13rn1sv4fYaZfupAXNL9naLz4QRoMfY= github.com/pascaldekloe/name v1.0.1 h1:9lnXOHeqeHHnWLbKfH6X98+4+ETVqFqxN09UXSjcMb0= github.com/pascaldekloe/name v1.0.1/go.mod h1:Z//MfYJnH4jVpQ9wkclwu2I2MkHmXTlT9wR5UZScttM= -github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= -github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.25 h1:kocOqRffaIbU5djlIBr7Wh+cx82C0vtFb0fOurZHqD0= +github.com/pierrec/lz4/v4 v4.1.25/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -349,8 +349,8 @@ github.com/timtadh/data-structures v0.6.1/go.mod h1:uYUnI1cQi/5yMCc7s23I+x8Mn8BC github.com/timtadh/getopt v1.0.0/go.mod h1:L3EL6YN2G0eIAhYBo9b7SB9d/kEQmdnwthIlMJfj210= github.com/timtadh/lexmachine v0.2.3 h1:ZqlfHnfMcAygtbNM5Gv7jQf8hmM8LfVzDjfCrq235NQ= github.com/timtadh/lexmachine v0.2.3/go.mod h1:oK1NW+93fQSIF6s+J6sXBFWsCPCFbNmrwKV1i0aqvW0= -github.com/twmb/franz-go v1.20.5 h1:Gj9jdkvlddf8pdrehvtDHLPult5JS8q65oITUff6dXo= -github.com/twmb/franz-go v1.20.5/go.mod h1:gZmp2nTNfKuiKKND8qAsv28VdMlr/Gf4BIcsj99Bmtk= +github.com/twmb/franz-go v1.20.7 h1:P4MGSXJjjAPP3NRGPCks/Lrq+j+twWMVl1qYCVgNmWY= +github.com/twmb/franz-go v1.20.7/go.mod h1:0bRX9HZVaoueqFWhPZNi2ODnJL7DNa6mK0HeCrC2bNU= github.com/twmb/franz-go/pkg/kadm v1.12.0 h1:I8P/gpXFzhl73QcAYmJu+1fOXvrynyH/MAotr2udEg4= github.com/twmb/franz-go/pkg/kadm v1.12.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0= github.com/twmb/franz-go/pkg/kmsg v1.12.0 h1:CbatD7ers1KzDNgJqPbKOq0Bz/WLBdsTH75wgzeVaPc= @@ -425,8 +425,8 @@ golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= -golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= -golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= +golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= +golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= golang.org/x/exp v0.0.0-20230116083435-1de6713980de h1:DBWn//IJw30uYCgERoxCg84hWtA97F4wMiKOIh00Uf0= golang.org/x/exp v0.0.0-20230116083435-1de6713980de/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -437,8 +437,8 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= -golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= +golang.org/x/mod v0.32.0 h1:9F4d3PHLljb6x//jOyokMv3eX+YDeepZSEo3mFJy93c= +golang.org/x/mod v0.32.0/go.mod h1:SgipZ/3h2Ci89DlEtEXWUk/HteuRin+HHhN+WbNhguU= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -451,8 +451,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= -golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= -golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= +golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= +golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M= golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -462,8 +462,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= -golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -485,16 +485,16 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= -golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= -golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU= -golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254= +golang.org/x/term v0.40.0 h1:36e4zGLqU4yhjlmxEaagx2KuYbJq3EwY8K943ZsHcvg= +golang.org/x/term v0.40.0/go.mod h1:w2P8uVp06p2iyKKuvXIm7N/y0UCRt3UfJTfZ7oOpglM= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -505,8 +505,8 @@ golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= -golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= +golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= +golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -524,8 +524,8 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= -golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= +golang.org/x/tools v0.41.0 h1:a9b8iMweWG+S0OBnlU36rzLp20z1Rp10w+IY2czHTQc= +golang.org/x/tools v0.41.0/go.mod h1:XSY6eDqxVNiYgezAVqqCeihT4j1U2CCsqvH3WhQpnlg= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= From 298d7d940b976c1745ba19b077209e801eaab667 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Fri, 13 Mar 2026 20:55:36 +0700 Subject: [PATCH 10/18] set context nil for record --- e2e/kafka_file/config.yml | 2 +- plugin/output/kafka/kafka.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e/kafka_file/config.yml b/e2e/kafka_file/config.yml index 826f3e5fe..a4524715d 100644 --- a/e2e/kafka_file/config.yml +++ b/e2e/kafka_file/config.yml @@ -3,7 +3,7 @@ pipelines: input: type: kafka offset: oldest - timeout: 5s + timeout: 60s # cause kafka no started yet meta: partition: "partition_{{ .partition }}" topic: "{{ .topic }}" diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go index d8abc4d0a..6f6d8546a 100644 --- a/plugin/output/kafka/kafka.go +++ b/plugin/output/kafka/kafka.go @@ -348,7 +348,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err data.messages[i].Timestamp = time.Now() data.messages[i].Value = outBuf[start:] data.messages[i].Topic = topic - data.messages[i].Context = ctx + data.messages[i].Context = nil // cause we set context on produce batch i++ }) From c32f06001d199cc3b4c1b71c393335bfef4b0800 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Fri, 13 Mar 2026 21:29:56 +0700 Subject: [PATCH 11/18] wait for kafka start --- Makefile | 2 +- e2e/kafka_auth/docker-compose.yml | 10 ++++++++++ e2e/kafka_file/config.yml | 2 +- e2e/kafka_file/docker-compose.yml | 10 ++++++++++ 4 files changed, 22 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index a6b3a7bf3..6013336b6 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,7 @@ test-e2e: .PHONY: test-e2e-docker-up test-e2e-docker-up: for dc in $(shell find e2e -name 'docker-compose.yml') ; do \ - docker compose -f $$dc up -d ; \ + docker compose -f $$dc up -d --wait; \ done .PHONY: test-e2e-docker-down diff --git a/e2e/kafka_auth/docker-compose.yml b/e2e/kafka_auth/docker-compose.yml index e9a2d5def..7db883ceb 100644 --- a/e2e/kafka_auth/docker-compose.yml +++ b/e2e/kafka_auth/docker-compose.yml @@ -22,6 +22,16 @@ services: - "kafka_data:/bitnami" - ./certs/kafka.truststore.jks:/bitnami/kafka/config/certs/kafka.truststore.jks - ./certs/kafka.keystore.jks:/bitnami/kafka/config/certs/kafka.keystore.jks + healthcheck: + test: + [ + "CMD-SHELL", + "/opt/bitnami/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9094", + ] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s environment: - KAFKA_CFG_NODE_ID=1001 - KAFKA_CFG_PROCESS_ROLES=broker,controller diff --git a/e2e/kafka_file/config.yml b/e2e/kafka_file/config.yml index a4524715d..826f3e5fe 100644 --- a/e2e/kafka_file/config.yml +++ b/e2e/kafka_file/config.yml @@ -3,7 +3,7 @@ pipelines: input: type: kafka offset: oldest - timeout: 60s # cause kafka no started yet + timeout: 5s meta: partition: "partition_{{ .partition }}" topic: "{{ .topic }}" diff --git a/e2e/kafka_file/docker-compose.yml b/e2e/kafka_file/docker-compose.yml index 1f6af3b8b..2a84eceba 100644 --- a/e2e/kafka_file/docker-compose.yml +++ b/e2e/kafka_file/docker-compose.yml @@ -7,6 +7,16 @@ services: - "9092:9092" volumes: - "kafka_data:/bitnami" + healthcheck: + test: + [ + "CMD-SHELL", + "/opt/bitnami/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092", + ] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s environment: - KAFKA_CFG_NODE_ID=1001 - KAFKA_CFG_PROCESS_ROLES=broker,controller From a6df1437a1aed7f2f5cfbc4b6a3322d1e4380bd9 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Mon, 16 Mar 2026 14:54:21 +0700 Subject: [PATCH 12/18] kafka consumer: use own ctx for consume --- e2e/split_join/config.yml | 2 +- e2e/split_join/split_join.go | 6 ++++-- plugin/input/kafka/consumer.go | 9 +++++++-- plugin/input/kafka/kafka.go | 6 +++--- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/e2e/split_join/config.yml b/e2e/split_join/config.yml index ab59db379..1c9bfdbf1 100644 --- a/e2e/split_join/config.yml +++ b/e2e/split_join/config.yml @@ -20,4 +20,4 @@ pipelines: message: output event sample output: type: kafka - timeout: 60s + timeout: 5s diff --git a/e2e/split_join/split_join.go b/e2e/split_join/split_join.go index cbd119bb0..988c93762 100644 --- a/e2e/split_join/split_join.go +++ b/e2e/split_join/split_join.go @@ -89,7 +89,7 @@ func (c *Config) Send(t *testing.T) { func (c *Config) Validate(t *testing.T) { r := require.New(t) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) defer cancel() expectedEventsCount := messages * arrayLen @@ -99,7 +99,9 @@ func (c *Config) Validate(t *testing.T) { go func() { for { - fetches := c.client.PollFetches(ctx) + pollCtx, pollCancel := context.WithTimeout(context.Background(), 5*time.Second) + fetches := c.client.PollFetches(pollCtx) + pollCancel() fetches.EachError(func(topic string, p int32, err error) {}) fetches.EachRecord(func(r *kgo.Record) { result[string(r.Value)]++ diff --git a/plugin/input/kafka/consumer.go b/plugin/input/kafka/consumer.go index b5d595644..9bf029f79 100644 --- a/plugin/input/kafka/consumer.go +++ b/plugin/input/kafka/consumer.go @@ -3,6 +3,7 @@ package kafka import ( "context" "sync" + "time" "github.com/ozontech/file.d/metric" "github.com/ozontech/file.d/pipeline" @@ -74,9 +75,13 @@ func (s *splitConsume) Lost(_ context.Context, _ *kgo.Client, lost map[string][] } } -func (s *splitConsume) consume(ctx context.Context, cl *kgo.Client) { +func (s *splitConsume) consume(ctx context.Context, cl *kgo.Client, timeout time.Duration) { for { - fetches := cl.PollRecords(ctx, s.bufferSize) + pollCtx, cancel := context.WithTimeout(context.Background(), timeout) + + fetches := cl.PollRecords(pollCtx, s.bufferSize) + cancel() + if fetches.IsClientClosed() { return } diff --git a/plugin/input/kafka/kafka.go b/plugin/input/kafka/kafka.go index 292fee33a..6cb64111f 100644 --- a/plugin/input/kafka/kafka.go +++ b/plugin/input/kafka/kafka.go @@ -293,7 +293,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa p.idByTopic[topic] = i } - ctx, cancel := context.WithTimeout(context.Background(), p.config.Timeout_) + ctx, cancel := context.WithCancel(context.Background()) p.cancel = cancel p.s = &splitConsume{ consumers: make(map[tp]*pconsumer), @@ -309,7 +309,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa p.controller.UseSpread() p.controller.DisableStreams() - go p.s.consume(ctx, p.client) + go p.s.consume(ctx, p.client, p.config.Timeout_) } func (p *Plugin) registerMetrics(ctl *metric.Ctl) { @@ -319,7 +319,6 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) { func (p *Plugin) Stop() { p.logger.Infof("Stopping") - p.cancel() ctx, cancel := context.WithTimeout(context.Background(), p.config.Timeout_) defer cancel() @@ -330,6 +329,7 @@ func (p *Plugin) Stop() { p.logger.Errorf("can't commit marked offsets: %s", err.Error()) } p.client.Close() + p.cancel() } func (p *Plugin) Commit(event *pipeline.Event) { From e7f7cea29d556cb6807e7a6d911f6c48474699cf Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Mon, 16 Mar 2026 18:31:18 +0700 Subject: [PATCH 13/18] use parent context on kafka consume --- plugin/input/kafka/consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/input/kafka/consumer.go b/plugin/input/kafka/consumer.go index 9bf029f79..0c0ee5290 100644 --- a/plugin/input/kafka/consumer.go +++ b/plugin/input/kafka/consumer.go @@ -77,7 +77,7 @@ func (s *splitConsume) Lost(_ context.Context, _ *kgo.Client, lost map[string][] func (s *splitConsume) consume(ctx context.Context, cl *kgo.Client, timeout time.Duration) { for { - pollCtx, cancel := context.WithTimeout(context.Background(), timeout) + pollCtx, cancel := context.WithTimeout(ctx, timeout) fetches := cl.PollRecords(pollCtx, s.bufferSize) cancel() From f9c97548dd625164050ef88050a3ccf233215e32 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 19 Mar 2026 15:30:09 +0700 Subject: [PATCH 14/18] set kafka timeouts in e2e tests --- e2e/kafka_auth/kafka_auth.go | 2 ++ e2e/kafka_file/kafka_file.go | 1 + e2e/split_join/split_join.go | 1 + 3 files changed, 4 insertions(+) diff --git a/e2e/kafka_auth/kafka_auth.go b/e2e/kafka_auth/kafka_auth.go index 43fe5409f..5e39d6c3f 100644 --- a/e2e/kafka_auth/kafka_auth.go +++ b/e2e/kafka_auth/kafka_auth.go @@ -111,6 +111,7 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) { MaxMessageBytes_: 1000000, SslEnabled: c.SslEnabled, SslSkipVerify: true, + Timeout_: 10 * time.Second, } if tt.sasl.Enabled { config.SaslEnabled = true @@ -141,6 +142,7 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) { SslSkipVerify: true, SessionTimeout_: 10 * time.Second, AutoCommitInterval_: 1 * time.Second, + Timeout_: 10 * time.Second, } if tt.sasl.Enabled { config.SaslEnabled = true diff --git a/e2e/kafka_file/kafka_file.go b/e2e/kafka_file/kafka_file.go index 3a32e906d..c37f43848 100644 --- a/e2e/kafka_file/kafka_file.go +++ b/e2e/kafka_file/kafka_file.go @@ -50,6 +50,7 @@ func (c *Config) Send(t *testing.T) { Brokers: c.Brokers, MaxMessageBytes_: 512, BatchSize_: c.Count, + Timeout_: 10 * time.Second, } client := kafka_out.NewClient(config, diff --git a/e2e/split_join/split_join.go b/e2e/split_join/split_join.go index 988c93762..89ea1f4b3 100644 --- a/e2e/split_join/split_join.go +++ b/e2e/split_join/split_join.go @@ -61,6 +61,7 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) AutoCommitInterval_: 1 * time.Second, ConsumerMaxWaitTime_: 1 * time.Second, HeartbeatInterval_: 10 * time.Second, + Timeout_: 10 * time.Second, } c.client = kafka_in.NewClient(config, From 137bc7aae3c64a07d2e9dc6830cac5f577c0e98b Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 19 Mar 2026 15:55:22 +0700 Subject: [PATCH 15/18] e2e split_join: create consumer group --- e2e/split_join/split_join.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/e2e/split_join/split_join.go b/e2e/split_join/split_join.go index 89ea1f4b3..cc79aab3f 100644 --- a/e2e/split_join/split_join.go +++ b/e2e/split_join/split_join.go @@ -72,6 +72,13 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) adminClient := kadm.NewClient(c.client) _, err := adminClient.CreateTopic(context.TODO(), 1, 1, nil, c.topic) r.NoError(err) + + // create consumer group + pollCtx, pollCancel := context.WithTimeout(context.Background(), 5*time.Second) + fetches := c.client.PollFetches(pollCtx) + pollCancel() + fetches.EachError(func(topic string, p int32, err error) {}) + fetches.EachRecord(func(r *kgo.Record) {}) } func (c *Config) Send(t *testing.T) { From 2aadc4e3a0871c4f5c37ed293daf6991397ce329 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Mon, 23 Mar 2026 17:12:39 +0700 Subject: [PATCH 16/18] kafka consumer: use only cancel context --- e2e/kafka_auth/kafka_auth.go | 1 - e2e/kafka_file/config.yml | 1 - e2e/split_join/config.yml | 1 - e2e/split_join/split_join.go | 7 ++----- plugin/input/kafka/README.md | 6 ------ plugin/input/kafka/consumer.go | 8 ++------ plugin/input/kafka/kafka.go | 10 ++-------- 7 files changed, 6 insertions(+), 28 deletions(-) diff --git a/e2e/kafka_auth/kafka_auth.go b/e2e/kafka_auth/kafka_auth.go index 5e39d6c3f..e4945840d 100644 --- a/e2e/kafka_auth/kafka_auth.go +++ b/e2e/kafka_auth/kafka_auth.go @@ -142,7 +142,6 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) { SslSkipVerify: true, SessionTimeout_: 10 * time.Second, AutoCommitInterval_: 1 * time.Second, - Timeout_: 10 * time.Second, } if tt.sasl.Enabled { config.SaslEnabled = true diff --git a/e2e/kafka_file/config.yml b/e2e/kafka_file/config.yml index 826f3e5fe..24ee781aa 100644 --- a/e2e/kafka_file/config.yml +++ b/e2e/kafka_file/config.yml @@ -3,7 +3,6 @@ pipelines: input: type: kafka offset: oldest - timeout: 5s meta: partition: "partition_{{ .partition }}" topic: "{{ .topic }}" diff --git a/e2e/split_join/config.yml b/e2e/split_join/config.yml index 1c9bfdbf1..cc55665fe 100644 --- a/e2e/split_join/config.yml +++ b/e2e/split_join/config.yml @@ -20,4 +20,3 @@ pipelines: message: output event sample output: type: kafka - timeout: 5s diff --git a/e2e/split_join/split_join.go b/e2e/split_join/split_join.go index cc79aab3f..b4510797e 100644 --- a/e2e/split_join/split_join.go +++ b/e2e/split_join/split_join.go @@ -61,7 +61,6 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) AutoCommitInterval_: 1 * time.Second, ConsumerMaxWaitTime_: 1 * time.Second, HeartbeatInterval_: 10 * time.Second, - Timeout_: 10 * time.Second, } c.client = kafka_in.NewClient(config, @@ -97,7 +96,7 @@ func (c *Config) Send(t *testing.T) { func (c *Config) Validate(t *testing.T) { r := require.New(t) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() expectedEventsCount := messages * arrayLen @@ -107,9 +106,7 @@ func (c *Config) Validate(t *testing.T) { go func() { for { - pollCtx, pollCancel := context.WithTimeout(context.Background(), 5*time.Second) - fetches := c.client.PollFetches(pollCtx) - pollCancel() + fetches := c.client.PollFetches(ctx) fetches.EachError(func(topic string, p int32, err error) {}) fetches.EachRecord(func(r *kgo.Record) { result[string(r.Value)]++ diff --git a/plugin/input/kafka/README.md b/plugin/input/kafka/README.md index 20fc2075e..56885602c 100755 --- a/plugin/input/kafka/README.md +++ b/plugin/input/kafka/README.md @@ -122,12 +122,6 @@ AutoCommitInterval sets how long to go between autocommits
-**`timeout`** *`cfg.Duration`* *`default=15s`* - -Timeout for fetch messages - -
- **`session_timeout`** *`cfg.Duration`* *`default=10s`* SessionTimeout sets how long a member in the group can go between heartbeats diff --git a/plugin/input/kafka/consumer.go b/plugin/input/kafka/consumer.go index 0c0ee5290..8337bc4e3 100644 --- a/plugin/input/kafka/consumer.go +++ b/plugin/input/kafka/consumer.go @@ -3,7 +3,6 @@ package kafka import ( "context" "sync" - "time" "github.com/ozontech/file.d/metric" "github.com/ozontech/file.d/pipeline" @@ -75,12 +74,9 @@ func (s *splitConsume) Lost(_ context.Context, _ *kgo.Client, lost map[string][] } } -func (s *splitConsume) consume(ctx context.Context, cl *kgo.Client, timeout time.Duration) { +func (s *splitConsume) consume(ctx context.Context, cl *kgo.Client) { for { - pollCtx, cancel := context.WithTimeout(ctx, timeout) - - fetches := cl.PollRecords(pollCtx, s.bufferSize) - cancel() + fetches := cl.PollRecords(ctx, s.bufferSize) if fetches.IsClientClosed() { return diff --git a/plugin/input/kafka/kafka.go b/plugin/input/kafka/kafka.go index 6cb64111f..a56aa1d77 100644 --- a/plugin/input/kafka/kafka.go +++ b/plugin/input/kafka/kafka.go @@ -156,12 +156,6 @@ type Config struct { AutoCommitInterval cfg.Duration `json:"auto_commit_interval" default:"1s" parse:"duration"` // * AutoCommitInterval_ time.Duration - // > @3@4@5@6 - // > - // > Timeout for fetch messages - Timeout cfg.Duration `json:"timeout" default:"15s" parse:"duration"` // * - Timeout_ time.Duration - // > @3@4@5@6 // > // > SessionTimeout sets how long a member in the group can go between heartbeats @@ -309,7 +303,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa p.controller.UseSpread() p.controller.DisableStreams() - go p.s.consume(ctx, p.client, p.config.Timeout_) + go p.s.consume(ctx, p.client) } func (p *Plugin) registerMetrics(ctl *metric.Ctl) { @@ -320,7 +314,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) { func (p *Plugin) Stop() { p.logger.Infof("Stopping") - ctx, cancel := context.WithTimeout(context.Background(), p.config.Timeout_) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err := p.client.CommitMarkedOffsets(ctx) From bcc3735c791ece268bfaa7107835941e85cc6b78 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Mon, 23 Mar 2026 18:28:15 +0700 Subject: [PATCH 17/18] no need ForceMetadataRefresh --- plugin/output/kafka/client.go | 1 - plugin/output/kafka/kafka.go | 5 ----- 2 files changed, 6 deletions(-) diff --git a/plugin/output/kafka/client.go b/plugin/output/kafka/client.go index 9f2853537..46a87b5bf 100644 --- a/plugin/output/kafka/client.go +++ b/plugin/output/kafka/client.go @@ -12,7 +12,6 @@ import ( type KafkaClient interface { ProduceSync(ctx context.Context, rs ...*kgo.Record) kgo.ProduceResults Close() - ForceMetadataRefresh() } func NewClient(c *Config, l *zap.Logger) *kgo.Client { diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go index 6f6d8546a..7f5f01319 100644 --- a/plugin/output/kafka/kafka.go +++ b/plugin/output/kafka/kafka.go @@ -2,14 +2,12 @@ package kafka import ( "context" - "errors" "time" "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/metric" "github.com/ozontech/file.d/pipeline" - "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -353,9 +351,6 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err }) if err := p.client.ProduceSync(ctx, data.messages[:i]...).FirstErr(); err != nil { - if errors.Is(err, kerr.LeaderNotAvailable) || errors.Is(err, kerr.NotLeaderForPartition) { - p.client.ForceMetadataRefresh() - } p.logger.Errorf("can't write batch: %v", err) p.sendErrorMetric.Inc() return err From f1882eb419ee235781b6d2a27a5a073de593446e Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Mon, 23 Mar 2026 18:29:55 +0700 Subject: [PATCH 18/18] try to debyg split_join --- e2e/start_work_test.go | 96 ------------------------------------------ 1 file changed, 96 deletions(-) diff --git a/e2e/start_work_test.go b/e2e/start_work_test.go index a4664a2c3..d6dd4d499 100644 --- a/e2e/start_work_test.go +++ b/e2e/start_work_test.go @@ -10,16 +10,6 @@ import ( "time" "github.com/ozontech/file.d/cfg" - "github.com/ozontech/file.d/e2e/file_clickhouse" - "github.com/ozontech/file.d/e2e/file_elasticsearch" - "github.com/ozontech/file.d/e2e/file_es_split" - "github.com/ozontech/file.d/e2e/file_file" - "github.com/ozontech/file.d/e2e/file_loki" - "github.com/ozontech/file.d/e2e/http_file" - "github.com/ozontech/file.d/e2e/join_throttle" - "github.com/ozontech/file.d/e2e/kafka_auth" - "github.com/ozontech/file.d/e2e/kafka_file" - "github.com/ozontech/file.d/e2e/redis_clients" "github.com/ozontech/file.d/e2e/split_join" "github.com/ozontech/file.d/fd" _ "github.com/ozontech/file.d/plugin/action/add_file_name" @@ -92,97 +82,11 @@ type E2ETest struct { func TestE2EStabilityWorkCase(t *testing.T) { testsList := []E2ETest{ - { - name: "kafka_auth", - e2eTest: &kafka_auth.Config{ - Brokers: []string{"localhost:9093"}, - SslEnabled: true, - }, - cfgPath: "./kafka_auth/config.yml", - onlyConfigure: true, - }, - { - name: "kafka_auth", - e2eTest: &kafka_auth.Config{ - Brokers: []string{"localhost:9095"}, - SslEnabled: false, - }, - cfgPath: "./kafka_auth/config.yml", - onlyConfigure: true, - }, - { - name: "file_file", - e2eTest: &file_file.Config{ - Count: 10, - Lines: 500, - RetTime: "1s", - }, - cfgPath: "./file_file/config.yml", - }, - { - name: "http_file", - e2eTest: &http_file.Config{ - Count: 10, - Lines: 500, - RetTime: "1s", - }, - cfgPath: "./http_file/config.yml", - }, - { - name: "kafka_file", - e2eTest: &kafka_file.Config{ - Topics: []string{"quickstart"}, - Brokers: []string{"localhost:9092"}, - Count: 500, - RetTime: "1s", - Partition: 4, - }, - cfgPath: "./kafka_file/config.yml", - }, - { - name: "join_throttle", - e2eTest: &join_throttle.Config{ - Count: 1000, - }, - cfgPath: "./join_throttle/config.yml", - }, { name: "split_join", e2eTest: &split_join.Config{}, cfgPath: "./split_join/config.yml", }, - { - name: "file_clickhouse", - e2eTest: &file_clickhouse.Config{}, - cfgPath: "./file_clickhouse/config.yml", - }, - { - name: "file_elasticsearch", - e2eTest: &file_elasticsearch.Config{ - Count: 10, - Pipeline: "test-ingest-pipeline", - Endpoint: "http://localhost:19200", - Username: "elastic", - Password: "elastic", - }, - cfgPath: "./file_elasticsearch/config.yml", - }, - { - name: "file_es_split", - e2eTest: &file_es_split.Config{}, - cfgPath: "./file_es_split/config.yml", - }, - { - name: "file_loki", - e2eTest: &file_loki.Config{}, - cfgPath: "./file_loki/config.yml", - }, - { - name: "redis_clients", - e2eTest: &redis_clients.Config{}, - cfgPath: "./redis_clients/config.yml", - onlyConfigure: true, - }, } for num, test := range testsList {