diff --git a/.github/trigger_files/beam_PreCommit_Java_Kafka_Streams_Runner.json b/.github/trigger_files/beam_PreCommit_Java_Kafka_Streams_Runner.json new file mode 100644 index 000000000000..5abe02fc09c7 --- /dev/null +++ b/.github/trigger_files/beam_PreCommit_Java_Kafka_Streams_Runner.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run.", + "modification": 1 +} diff --git a/.github/workflows/README.md b/.github/workflows/README.md index c6a95b29b4c0..e70d5e17d7a7 100644 --- a/.github/workflows/README.md +++ b/.github/workflows/README.md @@ -249,6 +249,7 @@ PreCommit Jobs run in a schedule and also get triggered in a PR if relevant sour | [ PreCommit Java HBase IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_HBase_IO_Direct.yml) | N/A |`Run Java_HBase_IO_Direct PreCommit`| [](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_HBase_IO_Direct.yml?query=event%3Aschedule) | | [ PreCommit Java HCatalog IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_HCatalog_IO_Direct.yml) | N/A |`Run Java_HCatalog_IO_Direct PreCommit`| [](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_HCatalog_IO_Direct.yml?query=event%3Aschedule) | | [ PreCommit Java Kafka IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Kafka_IO_Direct.yml) | N/A |`Run Java_Kafka_IO_Direct PreCommit`| [](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Kafka_IO_Direct.yml?query=event%3Aschedule) | +| [ PreCommit Java Kafka Streams Runner ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Kafka_Streams_Runner.yml) | N/A |`Run Java_Kafka_Streams_Runner PreCommit`| [](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Kafka_Streams_Runner.yml?query=event%3Aschedule) | | [ PreCommit Java InfluxDb IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_InfluxDb_IO_Direct.yml) | N/A |`Run Java_InfluxDb_IO_Direct PreCommit`| [](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_InfluxDb_IO_Direct.yml?query=event%3Aschedule) | | [ PreCommit Java IOs Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_IOs_Direct.yml) | N/A |`Run Java_IOs_Direct PreCommit`| N/A | | [ PreCommit Java JDBC IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_JDBC_IO_Direct.yml) | N/A |`Run Java_JDBC_IO_Direct PreCommit`| [](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_JDBC_IO_Direct.yml?query=event%3Aschedule) | diff --git a/.github/workflows/beam_PreCommit_Java_Kafka_Streams_Runner.yml b/.github/workflows/beam_PreCommit_Java_Kafka_Streams_Runner.yml new file mode 100644 index 000000000000..564b2bbc4bc8 --- /dev/null +++ b/.github/workflows/beam_PreCommit_Java_Kafka_Streams_Runner.yml @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: PreCommit Java Kafka Streams Runner + +on: + push: + tags: ['v*'] + branches: ['master', 'release-*'] + paths: + - "runners/kafka-streams/**" + - ".github/workflows/beam_PreCommit_Java_Kafka_Streams_Runner.yml" + pull_request_target: + branches: ['master', 'release-*'] + paths: + - "runners/kafka-streams/**" + - 'release/trigger_all_tests.json' + - '.github/trigger_files/beam_PreCommit_Java_Kafka_Streams_Runner.json' + issue_comment: + types: [created] + schedule: + - cron: '15 2/6 * * *' + workflow_dispatch: + +# Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + actions: write + pull-requests: write + checks: write + contents: read + deployments: read + id-token: none + issues: write + discussions: read + packages: read + pages: read + repository-projects: read + security-events: read + statuses: read + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.pull_request.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' + cancel-in-progress: true + +env: + DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} + GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + +jobs: + beam_PreCommit_Java_Kafka_Streams_Runner: + name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + strategy: + matrix: + job_name: ["beam_PreCommit_Java_Kafka_Streams_Runner"] + job_phrase: ["Run Java_Kafka_Streams_Runner PreCommit"] + timeout-minutes: 60 + if: | + github.event_name == 'push' || + github.event_name == 'pull_request_target' || + (github.event_name == 'schedule' && github.repository == 'apache/beam') || + github.event_name == 'workflow_dispatch' || + github.event.comment.body == 'Run Java_Kafka_Streams_Runner PreCommit' + runs-on: [self-hosted, ubuntu-24.04, main] + steps: + - uses: actions/checkout@v6 + - name: Setup repository + uses: ./.github/actions/setup-action + with: + comment_phrase: ${{ matrix.job_phrase }} + github_token: ${{ secrets.GITHUB_TOKEN }} + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + - name: Setup environment + uses: ./.github/actions/setup-environment-action + - name: run Kafka Streams runner build script + uses: ./.github/actions/gradle-command-self-hosted-action + with: + gradle-command: :runners:kafka-streams:build + max-workers: 4 + - name: Archive JUnit Test Results + uses: actions/upload-artifact@v7 + if: ${{ !success() }} + with: + name: JUnit Test Results + path: "**/build/reports/tests/" + - name: Publish JUnit Test Results + uses: EnricoMi/publish-unit-test-result-action@v2 + if: always() + with: + commit: '${{ env.prsha || env.GITHUB_SHA }}' + comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }} + files: '**/build/test-results/**/*.xml' + large_files: true + - name: Archive SpotBugs Results + uses: actions/upload-artifact@v7 + if: always() + with: + name: SpotBugs Results + path: '**/build/reports/spotbugs/*.html' + - name: Publish SpotBugs Results + uses: jwgmeligmeyling/spotbugs-github-action@v1.2 + if: always() + with: + name: Publish SpotBugs + path: '**/build/reports/spotbugs/*.html' diff --git a/CHANGES.md b/CHANGES.md index 52475a99d8e1..18d2698a2875 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -71,6 +71,7 @@ ## New Features / Improvements +* (Java) Added a `runners/kafka-streams` Gradle module with portable job server and runner entry points; translation fails fast with an explicit unsupported-URN message until transforms are implemented ([#38465](https://github.com/apache/beam/issues/38465)). * Capability introduces an indicator for aggregations and timers firing during a pipeline drain, allowing users and sinks to recognize and appropriately handle potentially incomplete or partial data ([#36884](https://github.com/apache/beam/issues/36884)). * Added support for setting disk provisioned IOPS and throughput in Dataflow runner via `--diskProvisionedIops` and `--diskProvisionedThroughputMibps` pipeline options (Java/Go/Python) ([#38349](https://github.com/apache/beam/issues/38349)). * TriggerStateMachineRunner changes from BitSetCoder to SentinelBitSetCoder to diff --git a/build.gradle.kts b/build.gradle.kts index 4af8fa3f1ab4..b3b9fdd7fdf0 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -278,6 +278,7 @@ tasks.register("javaPreCommit") { dependsOn(":runners:java-fn-execution:build") dependsOn(":runners:java-job-service:build") dependsOn(":runners:jet:build") + dependsOn(":runners:kafka-streams:build") dependsOn(":runners:local-java:build") dependsOn(":runners:portability:java:build") dependsOn(":runners:prism:java:build") diff --git a/runners/kafka-streams/build.gradle b/runners/kafka-streams/build.gradle new file mode 100644 index 000000000000..9204fef4e768 --- /dev/null +++ b/runners/kafka-streams/build.gradle @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } + +def kafka_version = '3.9.0' + +applyJavaNature( + automaticModuleName: 'org.apache.beam.runners.kafka.streams', +) + +description = "Apache Beam :: Runners :: Kafka Streams" + +evaluationDependsOn(":sdks:java:core") +evaluationDependsOn(":runners:core-java") + +configurations.configureEach { + resolutionStrategy.eachDependency { details -> + if (details.requested.group == "org.apache.kafka") { + details.useVersion(kafka_version) + details.because("Kafka Streams runner is developed against Kafka ${kafka_version}.") + } + } +} + +dependencies { + compileOnly project(":sdks:java:build-tools") + permitUnusedDeclared project(":sdks:java:build-tools") + + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(path: ":model:pipeline", configuration: "shadow") + implementation project(":runners:core-java") + permitUnusedDeclared project(":runners:core-java") + implementation project(":runners:java-fn-execution") + implementation project(":runners:java-job-service") + implementation project(":runners:portability:java") + implementation project(path: ":sdks:java:extensions:google-cloud-platform-core") + implementation library.java.args4j + implementation library.java.joda_time + implementation library.java.slf4j_api + implementation library.java.vendored_grpc_1_69_0 + implementation library.java.vendored_guava_32_1_2_jre + implementation "org.apache.kafka:kafka-clients:$kafka_version" + implementation "org.apache.kafka:kafka-streams:$kafka_version" + permitUnusedDeclared "org.apache.kafka:kafka-clients:$kafka_version" + permitUnusedDeclared "org.apache.kafka:kafka-streams:$kafka_version" + + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation library.java.hamcrest + testImplementation library.java.junit + testImplementation library.java.mockito_core +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobInvoker.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobInvoker.java new file mode 100644 index 000000000000..a32c7c487737 --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobInvoker.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import java.util.UUID; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.jobsubmission.JobInvocation; +import org.apache.beam.runners.jobsubmission.JobInvoker; +import org.apache.beam.runners.jobsubmission.PortablePipelineRunner; +import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; +import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Struct; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Job invoker for the Kafka Streams portable runner. */ +public class KafkaStreamsJobInvoker extends JobInvoker { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsJobInvoker.class); + + public static KafkaStreamsJobInvoker create( + KafkaStreamsJobServerDriver.KafkaStreamsServerConfiguration serverConfig) { + return new KafkaStreamsJobInvoker(serverConfig); + } + + private final KafkaStreamsJobServerDriver.KafkaStreamsServerConfiguration serverConfig; + + protected KafkaStreamsJobInvoker( + KafkaStreamsJobServerDriver.KafkaStreamsServerConfiguration serverConfig) { + super("kafka-streams-runner-job-invoker-%d"); + this.serverConfig = serverConfig; + } + + @Override + protected JobInvocation invokeWithExecutor( + RunnerApi.Pipeline pipeline, + Struct options, + @Nullable String retrievalToken, + ListeningExecutorService executorService) { + + LOG.trace( + "Parsing pipeline options (job server {}:{})", + serverConfig.getHost(), + serverConfig.getPort()); + KafkaStreamsPipelineOptions kafkaStreamsOptions = + PipelineOptionsTranslation.fromProto(options).as(KafkaStreamsPipelineOptions.class); + + String invocationId = + String.format("%s_%s", kafkaStreamsOptions.getJobName(), UUID.randomUUID().toString()); + + PortablePipelineRunner pipelineRunner = new KafkaStreamsPipelineRunner(kafkaStreamsOptions); + + LOG.info("Invoking job {} with pipeline runner {}", invocationId, pipelineRunner); + return createJobInvocation( + invocationId, + retrievalToken, + executorService, + pipeline, + kafkaStreamsOptions, + pipelineRunner); + } + + protected JobInvocation createJobInvocation( + String invocationId, + @Nullable String retrievalToken, + ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, + KafkaStreamsPipelineOptions kafkaStreamsOptions, + PortablePipelineRunner pipelineRunner) { + JobInfo jobInfo = + JobInfo.create( + invocationId, + kafkaStreamsOptions.getJobName(), + Strings.nullToEmpty(retrievalToken), + PipelineOptionsTranslation.toProto(kafkaStreamsOptions)); + return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner); + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobServerDriver.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobServerDriver.java new file mode 100644 index 000000000000..4f2134689253 --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsJobServerDriver.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import org.apache.beam.runners.jobsubmission.JobServerDriver; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.fn.server.ServerFactory; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Driver that starts a Beam job server for the Kafka Streams portable runner. */ +public class KafkaStreamsJobServerDriver extends JobServerDriver { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsJobServerDriver.class); + + /** Runner-specific configuration for the job server process. */ + public static class KafkaStreamsServerConfiguration extends ServerConfiguration {} + + public static void main(String[] args) throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + options.as(GcsOptions.class).setGcsUploadBufferSizeBytes(1024 * 1024); + FileSystems.setDefaultPipelineOptions(options); + fromParams(args).run(); + } + + private static void printUsage(CmdLineParser parser) { + System.err.println( + String.format( + "Usage: java %s arguments...", KafkaStreamsJobServerDriver.class.getSimpleName())); + parser.printUsage(System.err); + System.err.println(); + } + + public static KafkaStreamsServerConfiguration parseArgs(String[] args) { + KafkaStreamsServerConfiguration configuration = new KafkaStreamsServerConfiguration(); + CmdLineParser parser = new CmdLineParser(configuration); + try { + parser.parseArgument(args); + } catch (CmdLineException e) { + LOG.error("Unable to parse command line arguments.", e); + printUsage(parser); + throw new IllegalArgumentException("Unable to parse command line arguments.", e); + } + return configuration; + } + + /** Used by tests and tooling to construct a driver from command-line parameters. */ + public static KafkaStreamsJobServerDriver fromParams(String[] args) { + return fromConfig(parseArgs(args)); + } + + public static KafkaStreamsJobServerDriver fromConfig( + KafkaStreamsServerConfiguration configuration) { + return create( + configuration, + createJobServerFactory(configuration), + createArtifactServerFactory(configuration), + () -> KafkaStreamsJobInvoker.create(configuration)); + } + + public static KafkaStreamsJobServerDriver fromConfig( + KafkaStreamsServerConfiguration configuration, JobInvokerFactory jobInvokerFactory) { + return create( + configuration, + createJobServerFactory(configuration), + createArtifactServerFactory(configuration), + jobInvokerFactory); + } + + private static KafkaStreamsJobServerDriver create( + KafkaStreamsServerConfiguration configuration, + ServerFactory jobServerFactory, + ServerFactory artifactServerFactory, + JobInvokerFactory jobInvokerFactory) { + return new KafkaStreamsJobServerDriver( + configuration, jobServerFactory, artifactServerFactory, jobInvokerFactory); + } + + private KafkaStreamsJobServerDriver( + KafkaStreamsServerConfiguration configuration, + ServerFactory jobServerFactory, + ServerFactory artifactServerFactory, + JobInvokerFactory jobInvokerFactory) { + super(configuration, jobServerFactory, artifactServerFactory, jobInvokerFactory); + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java new file mode 100644 index 000000000000..2fa992e66e7e --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineOptions.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import java.nio.file.Paths; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PortablePipelineOptions; +import org.apache.beam.sdk.options.Validation; + +/** Pipeline options for the Kafka Streams runner. */ +public interface KafkaStreamsPipelineOptions extends PortablePipelineOptions { + + @Description("Comma-separated list of host:port Kafka brokers used by the Kafka Streams client.") + @Default.String("localhost:9092") + String getBootstrapServers(); + + void setBootstrapServers(String bootstrapServers); + + @Description( + "Kafka Streams application.id (must be unique for each distinct topology using the same " + + "input topics in a Kafka cluster). Must be specified explicitly: a shared default " + + "would let concurrent jobs collide on the same Kafka Streams consumer group.") + @Validation.Required + String getApplicationId(); + + void setApplicationId(String applicationId); + + @Description("Soft cap on the number of elements per bundle.") + @Default.Integer(1000) + int getMaxBundleSize(); + + void setMaxBundleSize(int maxBundleSize); + + @Description("Soft cap on bundle wall-clock duration in milliseconds.") + @Default.Integer(1000) + int getMaxBundleTimeMs(); + + void setMaxBundleTimeMs(int maxBundleTimeMs); + + @Description("Directory where Kafka Streams stores local state.") + @Default.InstanceFactory(StateDirDefaultFactory.class) + String getStateDir(); + + void setStateDir(String stateDir); + + /** + * Default {@link #getStateDir()} under the JVM temp directory. + * + *
The job name is included in the path so that multiple pipelines running on the same host
+ * (e.g. parallel tests) do not collide on the same Kafka Streams state directory and trigger a
+ * {@code LockException}.
+ */
+ class StateDirDefaultFactory implements DefaultValueFactory The initial implementation only validates the graph and fails fast with an explicit message
+ * for transforms that are not yet supported.
+ */
+public class KafkaStreamsPipelineTranslator {
+
+ public KafkaStreamsTranslationContext createTranslationContext(
+ JobInfo jobInfo, KafkaStreamsPipelineOptions pipelineOptions) {
+ return KafkaStreamsTranslationContext.create(jobInfo, pipelineOptions);
+ }
+
+ /** Returns the pipeline to translate (placeholder for future fusion / expansion steps). */
+ public RunnerApi.Pipeline prepareForTranslation(RunnerApi.Pipeline pipeline) {
+ return pipeline;
+ }
+
+ /**
+ * Translates the pipeline. Throws {@link UnsupportedOperationException} with a clear URN message
+ * for the first unsupported primitive encountered.
+ */
+ public void translate(KafkaStreamsTranslationContext context, RunnerApi.Pipeline pipeline) {
+ Map The skeleton translator does not yet handle any transforms; these tests pin the current
+ * "fail-fast with a clear URN" contract so that follow-up sub-issues can replace the assertions as
+ * real translators are added.
+ */
+public class KafkaStreamsPipelineTranslatorTest {
+
+ private static final String JOB_ID = "kafka-streams-test-job";
+
+ @Test
+ public void translateRejectsUnknownTransformWithUrnInMessage() {
+ KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator();
+ KafkaStreamsTranslationContext context = newContext();
+
+ RunnerApi.Pipeline pipeline =
+ RunnerApi.Pipeline.newBuilder()
+ .setComponents(
+ RunnerApi.Components.newBuilder()
+ .putTransforms(
+ "impulse",
+ RunnerApi.PTransform.newBuilder()
+ .setUniqueName("Impulse")
+ .setSpec(
+ RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN))
+ .build()))
+ .build();
+
+ UnsupportedOperationException ex =
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> translator.translate(context, translator.prepareForTranslation(pipeline)));
+
+ assertThat(ex.getMessage(), containsString("No translator registered for URN"));
+ assertThat(ex.getMessage(), containsString(PTransformTranslation.IMPULSE_TRANSFORM_URN));
+ assertThat(ex.getMessage(), containsString(JOB_ID));
+ }
+
+ @Test
+ public void translateRejectsEmptyPipeline() {
+ KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator();
+ KafkaStreamsTranslationContext context = newContext();
+
+ RunnerApi.Pipeline pipeline =
+ RunnerApi.Pipeline.newBuilder()
+ .setComponents(RunnerApi.Components.newBuilder().build())
+ .build();
+
+ UnsupportedOperationException ex =
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> translator.translate(context, translator.prepareForTranslation(pipeline)));
+
+ assertThat(ex.getMessage(), containsString("No translator registered"));
+ }
+
+ @Test
+ public void createTranslationContextExposesJobInfoAndOptions() {
+ KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator();
+ KafkaStreamsPipelineOptions options =
+ PipelineOptionsFactory.create().as(KafkaStreamsPipelineOptions.class);
+ JobInfo jobInfo =
+ JobInfo.create(
+ JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options));
+
+ KafkaStreamsTranslationContext context = translator.createTranslationContext(jobInfo, options);
+
+ assertThat(context.getJobInfo().jobId(), containsString(JOB_ID));
+ assertThat(context.getPipelineOptions().getBootstrapServers(), containsString("localhost"));
+ }
+
+ private static KafkaStreamsTranslationContext newContext() {
+ KafkaStreamsPipelineOptions options =
+ PipelineOptionsFactory.create().as(KafkaStreamsPipelineOptions.class);
+ JobInfo jobInfo =
+ JobInfo.create(
+ JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options));
+ return KafkaStreamsTranslationContext.create(jobInfo, options);
+ }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index fc5f40c23d17..b92b254981fe 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -143,6 +143,7 @@ include(":runners:google-cloud-dataflow-java:examples-streaming")
include(":runners:java-fn-execution")
include(":runners:java-job-service")
include(":runners:jet")
+include(":runners:kafka-streams")
include(":runners:local-java")
include(":runners:portability:java")
include(":runners:prism")