Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ test-e2e:
.PHONY: test-e2e-docker-up
test-e2e-docker-up:
for dc in $(shell find e2e -name 'docker-compose.yml') ; do \
docker compose -f $$dc up -d ; \
docker compose -f $$dc up -d --wait; \
done

.PHONY: test-e2e-docker-down
Expand Down
41 changes: 19 additions & 22 deletions e2e/kafka_auth/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,31 +1,18 @@
version: "2.1"

services:
zookeeper:
image: zookeeper:3.9
ports:
- "2182:2181"
volumes:
- "zookeeper_data:/data"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_4LW_COMMANDS_WHITELIST: "*"
ZOO_SERVERS: server.1=zookeeper:2888:3888;2181
init-certs:
image: docker.io/bitnamilegacy/kafka:3.6
image: docker.io/bitnamilegacy/kafka:3.9
command: /tmp/generate.sh
working_dir: /tmp/
user: 0:0
volumes:
- ./certs/:/tmp/certs/
- "./generate.sh:/tmp/generate.sh"
kafka:
image: docker.io/bitnamilegacy/kafka:3.6
image: docker.io/bitnamilegacy/kafka:3.9
container_name: kafka
depends_on:
zookeeper:
condition: service_started
init-certs:
condition: service_completed_successfully
ports:
Expand All @@ -35,13 +22,25 @@ services:
- "kafka_data:/bitnami"
- ./certs/kafka.truststore.jks:/bitnami/kafka/config/certs/kafka.truststore.jks
- ./certs/kafka.keystore.jks:/bitnami/kafka/config/certs/kafka.keystore.jks
healthcheck:
test:
[
"CMD-SHELL",
"/opt/bitnami/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9094",
]
interval: 10s
timeout: 5s
retries: 5
start_period: 10s
environment:
# Zookeeper
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_NODE_ID=1001
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1001@localhost:9096
# Listeners
- KAFKA_CFG_LISTENERS=SASL_SSL://:9093,SASL_PLAINTEXT://:9095,PLAINTEXT://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://localhost:9093,SASL_PLAINTEXT://localhost:9095,PLAINTEXT://:9094
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,SSL:SSL
- KAFKA_CFG_LISTENERS=SASL_SSL://:9093,SASL_PLAINTEXT://:9095,PLAINTEXT://:9094,CONTROLLER://:9096
- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://localhost:9093,SASL_PLAINTEXT://localhost:9095,PLAINTEXT://localhost:9094
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,SSL:SSL,CONTROLLER:PLAINTEXT
# Inter broker
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
Expand All @@ -57,7 +56,5 @@ services:
- KAFKA_CFG_SSL_CLIENT_AUTH=required

volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
1 change: 1 addition & 0 deletions e2e/kafka_auth/kafka_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
MaxMessageBytes_: 1000000,
SslEnabled: c.SslEnabled,
SslSkipVerify: true,
Timeout_: 10 * time.Second,
}
if tt.sasl.Enabled {
config.SaslEnabled = true
Expand Down
4 changes: 2 additions & 2 deletions e2e/kafka_file/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pipelines:
type: kafka
offset: oldest
meta:
partition: 'partition_{{ .partition }}'
topic: '{{ .topic }}'
partition: "partition_{{ .partition }}"
topic: "{{ .topic }}"
output:
type: file
34 changes: 17 additions & 17 deletions e2e/kafka_file/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
version: "2"

services:
zookeeper:
image: zookeeper:3.9
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/data"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_4LW_COMMANDS_WHITELIST: "*"
ZOO_SERVERS: server.1=zookeeper:2888:3888;2181
kafka:
image: docker.io/bitnamilegacy/kafka:3.1
image: docker.io/bitnamilegacy/kafka:3.9
ports:
- "9092:9092"
volumes:
- "kafka_data:/bitnami"
healthcheck:
test:
[
"CMD-SHELL",
"/opt/bitnami/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092",
]
interval: 10s
timeout: 5s
retries: 5
start_period: 10s
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_NODE_ID=1001
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
depends_on:
- zookeeper
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_CONTROLLER_LISTENERS=CONTROLLER://:9093
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1001@localhost:9093

volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
1 change: 1 addition & 0 deletions e2e/kafka_file/kafka_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (c *Config) Send(t *testing.T) {
Brokers: c.Brokers,
MaxMessageBytes_: 512,
BatchSize_: c.Count,
Timeout_: 10 * time.Second,
}

client := kafka_out.NewClient(config,
Expand Down
4 changes: 2 additions & 2 deletions e2e/split_join/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pipelines:
field: data
- type: join
field: message
start: '/^start/'
continue: '/^continue/'
start: "/^start/"
continue: "/^continue/"
- type: debug
message: output event sample
output:
Expand Down
9 changes: 8 additions & 1 deletion e2e/split_join/split_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)
adminClient := kadm.NewClient(c.client)
_, err := adminClient.CreateTopic(context.TODO(), 1, 1, nil, c.topic)
r.NoError(err)

// create consumer group
pollCtx, pollCancel := context.WithTimeout(context.Background(), 5*time.Second)
fetches := c.client.PollFetches(pollCtx)
pollCancel()
fetches.EachError(func(topic string, p int32, err error) {})
fetches.EachRecord(func(r *kgo.Record) {})
}

