From 2e4453071537e3a756b79dc668cc3ad920ca80e1 Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Sat, 16 May 2026 14:42:46 +0500 Subject: [PATCH 1/4] Add Kafka Streams runner skeleton module and portable entry points --- CHANGES.md | 1 + build.gradle.kts | 1 + runners/kafka-streams/build.gradle | 62 ++++++++++ .../kafka/streams/KafkaStreamsJobInvoker.java | 99 ++++++++++++++++ .../streams/KafkaStreamsJobServerDriver.java | 109 ++++++++++++++++++ .../streams/KafkaStreamsPipelineOptions.java | 76 ++++++++++++ .../streams/KafkaStreamsPipelineResult.java | 69 +++++++++++ .../streams/KafkaStreamsPipelineRunner.java | 48 ++++++++ .../kafka/streams/KafkaStreamsRunner.java | 101 ++++++++++++++++ .../streams/KafkaStreamsRunnerRegistrar.java | 48 ++++++++ .../runners/kafka/streams/package-info.java | 20 ++++ .../KafkaStreamsPipelineTranslator.java | 70 +++++++++++ .../KafkaStreamsTranslationContext.java | 47 ++++++++ .../streams/translation/package-info.java | 20 ++++ settings.gradle.kts | 1 + 15 files changed, 772 insertions(+) create mode 100644 runners/kafka-streams/build.gradle create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobInvoker.java create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobServerDriver.java create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineResult.java create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunner.java create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunnerRegistrar.java create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/package-info.java create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsTranslationContext.java create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/package-info.java diff --git a/CHANGES.md b/CHANGES.md index 52475a99d8e1..18d2698a2875 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -71,6 +71,7 @@ ## New Features / Improvements +* (Java) Added a `runners/kafka-streams` Gradle module with portable job server and runner entry points; translation fails fast with an explicit unsupported-URN message until transforms are implemented ([#38465](https://github.com/apache/beam/issues/38465)). * Capability introduces an indicator for aggregations and timers firing during a pipeline drain, allowing users and sinks to recognize and appropriately handle potentially incomplete or partial data ([#36884](https://github.com/apache/beam/issues/36884)). * Added support for setting disk provisioned IOPS and throughput in Dataflow runner via `--diskProvisionedIops` and `--diskProvisionedThroughputMibps` pipeline options (Java/Go/Python) ([#38349](https://github.com/apache/beam/issues/38349)). * TriggerStateMachineRunner changes from BitSetCoder to SentinelBitSetCoder to diff --git a/build.gradle.kts b/build.gradle.kts index 4af8fa3f1ab4..b3b9fdd7fdf0 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -278,6 +278,7 @@ tasks.register("javaPreCommit") { dependsOn(":runners:java-fn-execution:build") dependsOn(":runners:java-job-service:build") dependsOn(":runners:jet:build") + dependsOn(":runners:kafka-streams:build") dependsOn(":runners:local-java:build") dependsOn(":runners:portability:java:build") dependsOn(":runners:prism:java:build") diff --git a/runners/kafka-streams/build.gradle b/runners/kafka-streams/build.gradle new file mode 100644 index 000000000000..54474502ad7b --- /dev/null +++ b/runners/kafka-streams/build.gradle @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } + +def kafka_version = '3.9.0' + +applyJavaNature( + automaticModuleName: 'org.apache.beam.runners.kafka.streams', +) + +description = "Apache Beam :: Runners :: Kafka Streams" + +evaluationDependsOn(":sdks:java:core") +evaluationDependsOn(":runners:core-java") + +configurations.configureEach { + resolutionStrategy.eachDependency { details -> + if (details.requested.group == "org.apache.kafka") { + details.useVersion(kafka_version) + details.because("Kafka Streams runner is developed against Kafka ${kafka_version}.") + } + } +} + +dependencies { + compileOnly project(":sdks:java:build-tools") + permitUnusedDeclared project(":sdks:java:build-tools") + + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(path: ":model:pipeline", configuration: "shadow") + implementation project(":runners:core-java") + permitUnusedDeclared project(":runners:core-java") + implementation project(":runners:java-fn-execution") + implementation project(":runners:java-job-service") + implementation project(":runners:portability:java") + implementation project(path: ":sdks:java:extensions:google-cloud-platform-core") + implementation library.java.args4j + implementation library.java.joda_time + implementation library.java.slf4j_api + implementation library.java.vendored_grpc_1_69_0 + implementation library.java.vendored_guava_32_1_2_jre + implementation "org.apache.kafka:kafka-clients:$kafka_version" + implementation "org.apache.kafka:kafka-streams:$kafka_version" + permitUnusedDeclared "org.apache.kafka:kafka-clients:$kafka_version" + permitUnusedDeclared "org.apache.kafka:kafka-streams:$kafka_version" +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobInvoker.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobInvoker.java new file mode 100644 index 000000000000..bd1ed4c1bcf6 --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobInvoker.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import java.util.UUID; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.jobsubmission.JobInvocation; +import org.apache.beam.runners.jobsubmission.JobInvoker; +import org.apache.beam.runners.jobsubmission.PortablePipelineRunner; +import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; +import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Struct; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Job invoker for the Kafka Streams portable runner. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class KafkaStreamsJobInvoker extends JobInvoker { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsJobInvoker.class); + + public static KafkaStreamsJobInvoker create( + KafkaStreamsJobServerDriver.KafkaStreamsServerConfiguration serverConfig) { + return new KafkaStreamsJobInvoker(serverConfig); + } + + private final KafkaStreamsJobServerDriver.KafkaStreamsServerConfiguration serverConfig; + + protected KafkaStreamsJobInvoker( + KafkaStreamsJobServerDriver.KafkaStreamsServerConfiguration serverConfig) { + super("kafka-streams-runner-job-invoker-%d"); + this.serverConfig = serverConfig; + } + + @Override + protected JobInvocation invokeWithExecutor( + RunnerApi.Pipeline pipeline, + Struct options, + @Nullable String retrievalToken, + ListeningExecutorService executorService) { + + LOG.trace( + "Parsing pipeline options (job server {}:{})", + serverConfig.getHost(), + serverConfig.getPort()); + KafkaStreamsPipelineOptions kafkaStreamsOptions = + PipelineOptionsTranslation.fromProto(options).as(KafkaStreamsPipelineOptions.class); + + String invocationId = + String.format("%s_%s", kafkaStreamsOptions.getJobName(), UUID.randomUUID().toString()); + + PortablePipelineRunner pipelineRunner = new KafkaStreamsPipelineRunner(kafkaStreamsOptions); + kafkaStreamsOptions.setRunner(null); + + LOG.info("Invoking job {} with pipeline runner {}", invocationId, pipelineRunner); + return createJobInvocation( + invocationId, + retrievalToken, + executorService, + pipeline, + kafkaStreamsOptions, + pipelineRunner); + } + + protected JobInvocation createJobInvocation( + String invocationId, + String retrievalToken, + ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, + KafkaStreamsPipelineOptions kafkaStreamsOptions, + PortablePipelineRunner pipelineRunner) { + JobInfo jobInfo = + JobInfo.create( + invocationId, + kafkaStreamsOptions.getJobName(), + retrievalToken, + PipelineOptionsTranslation.toProto(kafkaStreamsOptions)); + return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner); + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobServerDriver.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobServerDriver.java new file mode 100644 index 000000000000..ddeac8b7c959 --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobServerDriver.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import org.apache.beam.runners.jobsubmission.JobServerDriver; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.fn.server.ServerFactory; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Driver that starts a Beam job server for the Kafka Streams portable runner. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class KafkaStreamsJobServerDriver extends JobServerDriver { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsJobServerDriver.class); + + /** Runner-specific configuration for the job server process. */ + public static class KafkaStreamsServerConfiguration extends ServerConfiguration {} + + public static void main(String[] args) throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + options.as(GcsOptions.class).setGcsUploadBufferSizeBytes(1024 * 1024); + FileSystems.setDefaultPipelineOptions(options); + fromParams(args).run(); + } + + private static void printUsage(CmdLineParser parser) { + System.err.println( + String.format( + "Usage: java %s arguments...", KafkaStreamsJobServerDriver.class.getSimpleName())); + parser.printUsage(System.err); + System.err.println(); + } + + public static KafkaStreamsServerConfiguration parseArgs(String[] args) { + KafkaStreamsServerConfiguration configuration = new KafkaStreamsServerConfiguration(); + CmdLineParser parser = new CmdLineParser(configuration); + try { + parser.parseArgument(args); + } catch (CmdLineException e) { + LOG.error("Unable to parse command line arguments.", e); + printUsage(parser); + throw new IllegalArgumentException("Unable to parse command line arguments.", e); + } + return configuration; + } + + /** Used by tests and tooling to construct a driver from command-line parameters. */ + public static KafkaStreamsJobServerDriver fromParams(String[] args) { + return fromConfig(parseArgs(args)); + } + + public static KafkaStreamsJobServerDriver fromConfig( + KafkaStreamsServerConfiguration configuration) { + return create( + configuration, + createJobServerFactory(configuration), + createArtifactServerFactory(configuration), + () -> KafkaStreamsJobInvoker.create(configuration)); + } + + public static KafkaStreamsJobServerDriver fromConfig( + KafkaStreamsServerConfiguration configuration, JobInvokerFactory jobInvokerFactory) { + return create( + configuration, + createJobServerFactory(configuration), + createArtifactServerFactory(configuration), + jobInvokerFactory); + } + + private static KafkaStreamsJobServerDriver create( + KafkaStreamsServerConfiguration configuration, + ServerFactory jobServerFactory, + ServerFactory artifactServerFactory, + JobInvokerFactory jobInvokerFactory) { + return new KafkaStreamsJobServerDriver( + configuration, jobServerFactory, artifactServerFactory, jobInvokerFactory); + } + + private KafkaStreamsJobServerDriver( + KafkaStreamsServerConfiguration configuration, + ServerFactory jobServerFactory, + ServerFactory artifactServerFactory, + JobInvokerFactory jobInvokerFactory) { + super(configuration, jobServerFactory, artifactServerFactory, jobInvokerFactory); + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java new file mode 100644 index 000000000000..019b37cba770 --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import java.nio.file.Paths; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PortablePipelineOptions; + +/** Pipeline options for the Kafka Streams runner. */ +public interface KafkaStreamsPipelineOptions extends PortablePipelineOptions { + + @Description("Comma-separated list of host:port Kafka brokers used by the Kafka Streams client.") + @Default.String("localhost:9092") + String getBootstrapServers(); + + void setBootstrapServers(String bootstrapServers); + + @Description( + "Kafka Streams application.id (must be unique for each distinct topology using the same " + + "input topics in a Kafka cluster).") + @Default.String("beam-kafka-streams-runner") + String getApplicationId(); + + void setApplicationId(String applicationId); + + @Description( + "Kafka Streams processing.guarantee setting, for example at_least_once or exactly_once_v2.") + @Default.String("exactly_once_v2") + String getProcessingGuarantee(); + + void setProcessingGuarantee(String processingGuarantee); + + @Description("Soft cap on the number of elements per bundle.") + @Default.Integer(1000) + int getMaxBundleSize(); + + void setMaxBundleSize(int maxBundleSize); + + @Description("Soft cap on bundle wall-clock duration in milliseconds.") + @Default.Integer(1000) + int getMaxBundleTimeMs(); + + void setMaxBundleTimeMs(int maxBundleTimeMs); + + @Description("Directory where Kafka Streams stores local state.") + @Default.InstanceFactory(StateDirDefaultFactory.class) + String getStateDir(); + + void setStateDir(String stateDir); + + /** Default {@link #getStateDir()} under the JVM temp directory. */ + class StateDirDefaultFactory implements DefaultValueFactory { + @Override + public String create(PipelineOptions options) { + return Paths.get(System.getProperty("java.io.tmpdir"), "beam-kafka-streams-state").toString(); + } + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineResult.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineResult.java new file mode 100644 index 000000000000..776eaa6629f5 --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineResult.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import java.io.IOException; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.joda.time.Duration; + +/** + * Forwards {@link PipelineResult} calls to a delegate and stops an embedded job server when the + * pipeline reaches a terminal state. + */ +class KafkaStreamsPipelineResult implements PipelineResult { + + private final PipelineResult delegate; + private final Runnable stopJobServer; + + KafkaStreamsPipelineResult(PipelineResult delegate, Runnable stopJobServer) { + this.delegate = delegate; + this.stopJobServer = stopJobServer; + } + + @Override + public State getState() { + return delegate.getState(); + } + + @Override + public State cancel() throws IOException { + State state = delegate.cancel(); + stopJobServer.run(); + return state; + } + + @Override + public State waitUntilFinish(Duration duration) { + State state = delegate.waitUntilFinish(duration); + stopJobServer.run(); + return state; + } + + @Override + public State waitUntilFinish() { + State state = delegate.waitUntilFinish(); + stopJobServer.run(); + return state; + } + + @Override + public MetricResults metrics() { + return delegate.metrics(); + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java new file mode 100644 index 000000000000..cc7464e22786 --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.jobsubmission.PortablePipelineResult; +import org.apache.beam.runners.jobsubmission.PortablePipelineRunner; +import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsPipelineTranslator; +import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsTranslationContext; + +/** Executes a portable pipeline by translating it to Kafka Streams. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class KafkaStreamsPipelineRunner implements PortablePipelineRunner { + + private final KafkaStreamsPipelineOptions pipelineOptions; + + public KafkaStreamsPipelineRunner(KafkaStreamsPipelineOptions pipelineOptions) { + this.pipelineOptions = pipelineOptions; + } + + @Override + public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) throws Exception { + KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); + KafkaStreamsTranslationContext context = + translator.createTranslationContext(jobInfo, pipelineOptions); + RunnerApi.Pipeline prepared = translator.prepareForTranslation(pipeline); + translator.translate(context, prepared); + throw new IllegalStateException("Translation unexpectedly completed without an executor"); + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunner.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunner.java new file mode 100644 index 000000000000..ce8f0544b681 --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunner.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.runners.portability.PortableRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.construction.Environments; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PipelineRunner} that submits portable jobs to an in-process or external Beam job service + * backed by the Kafka Streams translation path. + */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class KafkaStreamsRunner extends PipelineRunner { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsRunner.class); + + private final KafkaStreamsPipelineOptions pipelineOptions; + + public static KafkaStreamsRunner fromOptions(PipelineOptions options) { + return new KafkaStreamsRunner(options.as(KafkaStreamsPipelineOptions.class)); + } + + protected KafkaStreamsRunner(KafkaStreamsPipelineOptions pipelineOptions) { + this.pipelineOptions = pipelineOptions; + } + + @Override + public PipelineResult run(Pipeline pipeline) { + assignPortableDefaults(pipelineOptions); + KafkaStreamsJobServerDriver jobServerDriver = null; + try { + if (Strings.isNullOrEmpty(pipelineOptions.getJobEndpoint())) { + LOG.info("No job endpoint configured; starting an embedded Kafka Streams job server."); + KafkaStreamsJobServerDriver.KafkaStreamsServerConfiguration configuration = + new KafkaStreamsJobServerDriver.KafkaStreamsServerConfiguration(); + configuration.setPort(0); + jobServerDriver = KafkaStreamsJobServerDriver.fromConfig(configuration); + pipelineOptions.setJobEndpoint(jobServerDriver.start()); + } + PortableRunner portableRunner = PortableRunner.fromOptions(pipelineOptions); + PipelineResult result = portableRunner.run(pipeline); + if (jobServerDriver != null) { + return new KafkaStreamsPipelineResult(result, jobServerDriver::stop); + } + return result; + } catch (IOException e) { + if (jobServerDriver != null) { + jobServerDriver.stop(); + } + throw new RuntimeException(e); + } + } + + private static void assignPortableDefaults(KafkaStreamsPipelineOptions pipelineOptions) { + if (Strings.isNullOrEmpty(pipelineOptions.getDefaultEnvironmentType())) { + pipelineOptions.setDefaultEnvironmentType(Environments.ENVIRONMENT_LOOPBACK); + } + ExperimentalOptions experimentalOptions = pipelineOptions.as(ExperimentalOptions.class); + List experiments = + experimentalOptions.getExperiments() == null + ? new ArrayList<>() + : new ArrayList<>(experimentalOptions.getExperiments()); + if (!experiments.contains("beam_fn_api")) { + experiments.add("beam_fn_api"); + experimentalOptions.setExperiments(experiments); + } + } + + @Override + public String toString() { + return "KafkaStreamsRunner#" + hashCode(); + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunnerRegistrar.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunnerRegistrar.java new file mode 100644 index 000000000000..ac3c64b97bb0 --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunnerRegistrar.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; + +/** {@link com.google.auto.service.AutoService} registrations for the Kafka Streams runner. */ +public class KafkaStreamsRunnerRegistrar { + private KafkaStreamsRunnerRegistrar() {} + + /** Registers {@link KafkaStreamsRunner}. */ + @AutoService(PipelineRunnerRegistrar.class) + public static class Runner implements PipelineRunnerRegistrar { + @Override + public Iterable>> getPipelineRunners() { + return ImmutableList.of(KafkaStreamsRunner.class); + } + } + + /** Registers {@link KafkaStreamsPipelineOptions}. */ + @AutoService(PipelineOptionsRegistrar.class) + public static class Options implements PipelineOptionsRegistrar { + @Override + public Iterable> getPipelineOptions() { + return ImmutableList.of(KafkaStreamsPipelineOptions.class); + } + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/package-info.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/package-info.java new file mode 100644 index 000000000000..c9def4d1a4d7 --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Kafka Streams runner: portable pipeline execution backed by Apache Kafka Streams. */ +package org.apache.beam.runners.kafka.streams; diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java new file mode 100644 index 000000000000..cc915d604b68 --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import java.util.Map; +import java.util.TreeMap; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions; + +/** + * Translates a portable Beam pipeline into a Kafka Streams {@code Topology}. + * + *

