From e210b54a810d6edc1c7e8d438e355d7e4990b5b4 Mon Sep 17 00:00:00 2001 From: Sunil Ramchandra Pawar Date: Fri, 24 Apr 2026 09:58:23 +0530 Subject: [PATCH] Migrate index status propagation from gossip to table polling --- conf/cassandra.yaml | 3 + .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 10 + .../cassandra/index/IndexStatusManager.java | 233 +++++++++++++++++- .../schema/SystemDistributedKeyspace.java | 145 ++++++++++- .../cassandra/service/StorageService.java | 3 + .../test/sai/IndexAvailabilityTest.java | 101 ++++++++ .../index/IndexStatusManagerTest.java | 87 +++++++ 8 files changed, 570 insertions(+), 13 deletions(-) diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 5f3a935be3c..a8e7d56b5bf 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -957,6 +957,9 @@ index_summary_capacity: # Min unit: m index_summary_resize_interval: 60m +# How frequently (in seconds) to poll the index_events table for peer index status changes. +# index_status_poll_interval_in_seconds: 30 + # Whether to, when doing sequential writing, fsync() at intervals in # order to force the operating system to flush the dirty # buffers. Enable this to avoid sudden dirty buffer flushing from diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 2f290d8182e..a5b6d6a6f3d 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -944,6 +944,7 @@ public static void setClientMode(boolean clientMode) public volatile String default_secondary_index = CassandraIndex.NAME; public volatile boolean default_secondary_index_enabled = true; + public volatile int index_status_poll_interval_in_seconds = 30; public volatile boolean uncompressed_tables_enabled = true; public volatile boolean compact_tables_enabled = true; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 809a44fdcc6..0e54837caa4 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -4710,6 +4710,16 @@ public static void setDefaultSecondaryIndexEnabled(boolean enabled) conf.default_secondary_index_enabled = enabled; } + public static int getIndexStatusPollInterval() + { + return conf.index_status_poll_interval_in_seconds; + } + + public static void setIndexStatusPollInterval(int seconds) + { + conf.index_status_poll_interval_in_seconds = seconds; + } + public static boolean isTransientReplicationEnabled() { return conf.transient_replication_enabled; diff --git a/src/java/org/apache/cassandra/index/IndexStatusManager.java b/src/java/org/apache/cassandra/index/IndexStatusManager.java index c3a6f91ceb4..1ffb53d0048 100644 --- a/src/java/org/apache/cassandra/index/IndexStatusManager.java +++ b/src/java/org/apache/cassandra/index/IndexStatusManager.java @@ -18,12 +18,18 @@ package org.apache.cassandra.index; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -36,6 +42,9 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.exceptions.ReadFailureException; @@ -46,10 +55,12 @@ import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.schema.SystemDistributedKeyspace; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.utils.CassandraVersion; +import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JsonUtils; @@ -70,6 +81,14 @@ public class IndexStatusManager public static final IndexStatusManager instance = new IndexStatusManager(); + private static final int MAX_GOSSIP_VALUE_SIZE = 65535; + + public static final String TABLE_FALLBACK_MARKER = "TABLE_FALLBACK"; + + private volatile long lastPollTimestampMillis = 0; + + private ScheduledFuture pollFuture; + // executes index status propagation task asynchronously to avoid potential deadlock on SIM private final ExecutorPlus statusPropagationExecutor = executorFactory().withJmxInternal() .sequential("StatusPropagationExecutor"); @@ -163,7 +182,16 @@ public synchronized void receivePeerIndexStatus(InetAddressAndPort endpoint, Ver if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) return; - Map indexStatusMap = statusMapFromString(versionedValue); + Map indexStatusMap; + + if (versionedValue.value.equals(TABLE_FALLBACK_MARKER)) + { + indexStatusMap = SystemDistributedKeyspace.allIndexStatusesForHost(StorageService.instance.getHostIdForEndpoint(endpoint)); + } + else + { + indexStatusMap = statusMapFromString(versionedValue); + } Map oldStatus = peerIndexStatus.put(endpoint, indexStatusMap); Map updated = updatedIndexStatuses(oldStatus, indexStatusMap); @@ -229,29 +257,65 @@ public synchronized void propagateLocalIndexStatus(String keyspace, String index Map statusMap = peerIndexStatus.computeIfAbsent(FBUtilities.getBroadcastAddressAndPort(), k -> new HashMap<>()); String keyspaceIndex = identifier(keyspace, index); + UUID localHostId = StorageService.instance.getLocalHostUUID(); + CassandraVersion minVersion = ClusterMetadata.current().directory.clusterMinVersion.cassandraVersion; if (status == Index.Status.DROPPED) + { statusMap.remove(keyspaceIndex); + if (localHostId != null) + statusPropagationExecutor.submit(() -> { + if (shouldWriteToIndexTables(minVersion)) + { + SystemDistributedKeyspace.setIndexRemoved(localHostId, keyspace, index); + SystemDistributedKeyspace.recordIndexEvent(localHostId, keyspace, index, status); + } + }); + } else + { statusMap.put(keyspaceIndex, status); + if (localHostId != null) + statusPropagationExecutor.submit(() -> { + if (shouldWriteToIndexTables(minVersion)) + { + SystemDistributedKeyspace.updateIndexStatus(localHostId, keyspace, index, status); + SystemDistributedKeyspace.recordIndexEvent(localHostId, keyspace, index, status); + } + }); + } - // Don't try and propagate if the gossiper isn't enabled. This is primarily for tests where the - // Gossiper has not been started. If we attempt to propagate when not started an exception is - // logged and this causes a number of dtests to fail. - if (Gossiper.instance.isEnabled()) + // Only propagate via gossip when the gossiper is enabled and the cluster is not fully on 6.0+. + // Once all nodes are on 6.0+, index status is propagated via table polling instead. + if (Gossiper.instance.isEnabled() && !shouldWriteToIndexTables(minVersion)) { // Versions 5.0.0 through 5.0.2 use a much more bloated format that duplicates keyspace names // and writes full status names instead of their numeric codes. If the minimum cluster version is // unknown or one of those 3 versions, continue to propagate the old format. - CassandraVersion minVersion = ClusterMetadata.current().directory.clusterMinVersion.cassandraVersion; - - String newSerializedStatusMap = shouldWriteLegacyStatusFormat(minVersion) ? JsonUtils.writeAsJsonString(statusMap) + String newSerializedStatusMap = shouldWriteLegacyStatusFormat(minVersion) ? JsonUtils.writeAsJsonString(statusMap) : toSerializedFormat(statusMap); + byte[] utf8Bytes = newSerializedStatusMap.getBytes(StandardCharsets.UTF_8); + String gossipPayload; + + if (utf8Bytes.length > MAX_GOSSIP_VALUE_SIZE) + { + logger.error("Index status gossip payload size ({} bytes) exceeds limit ({} bytes), please consider removing unwanted indexes.", + utf8Bytes.length, MAX_GOSSIP_VALUE_SIZE); + gossipPayload = TABLE_FALLBACK_MARKER; + } + else + { + if (utf8Bytes.length > MAX_GOSSIP_VALUE_SIZE * 0.8) + logger.warn("Index status gossip payload size ({} bytes) approaching the limit ({} bytes), please consider removing unwanted indexes.", + utf8Bytes.length, MAX_GOSSIP_VALUE_SIZE); + gossipPayload = newSerializedStatusMap; + } + statusPropagationExecutor.submit(() -> { // schedule gossiper update asynchronously to avoid potential deadlock when another thread is holding // gossiper taskLock. - VersionedValue value = StorageService.instance.valueFactory.indexStatus(newSerializedStatusMap); + VersionedValue value = StorageService.instance.valueFactory.indexStatus(gossipPayload); Gossiper.instance.addLocalApplicationState(ApplicationState.INDEX_STATUS, value); }); } @@ -267,6 +331,29 @@ private static boolean shouldWriteLegacyStatusFormat(CassandraVersion minVersion return minVersion == null || (minVersion.major == 5 && minVersion.minor == 0 && minVersion.patch < 3); } + private static boolean shouldWriteToIndexTables(CassandraVersion minVersion) + { + return minVersion != null && (minVersion.major >= 6); + } + + @VisibleForTesting + static boolean shouldWriteToIndexTablesForTesting(CassandraVersion minVersion) + { + return shouldWriteToIndexTables(minVersion); + } + + @VisibleForTesting + void processEventsForTesting(UntypedResultSet results) + { + processEvents(results); + } + + @VisibleForTesting + void resetLastPollTimestamp() + { + lastPollTimestampMillis = 0; + } + /** * Serializes as a JSON string the status of the indexes in the provided map. *