func (c *Config) Send(t *testing.T) {
Expand All @@ -89,7 +96,7 @@ func (c *Config) Send(t *testing.T) {
func (c *Config) Validate(t *testing.T) {
r := require.New(t)

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

expectedEventsCount := messages * arrayLen
Expand Down
96 changes: 0 additions & 96 deletions e2e/start_work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,6 @@ import (
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/e2e/file_clickhouse"
"github.com/ozontech/file.d/e2e/file_elasticsearch"
"github.com/ozontech/file.d/e2e/file_es_split"
"github.com/ozontech/file.d/e2e/file_file"
"github.com/ozontech/file.d/e2e/file_loki"
"github.com/ozontech/file.d/e2e/http_file"
"github.com/ozontech/file.d/e2e/join_throttle"
"github.com/ozontech/file.d/e2e/kafka_auth"
"github.com/ozontech/file.d/e2e/kafka_file"
"github.com/ozontech/file.d/e2e/redis_clients"
"github.com/ozontech/file.d/e2e/split_join"
"github.com/ozontech/file.d/fd"
_ "github.com/ozontech/file.d/plugin/action/add_file_name"
Expand Down Expand Up @@ -92,97 +82,11 @@ type E2ETest struct {

func TestE2EStabilityWorkCase(t *testing.T) {
testsList := []E2ETest{
{
name: "kafka_auth",
e2eTest: &kafka_auth.Config{
Brokers: []string{"localhost:9093"},
SslEnabled: true,
},
cfgPath: "./kafka_auth/config.yml",
onlyConfigure: true,
},
{
name: "kafka_auth",
e2eTest: &kafka_auth.Config{
Brokers: []string{"localhost:9095"},
SslEnabled: false,
},
cfgPath: "./kafka_auth/config.yml",
onlyConfigure: true,
},
{
name: "file_file",
e2eTest: &file_file.Config{
Count: 10,
Lines: 500,
RetTime: "1s",
},
cfgPath: "./file_file/config.yml",
},
{
name: "http_file",
e2eTest: &http_file.Config{
Count: 10,
Lines: 500,
RetTime: "1s",
},
cfgPath: "./http_file/config.yml",
},
{
name: "kafka_file",
e2eTest: &kafka_file.Config{
Topics: []string{"quickstart"},
Brokers: []string{"localhost:9092"},
Count: 500,
RetTime: "1s",
Partition: 4,
},
cfgPath: "./kafka_file/config.yml",
},
{
name: "join_throttle",
e2eTest: &join_throttle.Config{
Count: 1000,
},
cfgPath: "./join_throttle/config.yml",
},
{
name: "split_join",
e2eTest: &split_join.Config{},
cfgPath: "./split_join/config.yml",
},
{
name: "file_clickhouse",
e2eTest: &file_clickhouse.Config{},
cfgPath: "./file_clickhouse/config.yml",
},
{
name: "file_elasticsearch",
e2eTest: &file_elasticsearch.Config{
Count: 10,
Pipeline: "test-ingest-pipeline",
Endpoint: "http://localhost:19200",
Username: "elastic",
Password: "elastic",
},
cfgPath: "./file_elasticsearch/config.yml",
},
{
name: "file_es_split",
e2eTest: &file_es_split.Config{},
cfgPath: "./file_es_split/config.yml",
},
{
name: "file_loki",
e2eTest: &file_loki.Config{},
cfgPath: "./file_loki/config.yml",
},
{
name: "redis_clients",
e2eTest: &redis_clients.Config{},
cfgPath: "./redis_clients/config.yml",
onlyConfigure: true,
},
}

for num, test := range testsList {
Expand Down
22 changes: 11 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ require (
github.com/jackc/pgconn v1.14.3
github.com/jackc/pgproto3/v2 v2.3.3
github.com/jackc/pgx/v4 v4.18.3
github.com/klauspost/compress v1.18.1
github.com/klauspost/compress v1.18.4
github.com/minio/minio-go v6.0.14+incompatible
github.com/ozontech/insane-json v0.1.9
github.com/pierrec/lz4/v4 v4.1.22
github.com/pierrec/lz4/v4 v4.1.25
github.com/prometheus/client_golang v1.16.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.42.0
Expand All @@ -42,7 +42,7 @@ require (
github.com/stretchr/testify v1.10.0
github.com/tidwall/gjson v1.18.0
github.com/timtadh/lexmachine v0.2.3
github.com/twmb/franz-go v1.20.5
github.com/twmb/franz-go v1.20.7
github.com/twmb/franz-go/pkg/kadm v1.12.0
github.com/twmb/franz-go/plugin/kzap v1.1.2
github.com/twmb/tlscfg v1.2.1
Expand All @@ -51,7 +51,7 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/automaxprocs v1.5.3
go.uber.org/zap v1.27.0
golang.org/x/net v0.47.0
golang.org/x/net v0.49.0
google.golang.org/protobuf v1.36.5
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
Expand Down Expand Up @@ -142,15 +142,15 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.45.0 // indirect
golang.org/x/mod v0.29.0 // indirect
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/mod v0.32.0 // indirect
golang.org/x/oauth2 v0.27.0 // indirect
golang.org/x/sync v0.18.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/term v0.37.0 // indirect
golang.org/x/text v0.31.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/term v0.40.0 // indirect
golang.org/x/text v0.34.0 // indirect
golang.org/x/time v0.12.0 // indirect
golang.org/x/tools v0.38.0 // indirect
golang.org/x/tools v0.41.0 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.62.0 // indirect
Expand Down
Loading
Loading