diff --git a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java index 724247456..e936ce5a5 100644 --- a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java +++ b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java @@ -51,6 +51,48 @@ public class AgentConfigOptions { public static final ConfigOption KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR = new ConfigOption<>("kafkaActionStateTopicReplicationFactor", Integer.class, 1); + /** The config parameter specifies the Fluss bootstrap servers. */ + public static final ConfigOption FLUSS_BOOTSTRAP_SERVERS = + new ConfigOption<>("flussBootstrapServers", String.class, "localhost:9123"); + + /** The config parameter specifies the Fluss database for action state. */ + public static final ConfigOption FLUSS_ACTION_STATE_DATABASE = + new ConfigOption<>("flussActionStateDatabase", String.class, "flink_agents"); + + /** The config parameter specifies the Fluss table name for action state. */ + public static final ConfigOption FLUSS_ACTION_STATE_TABLE = + new ConfigOption<>("flussActionStateTable", String.class, "action_states"); + + /** The config parameter specifies the number of buckets for the Fluss action state table. */ + public static final ConfigOption FLUSS_ACTION_STATE_TABLE_BUCKETS = + new ConfigOption<>("flussActionStateTableBuckets", Integer.class, 64); + + /** + * The config parameter specifies the authentication protocol for Fluss client. Valid values: + * {@code "PLAINTEXT"} (default, no authentication) and {@code "SASL"} (SASL/PLAIN + * authentication). Value is case-insensitive. + */ + public static final ConfigOption FLUSS_SECURITY_PROTOCOL = + new ConfigOption<>("flussSecurityProtocol", String.class, "PLAINTEXT"); + + /** The config parameter specifies the SASL mechanism for Fluss authentication. */ + public static final ConfigOption FLUSS_SASL_MECHANISM = + new ConfigOption<>("flussSaslMechanism", String.class, "PLAIN"); + + /** + * The config parameter specifies the JAAS configuration string for Fluss SASL authentication. + */ + public static final ConfigOption FLUSS_SASL_JAAS_CONFIG = + new ConfigOption<>("flussSaslJaasConfig", String.class, null); + + /** The config parameter specifies the username for Fluss SASL authentication. */ + public static final ConfigOption FLUSS_SASL_USERNAME = + new ConfigOption<>("flussSaslUsername", String.class, null); + + /** The config parameter specifies the password for Fluss SASL authentication. */ + public static final ConfigOption FLUSS_SASL_PASSWORD = + new ConfigOption<>("flussSaslPassword", String.class, null); + /** The config parameter specifies the unique identifier of job. */ public static final ConfigOption JOB_IDENTIFIER = new ConfigOption<>("job-identifier", String.class, null); diff --git a/docs/content/docs/operations/configuration.md b/docs/content/docs/operations/configuration.md index 67fed224a..1b9b477ab 100644 --- a/docs/content/docs/operations/configuration.md +++ b/docs/content/docs/operations/configuration.md @@ -140,14 +140,35 @@ Here is the list of all built-in core configuration options. ### Action State Store +#### Common + +| Key | Default | Type | Description | +|------------------------------|------------------|---------|------------------------------------------------------------------------------------------| +| `actionStateStoreBackend` | (none) | String | The backend for action state store. Supported values: `"kafka"`, `"fluss"`. | + #### Kafka-based Action State Store Here are the configuration options for Kafka-based Action State Store. | Key | Default | Type | Description | |-------------------------------------|--------------------------|---------|-----------------------------------------------------------------------------| -| `actionStateStoreBackend` | (none) | String | The config parameter specifies the backend for action state store. | | `kafkaBootstrapServers` | "localhost:9092" | String | The config parameter specifies the Kafka bootstrap server. | | `kafkaActionStateTopic` | (none) | String | The config parameter specifies the Kafka topic for action state. | | `kafkaActionStateTopicNumPartitions`| 64 | Integer | The config parameter specifies the number of partitions for the Kafka action state topic. | | `kafkaActionStateTopicReplicationFactor` | 1 | Integer | The config parameter specifies the replication factor for the Kafka action state topic. | + +#### Fluss-based Action State Store + +Here are the configuration options for Fluss-based Action State Store. + +| Key | Default | Type | Description | +|------------------------------|------------------|---------|------------------------------------------------------------------------------------------| +| `flussBootstrapServers` | "localhost:9123" | String | The Fluss bootstrap servers address. | +| `flussActionStateDatabase` | "flink_agents" | String | The Fluss database name for storing action state. | +| `flussActionStateTable` | "action_states" | String | The Fluss table name for storing action state. | +| `flussActionStateTableBuckets` | 64 | Integer | The number of buckets for the Fluss action state table. | +| `flussSecurityProtocol` | "PLAINTEXT" | String | The authentication protocol for Fluss client. Valid values: `PLAINTEXT` (default, no authentication), `SASL` (SASL/PLAIN authentication). | +| `flussSaslMechanism` | "PLAIN" | String | The SASL mechanism for Fluss authentication. | +| `flussSaslJaasConfig` | (none) | String | The JAAS configuration string for Fluss SASL authentication. | +| `flussSaslUsername` | (none) | String | The username for Fluss SASL authentication. | +| `flussSaslPassword` | (none) | String | The password for Fluss SASL authentication. | diff --git a/docs/content/docs/operations/deployment.md b/docs/content/docs/operations/deployment.md index bfe609c2a..ca2efc232 100644 --- a/docs/content/docs/operations/deployment.md +++ b/docs/content/docs/operations/deployment.md @@ -172,7 +172,7 @@ After recovery from a checkpoint, Flink Agents reprocess events that arrived aft To ensure exactly-once action consistency, you must configure an external action state store. Flink Agents record action state in this store on a per-action basis. After recovering from a checkpoint, Flink Agents consult the external store and will not re-execute actions that were already completed. This guarantees each action is executed exactly once after recovering from a checkpoint. {{< hint info >}} -**Note**: Currently, Kafka is supported as the external action state store. +**Note**: Currently, Kafka and Fluss are supported as the external action state store. {{< /hint >}} See [Action State Store Configuration]({{< ref "docs/operations/configuration#action-state-store" >}}) for configuration options. diff --git a/pom.xml b/pom.xml index cd7744797..e77b0a51c 100644 --- a/pom.xml +++ b/pom.xml @@ -43,6 +43,7 @@ under the License. false 2.2.0 4.0.0 + 0.9.0-incubating 5.10.1 2.18.2 0.5.5 @@ -309,6 +310,7 @@ under the License. --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports=java.base/jdk.internal.vm=ALL-UNNAMED diff --git a/runtime/pom.xml b/runtime/pom.xml index 384eeea82..b37a44550 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -116,6 +116,44 @@ under the License. kafka-clients ${kafka.version} + + + org.apache.fluss + fluss-client + ${fluss.version} + + + + org.apache.fluss + fluss-server + ${fluss.version} + test + + + org.apache.fluss + fluss-server + ${fluss.version} + test-jar + test + + + org.apache.fluss + fluss-test-utils + ${fluss.version} + test + + + org.apache.curator + curator-test + 5.4.0 + test + + + org.junit.jupiter + junit-jupiter-api + + + org.slf4j diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaDeserializer.java b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaDeserializer.java deleted file mode 100644 index 38e7b6ad7..000000000 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaDeserializer.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.agents.runtime.actionstate; - -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.module.SimpleModule; -import org.apache.flink.agents.api.Event; -import org.apache.flink.agents.api.InputEvent; -import org.apache.flink.agents.api.OutputEvent; -import org.apache.flink.agents.runtime.operator.ActionTask; -import org.apache.kafka.common.serialization.Deserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Map; - -/** - * Kafka deserializer for {@link ActionState}. - * - *

This deserializer handles the deserialization of byte arrays from Kafka back to ActionState - * instances. It uses Jackson ObjectMapper with custom deserializers to handle polymorphic Event - * types and ensures ActionTask is deserialized as null. - */ -public class ActionStateKafkaDeserializer implements Deserializer { - - private static final Logger LOG = LoggerFactory.getLogger(ActionStateKafkaDeserializer.class); - private static final ObjectMapper OBJECT_MAPPER = createObjectMapper(); - - @Override - public void configure(Map configs, boolean isKey) { - // No configuration needed - } - - @Override - public ActionState deserialize(String topic, byte[] data) { - if (data == null) { - return null; - } - - try { - return OBJECT_MAPPER.readValue(data, ActionState.class); - } catch (Exception e) { - LOG.error("Failed to deserialize ActionState for topic: {}", topic, e); - throw new RuntimeException("Failed to deserialize ActionState", e); - } - } - - @Override - public void close() { - // No resources to close - } - - /** Creates and configures the ObjectMapper for ActionState deserialization. */ - private static ObjectMapper createObjectMapper() { - ObjectMapper mapper = new ObjectMapper(); - - // Add type information for polymorphic Event deserialization - mapper.addMixIn(Event.class, EventTypeInfoMixin.class); - mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class); - mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class); - - // Create a module for custom deserializers - SimpleModule module = new SimpleModule(); - - // Custom deserializer for ActionTask - always deserialize as null - module.addDeserializer(ActionTask.class, new ActionTaskDeserializer()); - - mapper.registerModule(module); - - return mapper; - } - - /** Mixin to add type information for Event hierarchy. */ - @JsonTypeInfo( - use = JsonTypeInfo.Id.CLASS, - include = JsonTypeInfo.As.PROPERTY, - property = "@class") - public abstract static class EventTypeInfoMixin {} - - /** Custom deserializer for ActionTask that always deserializes as null. */ - public static class ActionTaskDeserializer extends JsonDeserializer { - @Override - public ActionTask deserialize(JsonParser p, DeserializationContext ctxt) - throws IOException { - // Skip the value and return null - p.skipChildren(); - return null; - } - } -} diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSeder.java b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSeder.java index b225eb09c..9af3dc3f6 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSeder.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSeder.java @@ -17,107 +17,34 @@ */ package org.apache.flink.agents.runtime.actionstate; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.module.SimpleModule; -import org.apache.flink.agents.api.Event; -import org.apache.flink.agents.api.InputEvent; -import org.apache.flink.agents.api.OutputEvent; -import org.apache.flink.agents.runtime.operator.ActionTask; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Map; /** - * Kafka serializer for {@link ActionState}. - * - *

This serializer handles the serialization of ActionState instances to byte arrays for storage - * in Kafka. It uses Jackson ObjectMapper with custom serializers to handle polymorphic Event types - * and ensures ActionTask is serialized as null. + * Kafka serializer/deserializer for {@link ActionState}. Delegates to {@link ActionStateSerde} for + * the actual serialization logic. */ public class ActionStateKafkaSeder implements Serializer, Deserializer { - private static final Logger LOG = LoggerFactory.getLogger(ActionStateKafkaSeder.class); - private static final ObjectMapper OBJECT_MAPPER = createObjectMapper(); - @Override public void configure(Map configs, boolean isKey) { // No configuration needed } @Override - public ActionState deserialize(String topic, byte[] data) { - if (data == null) { - return null; - } - - try { - return OBJECT_MAPPER.readValue(data, ActionState.class); - } catch (Exception e) { - LOG.error("Failed to deserialize ActionState for topic: {}", topic, e); - throw new RuntimeException("Failed to deserialize ActionState", e); - } + public byte[] serialize(String topic, ActionState data) { + return data == null ? null : ActionStateSerde.serialize(data); } @Override - public byte[] serialize(String topic, ActionState data) { - if (data == null) { - return null; - } - - try { - return OBJECT_MAPPER.writeValueAsBytes(data); - } catch (Exception e) { - LOG.error("Failed to serialize ActionState for topic: {}", topic, e); - throw new RuntimeException("Failed to serialize ActionState", e); - } + public ActionState deserialize(String topic, byte[] data) { + return data == null ? null : ActionStateSerde.deserialize(data); } @Override public void close() { // No resources to close } - - /** Creates and configures the ObjectMapper for ActionState serialization. */ - private static ObjectMapper createObjectMapper() { - ObjectMapper mapper = new ObjectMapper(); - - // Add type information for polymorphic Event deserialization - mapper.addMixIn(Event.class, EventTypeInfoMixin.class); - mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class); - mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class); - - // Create a module for custom serializers - SimpleModule module = new SimpleModule(); - - // Custom serializer for ActionTask - always serialize as null - module.addSerializer(ActionTask.class, new ActionTaskSerializer()); - - mapper.registerModule(module); - - return mapper; - } - - /** Mixin to add type information for Event hierarchy. */ - @JsonTypeInfo( - use = JsonTypeInfo.Id.CLASS, - include = JsonTypeInfo.As.PROPERTY, - property = "@class") - public abstract static class EventTypeInfoMixin {} - - /** Custom serializer for ActionTask that always serializes as null. */ - public static class ActionTaskSerializer extends JsonSerializer { - @Override - public void serialize(ActionTask value, JsonGenerator gen, SerializerProvider serializers) - throws IOException { - gen.writeNull(); - } - } } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSerializer.java b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerde.java similarity index 67% rename from runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSerializer.java rename to runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerde.java index 881c5293c..02c9a06b3 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSerializer.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerde.java @@ -27,50 +27,40 @@ import org.apache.flink.agents.api.InputEvent; import org.apache.flink.agents.api.OutputEvent; import org.apache.flink.agents.runtime.operator.ActionTask; -import org.apache.kafka.common.serialization.Serializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Map; /** - * Kafka serializer for {@link ActionState}. + * Backend-agnostic serializer/deserializer for {@link ActionState}. * - *

This serializer handles the serialization of ActionState instances to byte arrays for storage - * in Kafka. It uses Jackson ObjectMapper with custom serializers to handle polymorphic Event types - * and ensures ActionTask is serialized as null. + *

Uses Jackson {@link ObjectMapper} configured with polymorphic type information for the {@link + * Event} hierarchy and a custom null-serializer for {@link ActionTask}. Both Kafka and Fluss + * ActionStateStore backends delegate to this class for consistent serialization format. */ -public class ActionStateKafkaSerializer implements Serializer { +public final class ActionStateSerde { - private static final Logger LOG = LoggerFactory.getLogger(ActionStateKafkaSerializer.class); private static final ObjectMapper OBJECT_MAPPER = createObjectMapper(); - @Override - public void configure(Map configs, boolean isKey) { - // No configuration needed - } - - @Override - public byte[] serialize(String topic, ActionState data) { - if (data == null) { - return null; - } + private ActionStateSerde() {} + /** Serializes an {@link ActionState} to a JSON byte array. */ + public static byte[] serialize(ActionState state) { try { - return OBJECT_MAPPER.writeValueAsBytes(data); + return OBJECT_MAPPER.writeValueAsBytes(state); } catch (Exception e) { - LOG.error("Failed to serialize ActionState for topic: {}", topic, e); throw new RuntimeException("Failed to serialize ActionState", e); } } - @Override - public void close() { - // No resources to close + /** Deserializes an {@link ActionState} from a JSON byte array. */ + public static ActionState deserialize(byte[] data) { + try { + return OBJECT_MAPPER.readValue(data, ActionState.class); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize ActionState", e); + } } - /** Creates and configures the ObjectMapper for ActionState serialization. */ private static ObjectMapper createObjectMapper() { ObjectMapper mapper = new ObjectMapper(); @@ -79,12 +69,9 @@ private static ObjectMapper createObjectMapper() { mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class); mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class); - // Create a module for custom serializers - SimpleModule module = new SimpleModule(); - // Custom serializer for ActionTask - always serialize as null + SimpleModule module = new SimpleModule(); module.addSerializer(ActionTask.class, new ActionTaskSerializer()); - mapper.registerModule(module); return mapper; @@ -95,10 +82,10 @@ private static ObjectMapper createObjectMapper() { use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class") - public abstract static class EventTypeInfoMixin {} + abstract static class EventTypeInfoMixin {} /** Custom serializer for ActionTask that always serializes as null. */ - public static class ActionTaskSerializer extends JsonSerializer { + static class ActionTaskSerializer extends JsonSerializer { @Override public void serialize(ActionTask value, JsonGenerator gen, SerializerProvider serializers) throws IOException { diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateStore.java b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateStore.java index 433e2beac..e29557c0d 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateStore.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateStore.java @@ -26,7 +26,8 @@ /** Interface for storing and retrieving the state of actions performed by agents. */ public interface ActionStateStore extends AutoCloseable { enum BackendType { - KAFKA("kafka"); + KAFKA("kafka"), + FLUSS("fluss"); private final String type; diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java new file mode 100644 index 000000000..e91db0630 --- /dev/null +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.runtime.actionstate; + +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.plan.AgentConfiguration; +import org.apache.flink.agents.plan.actions.Action; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.client.admin.OffsetSpec; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.scanner.ScanRecord; +import org.apache.fluss.client.table.scanner.log.LogScanner; +import org.apache.fluss.client.table.scanner.log.ScanRecords; +import org.apache.fluss.client.table.writer.AppendWriter; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.DatabaseDescriptor; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.DataTypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL; +import static org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey; +import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS; +import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_CONFIG; +import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_PASSWORD; +import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_USERNAME; +import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_MECHANISM; +import static org.apache.fluss.config.ConfigOptions.CLIENT_SECURITY_PROTOCOL; + +/** + * An implementation of {@link ActionStateStore} that uses an Apache Fluss log table as the backend. + * All state is maintained in an in-memory map for fast lookups, with the Fluss log table providing + * durability and recovery support. + */ +public class FlussActionStateStore implements ActionStateStore { + + private static final Logger LOG = LoggerFactory.getLogger(FlussActionStateStore.class); + + private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1); + + private static final String SECURITY_PROTOCOL_PLAINTEXT = "PLAINTEXT"; + + // Column names in the Fluss table schema + private static final String COL_NAME_STATE_KEY = "state_key"; + private static final String COL_NAME_STATE_PAYLOAD = "state_payload"; + private static final String COL_NAME_AGENT_KEY = "agent_key"; + + // Column indices in the Fluss table schema + private static final int COL_STATE_KEY = 0; + private static final int COL_STATE_PAYLOAD = 1; + + private final AgentConfiguration agentConfiguration; + private final String databaseName; + private final String tableName; + private final TablePath tablePath; + + private final Connection connection; + private final Table table; + private final AppendWriter writer; + + /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on recovery. */ + private final Map actionStates; + + @VisibleForTesting + FlussActionStateStore( + Map actionStates, + Connection connection, + Table table, + AppendWriter writer) { + this.agentConfiguration = null; + this.databaseName = null; + this.tableName = null; + this.tablePath = null; + this.actionStates = actionStates; + this.connection = connection; + this.table = table; + this.writer = writer; + } + + public FlussActionStateStore(AgentConfiguration agentConfiguration) { + this.agentConfiguration = agentConfiguration; + this.databaseName = agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE); + this.tableName = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE); + this.tablePath = TablePath.of(databaseName, tableName); + this.actionStates = new HashMap<>(); + + Configuration flussConf = new Configuration(); + flussConf.setString( + BOOTSTRAP_SERVERS.key(), agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS)); + // Minimize latency for synchronous put(): setting batch linger time to zero ensures + // that each append is sent immediately without waiting for additional records to batch. + flussConf.set(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT, Duration.ZERO); + + // Only set security/SASL parameters when the protocol requires authentication. + // When PLAINTEXT (the default), SASL parameters are semantically invalid and may + // cause the Fluss client to attempt an unwanted SASL handshake. + String securityProtocol = agentConfiguration.get(FLUSS_SECURITY_PROTOCOL); + flussConf.setString(CLIENT_SECURITY_PROTOCOL, securityProtocol); + if (!SECURITY_PROTOCOL_PLAINTEXT.equalsIgnoreCase(securityProtocol)) { + flussConf.setString( + CLIENT_SASL_MECHANISM, agentConfiguration.get(FLUSS_SASL_MECHANISM)); + + String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG); + if (jaasConfig != null) { + flussConf.setString(CLIENT_SASL_JAAS_CONFIG, jaasConfig); + } + String username = agentConfiguration.get(FLUSS_SASL_USERNAME); + if (username != null) { + flussConf.setString(CLIENT_SASL_JAAS_USERNAME, username); + } + String password = agentConfiguration.get(FLUSS_SASL_PASSWORD); + if (password != null) { + flussConf.setString(CLIENT_SASL_JAAS_PASSWORD, password); + } + } + + this.connection = ConnectionFactory.createConnection(flussConf); + maybeCreateDatabaseAndTable(); + this.table = connection.getTable(tablePath); + this.writer = table.newAppend().createWriter(); + + LOG.info( + "Initialized FlussActionStateStore (log table) with table: {}.{}", + databaseName, + tableName); + } + + @Override + public void put(Object key, long seqNum, Action action, Event event, ActionState state) + throws Exception { + String stateKey = generateKey(key, seqNum, action, event); + byte[] payload = ActionStateSerde.serialize(state); + + GenericRow row = + GenericRow.of( + BinaryString.fromString(stateKey), + payload, + BinaryString.fromString(key.toString())); + + // Append to Fluss log for durability, then update in-memory cache. + // Synchronous write ensures the record is durable before returning. + writer.append(row).get(); + actionStates.put(stateKey, state); + + LOG.debug("Stored action state: key={}, isCompleted={}", stateKey, state.isCompleted()); + } + + @Override + public ActionState get(Object key, long seqNum, Action action, Event event) throws Exception { + String stateKey = generateKey(key, seqNum, action, event); + + boolean hasDivergence = checkDivergence(key.toString(), seqNum); + + if (!actionStates.containsKey(stateKey) || hasDivergence) { + actionStates + .entrySet() + .removeIf( + entry -> { + try { + List parts = ActionStateUtil.parseKey(entry.getKey()); + if (parts.size() >= 2) { + long stateSeqNum = Long.parseLong(parts.get(1)); + return stateSeqNum > seqNum; + } + } catch (NumberFormatException e) { + LOG.warn( + "Failed to parse sequence number from state key: {}", + stateKey); + } + return false; + }); + } + + ActionState state = actionStates.get(stateKey); + LOG.debug("Lookup action state: key={}, found={}", stateKey, state != null); + return state; + } + + private boolean checkDivergence(String key, long seqNum) { + return actionStates.keySet().stream() + .filter(k -> k.startsWith(key + "_" + seqNum + "_")) + .count() + > 1; + } + + /** + * Rebuilds in-memory state by scanning the Fluss log table. If recovery markers are provided, + * computes the minimum offset per bucket across all markers and subscribes from those offsets. + * Otherwise, skips rebuild since there is no checkpointed position to recover from. Reads from + * the start offset up to the latest offset captured at rebuild start. For the same state key + * appearing multiple times in the log, the latest record wins (last-write-wins). + */ + @Override + public void rebuildState(List recoveryMarkers) { + LOG.info( + "Rebuilding action state from Fluss log table with {} recovery markers", + recoveryMarkers.size()); + + if (recoveryMarkers.isEmpty()) { + LOG.info("No recovery markers, skipping state rebuild"); + return; + } + + actionStates.clear(); + + Map bucketStartOffsets = mergeRecoveryMarkerOffsets(recoveryMarkers); + if (bucketStartOffsets.isEmpty()) { + LOG.info("No valid bucket offsets in recovery markers, skipping state rebuild"); + return; + } + + Map bucketEndOffsets = getBucketEndOffsets(); + Map bucketEarliestOffsets = getBucketEarliestOffsets(); + LOG.debug( + "Rebuild window: startOffsets={}, earliestOffsets={}, endOffsets={}", + bucketStartOffsets, + bucketEarliestOffsets, + bucketEndOffsets); + + try (LogScanner scanner = table.newScan().createLogScanner()) { + Map remainingBuckets = + subscribeEffectiveOffsets( + scanner, bucketStartOffsets, bucketEndOffsets, bucketEarliestOffsets); + LOG.debug("Subscribed buckets for rebuild: {}", remainingBuckets); + + pollAndReplay(scanner, remainingBuckets); + } catch (Exception e) { + throw new RuntimeException("Failed to rebuild state from Fluss log table", e); + } + + LOG.info("Completed rebuilding state, recovered {} states", actionStates.size()); + } + + /** + * Merges recovery markers into a per-bucket start offset map. For each bucket, the minimum + * offset across all markers is used to cover the widest recovery window. + */ + private Map mergeRecoveryMarkerOffsets(List recoveryMarkers) { + Map bucketStartOffsets = new HashMap<>(); + for (Object marker : recoveryMarkers) { + if (marker instanceof Map) { + @SuppressWarnings("unchecked") + Map markerMap = (Map) marker; + for (Map.Entry entry : markerMap.entrySet()) { + bucketStartOffsets.merge(entry.getKey(), entry.getValue(), Math::min); + } + } else if (marker != null) { + LOG.warn( + "Ignoring unrecognized recovery marker type: {}", + marker.getClass().getName()); + } + } + return bucketStartOffsets; + } + + /** + * Validates effective offsets for each bucket and subscribes the scanner. Buckets with no new + * data are skipped; buckets with data loss (retention cleaned the recovery window) cause an + * immediate failure. + * + * @return a map of bucket-id to end-offset for buckets that need to be scanned + */ + private Map subscribeEffectiveOffsets( + LogScanner scanner, + Map bucketStartOffsets, + Map bucketEndOffsets, + Map bucketEarliestOffsets) { + Map remainingBuckets = new HashMap<>(); + for (Map.Entry entry : bucketStartOffsets.entrySet()) { + int bucket = entry.getKey(); + long startOffset = entry.getValue(); + long endOffset = bucketEndOffsets.get(bucket); + long earliestOffset = bucketEarliestOffsets.get(bucket); + + // No new data since checkpoint (includes empty buckets that never had writes: + // endOffset=0, startOffset=0) + if (endOffset == startOffset) { + LOG.info( + "Skipping bucket {} for rebuild: no new data " + + "(endOffset={} = startOffset={})", + bucket, + endOffset, + startOffset); + continue; + } + + // Data loss: retention cleaned the recovery window, or log was truncated/reset + if (earliestOffset > startOffset || endOffset < startOffset) { + throw new IllegalStateException( + String.format( + "Data loss detected for bucket %d: required data is no longer " + + "available in the log (startOffset=%d, endOffset=%d, " + + "earliestOffset=%d). Increase log retention or reduce " + + "checkpoint interval.", + bucket, startOffset, endOffset, earliestOffset)); + } + + scanner.subscribe(bucket, startOffset); + remainingBuckets.put(bucket, endOffset); + } + return remainingBuckets; + } + + /** + * Polls the scanner and replays deserialized action states until all subscribed buckets have + * been fully consumed up to their end offsets. + */ + private void pollAndReplay(LogScanner scanner, Map remainingBuckets) { + while (!remainingBuckets.isEmpty()) { + ScanRecords records = scanner.poll(POLL_TIMEOUT); + for (TableBucket bucket : records.buckets()) { + Long endOffset = remainingBuckets.get(bucket.getBucket()); + long lastSeenOffset = replayRecords(records.records(bucket), endOffset); + if (lastSeenOffset + 1 >= endOffset) { + remainingBuckets.remove(bucket.getBucket()); + scanner.unsubscribe(bucket.getBucket()); + } + } + } + } + + /** + * Replays records from a single bucket, deserializing and applying each action state. Returns + * the highest log offset seen (including records past endOffset), used to detect bucket + * completion. + */ + private long replayRecords(Iterable records, long endOffset) { + long lastSeenOffset = -1; + for (ScanRecord record : records) { + lastSeenOffset = record.logOffset(); + if (record.logOffset() >= endOffset) { + break; + } + InternalRow row = record.getRow(); + String stateKey = row.getString(COL_STATE_KEY).toString(); + byte[] payload = row.getBytes(COL_STATE_PAYLOAD); + ActionState state = ActionStateSerde.deserialize(payload); + actionStates.put(stateKey, state); + } + return lastSeenOffset; + } + + private Map getBucketEndOffsets() { + return getBucketOffsets(new OffsetSpec.LatestSpec()); + } + + private Map getBucketEarliestOffsets() { + return getBucketOffsets(new OffsetSpec.EarliestSpec()); + } + + private Map getBucketOffsets(OffsetSpec offsetSpec) { + int numBuckets = table.getTableInfo().getNumBuckets(); + try (Admin admin = connection.getAdmin()) { + List buckets = new ArrayList<>(); + for (int b = 0; b < numBuckets; b++) { + buckets.add(b); + } + return admin.listOffsets(tablePath, buckets, offsetSpec).all().get(); + } catch (Exception e) { + throw new RuntimeException("Failed to get offsets for Fluss table: " + tablePath, e); + } + } + + /** + * Returns the end offsets of each bucket as a recovery marker. Similar to Kafka's + * implementation, this captures the current log position so that {@link #rebuildState} can + * resume from these offsets instead of scanning from the beginning. + */ + @Override + public Object getRecoveryMarker() { + return getBucketEndOffsets(); + } + + /** + * Evicts pruned states from the in-memory cache. The Fluss log is append-only; physical cleanup + * relies on Fluss log retention configuration. + */ + @Override + public void pruneState(Object key, long seqNum) { + LOG.debug("Pruning in-memory state for key: {} up to seqNum: {}", key, seqNum); + + actionStates + .entrySet() + .removeIf( + entry -> { + String stateKey = entry.getKey(); + if (stateKey.startsWith(key.toString() + "_")) { + try { + List parts = ActionStateUtil.parseKey(stateKey); + if (parts.size() >= 2) { + long stateSeqNum = Long.parseLong(parts.get(1)); + return stateSeqNum <= seqNum; + } + } catch (Exception e) { + LOG.warn("Failed to parse state key: {}", stateKey, e); + } + } + return false; + }); + } + + @Override + public void close() throws Exception { + try { + if (table != null) { + table.close(); + } + } finally { + if (connection != null) { + connection.close(); + } + } + } + + private void maybeCreateDatabaseAndTable() { + try (Admin admin = connection.getAdmin()) { + if (!admin.databaseExists(databaseName).get()) { + admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, true).get(); + LOG.info("Created Fluss database: {}", databaseName); + } + + if (!admin.tableExists(tablePath).get()) { + // No primaryKey() call — this creates an append-only log table in Fluss. + Schema schema = + Schema.newBuilder() + .column(COL_NAME_STATE_KEY, DataTypes.STRING()) + .column(COL_NAME_STATE_PAYLOAD, DataTypes.BYTES()) + .column(COL_NAME_AGENT_KEY, DataTypes.STRING()) + .build(); + + int buckets = agentConfiguration.get(FLUSS_ACTION_STATE_TABLE_BUCKETS); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(buckets, COL_NAME_AGENT_KEY) + .comment("Flink Agents action state log") + .build(); + + admin.createTable(tablePath, descriptor, true).get(); + LOG.info("Created Fluss log table: {}", tablePath); + } else { + LOG.info("Fluss table {} already exists", tablePath); + } + } catch (Exception e) { + LOG.error( + "Failed to create or verify Fluss database/table: {}.{}", + databaseName, + tableName, + e); + throw new RuntimeException("Failed to create or verify Fluss database/table", e); + } + } +} diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java index 648f146be..f09e5cd29 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java @@ -194,7 +194,9 @@ public ActionState get(Object key, long seqNum, Action action, Event event) thro } private boolean checkDivergence(String key, long seqNum) { - return actionStates.keySet().stream().filter(k -> k.startsWith(key + "_" + seqNum)).count() + return actionStates.keySet().stream() + .filter(k -> k.startsWith(key + "_" + seqNum + "_")) + .count() > 1; } @@ -211,6 +213,7 @@ public void rebuildState(List recoveryMarkers) { // Process recovery markers to get the smallest offsets for each partition for (Object marker : recoveryMarkers) { if (marker instanceof Map) { + @SuppressWarnings("unchecked") Map markerMap = (Map) marker; for (Map.Entry entry : markerMap.entrySet()) { Long offset = diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index e5015a3a5..76ac47fc3 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -39,6 +39,7 @@ import org.apache.flink.agents.runtime.ResourceCache; import org.apache.flink.agents.runtime.actionstate.ActionState; import org.apache.flink.agents.runtime.actionstate.ActionStateStore; +import org.apache.flink.agents.runtime.actionstate.FlussActionStateStore; import org.apache.flink.agents.runtime.actionstate.KafkaActionStateStore; import org.apache.flink.agents.runtime.async.ContinuationActionExecutor; import org.apache.flink.agents.runtime.async.ContinuationContext; @@ -106,6 +107,7 @@ import static org.apache.flink.agents.api.configuration.AgentConfigOptions.BASE_LOG_DIR; import static org.apache.flink.agents.api.configuration.AgentConfigOptions.JOB_IDENTIFIER; import static org.apache.flink.agents.api.configuration.AgentConfigOptions.PRETTY_PRINT; +import static org.apache.flink.agents.runtime.actionstate.ActionStateStore.BackendType.FLUSS; import static org.apache.flink.agents.runtime.actionstate.ActionStateStore.BackendType.KAFKA; import static org.apache.flink.agents.runtime.utils.StateUtil.*; import static org.apache.flink.util.Preconditions.checkState; @@ -1136,11 +1138,16 @@ private EventLogger createEventLogger(AgentPlan agentPlan) { } private void maybeInitActionStateStore() { - if (actionStateStore == null - && KAFKA.getType() - .equalsIgnoreCase(agentPlan.getConfig().get(ACTION_STATE_STORE_BACKEND))) { + if (actionStateStore != null) { + return; + } + String backend = agentPlan.getConfig().get(ACTION_STATE_STORE_BACKEND); + if (KAFKA.getType().equalsIgnoreCase(backend)) { LOG.info("Using Kafka as backend of action state store."); actionStateStore = new KafkaActionStateStore(agentPlan.getConfig()); + } else if (FLUSS.getType().equalsIgnoreCase(backend)) { + LOG.info("Using Fluss as backend of action state store."); + actionStateStore = new FlussActionStateStore(agentPlan.getConfig()); } } diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerdeTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerdeTest.java index ca5103321..e0c4de0bf 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerdeTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerdeTest.java @@ -53,16 +53,13 @@ public void testActionStateSerializationDeserialization() throws Exception { originalState.addShortTermMemoryUpdate(shortTermMemoryUpdate); originalState.addEvent(outputEvent); - // Test Kafka seder/deserializer - ActionStateKafkaSeder seder = new ActionStateKafkaSeder(); - // Serialize - byte[] serialized = seder.serialize("test-topic", originalState); + byte[] serialized = ActionStateSerde.serialize(originalState); assertNotNull(serialized); assertTrue(serialized.length > 0); // Deserialize - ActionState deserializedState = seder.deserialize("test-topic", serialized); + ActionState deserializedState = ActionStateSerde.deserialize(serialized); assertNotNull(deserializedState); // Verify taskEvent @@ -102,10 +99,8 @@ public void testActionStateWithNullTaskEvent() throws Exception { originalState.addSensoryMemoryUpdate(memoryUpdate); // Test serialization/deserialization - ActionStateKafkaSeder seder = new ActionStateKafkaSeder(); - - byte[] serialized = seder.serialize("test-topic", originalState); - ActionState deserializedState = seder.deserialize("test-topic", serialized); + byte[] serialized = ActionStateSerde.serialize(originalState); + ActionState deserializedState = ActionStateSerde.deserialize(serialized); // Verify taskEvent is null assertNull(deserializedState.getTaskEvent()); @@ -128,10 +123,8 @@ public void testActionStateWithComplexAttributes() throws Exception { ActionState originalState = new ActionState(inputEvent); // Test serialization/deserialization - ActionStateKafkaSeder seder = new ActionStateKafkaSeder(); - - byte[] serialized = seder.serialize("test-topic", originalState); - ActionState deserializedState = seder.deserialize("test-topic", serialized); + byte[] serialized = ActionStateSerde.serialize(originalState); + ActionState deserializedState = ActionStateSerde.deserialize(serialized); // Verify complex attributes are preserved InputEvent deserializedInputEvent = (InputEvent) deserializedState.getTaskEvent(); @@ -157,10 +150,8 @@ public void testActionStateWithCallResults() throws Exception { originalState.addCallResult(result2); // Test serialization/deserialization - ActionStateKafkaSeder seder = new ActionStateKafkaSeder(); - - byte[] serialized = seder.serialize("test-topic", originalState); - ActionState deserializedState = seder.deserialize("test-topic", serialized); + byte[] serialized = ActionStateSerde.serialize(originalState); + ActionState deserializedState = ActionStateSerde.deserialize(serialized); // Verify call results assertEquals(2, deserializedState.getCallResultCount()); @@ -186,10 +177,8 @@ public void testActionStateWithPendingCallResult() throws Exception { ActionState originalState = new ActionState(inputEvent); originalState.addCallResult(CallResult.pending("module.func", "digest")); - ActionStateKafkaSeder seder = new ActionStateKafkaSeder(); - - byte[] serialized = seder.serialize("test-topic", originalState); - ActionState deserializedState = seder.deserialize("test-topic", serialized); + byte[] serialized = ActionStateSerde.serialize(originalState); + ActionState deserializedState = ActionStateSerde.deserialize(serialized); assertEquals(1, deserializedState.getCallResultCount()); CallResult result = deserializedState.getCallResult(0); @@ -215,10 +204,8 @@ public void testActionStateWithCompletedFlag() throws Exception { inputEvent, sensoryUpdates, shortTermUpdates, outputEvents, null, true); // Test serialization/deserialization - ActionStateKafkaSeder seder = new ActionStateKafkaSeder(); - - byte[] serialized = seder.serialize("test-topic", originalState); - ActionState deserializedState = seder.deserialize("test-topic", serialized); + byte[] serialized = ActionStateSerde.serialize(originalState); + ActionState deserializedState = ActionStateSerde.deserialize(serialized); // Verify completed flag assertTrue(deserializedState.isCompleted()); @@ -242,10 +229,8 @@ public void testActionStateInProgressWithCallResults() throws Exception { new ActionState(inputEvent, null, null, null, callResults, false); // Test serialization/deserialization - ActionStateKafkaSeder seder = new ActionStateKafkaSeder(); - - byte[] serialized = seder.serialize("test-topic", originalState); - ActionState deserializedState = seder.deserialize("test-topic", serialized); + byte[] serialized = ActionStateSerde.serialize(originalState); + ActionState deserializedState = ActionStateSerde.deserialize(serialized); // Verify state assertFalse(deserializedState.isCompleted()); @@ -261,10 +246,8 @@ public void testCallResultWithNullPayloads() throws Exception { ActionState originalState = new ActionState(inputEvent); originalState.addCallResult(new CallResult("func", "digest", null, null)); - ActionStateKafkaSeder seder = new ActionStateKafkaSeder(); - - byte[] serialized = seder.serialize("test-topic", originalState); - ActionState deserializedState = seder.deserialize("test-topic", serialized); + byte[] serialized = ActionStateSerde.serialize(originalState); + ActionState deserializedState = ActionStateSerde.deserialize(serialized); assertEquals(1, deserializedState.getCallResultCount()); CallResult result = deserializedState.getCallResult(0); @@ -310,9 +293,8 @@ public void testDeserializeLegacyCallResultWithoutStatus() throws Exception { + "\"completed\":false" + "}"; - ActionStateKafkaSeder seder = new ActionStateKafkaSeder(); ActionState deserializedState = - seder.deserialize("test-topic", json.getBytes(StandardCharsets.UTF_8)); + ActionStateSerde.deserialize(json.getBytes(StandardCharsets.UTF_8)); assertEquals(2, deserializedState.getCallResultCount()); @@ -326,4 +308,29 @@ public void testDeserializeLegacyCallResultWithoutStatus() throws Exception { assertArrayEquals( "exception".getBytes(StandardCharsets.UTF_8), legacyFailure.getExceptionPayload()); } + + @Test + public void testKafkaSederDelegatesToActionStateSerde() throws Exception { + InputEvent inputEvent = new InputEvent("test delegation"); + ActionState originalState = new ActionState(inputEvent); + + // Serialize via ActionStateSerde, deserialize via Kafka seder (and vice versa) + ActionStateKafkaSeder kafkaSeder = new ActionStateKafkaSeder(); + + byte[] serializedBySerde = ActionStateSerde.serialize(originalState); + byte[] serializedByKafka = kafkaSeder.serialize("test-topic", originalState); + + ActionState fromSerde = ActionStateSerde.deserialize(serializedByKafka); + ActionState fromKafka = kafkaSeder.deserialize("test-topic", serializedBySerde); + + // Both should produce identical results + assertArrayEquals(serializedBySerde, serializedByKafka); + assertEquals( + ((InputEvent) fromSerde.getTaskEvent()).getInput(), + ((InputEvent) fromKafka.getTaskEvent()).getInput()); + + // Kafka seder should handle nulls + assertNull(kafkaSeder.serialize("test-topic", null)); + assertNull(kafkaSeder.deserialize("test-topic", null)); + } } diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtilTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtilTest.java index fbe500b36..2a90c1f15 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtilTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtilTest.java @@ -17,10 +17,7 @@ */ package org.apache.flink.agents.runtime.actionstate; -import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.InputEvent; -import org.apache.flink.agents.api.context.RunnerContext; -import org.apache.flink.agents.plan.JavaFunction; import org.apache.flink.agents.plan.actions.Action; import org.junit.jupiter.api.Test; @@ -38,7 +35,7 @@ public class ActionStateUtilTest { public void testGenerateKeyConsistency() throws Exception { // Create test data Object key = "consistency-test"; - Action action = new TestAction("consistency-action"); + Action action = new NoOpAction("consistency-action"); InputEvent inputEvent = new InputEvent("same-input"); InputEvent inputEvent2 = new InputEvent("same-input"); @@ -54,7 +51,7 @@ public void testGenerateKeyConsistency() throws Exception { public void testGenerateKeyDifferentInputs() throws Exception { // Create test data Object key = "diff-test"; - Action action = new TestAction("diff-action"); + Action action = new NoOpAction("diff-action"); InputEvent inputEvent1 = new InputEvent("input1"); InputEvent inputEvent2 = new InputEvent("input2"); @@ -68,7 +65,7 @@ public void testGenerateKeyDifferentInputs() throws Exception { @Test public void testGenerateKeyWithNullKey() throws Exception { - Action action = new TestAction("test-action"); + Action action = new NoOpAction("test-action"); InputEvent inputEvent = new InputEvent("test-input"); assertThrows( @@ -93,7 +90,7 @@ public void testGenerateKeyWithNullAction() { @Test public void testGenerateKeyWithNullEvent() throws Exception { Object key = "test-key"; - Action action = new TestAction("test-action"); + Action action = new NoOpAction("test-action"); assertThrows( NullPointerException.class, @@ -106,7 +103,7 @@ public void testGenerateKeyWithNullEvent() throws Exception { public void testParseKeyValidKey() throws Exception { // Create test data and generate a key Object key = "test-key"; - Action action = new TestAction("test-action"); + Action action = new NoOpAction("test-action"); InputEvent inputEvent = new InputEvent("test-input"); long seqNum = 123; @@ -128,7 +125,7 @@ public void testParseKeyValidKey() throws Exception { public void testParseKeyRoundTrip() throws Exception { // Test that generate -> parse -> values match original inputs Object originalKey = "round-trip-test"; - Action action = new TestAction("round-trip-action"); + Action action = new NoOpAction("round-trip-action"); InputEvent inputEvent = new InputEvent("round-trip-input"); long seqNum = 456; @@ -176,7 +173,7 @@ public void testParseKeyWithInvalidFormat() { public void testParseKeyWithSpecialCharacters() throws Exception { // Test with keys containing special characters (but not the separator) Object key = "key-with-special@chars#123"; - Action action = new TestAction("action-with-special@chars"); + Action action = new NoOpAction("action-with-special@chars"); InputEvent inputEvent = new InputEvent("input-with-special@chars"); long seqNum = 789; @@ -190,7 +187,7 @@ public void testParseKeyWithSpecialCharacters() throws Exception { @Test public void testParseKeyConsistencyWithDifferentKeys() throws Exception { // Generate keys with different inputs and verify parsing consistency - Action action = new TestAction("consistency-action"); + Action action = new NoOpAction("consistency-action"); InputEvent inputEvent = new InputEvent("consistency-input"); String key1 = ActionStateUtil.generateKey("key1", 100, action, inputEvent); @@ -207,21 +204,4 @@ public void testParseKeyConsistencyWithDifferentKeys() throws Exception { assertEquals(parsed1.get(2), parsed2.get(2)); // Event UUID assertEquals(parsed1.get(3), parsed2.get(3)); // Action UUID } - - private static class TestAction extends Action { - - public static void doNothing(Event event, RunnerContext context) { - // No operation - } - - public TestAction(String name) throws Exception { - super( - name, - new JavaFunction( - TestAction.class.getName(), - "doNothing", - new Class[] {Event.class, RunnerContext.class}), - List.of(InputEvent.class.getName())); - } - } } diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreIT.java b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreIT.java new file mode 100644 index 000000000..0d4ddd062 --- /dev/null +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreIT.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.runtime.actionstate; + +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.InputEvent; +import org.apache.flink.agents.plan.AgentConfiguration; +import org.apache.flink.agents.plan.actions.Action; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS; +import static org.assertj.core.api.Assertions.assertThat; + +/** Integration tests for {@link FlussActionStateStore} against an embedded Fluss cluster. */ +public class FlussActionStateStoreIT { + + private static final String TEST_DATABASE = "test_flink_agents"; + private static final String TEST_TABLE = "action_state_it"; + private static final String TEST_KEY = "test-key"; + + @RegisterExtension + static final FlussClusterExtension FLUSS_CLUSTER = + FlussClusterExtension.builder().setNumOfTabletServers(1).build(); + + private FlussActionStateStore store; + private Action testAction; + private Event testEvent; + + @BeforeEach + void setUp() throws Exception { + AgentConfiguration config = createAgentConfiguration(); + store = new FlussActionStateStore(config); + + // Wait for table to be ready in the cluster + waitForTableReady(); + + testAction = new NoOpAction("test-action"); + testEvent = new InputEvent("test-data"); + } + + @AfterEach + void tearDown() throws Exception { + if (store != null) { + store.close(); + } + } + + // ==================== Basic CRUD ==================== + + @Test + void testPutAndGet() throws Exception { + ActionState state = new ActionState(testEvent); + + store.put(TEST_KEY, 1L, testAction, testEvent, state); + ActionState retrieved = store.get(TEST_KEY, 1L, testAction, testEvent); + + assertThat(retrieved).isNotNull(); + assertThat(retrieved.getTaskEvent()).isEqualTo(testEvent); + assertThat(retrieved.isCompleted()).isFalse(); + } + + @Test + void testGetNonExistent() throws Exception { + ActionState result = store.get(TEST_KEY, 999L, testAction, testEvent); + + assertThat(result).isNull(); + } + + @Test + void testMultipleSeqNums() throws Exception { + InputEvent event1 = new InputEvent("data-1"); + InputEvent event2 = new InputEvent("data-2"); + InputEvent event3 = new InputEvent("data-3"); + ActionState state1 = new ActionState(event1); + ActionState state2 = new ActionState(event2); + ActionState state3 = new ActionState(event3); + + store.put(TEST_KEY, 1L, testAction, testEvent, state1); + store.put(TEST_KEY, 2L, testAction, testEvent, state2); + store.put(TEST_KEY, 3L, testAction, testEvent, state3); + + assertThat(store.get(TEST_KEY, 1L, testAction, testEvent).getTaskEvent()).isEqualTo(event1); + assertThat(store.get(TEST_KEY, 2L, testAction, testEvent).getTaskEvent()).isEqualTo(event2); + assertThat(store.get(TEST_KEY, 3L, testAction, testEvent).getTaskEvent()).isEqualTo(event3); + } + + @Test + void testUpsertOverwrite() throws Exception { + ActionState original = new ActionState(new InputEvent("original")); + store.put(TEST_KEY, 1L, testAction, testEvent, original); + + InputEvent updatedEvent = new InputEvent("updated"); + ActionState updated = new ActionState(updatedEvent); + store.put(TEST_KEY, 1L, testAction, testEvent, updated); + + ActionState retrieved = store.get(TEST_KEY, 1L, testAction, testEvent); + assertThat(retrieved).isNotNull(); + assertThat(retrieved.getTaskEvent()).isEqualTo(updatedEvent); + } + + // ==================== Pruning ==================== + + @Test + void testPruneSingleKey() throws Exception { + store.put(TEST_KEY, 1L, testAction, testEvent, new ActionState(testEvent)); + store.put(TEST_KEY, 2L, testAction, testEvent, new ActionState(testEvent)); + store.put(TEST_KEY, 3L, testAction, testEvent, new ActionState(testEvent)); + + store.pruneState(TEST_KEY, 2L); + + // pruneState is synchronous (in-memory eviction) + // Check surviving entry first: get() with a missing key triggers divergence cleanup + // that removes entries with higher seqNums (same as Kafka backend behavior). + assertThat(store.get(TEST_KEY, 3L, testAction, testEvent)).isNotNull(); + assertThat(store.get(TEST_KEY, 1L, testAction, testEvent)).isNull(); + assertThat(store.get(TEST_KEY, 2L, testAction, testEvent)).isNull(); + } + + // ==================== Recovery ==================== + + @Test + @SuppressWarnings("unchecked") + void testRecoveryMarkerReturnsBucketOffsets() { + Object marker = store.getRecoveryMarker(); + assertThat(marker).isNotNull(); + assertThat(marker).isInstanceOf(Map.class); + Map bucketOffsets = (Map) marker; + // With 1 bucket configured, we should have exactly 1 entry + assertThat(bucketOffsets).hasSize(1); + assertThat(bucketOffsets).containsKey(0); + assertThat(bucketOffsets.get(0)).isGreaterThanOrEqualTo(0L); + } + + @Test + void testRebuildStateWithEmptyMarkersSkipsRebuild() throws Exception { + store.put(TEST_KEY, 1L, testAction, testEvent, new ActionState(testEvent)); + + // rebuildState with empty markers should skip rebuild (aligned with Kafka backend) + store.rebuildState(Collections.emptyList()); + + // The in-memory cache is not cleared when rebuild is skipped, + // so the state should still be accessible + assertThat(store.get(TEST_KEY, 1L, testAction, testEvent)).isNotNull(); + } + + @Test + @SuppressWarnings("unchecked") + void testRebuildStateWithRecoveryMarkers() throws Exception { + store.put(TEST_KEY, 1L, testAction, testEvent, new ActionState(testEvent)); + + // Capture recovery marker after writing data (simulates checkpoint boundary) + Object marker = store.getRecoveryMarker(); + + // Write more data after the marker (simulates writes between checkpoint and crash) + store.put(TEST_KEY, 2L, testAction, testEvent, new ActionState(testEvent)); + + // Close to ensure all writes are fully committed before recovery + store.close(); + + // Simulate recovery: new store instance + FlussActionStateStore recoveredStore = + new FlussActionStateStore(createAgentConfiguration()); + try { + // Rebuild using the marker; should replay from marker offset to current end + recoveredStore.rebuildState(List.of(marker)); + + // Check surviving entry first: get() with a missing key triggers divergence cleanup + // that removes entries with higher seqNums (same as Kafka backend behavior). + // Data written after the marker should be recovered + assertThat(recoveredStore.get(TEST_KEY, 2L, testAction, testEvent)).isNotNull(); + // Data written before the marker should NOT be in the rebuilt cache + assertThat(recoveredStore.get(TEST_KEY, 1L, testAction, testEvent)).isNull(); + } finally { + recoveredStore.close(); + // Prevent double-close in tearDown + store = null; + } + } + + @Test + void testPruneWorksAfterRecovery() throws Exception { + // Capture recovery marker BEFORE writing data. + Object marker = store.getRecoveryMarker(); + + store.put(TEST_KEY, 1L, testAction, testEvent, new ActionState(testEvent)); + store.put(TEST_KEY, 2L, testAction, testEvent, new ActionState(testEvent)); + store.put(TEST_KEY, 3L, testAction, testEvent, new ActionState(testEvent)); + store.close(); + + // Simulate recovery: new store instance + FlussActionStateStore recoveredStore = + new FlussActionStateStore(createAgentConfiguration()); + try { + // Rebuild state from the log using recovery markers + recoveredStore.rebuildState(List.of(marker)); + + assertThat(recoveredStore.get(TEST_KEY, 1L, testAction, testEvent)).isNotNull(); + assertThat(recoveredStore.get(TEST_KEY, 2L, testAction, testEvent)).isNotNull(); + assertThat(recoveredStore.get(TEST_KEY, 3L, testAction, testEvent)).isNotNull(); + + recoveredStore.pruneState(TEST_KEY, 2L); + + // Check surviving entry first (get() divergence cleanup side-effect) + assertThat(recoveredStore.get(TEST_KEY, 3L, testAction, testEvent)).isNotNull(); + assertThat(recoveredStore.get(TEST_KEY, 1L, testAction, testEvent)).isNull(); + assertThat(recoveredStore.get(TEST_KEY, 2L, testAction, testEvent)).isNull(); + } finally { + recoveredStore.close(); + } + } + + // ==================== Multi-bucket ==================== + + @Test + @SuppressWarnings("unchecked") + void testMultiBucketRecovery() throws Exception { + // Use a separate database/table with 4 buckets to test multi-bucket scenario + String multiDb = "test_flink_agents_multi"; + String multiTable = "action_state_multi"; + AgentConfiguration multiConfig = createAgentConfiguration(multiDb, multiTable, 4); + FlussActionStateStore multiStore = new FlussActionStateStore(multiConfig); + try { + waitForTableReady(multiDb, multiTable); + + // Write states with different keys (likely distributed across buckets) + Action action1 = new NoOpAction("multi-action"); + for (int i = 0; i < 10; i++) { + String key = "multi-key-" + i; + multiStore.put(key, 1L, action1, testEvent, new ActionState(testEvent)); + } + + // Verify all states are retrievable + for (int i = 0; i < 10; i++) { + String key = "multi-key-" + i; + assertThat(multiStore.get(key, 1L, action1, testEvent)).isNotNull(); + } + + // Recovery marker should contain all 4 buckets + Object marker = multiStore.getRecoveryMarker(); + assertThat(marker).isInstanceOf(Map.class); + Map bucketOffsets = (Map) marker; + assertThat(bucketOffsets).hasSize(4); + + // Write more data after marker + for (int i = 10; i < 15; i++) { + String key = "multi-key-" + i; + multiStore.put(key, 1L, action1, testEvent, new ActionState(testEvent)); + } + multiStore.close(); + + // Recover into a new store instance + FlussActionStateStore recoveredStore = new FlussActionStateStore(multiConfig); + try { + recoveredStore.rebuildState(List.of(marker)); + + // Data written after marker should be recovered + for (int i = 10; i < 15; i++) { + String key = "multi-key-" + i; + assertThat(recoveredStore.get(key, 1L, action1, testEvent)).isNotNull(); + } + } finally { + recoveredStore.close(); + } + } finally { + multiStore.close(); + // Prevent double-close in tearDown + store = null; + } + } + + // ==================== Helpers ==================== + + private AgentConfiguration createAgentConfiguration() { + return createAgentConfiguration(TEST_DATABASE, TEST_TABLE, 1); + } + + private AgentConfiguration createAgentConfiguration( + String database, String table, int buckets) { + AgentConfiguration config = new AgentConfiguration(); + config.set(FLUSS_BOOTSTRAP_SERVERS, FLUSS_CLUSTER.getBootstrapServers()); + config.set(FLUSS_ACTION_STATE_DATABASE, database); + config.set(FLUSS_ACTION_STATE_TABLE, table); + config.set(FLUSS_ACTION_STATE_TABLE_BUCKETS, buckets); + return config; + } + + private void waitForTableReady() throws Exception { + waitForTableReady(TEST_DATABASE, TEST_TABLE); + } + + private void waitForTableReady(String database, String table) throws Exception { + TablePath tablePath = TablePath.of(database, table); + try (org.apache.fluss.client.Connection conn = + org.apache.fluss.client.ConnectionFactory.createConnection( + FLUSS_CLUSTER.getClientConfig()); + Admin admin = conn.getAdmin()) { + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + FLUSS_CLUSTER.waitUntilTableReady(tableInfo.getTableId()); + } + } +} diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreSaslIT.java b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreSaslIT.java new file mode 100644 index 000000000..547a83fb4 --- /dev/null +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreSaslIT.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.runtime.actionstate; + +import org.apache.flink.agents.api.InputEvent; +import org.apache.flink.agents.plan.AgentConfiguration; +import org.apache.flink.agents.plan.actions.Action; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration tests for {@link FlussActionStateStore} with SASL/PLAIN authentication against an + * embedded Fluss cluster. + */ +public class FlussActionStateStoreSaslIT { + + private static final String TEST_DATABASE = "test_flink_agents_sasl"; + private static final String TEST_TABLE = "action_state_sasl_it"; + private static final String TEST_KEY = "test-key"; + private static final String SASL_USERNAME = "testuser"; + private static final String SASL_PASSWORD = "testpass"; + + @RegisterExtension + static final FlussClusterExtension FLUSS_CLUSTER = + FlussClusterExtension.builder() + .setNumOfTabletServers(1) + .setCoordinatorServerListeners("FLUSS://localhost:0, CLIENT://localhost:0") + .setTabletServerListeners("FLUSS://localhost:0, CLIENT://localhost:0") + .setClusterConf(createSaslClusterConfig()) + .build(); + + private FlussActionStateStore store; + + @BeforeEach + void setUp() throws Exception { + AgentConfiguration config = createSaslAgentConfiguration(); + store = new FlussActionStateStore(config); + } + + @AfterEach + void tearDown() throws Exception { + if (store != null) { + store.close(); + } + } + + @Test + void testPutAndGetWithSaslAuth() throws Exception { + Action testAction = new NoOpAction("sasl-action"); + InputEvent testEvent = new InputEvent("sasl-data"); + ActionState state = new ActionState(testEvent); + + store.put(TEST_KEY, 1L, testAction, testEvent, state); + ActionState retrieved = store.get(TEST_KEY, 1L, testAction, testEvent); + + assertThat(retrieved).isNotNull(); + assertThat(retrieved.getTaskEvent()).isEqualTo(testEvent); + } + + @Test + void testRecoveryWithSaslAuth() throws Exception { + Action testAction = new NoOpAction("sasl-recovery-action"); + InputEvent testEvent = new InputEvent("sasl-recovery-data"); + + // Write data and capture recovery marker + store.put(TEST_KEY, 1L, testAction, testEvent, new ActionState(testEvent)); + Object marker = store.getRecoveryMarker(); + + // Write more data after marker + store.put(TEST_KEY, 2L, testAction, testEvent, new ActionState(testEvent)); + store.close(); + + // Recover into a new store instance with SASL + FlussActionStateStore recoveredStore = + new FlussActionStateStore(createSaslAgentConfiguration()); + try { + recoveredStore.rebuildState(java.util.List.of(marker)); + + // Data written after marker should be recovered + assertThat(recoveredStore.get(TEST_KEY, 2L, testAction, testEvent)).isNotNull(); + // Data written before marker should NOT be in the rebuilt cache + assertThat(recoveredStore.get(TEST_KEY, 1L, testAction, testEvent)).isNull(); + } finally { + recoveredStore.close(); + store = null; + } + } + + @Test + void testMultipleWritesWithSaslAuth() throws Exception { + Action testAction = new NoOpAction("sasl-multi-action"); + InputEvent testEvent = new InputEvent("sasl-multi-data"); + + for (int i = 0; i < 5; i++) { + store.put("key-" + i, (long) i, testAction, testEvent, new ActionState(testEvent)); + } + + for (int i = 0; i < 5; i++) { + assertThat(store.get("key-" + i, (long) i, testAction, testEvent)).isNotNull(); + } + } + + // ==================== Helpers ==================== + + private static Configuration createSaslClusterConfig() { + Configuration conf = new Configuration(); + conf.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:sasl"); + conf.setString("security.sasl.enabled.mechanisms", "plain"); + conf.setString( + "security.sasl.plain.jaas.config", + "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required " + + " user_" + + SASL_USERNAME + + "=\"" + + SASL_PASSWORD + + "\";"); + return conf; + } + + private AgentConfiguration createSaslAgentConfiguration() { + AgentConfiguration config = new AgentConfiguration(); + String bootstrapServers = + String.join( + ",", + FLUSS_CLUSTER + .getClientConfig("CLIENT") + .get(ConfigOptions.BOOTSTRAP_SERVERS)); + config.set(FLUSS_BOOTSTRAP_SERVERS, bootstrapServers); + config.set(FLUSS_ACTION_STATE_DATABASE, TEST_DATABASE); + config.set(FLUSS_ACTION_STATE_TABLE, TEST_TABLE); + config.set(FLUSS_ACTION_STATE_TABLE_BUCKETS, 1); + config.set(FLUSS_SECURITY_PROTOCOL, "SASL"); + config.set(FLUSS_SASL_MECHANISM, "PLAIN"); + config.set(FLUSS_SASL_USERNAME, SASL_USERNAME); + config.set(FLUSS_SASL_PASSWORD, SASL_PASSWORD); + return config; + } +} diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreTest.java new file mode 100644 index 000000000..308ad3d42 --- /dev/null +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.runtime.actionstate; + +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.InputEvent; +import org.apache.flink.agents.plan.actions.Action; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.writer.AppendWriter; +import org.apache.fluss.row.InternalRow; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Unit tests for {@link FlussActionStateStore} store-level behavior. */ +public class FlussActionStateStoreTest { + + private static final String TEST_KEY = "test-key"; + + private AppendWriter mockWriter; + private FlussActionStateStore store; + private Action testAction; + private Event testEvent; + private ActionState testActionState; + private Map actionStates; + + @BeforeEach + void setUp() throws Exception { + mockWriter = mock(AppendWriter.class); + when(mockWriter.append(any(InternalRow.class))) + .thenReturn(CompletableFuture.completedFuture(null)); + + actionStates = new HashMap<>(); + store = + new FlussActionStateStore( + actionStates, mock(Connection.class), mock(Table.class), mockWriter); + + testAction = new NoOpAction("test-action"); + testEvent = new InputEvent("test data"); + testActionState = new ActionState(testEvent); + } + + @Test + void testPutActionState() throws Exception { + store.put(TEST_KEY, 1L, testAction, testEvent, testActionState); + + verify(mockWriter).append(any(InternalRow.class)); + + String stateKey = ActionStateUtil.generateKey(TEST_KEY, 1L, testAction, testEvent); + assertThat(actionStates).containsKey(stateKey); + assertThat(actionStates.get(stateKey)).isEqualTo(testActionState); + } + + @Test + void testPutActionStateWriterFailure() throws Exception { + when(mockWriter.append(any(InternalRow.class))) + .thenReturn(CompletableFuture.failedFuture(new IOException("connection lost"))); + + FlussActionStateStore failStore = + new FlussActionStateStore( + actionStates, mock(Connection.class), mock(Table.class), mockWriter); + + assertThatThrownBy( + () -> failStore.put(TEST_KEY, 1L, testAction, testEvent, testActionState)) + .isInstanceOf(Exception.class); + + // Cache should NOT be updated on write failure + String stateKey = ActionStateUtil.generateKey(TEST_KEY, 1L, testAction, testEvent); + assertThat(actionStates).doesNotContainKey(stateKey); + } + + @Test + void testGetTriggersDivergenceCleanup() throws Exception { + actionStates.put( + ActionStateUtil.generateKey(TEST_KEY, 1L, testAction, testEvent), testActionState); + actionStates.put( + ActionStateUtil.generateKey(TEST_KEY, 2L, testAction, testEvent), testActionState); + // diverge: same key+seqNum, different action + actionStates.put( + ActionStateUtil.generateKey(TEST_KEY, 2L, new NoOpAction("test-2"), testEvent), + testActionState); + actionStates.put( + ActionStateUtil.generateKey(TEST_KEY, 3L, testAction, testEvent), testActionState); + + store.get(TEST_KEY, 2L, new NoOpAction("test-1"), testEvent); + + // Divergence detected at seqNum 2 → removeIf clears seqNum > 2 + assertThat(store.get(TEST_KEY, 1L, testAction, testEvent)).isNotNull(); + assertThat(store.get(TEST_KEY, 2L, testAction, testEvent)).isNotNull(); + assertThat(store.get(TEST_KEY, 3L, testAction, testEvent)).isNull(); + } + + // ==================== rebuildState tests ==================== + + @Test + void testRebuildStateSkipsOnEmptyMarkers() throws Exception { + actionStates.put( + ActionStateUtil.generateKey(TEST_KEY, 1L, testAction, testEvent), testActionState); + + store.rebuildState(Collections.emptyList()); + + // Empty markers → rebuild is skipped, cache is NOT cleared + assertThat(actionStates).isNotEmpty(); + } + + @Test + void testRebuildStateSkipsOnNonMapMarker() throws Exception { + actionStates.put( + ActionStateUtil.generateKey(TEST_KEY, 1L, testAction, testEvent), testActionState); + + // A non-Map marker is ignored, resulting in empty bucketStartOffsets. + // Note: rebuildState clears the cache before checking offsets, + // so the cache will be empty even though no actual rebuild occurs. + store.rebuildState(List.of("invalid-marker")); + + assertThat(actionStates).isEmpty(); + } + + @Test + void testRebuildStateSkipsOnEmptyBucketOffsets() throws Exception { + actionStates.put( + ActionStateUtil.generateKey(TEST_KEY, 1L, testAction, testEvent), testActionState); + + // Empty map marker → no valid bucket offsets. + // Same as above: cache is cleared before the early-return check. + store.rebuildState(List.of(Map.of())); + + assertThat(actionStates).isEmpty(); + } + + @Test + void testCloseClosesResources() throws Exception { + Table mockTable = mock(Table.class); + Connection mockConnection = mock(Connection.class); + + FlussActionStateStore closeableStore = + new FlussActionStateStore(actionStates, mockConnection, mockTable, mockWriter); + + closeableStore.close(); + + verify(mockTable).close(); + verify(mockConnection).close(); + } +} diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStoreTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStoreTest.java index cd32524be..7cf829c00 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStoreTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStoreTest.java @@ -19,9 +19,7 @@ import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.InputEvent; -import org.apache.flink.agents.api.context.RunnerContext; import org.apache.flink.agents.plan.AgentConfiguration; -import org.apache.flink.agents.plan.JavaFunction; import org.apache.flink.agents.plan.actions.Action; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; @@ -76,7 +74,7 @@ void setUp() throws Exception { TEST_TOPIC); // Create test objects - testAction = new TestAction("test-action"); + testAction = new NoOpAction("test-action"); testEvent = new InputEvent("test data"); testActionState = new ActionState(testEvent); } @@ -107,7 +105,7 @@ void testGetNonExistentActionState() throws Exception { actionStates.put( ActionStateUtil.generateKey(TEST_KEY, 4L, testAction, testEvent), testActionState); - actionStateStore.get(TEST_KEY, 2L, new TestAction("test-1"), testEvent); + actionStateStore.get(TEST_KEY, 2L, new NoOpAction("test-1"), testEvent); assertNotNull(actionStateStore.get(TEST_KEY, 1L, testAction, testEvent)); assertNotNull(actionStateStore.get(TEST_KEY, 2L, testAction, testEvent)); @@ -123,7 +121,7 @@ void testGetActionStateWithDiverge() throws Exception { ActionStateUtil.generateKey(TEST_KEY, 2L, testAction, testEvent), testActionState); // diverge here actionStates.put( - ActionStateUtil.generateKey(TEST_KEY, 2L, new TestAction("test-2"), testEvent), + ActionStateUtil.generateKey(TEST_KEY, 2L, new NoOpAction("test-2"), testEvent), testActionState); actionStates.put( ActionStateUtil.generateKey(TEST_KEY, 3L, testAction, testEvent), testActionState); @@ -256,21 +254,4 @@ void testRebuildState() throws Exception { ActionStateUtil.generateKey(TEST_KEY, 3L, testAction, testEvent))) .isEqualTo(thirdState); } - - private static class TestAction extends Action { - - public static void doNothing(Event event, RunnerContext context) { - // No operation - } - - public TestAction(String name) throws Exception { - super( - name, - new JavaFunction( - TestAction.class.getName(), - "doNothing", - new Class[] {Event.class, RunnerContext.class}), - List.of(InputEvent.class.getName())); - } - } } diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/NoOpAction.java b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/NoOpAction.java new file mode 100644 index 000000000..cfc1f5d20 --- /dev/null +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/NoOpAction.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.runtime.actionstate; + +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.InputEvent; +import org.apache.flink.agents.api.context.RunnerContext; +import org.apache.flink.agents.plan.JavaFunction; +import org.apache.flink.agents.plan.actions.Action; + +import java.util.List; + +/** A no-op {@link Action} for use in action state store tests. */ +public class NoOpAction extends Action { + + public static void doNothing(Event event, RunnerContext context) { + // No operation + } + + public NoOpAction(String name) throws Exception { + super( + name, + new JavaFunction( + NoOpAction.class.getName(), + "doNothing", + new Class[] {Event.class, RunnerContext.class}), + List.of(InputEvent.class.getName())); + } +}