@@ -340,8 +427,136 @@ private String identifier(String keyspace, String index) return keyspace + '.' + index; } + /** + * Load index statuses from the system_distributed.index_build_status table on startup + * so that index statuses are known before gossip starts. + */ + public synchronized void loadIndexStatusesFromTable() + { + try + { + Map> allStatuses = SystemDistributedKeyspace.allIndexStatuses(); + for (Map.Entry> entry : allStatuses.entrySet()) + { + InetAddressAndPort endpoint = StorageService.instance.getEndpointForHostId(entry.getKey()); + if (endpoint == null) + continue; + + peerIndexStatus.putIfAbsent(endpoint, entry.getValue()); + } + logger.info("Loaded index statuses from system table for {} peers", allStatuses.size()); + } + catch (Exception e) + { + logger.warn("Unable to load index statuses from system table: {}", e.getMessage()); + } + } + + /** + * Refresh index statuses from the full table, overwriting any existing data. + */ + public synchronized void refreshFromFullTable() + { + try + { + Map> allStatuses = SystemDistributedKeyspace.allIndexStatuses(); + for (Map.Entry> entry : allStatuses.entrySet()) + { + InetAddressAndPort endpoint = StorageService.instance.getEndpointForHostId(entry.getKey()); + if (endpoint == null) + continue; + + peerIndexStatus.put(endpoint, entry.getValue()); + } + logger.info("Refreshed index statuses from system table for {} peers", allStatuses.size()); + } + catch (Exception e) + { + logger.warn("Unable to refresh index statuses from system table: {}", e.getMessage()); + } + } + public void shutdownAndWait(long interval, TimeUnit unit) throws InterruptedException, TimeoutException { + if (pollFuture != null) + pollFuture.cancel(false); + ExecutorUtils.shutdownAndWait(interval, unit, statusPropagationExecutor); } + + public void startPolling() + { + int intervalSeconds = DatabaseDescriptor.getIndexStatusPollInterval(); + if (intervalSeconds <= 0) + return; + + pollFuture = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay( + this::pollIndexEvents, + intervalSeconds, + intervalSeconds, + TimeUnit.SECONDS); + } + + private synchronized void pollIndexEvents() + { + if (lastPollTimestampMillis == 0) + { + refreshFromFullTable(); + lastPollTimestampMillis = Clock.Global.currentTimeMillis(); + } + else + { + try + { + String today = LocalDate.now(ZoneOffset.UTC).toString(); + String lastPollDate = Instant.ofEpochMilli(lastPollTimestampMillis).atZone(ZoneOffset.UTC).toLocalDate().toString(); + + if (!today.equals(lastPollDate)) + { + // midnight crossed so get remaining events from last day. + UntypedResultSet yesterdayResults = SystemDistributedKeyspace.queryIndexEvents(lastPollDate, lastPollTimestampMillis); + if (yesterdayResults != null) + processEvents(yesterdayResults); + } + + UntypedResultSet todayResults = SystemDistributedKeyspace.queryIndexEvents(today, lastPollTimestampMillis); + if (todayResults != null) + processEvents(todayResults); + + lastPollTimestampMillis = Clock.Global.currentTimeMillis(); + } + catch (Exception e) + { + logger.warn("Unable to load index events from system table: {}", e.getMessage()); + } + } + } + + private void processEvents(UntypedResultSet results) + { + for (UntypedResultSet.Row row : results) + { + UUID hostId = row.getUUID("host_id"); + if ((hostId == null) || hostId.equals(StorageService.instance.getLocalHostUUID())) + continue; + + InetAddressAndPort endpoint = StorageService.instance.getEndpointForHostId(hostId); + if (endpoint == null) + continue; + + String indexName = row.getString("index_name"); + Index.Status status = Index.Status.valueOf(row.getString("event")); + + if (status == Index.Status.DROPPED) + { + Map statusMap = peerIndexStatus.get(endpoint); + if (statusMap != null) + statusMap.remove(indexName); + } + else + { + peerIndexStatus.computeIfAbsent(endpoint, k -> new HashMap<>()).put(indexName, status); + } + } + } } diff --git a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java index 04ccb543454..3f842687b85 100644 --- a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java @@ -20,6 +20,8 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -55,6 +57,7 @@ import org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.index.Index; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.CommonRange; import org.apache.cassandra.repair.messages.RepairOption; @@ -89,10 +92,11 @@ private SystemDistributedKeyspace() * gen 6: add denylist table * gen 7: add auto_repair_history and auto_repair_priority tables for AutoRepair feature * gen 8: add compression_dictionaries for dictionary-based compression algorithms (e.g. zstd) + * gen 9: add index_build_status and index_events tables for CASSANDRA-21264 * * // TODO: TCM - how do we evolve these tables? */ - public static final long GENERATION = 8; + public static final long GENERATION = 9; public static final String REPAIR_HISTORY = "repair_history"; @@ -108,10 +112,15 @@ private SystemDistributedKeyspace() public static final String COMPRESSION_DICTIONARIES = "compression_dictionaries"; + public static final String INDEX_BUILD_STATUS = "index_build_status"; + + public static final String INDEX_EVENTS = "index_events"; + public static final Set TABLE_NAMES = ImmutableSet.of(REPAIR_HISTORY, PARENT_REPAIR_HISTORY, VIEW_BUILD_STATUS, PARTITION_DENYLIST_TABLE, AUTO_REPAIR_HISTORY, AUTO_REPAIR_PRIORITY, - COMPRESSION_DICTIONARIES); + COMPRESSION_DICTIONARIES, INDEX_BUILD_STATUS, + INDEX_EVENTS); public static final String REPAIR_HISTORY_CQL = "CREATE TABLE IF NOT EXISTS %s (" + "keyspace_name text," @@ -210,6 +219,30 @@ private SystemDistributedKeyspace() private static final TableMetadata CompressionDictionariesTable = parse(COMPRESSION_DICTIONARIES, "Compression dictionaries for applicable tables", COMPRESSION_DICTIONARIES_CQL).build(); + public static final String INDEX_BUILD_STATUS_CQL = "CREATE TABLE IF NOT EXISTS %s (" + + "host_id uuid," + + "keyspace_name text," + + "index_name text," + + "status text," + + "PRIMARY KEY (host_id, keyspace_name, index_name))"; + private static final TableMetadata IndexBuildStatus = + parse(INDEX_BUILD_STATUS, "Index build status", INDEX_BUILD_STATUS_CQL).build(); + + private static final String INDEX_EVENTS_CQL = "CREATE TABLE IF NOT EXISTS %s (" + + "date text," + + "event_time timestamp," + + "index_name text," + + "host_id UUID," + + "event text," + + "PRIMARY KEY (date, event_time, index_name, host_id)) " + + "WITH CLUSTERING ORDER BY (event_time DESC)"; + + private static final TableMetadata IndexEventsTable = + parse(INDEX_EVENTS, "Index events for applicable tables", INDEX_EVENTS_CQL) + .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7)) + .compaction(CompactionParams.twcs(ImmutableMap.of("compaction_window_unit","DAYS", + "compaction_window_size","1"))).build(); + private static TableMetadata.Builder parse(String table, String description, String cql) { return CreateTableStatement.parse(format(cql, table), SchemaConstants.DISTRIBUTED_KEYSPACE_NAME) @@ -223,9 +256,9 @@ public static KeyspaceMetadata metadata() return KeyspaceMetadata.create(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, KeyspaceParams.simple(Math.max(DEFAULT_RF, DatabaseDescriptor.getDefaultKeyspaceRF())), Tables.of(RepairHistory, ParentRepairHistory, - ViewBuildStatus, PartitionDenylistTable, + ViewBuildStatus, IndexBuildStatus, PartitionDenylistTable, AutoRepairHistoryTable, AutoRepairPriorityTable, - CompressionDictionariesTable)); + CompressionDictionariesTable, IndexEventsTable)); } public static void startParentRepair(TimeUUID parent_id, String keyspaceName, String[] cfnames, RepairOption options) @@ -410,6 +443,110 @@ public static void setViewRemoved(String keyspaceName, String viewName) forceBlockingFlush(VIEW_BUILD_STATUS, ColumnFamilyStore.FlushReason.INTERNALLY_FORCED); } + public static void updateIndexStatus(UUID hostId, String keyspace, String index, Index.Status status) + { + String query = "INSERT INTO %s.%s (host_id, keyspace_name, index_name, status) VALUES (?, ?, ?, ?)"; + QueryProcessor.process(format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, INDEX_BUILD_STATUS), + ConsistencyLevel.ONE, + Lists.newArrayList(bytes(hostId), + bytes(keyspace), + bytes(index), + bytes(status.toString()))); + } + + public static void setIndexRemoved(UUID hostId, String keyspaceName, String indexName) + { + String buildReq = "DELETE FROM %s.%s WHERE host_id = ? AND keyspace_name = ? AND index_name = ?"; + QueryProcessor.executeInternal(format(buildReq, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, INDEX_BUILD_STATUS), + hostId, keyspaceName, indexName); + forceBlockingFlush(INDEX_BUILD_STATUS, ColumnFamilyStore.FlushReason.INTERNALLY_FORCED); + } + + public static Map> allIndexStatuses() + { + String query = "SELECT host_id, keyspace_name, index_name, status FROM %s.%s"; + UntypedResultSet results; + + try + { + results = QueryProcessor.execute(format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, INDEX_BUILD_STATUS), + ConsistencyLevel.ONE); + } + catch (Exception e) + { + logger.warn("Unable to load index statuses from system table: {}", e.getMessage()); + return Collections.emptyMap(); + } + + Map> allStatuses = new HashMap<>(); + for (UntypedResultSet.Row row : results) + { + UUID hostId = row.getUUID("host_id"); + String identifier = row.getString("keyspace_name") + '.' + row.getString("index_name"); + Index.Status status = Index.Status.valueOf(row.getString("status")); + allStatuses.computeIfAbsent(hostId, k -> new HashMap<>()).put(identifier, status); + } + + return allStatuses; + } + + public static Map allIndexStatusesForHost(UUID hostId) + { + String query = "SELECT keyspace_name, index_name, status FROM %s.%s WHERE host_id = ?"; + UntypedResultSet results; + + try + { + results = QueryProcessor.execute(format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, INDEX_BUILD_STATUS), + ConsistencyLevel.ONE, + hostId); + } + catch (Exception e) + { + logger.warn("Unable to load index statuses from system table for host {}: {}", hostId, e.getMessage()); + return Collections.emptyMap(); + } + + Map statuses = new HashMap<>(); + for (UntypedResultSet.Row row : results) + { + statuses.put(row.getString("keyspace_name") + '.' + row.getString("index_name"), + Index.Status.valueOf(row.getString("status"))); + } + + return statuses; + } + + public static void recordIndexEvent(UUID hostId, String keyspace, String index, Index.Status status) + { + String query = "INSERT INTO %s.%s (date, event_time, index_name, host_id, event) VALUES (?, to_timestamp(now()), ?, ?, ?)"; + QueryProcessor.process(format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, INDEX_EVENTS), + ConsistencyLevel.ONE, + Lists.newArrayList(bytes(LocalDate.now(ZoneOffset.UTC).toString()), + bytes(keyspace + '.' + index), + bytes(hostId), + bytes(status.toString()))); + } + + public static UntypedResultSet queryIndexEvents(String date, long sinceTimestampMillis) + { + String query = "SELECT index_name, host_id, event FROM %s.%s WHERE date = ? AND event_time > ? "; + UntypedResultSet status; + try + { + status = QueryProcessor.execute(format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, INDEX_EVENTS), + ConsistencyLevel.ONE, + date, + new java.util.Date(sinceTimestampMillis)); + } + catch (Exception e) + { + return null; + } + + return status; + } + /** * Stores a compression dictionary for a given keyspace and table in the distributed system keyspace. * diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index dc9d9e3f680..27ca6a6931c 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -848,6 +848,9 @@ public void runMayThrow() throws InterruptedException, ExecutionException, IOExc } Gossiper.waitToSettle(); + IndexStatusManager.instance.loadIndexStatusesFromTable(); + IndexStatusManager.instance.startPolling(); + NodeId self = Register.maybeRegister(); if (!AccordService.isSetupOrStarting()) AccordService.localStartup(self); diff --git a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java index 0bdc4a19a3a..bde10886b11 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java @@ -21,6 +21,7 @@ import java.net.InetAddress; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -37,6 +38,7 @@ import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.LogAction; import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.index.Index; import org.apache.cassandra.index.IndexStatusManager; @@ -44,6 +46,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SystemDistributedKeyspace; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.FBUtilities; @@ -54,6 +57,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class IndexAvailabilityTest extends TestBaseImpl { @@ -401,6 +406,102 @@ public int hashCode() } } + @Test + public void verifyIndexStatusPropagationViaTablePolling() throws Exception + { + try (Cluster cluster = init(Cluster.build(2) + .withConfig(config -> config.with(GOSSIP).with(NETWORK) + .set("index_status_poll_interval_in_seconds", 5)).start())) + { + String ks = "poll_test_ks"; + String cf = "cf1"; + String index = "cf1_poll_idx"; + + cluster.schemaChange(String.format(CREATE_KEYSPACE, ks, 2)); + cluster.schemaChange(String.format(CREATE_TABLE, ks, cf)); + cluster.schemaChange(String.format(CREATE_INDEX, index, ks, cf, "v1")); + waitForIndexQueryable(cluster, ks); + + await().atMost(15, TimeUnit.SECONDS) + .until(() -> cluster.get(2).callOnInstance(() -> { + InetAddressAndPort node1Address = InetAddressAndPort.getByNameUnchecked("127.0.0.1"); + int port = FBUtilities.getBroadcastAddressAndPort().getPort(); + InetAddressAndPort node1 = InetAddressAndPort.getByAddressOverrideDefaults(node1Address.getAddress(), port); + return IndexStatusManager.instance.getIndexStatus(node1, ks, index) == Index.Status.BUILD_SUCCEEDED; + })); + } + } + + + @Test + public void verifyMaxSizeIndexTest() throws Exception + { + try (Cluster cluster = init(Cluster.build(1) + .withConfig(config -> config.with(GOSSIP).with(NETWORK)) + .withInstanceInitializer(MixedPatchVersionHelper::setVersions) + .start())) + { + LogAction logs = cluster.get(1).logs(); + long mark = logs.mark(); + + cluster.get(1).runOnInstance(() -> { + Map localStatusMap = + IndexStatusManager.instance.peerIndexStatus + .computeIfAbsent(FBUtilities.getBroadcastAddressAndPort(), k -> new HashMap<>()); + + for (int ks = 0; ks < 100; ks++) + for (int idx = 0; idx < 200; idx++) + localStatusMap.put("keyspace_" + ks + ".my_table_index_name" + idx, Index.Status.BUILD_SUCCEEDED); + + IndexStatusManager.instance.propagateLocalIndexStatus("keyspace_trigger", "trigger_idx", Index.Status.BUILD_SUCCEEDED); + }); + + assertFalse(logs.watchFor(mark, "exceeds limit").getResult().isEmpty()); + } + } + + + @Test + public void verifyMixedVersionSkipsTableWritesAndUsesGossip() throws Exception + { + try (Cluster cluster = init(Cluster.build(2) + .withConfig(config -> config.with(GOSSIP).with(NETWORK)) + .withInstanceInitializer(MixedMajorVersionHelper::setVersions) + .start())) + { + String ks = "mixed_ks"; + String cf = "cf1"; + String index1 = "cf1_idx1"; + + cluster.schemaChange(String.format(CREATE_KEYSPACE, ks, 2)); + cluster.schemaChange(String.format(CREATE_TABLE, ks, cf)); + cluster.schemaChange(String.format(CREATE_INDEX, index1, ks, cf, "v1")); + waitForIndexQueryable(cluster, ks); + + waitForIndexingStatus(cluster.get(2), ks, index1, cluster.get(1), Index.Status.BUILD_SUCCEEDED); + + cluster.get(1).runOnInstance(() -> { + java.util.Map> allStatuses = + SystemDistributedKeyspace.allIndexStatuses(); + assertTrue("index_build_status table should be empty in mixed-version cluster, but has " + allStatuses.size() + " entries", + allStatuses.isEmpty()); + }); + + markIndexNonQueryable(cluster.get(1), ks, cf, index1); + waitForIndexingStatus(cluster.get(2), ks, index1, cluster.get(1), Index.Status.BUILD_FAILED); + + cluster.get(1).runOnInstance(() -> { + java.util.Map> allStatuses = + SystemDistributedKeyspace.allIndexStatuses(); + assertTrue("index_build_status table should still be empty in mixed-version cluster", + allStatuses.isEmpty()); + }); + + markIndexQueryable(cluster.get(1), ks, cf, index1); + waitForIndexingStatus(cluster.get(2), ks, index1, cluster.get(1), Index.Status.BUILD_SUCCEEDED); + } + } + public static class MixedMajorVersionHelper { @SuppressWarnings({ "unused", "resource" }) diff --git a/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java b/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java index 39401ac1bc4..8d8c776c2dd 100644 --- a/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java +++ b/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java @@ -18,7 +18,10 @@ package org.apache.cassandra.index; +import java.io.UTFDataFormatException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,6 +37,7 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; @@ -41,11 +45,15 @@ import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.JsonUtils; import static org.apache.cassandra.locator.ReplicaUtils.full; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; @RunWith(Parameterized.class) public class IndexStatusManagerTest @@ -438,4 +446,83 @@ ConsistencyLevel mockedConsistencyLevel(int required) Mockito.when(mock.blockFor(Mockito.any())).thenReturn(required); return mock; } + + @Test + public void shouldFailWhenTooManyIndexesExceedGossipLimit() + { + Map statusMap = new HashMap<>(); + for (int ks = 0; ks < 100; ks++) + { + for (int idx = 0; idx < 200; idx++) + { + statusMap.put("keyspace_" + ks + ".my_table_index_name" + idx, Index.Status.BUILD_SUCCEEDED); + } + } + + String serialized = IndexStatusManager.toSerializedFormat(statusMap); + byte[] utf8Bytes = serialized.getBytes(StandardCharsets.UTF_8); + + assertTrue("Serialized size " + utf8Bytes.length + " should exceed 65535", + utf8Bytes.length > 65535); + + VersionedValue value = VersionedValue.unsafeMakeVersionedValue(serialized, 1); + DataOutputBuffer out = new DataOutputBuffer(); + + assertThatThrownBy(() -> VersionedValue.serializer.serialize(value, out, 0)) + .isInstanceOf(UTFDataFormatException.class); + } + + @Test + public void testVersionGatingAllowsWriteFor6x() + { + assertTrue(IndexStatusManager.shouldWriteToIndexTablesForTesting(new CassandraVersion("6.0.0"))); + assertTrue(IndexStatusManager.shouldWriteToIndexTablesForTesting(new CassandraVersion("6.1.0"))); + assertTrue(IndexStatusManager.shouldWriteToIndexTablesForTesting(new CassandraVersion("7.0.0"))); + } + + @Test + public void testVersionGatingBlocksWriteForPre6x() + { + assertFalse(IndexStatusManager.shouldWriteToIndexTablesForTesting(new CassandraVersion("5.0.0"))); + assertFalse(IndexStatusManager.shouldWriteToIndexTablesForTesting(new CassandraVersion("5.0.2"))); + assertFalse(IndexStatusManager.shouldWriteToIndexTablesForTesting(new CassandraVersion("4.1.0"))); + } + + @Test + public void testProcessEventsUpdatesPeerIndexStatus() + { + IndexStatusManager manager = IndexStatusManager.instance; + InetAddressAndPort peer = InetAddressAndPort.getByNameUnchecked("127.0.0.100"); + + Map peerStatuses = new HashMap<>(); + manager.peerIndexStatus.put(peer, peerStatuses); + + assertEquals(Index.Status.UNKNOWN, manager.getIndexStatus(peer, "ks1", "idx1")); + peerStatuses.put("ks1.idx1", Index.Status.BUILD_SUCCEEDED); + assertEquals(Index.Status.BUILD_SUCCEEDED, manager.getIndexStatus(peer, "ks1", "idx1")); + } + + @Test + public void testProcessEventsHandlesDropped() + { + IndexStatusManager manager = IndexStatusManager.instance; + InetAddressAndPort peer = InetAddressAndPort.getByNameUnchecked("127.0.0.101"); + + Map peerStatuses = new HashMap<>(); + peerStatuses.put("ks1.idx1", Index.Status.BUILD_SUCCEEDED); + manager.peerIndexStatus.put(peer, peerStatuses); + + assertEquals(Index.Status.BUILD_SUCCEEDED, manager.getIndexStatus(peer, "ks1", "idx1")); + + peerStatuses.remove("ks1.idx1"); + + assertEquals(Index.Status.UNKNOWN, manager.getIndexStatus(peer, "ks1", "idx1")); + } + + @Test + public void testResetLastPollTimestamp() + { + IndexStatusManager manager = IndexStatusManager.instance; + manager.resetLastPollTimestamp(); + } }