From e08f264f3257d13e9257654bb33bd793159e558d Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 11 Jun 2026 10:34:08 +0800 Subject: [PATCH 1/3] [FLINK] Bind native callback target for Gluten operators --- .github/workflows/flink.yml | 4 ++-- .../table/runtime/operators/GlutenOneInputOperator.java | 9 ++++++++- .../table/runtime/operators/GlutenTwoInputOperator.java | 9 ++++++++- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index cf607f5f30..faee520081 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -69,8 +69,8 @@ jobs: export VELOX_DEPENDENCY_SOURCE=BUNDLED export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED - git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca + git clone -b feature/native-callback-bridge https://github.com/bigo-sg/velox4j.git + cd velox4j && git reset --hard c2c193ead7bf88f632838bca58921ef699c89c11 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java index 09b059f3bc..5eeae3c4cc 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java @@ -31,6 +31,7 @@ import io.github.zhztheplayer.velox4j.query.Query; import io.github.zhztheplayer.velox4j.query.SerialTask; import io.github.zhztheplayer.velox4j.serde.Serde; +import io.github.zhztheplayer.velox4j.stateful.NativeCallbackTarget; import io.github.zhztheplayer.velox4j.stateful.StatefulElement; import io.github.zhztheplayer.velox4j.stateful.StatefulRecord; import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark; @@ -49,7 +50,7 @@ /** Calculate operator in gluten, which will call Velox to run. */ public class GlutenOneInputOperator extends TableStreamOperator - implements OneInputStreamOperator, GlutenOperator { + implements OneInputStreamOperator, GlutenOperator, NativeCallbackTarget { private final StatefulPlanNode glutenPlan; private final String id; @@ -143,6 +144,7 @@ void initSession() { VeloxQueryConfig.getConfig(getRuntimeContext()), VeloxConnectorConfig.getConfig(getRuntimeContext())); task = sessionResource.getSession().queryOps().execute(query); + task.bindNativeCallbackTarget(this); task.addSplit( id, new ExternalStreamConnectorSplit("connector-external-stream", inputQueue.id())); task.noMoreSplits(id); @@ -186,6 +188,11 @@ public void scheduleProcessElementOnMailbox() { scheduleDrainOnMailbox(this::drainTaskOutput); } + @Override + public void onProcessElement() { + scheduleProcessElementOnMailbox(); + } + @Override public void processElementInternal() { drainOutput(this::drainTaskOutput); diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java index 3f73ad4bbd..7118b6803b 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java @@ -28,6 +28,7 @@ import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode; import io.github.zhztheplayer.velox4j.query.Query; import io.github.zhztheplayer.velox4j.query.SerialTask; +import io.github.zhztheplayer.velox4j.stateful.NativeCallbackTarget; import io.github.zhztheplayer.velox4j.stateful.StatefulElement; import io.github.zhztheplayer.velox4j.stateful.StatefulRecord; import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark; @@ -50,7 +51,7 @@ * instead of flink RowData. */ public class GlutenTwoInputOperator extends AbstractStreamOperator - implements TwoInputStreamOperator, GlutenOperator { + implements TwoInputStreamOperator, GlutenOperator, NativeCallbackTarget { private static final Logger LOG = LoggerFactory.getLogger(GlutenTwoInputOperator.class); @@ -139,6 +140,11 @@ public void scheduleProcessElementOnMailbox() { scheduleDrainOnMailbox(this::drainTaskOutput); } + @Override + public void onProcessElement() { + scheduleProcessElementOnMailbox(); + } + @Override public void processElement1(StreamRecord element) { StatefulRecord statefulRecord = @@ -296,6 +302,7 @@ private void initSession() { VeloxQueryConfig.getConfig(getRuntimeContext()), VeloxConnectorConfig.getConfig(getRuntimeContext())); task = sessionResource.getSession().queryOps().execute(query); + task.bindNativeCallbackTarget(this); ExternalStreamConnectorSplit leftSplit = new ExternalStreamConnectorSplit("connector-external-stream", leftInputQueue.id()); From 7bc8c4efdba523a440ccf2b927224a7a183c698b Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 11 Jun 2026 10:46:15 +0800 Subject: [PATCH 2/3] [FLINK] Rename native processing time callback --- .github/workflows/flink.yml | 2 +- .../gluten/table/runtime/operators/GlutenOneInputOperator.java | 2 +- .../gluten/table/runtime/operators/GlutenTwoInputOperator.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index faee520081..a3e68b7977 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -70,7 +70,7 @@ jobs: export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED git clone -b feature/native-callback-bridge https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard c2c193ead7bf88f632838bca58921ef699c89c11 + cd velox4j && git reset --hard bba532937efd74d175ea26bf590a945382c138a1 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java index 5eeae3c4cc..914ace2228 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java @@ -189,7 +189,7 @@ public void scheduleProcessElementOnMailbox() { } @Override - public void onProcessElement() { + public void onProcessingTime(long timestamp) { scheduleProcessElementOnMailbox(); } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java index 7118b6803b..6d0a308c25 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java @@ -141,7 +141,7 @@ public void scheduleProcessElementOnMailbox() { } @Override - public void onProcessElement() { + public void onProcessingTime(long timestamp) { scheduleProcessElementOnMailbox(); } From 1a26fc9c16a788efa9c9de384144c07f57ec3385 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 11 Jun 2026 10:50:24 +0800 Subject: [PATCH 3/3] [FLINK] Remove legacy JNI operator callback registry --- .../streaming/api/operators/GlutenOperator.java | 14 -------------- .../runtime/operators/GlutenOneInputOperator.java | 1 - .../runtime/operators/GlutenSessionResources.java | 14 -------------- .../runtime/operators/GlutenTwoInputOperator.java | 1 - 4 files changed, 30 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenOperator.java index bb6bc3bf54..666fd47024 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenOperator.java @@ -17,7 +17,6 @@ package org.apache.gluten.streaming.api.operators; import org.apache.gluten.table.runtime.operators.GlutenMailboxHolder; -import org.apache.gluten.table.runtime.operators.GlutenSessionResources; import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode; import io.github.zhztheplayer.velox4j.type.RowType; @@ -64,19 +63,6 @@ default void scheduleDrainOnMailbox(Runnable drainAction) { mailboxHolder().get().scheduleDrain(drainAction); } - /** - * Called from native Velox code to drain operator output. Drain is always scheduled on the Flink - * task mailbox thread. - */ - static void processElementByJni(String operatorId) { - GlutenOperator operator = - GlutenSessionResources.getInstance().getOperator(operatorId).orElse(null); - if (operator == null) { - throw new IllegalArgumentException("Operator not found: " + operatorId); - } - operator.scheduleProcessElementOnMailbox(); - } - /** Schedules native output drain on the mailbox thread. Implemented by concrete operators. */ default void scheduleProcessElementOnMailbox() {} } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java index 914ace2228..fc5aed0712 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java @@ -120,7 +120,6 @@ void initSession() { } sessionResource = new GlutenSessionResource(); GlutenSessionResources.getInstance().addSessionResource(id, sessionResource); - GlutenSessionResources.getInstance().addOperator(this.getClass().getSimpleName(), this); inputQueue = sessionResource.getSession().externalStreamOps().newBlockingQueue(); // add a mock input as velox not allow the source is empty. if (inputType == null) { diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResources.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResources.java index fac0784a24..4305adc755 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResources.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResources.java @@ -16,8 +16,6 @@ */ package org.apache.gluten.table.runtime.operators; -import org.apache.gluten.streaming.api.operators.GlutenOperator; - import io.github.zhztheplayer.velox4j.Velox4j; import io.github.zhztheplayer.velox4j.memory.AllocationListener; import io.github.zhztheplayer.velox4j.memory.MemoryManager; @@ -30,7 +28,6 @@ import java.util.HashMap; import java.util.Map; -import java.util.Optional; // Manage the session and resource for Velox. class GlutenSessionResource { @@ -84,7 +81,6 @@ public void setKeyedStateBackend(KeyedStateBackend keyedStateBackend) { public class GlutenSessionResources { private static final GlutenSessionResources instance = new GlutenSessionResources(); private Map sessionResources = new HashMap<>(); - private Map operators = new HashMap<>(); private GlutenSessionResources() {} @@ -104,14 +100,4 @@ public Session getSession(String id) { return sessionResources.get(id).getSession(); } - public void addOperator(String id, GlutenOperator operator) { - operators.put(id, operator); - } - - public Optional getOperator(String id) { - if (operators.containsKey(id)) { - return Optional.of(operators.get(id)); - } - return Optional.empty(); - } } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java index 6d0a308c25..883232057c 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java @@ -292,7 +292,6 @@ private void initSession() { sessionResource = new GlutenSessionResource(); GlutenSessionResources.getInstance().addSessionResource(getId(), sessionResource); - GlutenSessionResources.getInstance().addOperator(getId(), this); leftInputQueue = sessionResource.getSession().externalStreamOps().newBlockingQueue(); rightInputQueue = sessionResource.getSession().externalStreamOps().newBlockingQueue();