The initial implementation only validates the graph and fails fast with an explicit message + * for transforms that are not yet supported. + */ +public class KafkaStreamsPipelineTranslator { + + public KafkaStreamsTranslationContext createTranslationContext( + JobInfo jobInfo, KafkaStreamsPipelineOptions pipelineOptions) { + return KafkaStreamsTranslationContext.create(jobInfo, pipelineOptions); + } + + /** Returns the pipeline to translate (placeholder for future fusion / expansion steps). */ + public RunnerApi.Pipeline prepareForTranslation(RunnerApi.Pipeline pipeline) { + return pipeline; + } + + /** + * Translates the pipeline. Throws {@link UnsupportedOperationException} with a clear URN message + * for the first unsupported primitive encountered. + */ + public void translate(KafkaStreamsTranslationContext context, RunnerApi.Pipeline pipeline) { + Map transforms = pipeline.getComponents().getTransformsMap(); + TreeMap ordered = new TreeMap<>(transforms); + for (Map.Entry entry : ordered.entrySet()) { + RunnerApi.PTransform transform = entry.getValue(); + if (!transform.hasSpec()) { + continue; + } + String urn = transform.getSpec().getUrn(); + if (urn.isEmpty()) { + continue; + } + throw new UnsupportedOperationException( + "No translator registered for URN " + + urn + + " (jobId=" + + context.getJobInfo().jobId() + + ")"); + } + throw new UnsupportedOperationException( + "No translator registered for pipeline (no transform URNs found)"); + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsTranslationContext.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsTranslationContext.java new file mode 100644 index 000000000000..7c6d3d079159 --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsTranslationContext.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions; + +/** Mutable state shared while translating a portable pipeline into a Kafka Streams topology. */ +public class KafkaStreamsTranslationContext { + + private final JobInfo jobInfo; + private final KafkaStreamsPipelineOptions pipelineOptions; + + public static KafkaStreamsTranslationContext create( + JobInfo jobInfo, KafkaStreamsPipelineOptions pipelineOptions) { + return new KafkaStreamsTranslationContext(jobInfo, pipelineOptions); + } + + private KafkaStreamsTranslationContext( + JobInfo jobInfo, KafkaStreamsPipelineOptions pipelineOptions) { + this.jobInfo = jobInfo; + this.pipelineOptions = pipelineOptions; + } + + public JobInfo getJobInfo() { + return jobInfo; + } + + public KafkaStreamsPipelineOptions getPipelineOptions() { + return pipelineOptions; + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/package-info.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/package-info.java new file mode 100644 index 000000000000..9c09b9ceeb0b --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Portable pipeline translation to Kafka Streams topologies. */ +package org.apache.beam.runners.kafka.streams.translation; diff --git a/settings.gradle.kts b/settings.gradle.kts index fc5f40c23d17..b92b254981fe 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -143,6 +143,7 @@ include(":runners:google-cloud-dataflow-java:examples-streaming") include(":runners:java-fn-execution") include(":runners:java-job-service") include(":runners:jet") +include(":runners:kafka-streams") include(":runners:local-java") include(":runners:portability:java") include(":runners:prism") From 61c891a69cad50e1fd4b2e4a336843efe4d5e6bb Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Tue, 19 May 2026 16:10:28 +0500 Subject: [PATCH 2/4] Address review notes on KafkaStreamsPipelineResult and state dir - cancel(): wrap delegate.cancel() in try/finally so the embedded job server is always stopped, even if cancellation throws IOException. - waitUntilFinish(Duration): only stop the job server when the returned state is terminal, so a timed-out wait does not prematurely kill the job server while the pipeline is still running. - waitUntilFinish(): wrap in try/finally for the same defensive cleanup reason as cancel(). - KafkaStreamsPipelineOptions.StateDirDefaultFactory: include the job name in the default Kafka Streams state directory so that multiple pipelines on the same host (e.g. parallel tests) do not collide and hit a LockException. --- .../streams/KafkaStreamsPipelineOptions.java | 14 ++++++++++-- .../streams/KafkaStreamsPipelineResult.java | 22 +++++++++++++------ 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java index 019b37cba770..911ffce2eea3 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java @@ -66,11 +66,21 @@ public interface KafkaStreamsPipelineOptions extends PortablePipelineOptions { void setStateDir(String stateDir); - /** Default {@link #getStateDir()} under the JVM temp directory. */ + /** + * Default {@link #getStateDir()} under the JVM temp directory. + * + *

The job name is included in the path so that multiple pipelines running on the same host + * (e.g. parallel tests) do not collide on the same Kafka Streams state directory and trigger a + * {@code LockException}. + */ class StateDirDefaultFactory implements DefaultValueFactory { @Override public String create(PipelineOptions options) { - return Paths.get(System.getProperty("java.io.tmpdir"), "beam-kafka-streams-state").toString(); + return Paths.get( + System.getProperty("java.io.tmpdir"), + "beam-kafka-streams-state", + options.getJobName()) + .toString(); } } } diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineResult.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineResult.java index 776eaa6629f5..65ff8b77b180 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineResult.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineResult.java @@ -43,23 +43,31 @@ public State getState() { @Override public State cancel() throws IOException { - State state = delegate.cancel(); - stopJobServer.run(); - return state; + try { + return delegate.cancel(); + } finally { + stopJobServer.run(); + } } @Override public State waitUntilFinish(Duration duration) { State state = delegate.waitUntilFinish(duration); - stopJobServer.run(); + // A null/non-terminal state means the wait timed out and the pipeline is still running; + // keep the job server alive so the caller can continue to interact with the job. + if (state != null && state.isTerminal()) { + stopJobServer.run(); + } return state; } @Override public State waitUntilFinish() { - State state = delegate.waitUntilFinish(); - stopJobServer.run(); - return state; + try { + return delegate.waitUntilFinish(); + } finally { + stopJobServer.run(); + } } @Override From cef6544e7920219e7c02e85f9d9902f6cc33ea7e Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Thu, 21 May 2026 20:33:28 +0500 Subject: [PATCH 3/4] Address review feedback on Kafka Streams Runner skeleton --- ...m_PreCommit_Java_Kafka_Streams_Runner.json | 4 + .github/workflows/README.md | 1 + ...am_PreCommit_Java_Kafka_Streams_Runner.yml | 118 ++++++++++++++++++ runners/kafka-streams/build.gradle | 5 + .../kafka/streams/KafkaStreamsJobInvoker.java | 22 +++- .../streams/KafkaStreamsJobServerDriver.java | 3 - .../streams/KafkaStreamsPipelineOptions.java | 7 -- .../streams/KafkaStreamsPipelineRunner.java | 3 - .../kafka/streams/KafkaStreamsRunner.java | 14 +-- .../KafkaStreamsJobServerDriverTest.java | 71 +++++++++++ .../KafkaStreamsPipelineOptionsTest.java | 74 +++++++++++ .../KafkaStreamsPipelineTranslatorTest.java | 113 +++++++++++++++++ 12 files changed, 408 insertions(+), 27 deletions(-) create mode 100644 .github/trigger_files/beam_PreCommit_Java_Kafka_Streams_Runner.json create mode 100644 .github/workflows/beam_PreCommit_Java_Kafka_Streams_Runner.yml create mode 100644 runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobServerDriverTest.java create mode 100644 runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptionsTest.java create mode 100644 runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java diff --git a/.github/trigger_files/beam_PreCommit_Java_Kafka_Streams_Runner.json b/.github/trigger_files/beam_PreCommit_Java_Kafka_Streams_Runner.json new file mode 100644 index 000000000000..5abe02fc09c7 --- /dev/null +++ b/.github/trigger_files/beam_PreCommit_Java_Kafka_Streams_Runner.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run.", + "modification": 1 +} diff --git a/.github/workflows/README.md b/.github/workflows/README.md index c6a95b29b4c0..e70d5e17d7a7 100644 --- a/.github/workflows/README.md +++ b/.github/workflows/README.md @@ -249,6 +249,7 @@ PreCommit Jobs run in a schedule and also get triggered in a PR if relevant sour | [ PreCommit Java HBase IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_HBase_IO_Direct.yml) | N/A |`Run Java_HBase_IO_Direct PreCommit`| [![.github/workflows/beam_PreCommit_Java_HBase_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_HBase_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_HBase_IO_Direct.yml?query=event%3Aschedule) | | [ PreCommit Java HCatalog IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_HCatalog_IO_Direct.yml) | N/A |`Run Java_HCatalog_IO_Direct PreCommit`| [![.github/workflows/beam_PreCommit_Java_HCatalog_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_HCatalog_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_HCatalog_IO_Direct.yml?query=event%3Aschedule) | | [ PreCommit Java Kafka IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Kafka_IO_Direct.yml) | N/A |`Run Java_Kafka_IO_Direct PreCommit`| [![.github/workflows/beam_PreCommit_Java_Kafka_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Kafka_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Kafka_IO_Direct.yml?query=event%3Aschedule) | +| [ PreCommit Java Kafka Streams Runner ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Kafka_Streams_Runner.yml) | N/A |`Run Java_Kafka_Streams_Runner PreCommit`| [![.github/workflows/beam_PreCommit_Java_Kafka_Streams_Runner.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Kafka_Streams_Runner.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Kafka_Streams_Runner.yml?query=event%3Aschedule) | | [ PreCommit Java InfluxDb IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_InfluxDb_IO_Direct.yml) | N/A |`Run Java_InfluxDb_IO_Direct PreCommit`| [![.github/workflows/beam_PreCommit_Java_InfluxDb_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_InfluxDb_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_InfluxDb_IO_Direct.yml?query=event%3Aschedule) | | [ PreCommit Java IOs Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_IOs_Direct.yml) | N/A |`Run Java_IOs_Direct PreCommit`| N/A | | [ PreCommit Java JDBC IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_JDBC_IO_Direct.yml) | N/A |`Run Java_JDBC_IO_Direct PreCommit`| [![.github/workflows/beam_PreCommit_Java_JDBC_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_JDBC_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_JDBC_IO_Direct.yml?query=event%3Aschedule) | diff --git a/.github/workflows/beam_PreCommit_Java_Kafka_Streams_Runner.yml b/.github/workflows/beam_PreCommit_Java_Kafka_Streams_Runner.yml new file mode 100644 index 000000000000..564b2bbc4bc8 --- /dev/null +++ b/.github/workflows/beam_PreCommit_Java_Kafka_Streams_Runner.yml @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: PreCommit Java Kafka Streams Runner + +on: + push: + tags: ['v*'] + branches: ['master', 'release-*'] + paths: + - "runners/kafka-streams/**" + - ".github/workflows/beam_PreCommit_Java_Kafka_Streams_Runner.yml" + pull_request_target: + branches: ['master', 'release-*'] + paths: + - "runners/kafka-streams/**" + - 'release/trigger_all_tests.json' + - '.github/trigger_files/beam_PreCommit_Java_Kafka_Streams_Runner.json' + issue_comment: + types: [created] + schedule: + - cron: '15 2/6 * * *' + workflow_dispatch: + +# Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + actions: write + pull-requests: write + checks: write + contents: read + deployments: read + id-token: none + issues: write + discussions: read + packages: read + pages: read + repository-projects: read + security-events: read + statuses: read + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.pull_request.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' + cancel-in-progress: true + +env: + DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} + GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + +jobs: + beam_PreCommit_Java_Kafka_Streams_Runner: + name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + strategy: + matrix: + job_name: ["beam_PreCommit_Java_Kafka_Streams_Runner"] + job_phrase: ["Run Java_Kafka_Streams_Runner PreCommit"] + timeout-minutes: 60 + if: | + github.event_name == 'push' || + github.event_name == 'pull_request_target' || + (github.event_name == 'schedule' && github.repository == 'apache/beam') || + github.event_name == 'workflow_dispatch' || + github.event.comment.body == 'Run Java_Kafka_Streams_Runner PreCommit' + runs-on: [self-hosted, ubuntu-24.04, main] + steps: + - uses: actions/checkout@v6 + - name: Setup repository + uses: ./.github/actions/setup-action + with: + comment_phrase: ${{ matrix.job_phrase }} + github_token: ${{ secrets.GITHUB_TOKEN }} + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + - name: Setup environment + uses: ./.github/actions/setup-environment-action + - name: run Kafka Streams runner build script + uses: ./.github/actions/gradle-command-self-hosted-action + with: + gradle-command: :runners:kafka-streams:build + max-workers: 4 + - name: Archive JUnit Test Results + uses: actions/upload-artifact@v7 + if: ${{ !success() }} + with: + name: JUnit Test Results + path: "**/build/reports/tests/" + - name: Publish JUnit Test Results + uses: EnricoMi/publish-unit-test-result-action@v2 + if: always() + with: + commit: '${{ env.prsha || env.GITHUB_SHA }}' + comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }} + files: '**/build/test-results/**/*.xml' + large_files: true + - name: Archive SpotBugs Results + uses: actions/upload-artifact@v7 + if: always() + with: + name: SpotBugs Results + path: '**/build/reports/spotbugs/*.html' + - name: Publish SpotBugs Results + uses: jwgmeligmeyling/spotbugs-github-action@v1.2 + if: always() + with: + name: Publish SpotBugs + path: '**/build/reports/spotbugs/*.html' diff --git a/runners/kafka-streams/build.gradle b/runners/kafka-streams/build.gradle index 54474502ad7b..9204fef4e768 100644 --- a/runners/kafka-streams/build.gradle +++ b/runners/kafka-streams/build.gradle @@ -59,4 +59,9 @@ dependencies { implementation "org.apache.kafka:kafka-streams:$kafka_version" permitUnusedDeclared "org.apache.kafka:kafka-clients:$kafka_version" permitUnusedDeclared "org.apache.kafka:kafka-streams:$kafka_version" + + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation library.java.hamcrest + testImplementation library.java.junit + testImplementation library.java.mockito_core } diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobInvoker.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobInvoker.java index bd1ed4c1bcf6..ad12e17da5af 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobInvoker.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobInvoker.java @@ -23,17 +23,16 @@ import org.apache.beam.runners.jobsubmission.JobInvocation; import org.apache.beam.runners.jobsubmission.JobInvoker; import org.apache.beam.runners.jobsubmission.PortablePipelineRunner; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Struct; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Job invoker for the Kafka Streams portable runner. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class KafkaStreamsJobInvoker extends JobInvoker { private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsJobInvoker.class); @@ -69,7 +68,7 @@ protected JobInvocation invokeWithExecutor( String.format("%s_%s", kafkaStreamsOptions.getJobName(), UUID.randomUUID().toString()); PortablePipelineRunner pipelineRunner = new KafkaStreamsPipelineRunner(kafkaStreamsOptions); - kafkaStreamsOptions.setRunner(null); + clearRunner(kafkaStreamsOptions); LOG.info("Invoking job {} with pipeline runner {}", invocationId, pipelineRunner); return createJobInvocation( @@ -83,7 +82,7 @@ protected JobInvocation invokeWithExecutor( protected JobInvocation createJobInvocation( String invocationId, - String retrievalToken, + @Nullable String retrievalToken, ListeningExecutorService executorService, RunnerApi.Pipeline pipeline, KafkaStreamsPipelineOptions kafkaStreamsOptions, @@ -92,8 +91,19 @@ protected JobInvocation createJobInvocation( JobInfo.create( invocationId, kafkaStreamsOptions.getJobName(), - retrievalToken, + Strings.nullToEmpty(retrievalToken), PipelineOptionsTranslation.toProto(kafkaStreamsOptions)); return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner); } + + /** + * Clears the runner class on the pipeline options before serialization. The Beam {@link + * PipelineOptions#setRunner} API accepts {@code null} to clear the runner — this mirrors the same + * pattern used by Flink/Spark portable runners so the runner class is not required on the SDK + * harness classpath. + */ + @SuppressWarnings("nullness") + private static void clearRunner(PipelineOptions options) { + options.setRunner(null); + } } diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobServerDriver.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobServerDriver.java index ddeac8b7c959..4f2134689253 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobServerDriver.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobServerDriver.java @@ -29,9 +29,6 @@ import org.slf4j.LoggerFactory; /** Driver that starts a Beam job server for the Kafka Streams portable runner. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class KafkaStreamsJobServerDriver extends JobServerDriver { private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsJobServerDriver.class); diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java index 911ffce2eea3..f46e0a024ed0 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java @@ -41,13 +41,6 @@ public interface KafkaStreamsPipelineOptions extends PortablePipelineOptions { void setApplicationId(String applicationId); - @Description( - "Kafka Streams processing.guarantee setting, for example at_least_once or exactly_once_v2.") - @Default.String("exactly_once_v2") - String getProcessingGuarantee(); - - void setProcessingGuarantee(String processingGuarantee); - @Description("Soft cap on the number of elements per bundle.") @Default.Integer(1000) int getMaxBundleSize(); diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java index cc7464e22786..7c78d3a751b4 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java @@ -25,9 +25,6 @@ import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsTranslationContext; /** Executes a portable pipeline by translating it to Kafka Streams. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class KafkaStreamsPipelineRunner implements PortablePipelineRunner { private final KafkaStreamsPipelineOptions pipelineOptions; diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunner.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunner.java index ce8f0544b681..a6f3621d1ffd 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunner.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunner.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.construction.Environments; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,9 +36,6 @@ * A {@link PipelineRunner} that submits portable jobs to an in-process or external Beam job service * backed by the Kafka Streams translation path. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class KafkaStreamsRunner extends PipelineRunner { private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsRunner.class); @@ -55,7 +53,7 @@ protected KafkaStreamsRunner(KafkaStreamsPipelineOptions pipelineOptions) { @Override public PipelineResult run(Pipeline pipeline) { assignPortableDefaults(pipelineOptions); - KafkaStreamsJobServerDriver jobServerDriver = null; + @Nullable KafkaStreamsJobServerDriver jobServerDriver = null; try { if (Strings.isNullOrEmpty(pipelineOptions.getJobEndpoint())) { LOG.info("No job endpoint configured; starting an embedded Kafka Streams job server."); @@ -68,7 +66,8 @@ public PipelineResult run(Pipeline pipeline) { PortableRunner portableRunner = PortableRunner.fromOptions(pipelineOptions); PipelineResult result = portableRunner.run(pipeline); if (jobServerDriver != null) { - return new KafkaStreamsPipelineResult(result, jobServerDriver::stop); + KafkaStreamsJobServerDriver driverForStop = jobServerDriver; + return new KafkaStreamsPipelineResult(result, driverForStop::stop); } return result; } catch (IOException e) { @@ -84,10 +83,9 @@ private static void assignPortableDefaults(KafkaStreamsPipelineOptions pipelineO pipelineOptions.setDefaultEnvironmentType(Environments.ENVIRONMENT_LOOPBACK); } ExperimentalOptions experimentalOptions = pipelineOptions.as(ExperimentalOptions.class); + @Nullable List existingExperiments = experimentalOptions.getExperiments(); List experiments = - experimentalOptions.getExperiments() == null - ? new ArrayList<>() - : new ArrayList<>(experimentalOptions.getExperiments()); + existingExperiments == null ? new ArrayList<>() : new ArrayList<>(existingExperiments); if (!experiments.contains("beam_fn_api")) { experiments.add("beam_fn_api"); experimentalOptions.setExperiments(experiments); diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobServerDriverTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobServerDriverTest.java new file mode 100644 index 000000000000..8eecc708904d --- /dev/null +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobServerDriverTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.junit.Test; + +/** Tests for {@link KafkaStreamsJobServerDriver}. */ +public class KafkaStreamsJobServerDriverTest { + + @Test + public void testConfigurationDefaults() { + KafkaStreamsJobServerDriver.KafkaStreamsServerConfiguration config = + new KafkaStreamsJobServerDriver.KafkaStreamsServerConfiguration(); + + assertThat(config.getHost(), is("localhost")); + assertThat(config.getPort(), is(8099)); + assertThat(config.getArtifactPort(), is(8098)); + assertThat(config.getExpansionPort(), is(8097)); + assertThat(config.isCleanArtifactsPerJob(), is(true)); + + KafkaStreamsJobServerDriver driver = KafkaStreamsJobServerDriver.fromConfig(config); + assertThat(driver, is(not(nullValue()))); + } + + @Test + public void testConfigurationFromArgs() { + KafkaStreamsJobServerDriver.KafkaStreamsServerConfiguration config = + KafkaStreamsJobServerDriver.parseArgs( + new String[] { + "--job-host=test-host", + "--job-port", + "42", + "--artifact-port", + "43", + "--expansion-port", + "44", + "--clean-artifacts-per-job=false", + }); + + assertThat(config.getHost(), is("test-host")); + assertThat(config.getPort(), is(42)); + assertThat(config.getArtifactPort(), is(43)); + assertThat(config.getExpansionPort(), is(44)); + assertThat(config.isCleanArtifactsPerJob(), is(false)); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidArgsRejected() { + KafkaStreamsJobServerDriver.parseArgs(new String[] {"--unknown-flag=value"}); + } +} diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptionsTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptionsTest.java new file mode 100644 index 000000000000..45f96637acfe --- /dev/null +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptionsTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Test; + +/** Tests for {@link KafkaStreamsPipelineOptions}. */ +public class KafkaStreamsPipelineOptionsTest { + + @Test + public void testDefaultValues() { + KafkaStreamsPipelineOptions options = + PipelineOptionsFactory.create().as(KafkaStreamsPipelineOptions.class); + + assertThat(options.getBootstrapServers(), is("localhost:9092")); + assertThat(options.getApplicationId(), is("beam-kafka-streams-runner")); + assertThat(options.getMaxBundleSize(), is(1000)); + assertThat(options.getMaxBundleTimeMs(), is(1000)); + assertThat(options.getStateDir(), is(notNullValue())); + assertThat(options.getStateDir(), containsString("beam-kafka-streams-state")); + assertThat(options.getStateDir(), containsString(options.getJobName())); + } + + @Test + public void testOverrides() { + KafkaStreamsPipelineOptions options = + PipelineOptionsFactory.fromArgs( + "--bootstrapServers=broker-1:9092,broker-2:9092", + "--applicationId=custom-app", + "--maxBundleSize=500", + "--maxBundleTimeMs=250", + "--stateDir=/var/data/beam-ks") + .as(KafkaStreamsPipelineOptions.class); + + assertThat(options.getBootstrapServers(), is("broker-1:9092,broker-2:9092")); + assertThat(options.getApplicationId(), is("custom-app")); + assertThat(options.getMaxBundleSize(), is(500)); + assertThat(options.getMaxBundleTimeMs(), is(250)); + assertThat(options.getStateDir(), is("/var/data/beam-ks")); + } + + @Test + public void testStateDirIsolatesByJobName() { + KafkaStreamsPipelineOptions optionsA = + PipelineOptionsFactory.fromArgs("--jobName=job-a").as(KafkaStreamsPipelineOptions.class); + KafkaStreamsPipelineOptions optionsB = + PipelineOptionsFactory.fromArgs("--jobName=job-b").as(KafkaStreamsPipelineOptions.class); + + assertThat(optionsA.getStateDir(), containsString("job-a")); + assertThat(optionsB.getStateDir(), containsString("job-b")); + assertThat(optionsA.getStateDir().equals(optionsB.getStateDir()), is(false)); + } +} diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java new file mode 100644 index 000000000000..86e9cf878497 --- /dev/null +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.construction.PTransformTranslation; +import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; +import org.junit.Test; + +/** + * Tests for {@link KafkaStreamsPipelineTranslator}. + * + *

The skeleton translator does not yet handle any transforms; these tests pin the current + * "fail-fast with a clear URN" contract so that follow-up sub-issues can replace the assertions as + * real translators are added. + */ +public class KafkaStreamsPipelineTranslatorTest { + + private static final String JOB_ID = "kafka-streams-test-job"; + + @Test + public void translateRejectsUnknownTransformWithUrnInMessage() { + KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); + KafkaStreamsTranslationContext context = newContext(); + + RunnerApi.Pipeline pipeline = + RunnerApi.Pipeline.newBuilder() + .setComponents( + RunnerApi.Components.newBuilder() + .putTransforms( + "impulse", + RunnerApi.PTransform.newBuilder() + .setUniqueName("Impulse") + .setSpec( + RunnerApi.FunctionSpec.newBuilder() + .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)) + .build())) + .build(); + + UnsupportedOperationException ex = + assertThrows( + UnsupportedOperationException.class, + () -> translator.translate(context, translator.prepareForTranslation(pipeline))); + + assertThat(ex.getMessage(), containsString("No translator registered for URN")); + assertThat(ex.getMessage(), containsString(PTransformTranslation.IMPULSE_TRANSFORM_URN)); + assertThat(ex.getMessage(), containsString(JOB_ID)); + } + + @Test + public void translateRejectsEmptyPipeline() { + KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); + KafkaStreamsTranslationContext context = newContext(); + + RunnerApi.Pipeline pipeline = + RunnerApi.Pipeline.newBuilder() + .setComponents(RunnerApi.Components.newBuilder().build()) + .build(); + + UnsupportedOperationException ex = + assertThrows( + UnsupportedOperationException.class, + () -> translator.translate(context, translator.prepareForTranslation(pipeline))); + + assertThat(ex.getMessage(), containsString("No translator registered")); + } + + @Test + public void createTranslationContextExposesJobInfoAndOptions() { + KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); + KafkaStreamsPipelineOptions options = + PipelineOptionsFactory.create().as(KafkaStreamsPipelineOptions.class); + JobInfo jobInfo = + JobInfo.create( + JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options)); + + KafkaStreamsTranslationContext context = translator.createTranslationContext(jobInfo, options); + + assertThat(context.getJobInfo().jobId(), containsString(JOB_ID)); + assertThat(context.getPipelineOptions().getBootstrapServers(), containsString("localhost")); + } + + private static KafkaStreamsTranslationContext newContext() { + KafkaStreamsPipelineOptions options = + PipelineOptionsFactory.create().as(KafkaStreamsPipelineOptions.class); + JobInfo jobInfo = + JobInfo.create( + JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options)); + return KafkaStreamsTranslationContext.create(jobInfo, options); + } +} From 0d445f79e0c61d8f2c69c6611abe6078ac608fd8 Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Fri, 22 May 2026 19:09:38 +0500 Subject: [PATCH 4/4] Drop setRunner(null) suppression; make applicationId required --- .../kafka/streams/KafkaStreamsJobInvoker.java | 13 ---------- .../streams/KafkaStreamsPipelineOptions.java | 6 +++-- .../KafkaStreamsPipelineOptionsTest.java | 24 +++++++++++++++---- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobInvoker.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobInvoker.java index ad12e17da5af..a32c7c487737 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobInvoker.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobInvoker.java @@ -23,7 +23,6 @@ import org.apache.beam.runners.jobsubmission.JobInvocation; import org.apache.beam.runners.jobsubmission.JobInvoker; import org.apache.beam.runners.jobsubmission.PortablePipelineRunner; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; @@ -68,7 +67,6 @@ protected JobInvocation invokeWithExecutor( String.format("%s_%s", kafkaStreamsOptions.getJobName(), UUID.randomUUID().toString()); PortablePipelineRunner pipelineRunner = new KafkaStreamsPipelineRunner(kafkaStreamsOptions); - clearRunner(kafkaStreamsOptions); LOG.info("Invoking job {} with pipeline runner {}", invocationId, pipelineRunner); return createJobInvocation( @@ -95,15 +93,4 @@ protected JobInvocation createJobInvocation( PipelineOptionsTranslation.toProto(kafkaStreamsOptions)); return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner); } - - /** - * Clears the runner class on the pipeline options before serialization. The Beam {@link - * PipelineOptions#setRunner} API accepts {@code null} to clear the runner — this mirrors the same - * pattern used by Flink/Spark portable runners so the runner class is not required on the SDK - * harness classpath. - */ - @SuppressWarnings("nullness") - private static void clearRunner(PipelineOptions options) { - options.setRunner(null); - } } diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java index f46e0a024ed0..2fa992e66e7e 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PortablePipelineOptions; +import org.apache.beam.sdk.options.Validation; /** Pipeline options for the Kafka Streams runner. */ public interface KafkaStreamsPipelineOptions extends PortablePipelineOptions { @@ -35,8 +36,9 @@ public interface KafkaStreamsPipelineOptions extends PortablePipelineOptions { @Description( "Kafka Streams application.id (must be unique for each distinct topology using the same " - + "input topics in a Kafka cluster).") - @Default.String("beam-kafka-streams-runner") + + "input topics in a Kafka cluster). Must be specified explicitly: a shared default " + + "would let concurrent jobs collide on the same Kafka Streams consumer group.") + @Validation.Required String getApplicationId(); void setApplicationId(String applicationId); diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptionsTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptionsTest.java index 45f96637acfe..27e2a63cc1f5 100644 --- a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptionsTest.java +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptionsTest.java @@ -21,8 +21,10 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.junit.Test; /** Tests for {@link KafkaStreamsPipelineOptions}. */ @@ -31,10 +33,10 @@ public class KafkaStreamsPipelineOptionsTest { @Test public void testDefaultValues() { KafkaStreamsPipelineOptions options = - PipelineOptionsFactory.create().as(KafkaStreamsPipelineOptions.class); + PipelineOptionsFactory.fromArgs("--applicationId=test-app") + .as(KafkaStreamsPipelineOptions.class); assertThat(options.getBootstrapServers(), is("localhost:9092")); - assertThat(options.getApplicationId(), is("beam-kafka-streams-runner")); assertThat(options.getMaxBundleSize(), is(1000)); assertThat(options.getMaxBundleTimeMs(), is(1000)); assertThat(options.getStateDir(), is(notNullValue())); @@ -42,6 +44,18 @@ public void testDefaultValues() { assertThat(options.getStateDir(), containsString(options.getJobName())); } + @Test + public void testApplicationIdIsRequired() { + KafkaStreamsPipelineOptions options = + PipelineOptionsFactory.create().as(KafkaStreamsPipelineOptions.class); + + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> PipelineOptionsValidator.validate(KafkaStreamsPipelineOptions.class, options)); + assertThat(ex.getMessage(), containsString("getApplicationId")); + } + @Test public void testOverrides() { KafkaStreamsPipelineOptions options = @@ -63,9 +77,11 @@ public void testOverrides() { @Test public void testStateDirIsolatesByJobName() { KafkaStreamsPipelineOptions optionsA = - PipelineOptionsFactory.fromArgs("--jobName=job-a").as(KafkaStreamsPipelineOptions.class); + PipelineOptionsFactory.fromArgs("--jobName=job-a", "--applicationId=app-a") + .as(KafkaStreamsPipelineOptions.class); KafkaStreamsPipelineOptions optionsB = - PipelineOptionsFactory.fromArgs("--jobName=job-b").as(KafkaStreamsPipelineOptions.class); + PipelineOptionsFactory.fromArgs("--jobName=job-b", "--applicationId=app-b") + .as(KafkaStreamsPipelineOptions.class); assertThat(optionsA.getStateDir(), containsString("job-a")); assertThat(optionsB.getStateDir(), containsString("job-b"));