Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,48 @@ public class AgentConfigOptions {
public static final ConfigOption<Integer> KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR =
new ConfigOption<>("kafkaActionStateTopicReplicationFactor", Integer.class, 1);

/** The config parameter specifies the Fluss bootstrap servers. */
public static final ConfigOption<String> FLUSS_BOOTSTRAP_SERVERS =
new ConfigOption<>("flussBootstrapServers", String.class, "localhost:9123");

/** The config parameter specifies the Fluss database for action state. */
public static final ConfigOption<String> FLUSS_ACTION_STATE_DATABASE =
new ConfigOption<>("flussActionStateDatabase", String.class, "flink_agents");

/** The config parameter specifies the Fluss table name for action state. */
public static final ConfigOption<String> FLUSS_ACTION_STATE_TABLE =
new ConfigOption<>("flussActionStateTable", String.class, "action_states");

/** The config parameter specifies the number of buckets for the Fluss action state table. */
public static final ConfigOption<Integer> FLUSS_ACTION_STATE_TABLE_BUCKETS =
new ConfigOption<>("flussActionStateTableBuckets", Integer.class, 64);

/**
* The config parameter specifies the authentication protocol for Fluss client. Valid values:
* {@code "PLAINTEXT"} (default, no authentication) and {@code "SASL"} (SASL/PLAIN
* authentication). Value is case-insensitive.
*/
public static final ConfigOption<String> FLUSS_SECURITY_PROTOCOL =
new ConfigOption<>("flussSecurityProtocol", String.class, "PLAINTEXT");

/** The config parameter specifies the SASL mechanism for Fluss authentication. */
public static final ConfigOption<String> FLUSS_SASL_MECHANISM =
new ConfigOption<>("flussSaslMechanism", String.class, "PLAIN");

/**
* The config parameter specifies the JAAS configuration string for Fluss SASL authentication.
*/
public static final ConfigOption<String> FLUSS_SASL_JAAS_CONFIG =
new ConfigOption<>("flussSaslJaasConfig", String.class, null);

/** The config parameter specifies the username for Fluss SASL authentication. */
public static final ConfigOption<String> FLUSS_SASL_USERNAME =
new ConfigOption<>("flussSaslUsername", String.class, null);

/** The config parameter specifies the password for Fluss SASL authentication. */
public static final ConfigOption<String> FLUSS_SASL_PASSWORD =
new ConfigOption<>("flussSaslPassword", String.class, null);

/** The config parameter specifies the unique identifier of job. */
public static final ConfigOption<String> JOB_IDENTIFIER =
new ConfigOption<>("job-identifier", String.class, null);
Expand Down
23 changes: 22 additions & 1 deletion docs/content/docs/operations/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,35 @@ Here is the list of all built-in core configuration options.

### Action State Store

#### Common

| Key | Default | Type | Description |
|------------------------------|------------------|---------|------------------------------------------------------------------------------------------|
| `actionStateStoreBackend` | (none) | String | The backend for action state store. Supported values: `"kafka"`, `"fluss"`. |

#### Kafka-based Action State Store

Here are the configuration options for Kafka-based Action State Store.

| Key | Default | Type | Description |
|-------------------------------------|--------------------------|---------|-----------------------------------------------------------------------------|
| `actionStateStoreBackend` | (none) | String | The config parameter specifies the backend for action state store. |
| `kafkaBootstrapServers` | "localhost:9092" | String | The config parameter specifies the Kafka bootstrap server. |
| `kafkaActionStateTopic` | (none) | String | The config parameter specifies the Kafka topic for action state. |
| `kafkaActionStateTopicNumPartitions`| 64 | Integer | The config parameter specifies the number of partitions for the Kafka action state topic. |
| `kafkaActionStateTopicReplicationFactor` | 1 | Integer | The config parameter specifies the replication factor for the Kafka action state topic. |

#### Fluss-based Action State Store

Here are the configuration options for Fluss-based Action State Store.

Comment thread
beryllw marked this conversation as resolved.
| Key | Default | Type | Description |
|------------------------------|------------------|---------|------------------------------------------------------------------------------------------|
| `flussBootstrapServers` | "localhost:9123" | String | The Fluss bootstrap servers address. |
| `flussActionStateDatabase` | "flink_agents" | String | The Fluss database name for storing action state. |
| `flussActionStateTable` | "action_states" | String | The Fluss table name for storing action state. |
| `flussActionStateTableBuckets` | 64 | Integer | The number of buckets for the Fluss action state table. |
| `flussSecurityProtocol` | "PLAINTEXT" | String | The authentication protocol for Fluss client. Valid values: `PLAINTEXT` (default, no authentication), `SASL` (SASL/PLAIN authentication). |
| `flussSaslMechanism` | "PLAIN" | String | The SASL mechanism for Fluss authentication. |
| `flussSaslJaasConfig` | (none) | String | The JAAS configuration string for Fluss SASL authentication. |
| `flussSaslUsername` | (none) | String | The username for Fluss SASL authentication. |
| `flussSaslPassword` | (none) | String | The password for Fluss SASL authentication. |
2 changes: 1 addition & 1 deletion docs/content/docs/operations/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ After recovery from a checkpoint, Flink Agents reprocess events that arrived aft
To ensure exactly-once action consistency, you must configure an external action state store. Flink Agents record action state in this store on a per-action basis. After recovering from a checkpoint, Flink Agents consult the external store and will not re-execute actions that were already completed. This guarantees each action is executed exactly once after recovering from a checkpoint.

{{< hint info >}}
**Note**: Currently, Kafka is supported as the external action state store.
**Note**: Currently, Kafka and Fluss are supported as the external action state store.
{{< /hint >}}

See [Action State Store Configuration]({{< ref "docs/operations/configuration#action-state-store" >}}) for configuration options.
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ under the License.
<spotless.skip>false</spotless.skip>
<flink.version>2.2.0</flink.version>
<kafka.version>4.0.0</kafka.version>
<fluss.version>0.9.0-incubating</fluss.version>
<junit5.version>5.10.1</junit5.version>
<jackson.version>2.18.2</jackson.version>
<pemja.version>0.5.5</pemja.version>
Expand Down Expand Up @@ -309,6 +310,7 @@ under the License.
<configuration>
<argLine>
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-exports=java.base/jdk.internal.vm=ALL-UNNAMED
</argLine>
</configuration>
Expand Down
38 changes: 38 additions & 0 deletions runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,44 @@ under the License.
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- fluss client -->
<dependency>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-client</artifactId>
<version>${fluss.version}</version>
</dependency>
<!-- fluss test infrastructure -->
<dependency>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-server</artifactId>
<version>${fluss.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-server</artifactId>
<version>${fluss.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
Comment thread
beryllw marked this conversation as resolved.
<dependency>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-test-utils</artifactId>
<version>${fluss.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>5.4.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- LOG -->
<dependency>
<groupId>org.slf4j</groupId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,107 +17,34 @@
*/
package org.apache.flink.agents.runtime.actionstate;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.InputEvent;
import org.apache.flink.agents.api.OutputEvent;
import org.apache.flink.agents.runtime.operator.ActionTask;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;

/**
* Kafka serializer for {@link ActionState}.
*
* <p>This serializer handles the serialization of ActionState instances to byte arrays for storage
* in Kafka. It uses Jackson ObjectMapper with custom serializers to handle polymorphic Event types
* and ensures ActionTask is serialized as null.
* Kafka serializer/deserializer for {@link ActionState}. Delegates to {@link ActionStateSerde} for
* the actual serialization logic.
*/
public class ActionStateKafkaSeder implements Serializer<ActionState>, Deserializer<ActionState> {

private static final Logger LOG = LoggerFactory.getLogger(ActionStateKafkaSeder.class);
private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// No configuration needed
}

@Override
public ActionState deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}

try {
return OBJECT_MAPPER.readValue(data, ActionState.class);
} catch (Exception e) {
LOG.error("Failed to deserialize ActionState for topic: {}", topic, e);
throw new RuntimeException("Failed to deserialize ActionState", e);
}
public byte[] serialize(String topic, ActionState data) {
return data == null ? null : ActionStateSerde.serialize(data);
}

@Override
public byte[] serialize(String topic, ActionState data) {
if (data == null) {
return null;
}

try {
return OBJECT_MAPPER.writeValueAsBytes(data);
} catch (Exception e) {
LOG.error("Failed to serialize ActionState for topic: {}", topic, e);
throw new RuntimeException("Failed to serialize ActionState", e);
}
public ActionState deserialize(String topic, byte[] data) {
return data == null ? null : ActionStateSerde.deserialize(data);
}

@Override
public void close() {
// No resources to close
}

/** Creates and configures the ObjectMapper for ActionState serialization. */
private static ObjectMapper createObjectMapper() {
ObjectMapper mapper = new ObjectMapper();

// Add type information for polymorphic Event deserialization
mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);

// Create a module for custom serializers
SimpleModule module = new SimpleModule();

// Custom serializer for ActionTask - always serialize as null
module.addSerializer(ActionTask.class, new ActionTaskSerializer());

mapper.registerModule(module);

return mapper;
}

/** Mixin to add type information for Event hierarchy. */
@JsonTypeInfo(
use = JsonTypeInfo.Id.CLASS,
include = JsonTypeInfo.As.PROPERTY,
property = "@class")
public abstract static class EventTypeInfoMixin {}

/** Custom serializer for ActionTask that always serializes as null. */
public static class ActionTaskSerializer extends JsonSerializer<ActionTask> {
@Override
public void serialize(ActionTask value, JsonGenerator gen, SerializerProvider serializers)
throws IOException {
gen.writeNull();
}
}
}
Loading
Loading