Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d8490b0
meatadata store bits part 1
capistrant Dec 13, 2025
3d2d423
annotate segments with compaction fingerprint before persist
capistrant Dec 13, 2025
48854f4
Add ability to generate compaction state fingerprint
capistrant Dec 13, 2025
c6a3367
add fingerprint to task context and make legacy last compaction state…
capistrant Dec 13, 2025
f3b706e
update embedded tests for compaction supervisors to flex fingerprints
capistrant Dec 13, 2025
46fb807
checkpoint with persisting compaction states
capistrant Dec 13, 2025
0fef358
add duty to clean up unused compaction states
capistrant Dec 14, 2025
edeaf30
take fingerprints into account in CompactionStatus
capistrant Dec 14, 2025
97daf3f
Add and improve tests
capistrant Dec 15, 2025
dbcdfcf
get rid of some todo comments
capistrant Dec 15, 2025
38f6d15
fix checkstyle
capistrant Dec 15, 2025
4cf1197
cleanup some more TODO
capistrant Dec 15, 2025
ba269bd
Add some docs
capistrant Dec 15, 2025
f168bc9
update web console
capistrant Dec 15, 2025
2292b15
make cache size configurable and fix some spelling
capistrant Dec 15, 2025
74c8ebc
fixup use of deprecated builder
capistrant Dec 15, 2025
adac5ec
fix checktyle
capistrant Dec 15, 2025
4fb3a9c
fix coordinator compactsegments duty and respond to self review comments
capistrant Dec 15, 2025
708c6f8
fix spellchecker
capistrant Dec 15, 2025
03bb14a
predates is a word
capistrant Dec 16, 2025
a262f79
improve some javadocs
capistrant Dec 16, 2025
6126e22
simplify some test assertions based on review
capistrant Dec 16, 2025
b78ec13
better naming
capistrant Dec 16, 2025
78f115e
controller impl cleanup
capistrant Dec 16, 2025
f06d715
For compaction supervisors, take persisting pending compaction states…
capistrant Dec 16, 2025
d571e43
use Configs.valueOrDefault helper in data segment
capistrant Dec 16, 2025
07afc2f
Refactor where fingerprinting happens and how the object mapper is wi…
capistrant Dec 16, 2025
12ea741
refactor CompactionStateManager into an interface with a persisted an…
capistrant Dec 16, 2025
f57527a
Merge branch 'master' into compaction-fingerprinting
capistrant Dec 22, 2025
858cbd3
remove fingerprinting support from the coordinator compact segments duty
capistrant Dec 22, 2025
9afab2f
Move on heap compaction state manager to test sources
capistrant Dec 22, 2025
34a8a11
CompactionStateManager is now overlord only
capistrant Dec 22, 2025
7214418
Refactor how the compaction state fingerprint cache is wired up
capistrant Jan 2, 2026
6a9743a
Merge branch 'master' into compaction-fingerprinting
capistrant Jan 2, 2026
cd55b0e
prettify
capistrant Jan 4, 2026
58724cc
small changes after self-review
capistrant Jan 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.common.collect.ImmutableList;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.metadata.HeapMemoryCompactionStateManager;
import org.apache.druid.segment.metadata.NoopCompactionStateCache;
import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
import org.apache.druid.server.compaction.CompactionSegmentIterator;
import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
Expand Down Expand Up @@ -135,7 +137,9 @@ public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
policy,
compactionConfigs,
dataSources,
Collections.emptyMap()
Collections.emptyMap(),
new HeapMemoryCompactionStateManager(),
new NoopCompactionStateCache()
);
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
blackhole.consume(iterator.next());
Expand Down
1 change: 1 addition & 0 deletions docs/api-reference/automatic-compaction-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@ This includes the following fields:
|`compactionPolicy`|Policy to choose intervals for compaction. Currently, the only supported policy is [Newest segment first](#compaction-policy-newestsegmentfirst).|Newest segment first|
|`useSupervisors`|Whether compaction should be run on Overlord using supervisors instead of Coordinator duties.|false|
|`engine`|Engine used for running compaction tasks, unless overridden in the datasource-level compaction config. Possible values are `native` and `msq`. `msq` engine can be used for compaction only if `useSupervisors` is `true`.|`native`|
|`legacyPersistLastCompactionStateInSegments`|Whether to persist the full compaction state in segment metadata. When `true` (default), compaction state is stored in both the segment metadata and the compaction states table. This is historically how Druid has worked. When `false`, only a fingerprint reference is stored in the segment metadata, reducing storage overhead in the segments table. The actual compaction state is stored in the compaction states table and can be referenced with the aforementioned fingerprint. Eventually this configuration will be removed and all compaction will use the fingerprint method only. This configuration exists for operators to opt into this future pattern early. **WARNING: if you set this to false and then compact data, rolling back to a Druid version that predates compaction state fingerprinting (< Druid 36) will result in missing compaction states and trigger compaction on segments that may already be compacted.**|`true`|

#### Compaction policy `newestSegmentFirst`

Expand Down
1 change: 1 addition & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ These properties specify the JDBC connection and other configuration around the
|`druid.metadata.storage.tables.segments`|The table to use to look for segments.|`druid_segments`|
|`druid.metadata.storage.tables.rules`|The table to use to look for segment load/drop rules.|`druid_rules`|
|`druid.metadata.storage.tables.config`|The table to use to look for configs.|`druid_config`|
|`druid.metadata.storage.tables.compactionStates`|The table to use to store compaction state fingerprints.|`druid_compactionStates`|
|`druid.metadata.storage.tables.tasks`|Used by the indexing service to store tasks.|`druid_tasks`|
|`druid.metadata.storage.tables.taskLog`|Used by the indexing service to store task logs.|`druid_tasklogs`|
|`druid.metadata.storage.tables.taskLock`|Used by the indexing service to store task locks.|`druid_tasklocks`|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1508,7 +1508,7 @@ public void testAutoCompactionDutyWithDimensionsSpec(CompactionEngine engine) th
@ParameterizedTest(name = "useSupervisors={0}")
public void testAutoCompactionDutyWithFilter(boolean useSupervisors) throws Exception
{
updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null));
updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null, true));

loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
Expand Down Expand Up @@ -1552,7 +1552,7 @@ public void testAutoCompactionDutyWithFilter(boolean useSupervisors) throws Exce
@ParameterizedTest(name = "useSupervisors={0}")
public void testAutoCompationDutyWithMetricsSpec(boolean useSupervisors) throws Exception
{
updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null));
updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null, true));

loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
Expand Down Expand Up @@ -1854,7 +1854,7 @@ private void forceTriggerAutoCompaction(
).collect(Collectors.toList())
);
updateClusterConfig(
new ClusterCompactionConfig(0.5, intervals.size(), policy, true, null)
new ClusterCompactionConfig(0.5, intervals.size(), policy, true, null, true)
);

// Wait for scheduler to pick up the compaction job
Expand All @@ -1864,7 +1864,7 @@ private void forceTriggerAutoCompaction(

// Disable all compaction
updateClusterConfig(
new ClusterCompactionConfig(0.5, intervals.size(), COMPACT_NOTHING_POLICY, true, null)
new ClusterCompactionConfig(0.5, intervals.size(), COMPACT_NOTHING_POLICY, true, null, true)
);
} else {
forceTriggerAutoCompaction(numExpectedSegmentsAfterCompaction);
Expand Down Expand Up @@ -1956,7 +1956,8 @@ private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int maxCom
maxCompactionTaskSlots,
oldConfig.getCompactionPolicy(),
oldConfig.isUseSupervisors(),
oldConfig.getEngine()
oldConfig.getEngine(),
oldConfig.isLegacyPersistLastCompactionStateInSegments()
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.rpc.UpdateResponse;
import org.apache.druid.segment.metadata.CompactionStateManager;
import org.apache.druid.server.compaction.CompactionStatus;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
Expand Down Expand Up @@ -98,14 +100,14 @@ public EmbeddedDruidCluster createCluster()
private void configureCompaction(CompactionEngine compactionEngine)
{
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 100, null, true, compactionEngine))
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 100, null, true, compactionEngine, true))
);
Assertions.assertTrue(updateResponse.isSuccess());
}

