Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ jobs:
export VELOX_DEPENDENCY_SOURCE=BUNDLED
export fmt_SOURCE=BUNDLED
export folly_SOURCE=BUNDLED
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j && git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca
git clone -b feature/native-callback-bridge https://github.com/bigo-sg/velox4j.git
cd velox4j && git reset --hard bba532937efd74d175ea26bf590a945382c138a1
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
$GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.gluten.streaming.api.operators;

import org.apache.gluten.table.runtime.operators.GlutenMailboxHolder;
import org.apache.gluten.table.runtime.operators.GlutenSessionResources;

import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
import io.github.zhztheplayer.velox4j.type.RowType;
Expand Down Expand Up @@ -64,19 +63,6 @@ default void scheduleDrainOnMailbox(Runnable drainAction) {
mailboxHolder().get().scheduleDrain(drainAction);
}

/**
* Called from native Velox code to drain operator output. Drain is always scheduled on the Flink
* task mailbox thread.
*/
static void processElementByJni(String operatorId) {
GlutenOperator operator =
GlutenSessionResources.getInstance().getOperator(operatorId).orElse(null);
if (operator == null) {
throw new IllegalArgumentException("Operator not found: " + operatorId);
}
operator.scheduleProcessElementOnMailbox();
}

/** Schedules native output drain on the mailbox thread. Implemented by concrete operators. */
default void scheduleProcessElementOnMailbox() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.github.zhztheplayer.velox4j.query.Query;
import io.github.zhztheplayer.velox4j.query.SerialTask;
import io.github.zhztheplayer.velox4j.serde.Serde;
import io.github.zhztheplayer.velox4j.stateful.NativeCallbackTarget;
import io.github.zhztheplayer.velox4j.stateful.StatefulElement;
import io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark;
Expand All @@ -49,7 +50,7 @@

/** Calculate operator in gluten, which will call Velox to run. */
public class GlutenOneInputOperator<IN, OUT> extends TableStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT>, GlutenOperator {
implements OneInputStreamOperator<IN, OUT>, GlutenOperator, NativeCallbackTarget {

private final StatefulPlanNode glutenPlan;
private final String id;
Expand Down Expand Up @@ -119,7 +120,6 @@ void initSession() {
}
sessionResource = new GlutenSessionResource();
GlutenSessionResources.getInstance().addSessionResource(id, sessionResource);
GlutenSessionResources.getInstance().addOperator(this.getClass().getSimpleName(), this);
inputQueue = sessionResource.getSession().externalStreamOps().newBlockingQueue();
// add a mock input as velox not allow the source is empty.
if (inputType == null) {
Expand All @@ -143,6 +143,7 @@ void initSession() {
VeloxQueryConfig.getConfig(getRuntimeContext()),
VeloxConnectorConfig.getConfig(getRuntimeContext()));
task = sessionResource.getSession().queryOps().execute(query);
task.bindNativeCallbackTarget(this);
task.addSplit(
id, new ExternalStreamConnectorSplit("connector-external-stream", inputQueue.id()));
task.noMoreSplits(id);
Expand Down Expand Up @@ -186,6 +187,11 @@ public void scheduleProcessElementOnMailbox() {
scheduleDrainOnMailbox(this::drainTaskOutput);
}

@Override
public void onProcessingTime(long timestamp) {
scheduleProcessElementOnMailbox();
}

@Override
public void processElementInternal() {
drainOutput(this::drainTaskOutput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.gluten.table.runtime.operators;

import org.apache.gluten.streaming.api.operators.GlutenOperator;

import io.github.zhztheplayer.velox4j.Velox4j;
import io.github.zhztheplayer.velox4j.memory.AllocationListener;
import io.github.zhztheplayer.velox4j.memory.MemoryManager;
Expand All @@ -30,7 +28,6 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

// Manage the session and resource for Velox.
class GlutenSessionResource {
Expand Down Expand Up @@ -84,7 +81,6 @@ public void setKeyedStateBackend(KeyedStateBackend<?> keyedStateBackend) {
public class GlutenSessionResources {
private static final GlutenSessionResources instance = new GlutenSessionResources();
private Map<String, GlutenSessionResource> sessionResources = new HashMap<>();
private Map<String, GlutenOperator> operators = new HashMap<>();

private GlutenSessionResources() {}

Expand All @@ -104,14 +100,4 @@ public Session getSession(String id) {
return sessionResources.get(id).getSession();
}

public void addOperator(String id, GlutenOperator operator) {
operators.put(id, operator);
}

public Optional<GlutenOperator> getOperator(String id) {
if (operators.containsKey(id)) {
return Optional.of(operators.get(id));
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
import io.github.zhztheplayer.velox4j.query.Query;
import io.github.zhztheplayer.velox4j.query.SerialTask;
import io.github.zhztheplayer.velox4j.stateful.NativeCallbackTarget;
import io.github.zhztheplayer.velox4j.stateful.StatefulElement;
import io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark;
Expand All @@ -50,7 +51,7 @@
* instead of flink RowData.
*/
public class GlutenTwoInputOperator<IN, OUT> extends AbstractStreamOperator<OUT>
implements TwoInputStreamOperator<IN, IN, OUT>, GlutenOperator {
implements TwoInputStreamOperator<IN, IN, OUT>, GlutenOperator, NativeCallbackTarget {

private static final Logger LOG = LoggerFactory.getLogger(GlutenTwoInputOperator.class);

Expand Down Expand Up @@ -139,6 +140,11 @@ public void scheduleProcessElementOnMailbox() {
scheduleDrainOnMailbox(this::drainTaskOutput);
}

@Override
public void onProcessingTime(long timestamp) {
scheduleProcessElementOnMailbox();
}

@Override
public void processElement1(StreamRecord<IN> element) {
StatefulRecord statefulRecord =
Expand Down Expand Up @@ -286,7 +292,6 @@ private void initSession() {

sessionResource = new GlutenSessionResource();
GlutenSessionResources.getInstance().addSessionResource(getId(), sessionResource);
GlutenSessionResources.getInstance().addOperator(getId(), this);
leftInputQueue = sessionResource.getSession().externalStreamOps().newBlockingQueue();
rightInputQueue = sessionResource.getSession().externalStreamOps().newBlockingQueue();

Expand All @@ -296,6 +301,7 @@ private void initSession() {
VeloxQueryConfig.getConfig(getRuntimeContext()),
VeloxConnectorConfig.getConfig(getRuntimeContext()));
task = sessionResource.getSession().queryOps().execute(query);
task.bindNativeCallbackTarget(this);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task.unbindCallbackTarget when close?


ExternalStreamConnectorSplit leftSplit =
new ExternalStreamConnectorSplit("connector-external-stream", leftInputQueue.id());
Expand Down
Loading