@MethodSource("getEngine")
@ParameterizedTest(name = "compactionEngine={0}")
public void test_ingestDayGranularity_andCompactToMonthGranularity_withInlineConfig(CompactionEngine compactionEngine)
public void test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToYearGranularity_withInlineConfig(CompactionEngine compactionEngine)
{
configureCompaction(compactionEngine);

Expand All @@ -119,7 +121,7 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity_withInlineCon
Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY));

// Create a compaction config with MONTH granularity
InlineSchemaDataSourceCompactionConfig compactionConfig =
InlineSchemaDataSourceCompactionConfig monthGranularityConfig =
InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
Expand Down Expand Up @@ -152,11 +154,159 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity_withInlineCon
)
.build();

runCompactionWithSpec(compactionConfig);
runCompactionWithSpec(monthGranularityConfig);
waitForAllCompactionTasksToFinish();

Assertions.assertEquals(0, getNumSegmentsWith(Granularities.DAY));
Assertions.assertEquals(1, getNumSegmentsWith(Granularities.MONTH));

verifyCompactedSegmentsHaveFingerprints(monthGranularityConfig);

InlineSchemaDataSourceCompactionConfig yearGranConfig =
InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
.withSkipOffsetFromLatest(Period.seconds(0))
.withGranularitySpec(
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null)
)
.withTuningConfig(
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new DimensionRangePartitionsSpec(null, 5000, List.of("item"), false),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
)
.build();

overlord.latchableEmitter().flush(); // flush events so wait for works correctly on the next round of compaction
runCompactionWithSpec(yearGranConfig);
waitForAllCompactionTasksToFinish();

Assertions.assertEquals(0, getNumSegmentsWith(Granularities.DAY));
Assertions.assertEquals(0, getNumSegmentsWith(Granularities.MONTH));
Assertions.assertEquals(1, getNumSegmentsWith(Granularities.YEAR));

verifyCompactedSegmentsHaveFingerprints(yearGranConfig);
}

@MethodSource("getEngine")
@ParameterizedTest(name = "compactionEngine={0}")
public void test_compaction_withPersistLastCompactionStateFalse_storesOnlyFingerprint(CompactionEngine compactionEngine)
throws InterruptedException
{
// Configure cluster with persistLastCompactionState=false
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(
new ClusterCompactionConfig(1.0, 100, null, true, compactionEngine, false)
)
);
Assertions.assertTrue(updateResponse.isSuccess());

// Ingest data at DAY granularity
runIngestionAtGranularity(
"DAY",
"2025-06-01T00:00:00.000Z,shirt,105\n"
+ "2025-06-02T00:00:00.000Z,trousers,210"
);
Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));

// Create compaction config to compact to MONTH granularity
InlineSchemaDataSourceCompactionConfig monthConfig =
InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
.withSkipOffsetFromLatest(Period.seconds(0))
.withGranularitySpec(
new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null)
)
.withTuningConfig(
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new DimensionRangePartitionsSpec(1000, null, List.of("item"), false),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
)
.build();

runCompactionWithSpec(monthConfig);
waitForAllCompactionTasksToFinish();

verifySegmentsHaveNullLastCompactionStateAndNonNullFingerprint();
}

private void verifySegmentsHaveNullLastCompactionStateAndNonNullFingerprint()
{
overlord
.bindings()
.segmentsMetadataStorage()
.retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
.forEach(segment -> {
Assertions.assertNull(
segment.getLastCompactionState(),
"Segment " + segment.getId() + " should have null lastCompactionState"
);
Assertions.assertNotNull(
segment.getCompactionStateFingerprint(),
"Segment " + segment.getId() + " should have non-null compactionStateFingerprint"
);
});
}

private void verifyCompactedSegmentsHaveFingerprints(DataSourceCompactionConfig compactionConfig)
{
CompactionStateManager compactionStateManager = overlord
.bindings()
.getInstance(CompactionStateManager.class);

String expectedFingerprint = compactionStateManager.generateCompactionStateFingerprint(
CompactionStatus.createCompactionStateFromConfig(compactionConfig),
dataSource
);

overlord
.bindings()
.segmentsMetadataStorage()
.retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
.forEach(segment -> {
Assertions.assertEquals(
expectedFingerprint,
segment.getCompactionStateFingerprint(),
"Segment " + segment.getId() + " fingerprint should match expected fingerprint"
);
});
}

private void runCompactionWithSpec(DataSourceCompactionConfig config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkip
);

final ClusterCompactionConfig updatedCompactionConfig
= new ClusterCompactionConfig(1.0, 10, null, true, null);
= new ClusterCompactionConfig(1.0, 10, null, true, null, null);
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(updatedCompactionConfig)
);
Expand Down Expand Up @@ -323,7 +323,7 @@ public void test_ingestClusterMetrics_compactionSkipsLockedIntervals()
);

final ClusterCompactionConfig updatedCompactionConfig
= new ClusterCompactionConfig(1.0, 10, null, true, null);
= new ClusterCompactionConfig(1.0, 10, null, true, null, null);
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(updatedCompactionConfig)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private IndexTask createIndexTaskForInlineData(String taskId)
private void enableCompactionSupervisor()
{
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 10, null, true, null))
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 10, null, true, null, null))
);
Assertions.assertTrue(updateResponse.isSuccess());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public MetadataStorageTablesConfig getMetadataStorageTablesConfig()
null,
null,
null,
null,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,25 @@ public static boolean isGuaranteedRollup(
return tuningConfig.isForceGuaranteedRollup();
}

/**
* Returns a function that adds the given compaction state fingerprint to all segments.
* If the fingerprint is null, returns an identity function that leaves segments unchanged.
*/
public static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateFingerprintToSegments(
String compactionStateFingerprint
)
{
if (compactionStateFingerprint != null) {
return segments -> segments.stream()
.map(
segment -> segment.withCompactionStateFingerprint(compactionStateFingerprint)
)
.collect(Collectors.toSet());
} else {
return Function.identity();
}
}

public static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateToSegments(
boolean storeCompactionState,
TaskToolbox toolbox,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,11 +902,19 @@ private TaskStatus generateAndPublishSegments(
Tasks.STORE_COMPACTION_STATE_KEY,
Tasks.DEFAULT_STORE_COMPACTION_STATE
);

final String compactionStateFingerprint = getContextValue(
Tasks.COMPACTION_STATE_FINGERPRINT_KEY,
null
);

final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction =
addCompactionStateToSegments(
storeCompactionState,
toolbox,
ingestionSchema
).andThen(
addCompactionStateFingerprintToSegments(compactionStateFingerprint)
);

Set<DataSegment> tombStones = Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,13 @@ public class Tasks
static {
Verify.verify(STORE_COMPACTION_STATE_KEY.equals(CompactSegments.STORE_COMPACTION_STATE_KEY));
}

/**
* Context k:v pair that holds the fingerprint of the compaction state to be stored with the segment
*/
public static final String COMPACTION_STATE_FINGERPRINT_KEY = "compactionStateFingerprint";

static {
Verify.verify(COMPACTION_STATE_FINGERPRINT_KEY.equals(CompactSegments.COMPACTION_STATE_FINGERPRINT_KEY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1162,12 +1162,20 @@ private void publishSegments(
Tasks.STORE_COMPACTION_STATE_KEY,
Tasks.DEFAULT_STORE_COMPACTION_STATE
);
final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction = addCompactionStateToSegments(
storeCompactionState,
toolbox,
ingestionSchema
final String compactionStateFingerprint = getContextValue(
Tasks.COMPACTION_STATE_FINGERPRINT_KEY,
null
);

final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction =
addCompactionStateToSegments(
storeCompactionState,
toolbox,
ingestionSchema
).andThen(
addCompactionStateFingerprintToSegments(compactionStateFingerprint)
);

Set<DataSegment> tombStones = Collections.emptySet();
if (getIngestionMode() == IngestionMode.REPLACE) {
TombstoneHelper tombstoneHelper = new TombstoneHelper(toolbox.getTaskActionClient());
Expand Down
Loading
Loading