From aa4b1dfd3263c9591b0f008f75faaa312545f0c2 Mon Sep 17 00:00:00 2001 From: Benoit TELLIER Date: Tue, 16 Jun 2026 21:49:01 +0200 Subject: [PATCH 01/12] [REFACTOR] CassandraMailboxPathV3DAO: remove unused field --- .../james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java index d6b4217ee9f..31f198b54dc 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java @@ -69,7 +69,6 @@ public class CassandraMailboxPathV3DAO { private final CqlSession session; private final DriverExecutionProfile lwtProfile; private final DriverExecutionProfile readProfile; - private final DriverExecutionProfile writeProfile; @Inject public CassandraMailboxPathV3DAO(CqlSession session) { @@ -82,7 +81,6 @@ public CassandraMailboxPathV3DAO(CqlSession session) { this.selectAll = prepareSelectAll(); this.lwtProfile = JamesExecutionProfiles.getLWTProfile(session); this.readProfile = ProfileLocator.READ.locateProfile(session, "MAILBOXPATHV3"); - this.writeProfile = ProfileLocator.WRITE.locateProfile(session, "MAILBOXPATHV3"); } private PreparedStatement prepareDelete() { From f3ef2a493178cf8b8a4168ae56df541a71775c48 Mon Sep 17 00:00:00 2001 From: Benoit TELLIER Date: Tue, 16 Jun 2026 15:57:16 +0200 Subject: [PATCH 02/12] [FIX] User rename was returning wrong quota when target address is not empty --- server/container/mailbox-adapter/pom.xml | 4 + .../mailbox/QuotaUsernameChangeTaskStep.java | 50 +----- .../QuotaUsernameChangeTaskStepTest.java | 161 +++++++++--------- 3 files changed, 89 insertions(+), 126 deletions(-) diff --git a/server/container/mailbox-adapter/pom.xml b/server/container/mailbox-adapter/pom.xml index 80fcbda7cf8..f69974e71f3 100644 --- a/server/container/mailbox-adapter/pom.xml +++ b/server/container/mailbox-adapter/pom.xml @@ -58,6 +58,10 @@ ${james.groupId} apache-james-mailbox-store + + ${james.groupId} + apache-james-mailbox-tools-quota-recompute + ${james.groupId} event-bus-api diff --git a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/QuotaUsernameChangeTaskStep.java b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/QuotaUsernameChangeTaskStep.java index abc439de45d..7614771f5ca 100644 --- a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/QuotaUsernameChangeTaskStep.java +++ b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/QuotaUsernameChangeTaskStep.java @@ -19,49 +19,37 @@ package org.apache.james.adapter.mailbox; -import java.time.Instant; import java.util.Optional; import jakarta.inject.Inject; import org.apache.james.core.Username; -import org.apache.james.events.EventBus; -import org.apache.james.events.RegistrationKey; -import org.apache.james.mailbox.model.CurrentQuotas; import org.apache.james.mailbox.model.Quota; -import org.apache.james.mailbox.model.QuotaOperation; import org.apache.james.mailbox.model.QuotaRoot; -import org.apache.james.mailbox.quota.CurrentQuotaManager; import org.apache.james.mailbox.quota.MaxQuotaManager; import org.apache.james.mailbox.quota.QuotaManager; import org.apache.james.mailbox.quota.UserQuotaRootResolver; -import org.apache.james.mailbox.store.event.EventFactory; +import org.apache.james.mailbox.quota.task.RecomputeMailboxCurrentQuotasService; import org.apache.james.user.api.UsernameChangeTaskStep; import org.reactivestreams.Publisher; -import com.google.common.collect.ImmutableSet; - import reactor.core.publisher.Mono; public class QuotaUsernameChangeTaskStep implements UsernameChangeTaskStep { - private static final ImmutableSet NO_REGISTRATION_KEYS = ImmutableSet.of(); private final QuotaManager quotaManager; private final MaxQuotaManager maxQuotaManager; - private final CurrentQuotaManager currentQuotaManager; + private final RecomputeMailboxCurrentQuotasService recomputeMailboxCurrentQuotasService; private final UserQuotaRootResolver userQuotaRootResolver; - private final EventBus eventBus; @Inject public QuotaUsernameChangeTaskStep(QuotaManager quotaManager, - CurrentQuotaManager currentQuotaManager, + RecomputeMailboxCurrentQuotasService recomputeMailboxCurrentQuotasService, UserQuotaRootResolver userQuotaRootResolver, - MaxQuotaManager maxQuotaManager, - EventBus eventBus) { + MaxQuotaManager maxQuotaManager) { this.quotaManager = quotaManager; - this.currentQuotaManager = currentQuotaManager; + this.recomputeMailboxCurrentQuotasService = recomputeMailboxCurrentQuotasService; this.userQuotaRootResolver = userQuotaRootResolver; this.maxQuotaManager = maxQuotaManager; - this.eventBus = eventBus; } @Override @@ -78,13 +66,8 @@ public int priority() { public Publisher changeUsername(Username oldUsername, Username newUsername) { return Mono.from(quotaManager.getQuotasReactive(userQuotaRootResolver.forUser(oldUsername))) .flatMap(quotas -> Mono.fromCallable(() -> userQuotaRootResolver.forUser(newUsername)) - .flatMap(newUserQuotaRoot -> setQuotaForNewUser(newUserQuotaRoot, quotas) - .then(dispatchNewEventQuota(newUserQuotaRoot, newUsername)))); - } - - private Mono setQuotaForNewUser(QuotaRoot quotaRoot, QuotaManager.Quotas quotas) { - return setMaxQuota(quotaRoot, quotas) - .then(setCurrentQuota(quotaRoot, quotas)); + .flatMap(newUserQuotaRoot -> setMaxQuota(newUserQuotaRoot, quotas))) + .then(Mono.from(recomputeMailboxCurrentQuotasService.recomputeCurrentQuotas(newUsername))); } private Mono setMaxQuota(QuotaRoot quotaRoot, QuotaManager.Quotas quotas) { @@ -104,23 +87,4 @@ private Mono setMaxMessagesQuota(QuotaRoot quotaRoot, QuotaManager.Quotas .get(Quota.Scope.User))) .flatMap(quotaCountLimit -> Mono.from(maxQuotaManager.setMaxMessageReactive(quotaRoot, quotaCountLimit))); } - - private Mono setCurrentQuota(QuotaRoot quotaRoot, QuotaManager.Quotas quotas) { - return Mono.from(currentQuotaManager.setCurrentQuotas(QuotaOperation.from(quotaRoot, - new CurrentQuotas(quotas.getMessageQuota().getUsed(), quotas.getStorageQuota().getUsed())))); - } - - private Mono dispatchNewEventQuota(QuotaRoot quotaRoot, Username username) { - return Mono.from(quotaManager.getQuotasReactive(quotaRoot)) - .flatMap(quotas -> eventBus.dispatch( - EventFactory.quotaUpdated() - .randomEventId() - .user(username) - .quotaRoot(quotaRoot) - .quotaCount(quotas.getMessageQuota()) - .quotaSize(quotas.getStorageQuota()) - .instant(Instant.now()) - .build(), - NO_REGISTRATION_KEYS)); - } } diff --git a/server/container/mailbox-adapter/src/test/java/org/apache/james/adapter/mailbox/QuotaUsernameChangeTaskStepTest.java b/server/container/mailbox-adapter/src/test/java/org/apache/james/adapter/mailbox/QuotaUsernameChangeTaskStepTest.java index 8c789c6f0b4..d1bf2ee9e14 100644 --- a/server/container/mailbox-adapter/src/test/java/org/apache/james/adapter/mailbox/QuotaUsernameChangeTaskStepTest.java +++ b/server/container/mailbox-adapter/src/test/java/org/apache/james/adapter/mailbox/QuotaUsernameChangeTaskStepTest.java @@ -24,6 +24,7 @@ import static org.assertj.core.api.SoftAssertions.assertSoftly; import static org.awaitility.Durations.TEN_SECONDS; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import org.apache.james.core.Username; @@ -35,17 +36,22 @@ import org.apache.james.events.EventBus; import org.apache.james.events.EventListener; import org.apache.james.events.Group; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.MessageManager; import org.apache.james.mailbox.events.GenericGroup; import org.apache.james.mailbox.events.MailboxEvents; +import org.apache.james.mailbox.inmemory.InMemoryMailboxManager; import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources; import org.apache.james.mailbox.model.CurrentQuotas; +import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.model.Quota; -import org.apache.james.mailbox.model.QuotaOperation; import org.apache.james.mailbox.model.QuotaRoot; import org.apache.james.mailbox.quota.CurrentQuotaManager; import org.apache.james.mailbox.quota.MaxQuotaManager; import org.apache.james.mailbox.quota.QuotaManager; import org.apache.james.mailbox.quota.UserQuotaRootResolver; +import org.apache.james.mailbox.quota.task.RecomputeMailboxCurrentQuotasService; +import org.apache.james.mime4j.dom.Message; import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -55,8 +61,11 @@ class QuotaUsernameChangeTaskStepTest { private static final Username ALICE = Username.of("alice"); private static final Username BOB = Username.of("bob"); + // Quotas resulting from a single appended message, see appendAMessage. + private static final CurrentQuotas ONE_MESSAGE_QUOTAS = new CurrentQuotas(QuotaCountUsage.count(1L), QuotaSizeUsage.size(103L)); private QuotaUsernameChangeTaskStep testee; + private InMemoryMailboxManager mailboxManager; private QuotaManager quotaManager; private CurrentQuotaManager currentQuotaManager; private UserQuotaRootResolver quotaRootResolver; @@ -67,6 +76,7 @@ class QuotaUsernameChangeTaskStepTest { @BeforeEach void setUp() { InMemoryIntegrationResources resources = InMemoryIntegrationResources.defaultResources(); + mailboxManager = resources.getMailboxManager(); quotaManager = resources.getQuotaManager(); currentQuotaManager = resources.getCurrentQuotaManager(); quotaRootResolver = resources.getDefaultUserQuotaRootResolver(); @@ -88,84 +98,85 @@ public void event(Event event) { } }); - testee = new QuotaUsernameChangeTaskStep( - quotaManager, + RecomputeMailboxCurrentQuotasService recomputeMailboxCurrentQuotasService = new RecomputeMailboxCurrentQuotasService( currentQuotaManager, + resources.getCurrentQuotaCalculator(), quotaRootResolver, - maxQuotaManager, + mailboxManager.getSessionProvider(), + mailboxManager, + quotaManager, eventBus); + + testee = new QuotaUsernameChangeTaskStep( + quotaManager, + recomputeMailboxCurrentQuotasService, + quotaRootResolver, + maxQuotaManager); } @Test - void shouldMigrateQuotas() throws Exception { + void shouldMigrateMaxQuotas() throws Exception { QuotaRoot bobQuotaRoot = quotaRootResolver.forUser(BOB); maxQuotaManager.setMaxMessage(bobQuotaRoot, QuotaCountLimit.count(50)); maxQuotaManager.setMaxStorage(bobQuotaRoot, QuotaSizeLimit.size(100)); - Mono.from(currentQuotaManager.setCurrentQuotas(QuotaOperation.from(bobQuotaRoot, new CurrentQuotas( - QuotaCountUsage.count(5), QuotaSizeUsage.size(10) - )))).block(); Mono.from(testee.changeUsername(BOB, ALICE)).block(); QuotaManager.Quotas aliceQuotas = quotaManager.getQuotas(quotaRootResolver.forUser(ALICE)); assertSoftly(softly -> { - softly.assertThat(aliceQuotas.getMessageQuota()) - .isEqualTo(Quota.builder().used(QuotaCountUsage.count(5)).computedLimit(QuotaCountLimit.count(50)).build()); - - softly.assertThat(aliceQuotas.getStorageQuota()) - .isEqualTo(Quota.builder().used(QuotaSizeUsage.size(10)).computedLimit(QuotaSizeLimit.size(100)).build()); + softly.assertThat(aliceQuotas.getMessageQuota().getLimit()).isEqualTo(QuotaCountLimit.count(50)); + softly.assertThat(aliceQuotas.getStorageQuota().getLimit()).isEqualTo(QuotaSizeLimit.size(100)); }); } @Test - void migrateShouldNotThrowWhenNoQuotas() throws Exception { - assertThatCode(() -> Mono.from(testee.changeUsername(BOB, ALICE)).block()) - .doesNotThrowAnyException(); - QuotaManager.Quotas aliceQuotas = quotaManager.getQuotas(quotaRootResolver.forUser(ALICE)); - assertSoftly(softly -> { - softly.assertThat(aliceQuotas.getMessageQuota()) - .isEqualTo(Quota.builder().used(QuotaCountUsage.count(0)).computedLimit(QuotaCountLimit.unlimited()).build()); + void shouldRecomputeCurrentQuotasFromNewUserMailboxes() throws Exception { + appendAMessage(ALICE); - softly.assertThat(aliceQuotas.getStorageQuota()) - .isEqualTo(Quota.builder().used(QuotaSizeUsage.size(0)).computedLimit(QuotaSizeLimit.unlimited()).build()); - }); + Mono.from(testee.changeUsername(BOB, ALICE)).block(); + + assertThat(Mono.from(currentQuotaManager.getCurrentQuotas(quotaRootResolver.forUser(ALICE))).block()) + .isEqualTo(ONE_MESSAGE_QUOTAS); } @Test - void migrateShouldSucceedWhenUnlimitedQuotas() throws Exception { + void shouldNotOverwriteDestinationCurrentQuotasWithSourceOnes() throws Exception { + // Source (BOB) holds a current quota value... QuotaRoot bobQuotaRoot = quotaRootResolver.forUser(BOB); - maxQuotaManager.setMaxMessage(bobQuotaRoot, QuotaCountLimit.unlimited()); - maxQuotaManager.setMaxStorage(bobQuotaRoot, QuotaSizeLimit.unlimited()); - Mono.from(currentQuotaManager.setCurrentQuotas(QuotaOperation.from(bobQuotaRoot, new CurrentQuotas( - QuotaCountUsage.count(5), QuotaSizeUsage.size(10) - )))).block(); + maxQuotaManager.setMaxMessage(bobQuotaRoot, QuotaCountLimit.count(50)); + maxQuotaManager.setMaxStorage(bobQuotaRoot, QuotaSizeLimit.size(100)); + + // ...while the destination (ALICE) already has its own content. + appendAMessage(ALICE); Mono.from(testee.changeUsername(BOB, ALICE)).block(); - QuotaManager.Quotas aliceQuotas = quotaManager.getQuotas(quotaRootResolver.forUser(ALICE)); - assertSoftly(softly -> { - softly.assertThat(aliceQuotas.getMessageQuota()) - .isEqualTo(Quota.builder().used(QuotaCountUsage.count(5)).computedLimit(QuotaCountLimit.unlimited()).build()); + // ALICE current quotas reflect her actual mailboxes content, not BOB's figures. + assertThat(Mono.from(currentQuotaManager.getCurrentQuotas(quotaRootResolver.forUser(ALICE))).block()) + .isEqualTo(ONE_MESSAGE_QUOTAS); + } - softly.assertThat(aliceQuotas.getStorageQuota()) - .isEqualTo(Quota.builder().used(QuotaSizeUsage.size(10)).computedLimit(QuotaSizeLimit.unlimited()).build()); - }); + @Test + void migrateShouldNotThrowWhenNoQuotas() throws Exception { + assertThatCode(() -> Mono.from(testee.changeUsername(BOB, ALICE)).block()) + .doesNotThrowAnyException(); + + assertThat(Mono.from(currentQuotaManager.getCurrentQuotas(quotaRootResolver.forUser(ALICE))).block()) + .isEqualTo(CurrentQuotas.emptyQuotas()); } @Test void migrateShouldSucceedWhenOnlyMessagesQuota() throws Exception { QuotaRoot bobQuotaRoot = quotaRootResolver.forUser(BOB); maxQuotaManager.setMaxMessage(bobQuotaRoot, QuotaCountLimit.count(10)); + Mono.from(testee.changeUsername(BOB, ALICE)).block(); QuotaManager.Quotas aliceQuotas = quotaManager.getQuotas(quotaRootResolver.forUser(ALICE)); assertSoftly(softly -> { - softly.assertThat(aliceQuotas.getMessageQuota()) - .isEqualTo(Quota.builder().used(QuotaCountUsage.count(0)).computedLimit(QuotaCountLimit.count(10)).build()); - - softly.assertThat(aliceQuotas.getStorageQuota()) - .isEqualTo(Quota.builder().used(QuotaSizeUsage.size(0)).computedLimit(QuotaSizeLimit.unlimited()).build()); + softly.assertThat(aliceQuotas.getMessageQuota().getLimit()).isEqualTo(QuotaCountLimit.count(10)); + softly.assertThat(aliceQuotas.getStorageQuota().getLimit()).isEqualTo(QuotaSizeLimit.unlimited()); }); } @@ -173,44 +184,13 @@ void migrateShouldSucceedWhenOnlyMessagesQuota() throws Exception { void migrateShouldSucceedWhenOnlyStorageQuota() throws Exception { QuotaRoot bobQuotaRoot = quotaRootResolver.forUser(BOB); maxQuotaManager.setMaxStorage(bobQuotaRoot, QuotaSizeLimit.size(10)); - Mono.from(testee.changeUsername(BOB, ALICE)).block(); - - QuotaManager.Quotas aliceQuotas = quotaManager.getQuotas(quotaRootResolver.forUser(ALICE)); - assertSoftly(softly -> { - softly.assertThat(aliceQuotas.getMessageQuota()) - .isEqualTo(Quota.builder().used(QuotaCountUsage.count(0)).computedLimit(QuotaCountLimit.unlimited()).build()); - - softly.assertThat(aliceQuotas.getStorageQuota()) - .isEqualTo(Quota.builder().used(QuotaSizeUsage.size(0)).computedLimit(QuotaSizeLimit.size(10)).build()); - }); - } - - @Test - void migrateShouldSucceedWhenAliceAlreadyQuotas() throws Exception { - QuotaRoot bobQuotaRoot = quotaRootResolver.forUser(BOB); - maxQuotaManager.setMaxMessage(bobQuotaRoot, QuotaCountLimit.count(50)); - maxQuotaManager.setMaxStorage(bobQuotaRoot, QuotaSizeLimit.size(100)); - Mono.from(currentQuotaManager.setCurrentQuotas(QuotaOperation.from(bobQuotaRoot, new CurrentQuotas( - QuotaCountUsage.count(5), QuotaSizeUsage.size(10) - )))).block(); - - QuotaRoot aliceQuotaRoot = quotaRootResolver.forUser(ALICE); - maxQuotaManager.setMaxMessage(aliceQuotaRoot, QuotaCountLimit.count(55)); - maxQuotaManager.setMaxStorage(aliceQuotaRoot, QuotaSizeLimit.size(150)); - Mono.from(currentQuotaManager.setCurrentQuotas(QuotaOperation.from(aliceQuotaRoot, new CurrentQuotas( - QuotaCountUsage.count(7), QuotaSizeUsage.size(8) - )))).block(); Mono.from(testee.changeUsername(BOB, ALICE)).block(); QuotaManager.Quotas aliceQuotas = quotaManager.getQuotas(quotaRootResolver.forUser(ALICE)); - assertSoftly(softly -> { - softly.assertThat(aliceQuotas.getMessageQuota()) - .isEqualTo(Quota.builder().used(QuotaCountUsage.count(5)).computedLimit(QuotaCountLimit.count(50)).build()); - - softly.assertThat(aliceQuotas.getStorageQuota()) - .isEqualTo(Quota.builder().used(QuotaSizeUsage.size(10)).computedLimit(QuotaSizeLimit.size(100)).build()); + softly.assertThat(aliceQuotas.getMessageQuota().getLimit()).isEqualTo(QuotaCountLimit.unlimited()); + softly.assertThat(aliceQuotas.getStorageQuota().getLimit()).isEqualTo(QuotaSizeLimit.size(10)); }); } @@ -219,28 +199,43 @@ void migrateShouldDispatchQuotaUpdateEvent() throws Exception { QuotaRoot bobQuotaRoot = quotaRootResolver.forUser(BOB); maxQuotaManager.setMaxMessage(bobQuotaRoot, QuotaCountLimit.count(50)); maxQuotaManager.setMaxStorage(bobQuotaRoot, QuotaSizeLimit.size(100)); - Mono.from(currentQuotaManager.setCurrentQuotas(QuotaOperation.from(bobQuotaRoot, new CurrentQuotas( - QuotaCountUsage.count(5), QuotaSizeUsage.size(10) - )))).block(); - Mono.from(testee.changeUsername(BOB, ALICE)).block(); + appendAMessage(ALICE); + Mono.from(testee.changeUsername(BOB, ALICE)).block(); Awaitility.await() .atMost(TEN_SECONDS) - .untilAsserted(() -> assertThat(eventStore.size()).isEqualTo(1)); + .untilAsserted(() -> assertThat(eventStore).anyMatch(event -> event instanceof MailboxEvents.QuotaUsageUpdatedEvent)); - MailboxEvents.QuotaUsageUpdatedEvent quotaUsageUpdatedEvent = (MailboxEvents.QuotaUsageUpdatedEvent) eventStore.get(0); + MailboxEvents.QuotaUsageUpdatedEvent quotaUsageUpdatedEvent = eventStore.stream() + .filter(event -> event instanceof MailboxEvents.QuotaUsageUpdatedEvent) + .map(MailboxEvents.QuotaUsageUpdatedEvent.class::cast) + .reduce((first, second) -> second) + .orElseThrow(); assertSoftly(softly -> { softly.assertThat(quotaUsageUpdatedEvent.getCountQuota()) - .isEqualTo(Quota.builder().used(QuotaCountUsage.count(5)).computedLimit(QuotaCountLimit.count(50)).build()); + .isEqualTo(Quota.builder().used(QuotaCountUsage.count(1)).computedLimit(QuotaCountLimit.count(50)).build()); softly.assertThat(quotaUsageUpdatedEvent.getSizeQuota()) - .isEqualTo(Quota.builder().used(QuotaSizeUsage.size(10)).computedLimit(QuotaSizeLimit.size(100)).build()); + .isEqualTo(Quota.builder().used(QuotaSizeUsage.size(103)).computedLimit(QuotaSizeLimit.size(100)).build()); softly.assertThat(quotaUsageUpdatedEvent.getUsername()) .isEqualTo(ALICE); softly.assertThat(quotaUsageUpdatedEvent.getQuotaRoot()) .isEqualTo(quotaRootResolver.forUser(ALICE)); }); } -} \ No newline at end of file + + private void appendAMessage(Username username) throws Exception { + MailboxSession session = mailboxManager.createSystemSession(username); + MailboxPath inbox = MailboxPath.inbox(username); + mailboxManager.createMailbox(inbox, session); + MessageManager messageManager = mailboxManager.getMailbox(inbox, session); + messageManager.appendMessage(MessageManager.AppendCommand.from( + Message.Builder.of() + .setTo("test@localhost.com") + .setBody("This is a message", StandardCharsets.UTF_8)), + session); + mailboxManager.endProcessingRequest(session); + } +} From 9fb14ad21d83b91a1c4e44f24314409ab24124af Mon Sep 17 00:00:00 2001 From: Benoit TELLIER Date: Tue, 16 Jun 2026 16:17:05 +0200 Subject: [PATCH 03/12] [FIX] User rename: Simplify rename dance when destination exist --- .../MailboxUsernameChangeTaskStep.java | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java index 45523ec5995..74441266f78 100644 --- a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java +++ b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java @@ -84,8 +84,7 @@ private Mono migrateMailbox(MailboxSession fromSession, MailboxSession toS if (!exist) { return renameMailboxAndRenameSubscriptionForDelegatee(fromSession, toSession, mailbox, renamedPath); } else { - return renameWhenMailboxExist(toSession, renamedPath, - renameMailboxAndRenameSubscriptionForDelegatee(fromSession, toSession, mailbox, renamedPath)); + return moveWhenMailboxExist(fromSession, toSession, mailbox, renamedPath); } }); } @@ -98,18 +97,15 @@ private Mono renameMailboxAndRenameSubscriptionForDelegatee(MailboxSession .then(); } - // rename: renamedPath -> temporaryPath - // rename: mailbox.getPath -> renamedPath - // copy messages: temporaryPath -> renamedPath - // delete: temporaryPath - private Mono renameWhenMailboxExist(MailboxSession toSession, MailboxPath renamedPath, Mono renamePublisher) { + // The destination mailbox already exists: bring the source mailbox into the destination + // account under a temporary name, MOVE its messages into the destination, then drop the + // emptied temporary mailbox. + private Mono moveWhenMailboxExist(MailboxSession fromSession, MailboxSession toSession, MailboxMetaData mailbox, MailboxPath renamedPath) { MailboxPath temporaryPath = new MailboxPath(renamedPath.getNamespace(), renamedPath.getUser(), renamedPath.getName() + "tmp"); - return mailboxManager.renameMailboxReactive(renamedPath, temporaryPath, - MailboxManager.RenameOption.NONE, toSession) - .then(renamePublisher) - .then(mailboxManager.copyMessagesReactive(MessageRange.all(), - temporaryPath, renamedPath, toSession) - .then()) + return mailboxManager.renameMailboxReactive(mailbox.getPath(), temporaryPath, + MailboxManager.RenameOption.NONE, fromSession, toSession) + .thenMany(mailboxManager.moveMessagesReactive(MessageRange.all(), temporaryPath, renamedPath, toSession)) + .then(renameSubscriptionsForDelegatee(mailbox, renamedPath)) .then(mailboxManager.deleteMailboxReactive(temporaryPath, toSession)); } From 914f588cf9005d7ce16fcc82c23ac377e130dffd Mon Sep 17 00:00:00 2001 From: Benoit TELLIER Date: Tue, 16 Jun 2026 17:35:09 +0200 Subject: [PATCH 04/12] [FIX] User rename: per rename temporary mailbox layout --- .../mailbox/MailboxUsernameChangeTaskStep.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java index 74441266f78..b23334c7861 100644 --- a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java +++ b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java @@ -19,6 +19,8 @@ package org.apache.james.adapter.mailbox; +import java.util.concurrent.ThreadLocalRandom; + import jakarta.inject.Inject; import org.apache.james.core.Username; @@ -69,22 +71,27 @@ public Publisher changeUsername(Username oldUsername, Username newUsername .user(fromSession.getUser()) .build(); + // A single random suffix is generated for the whole rename operation so that temporary + // mailboxes never collide with a leftover from a previous run (we thus do not loose the + // previous content) while staying recognizable. + int renameOperationId = ThreadLocalRandom.current().nextInt(1000); + return mailboxManager.search(queryUser, MailboxManager.MailboxSearchFetchType.Minimal, fromSession) // Only keep top level, rename takes care of sub mailboxes .filter(mailbox -> mailbox.getPath().getHierarchyLevels(fromSession.getPathDelimiter()).size() == 1) - .concatMap(mailbox -> migrateMailbox(fromSession, toSession, mailbox)) + .concatMap(mailbox -> migrateMailbox(fromSession, toSession, mailbox, renameOperationId)) .doFinally(any -> mailboxManager.endProcessingRequest(fromSession)) .doFinally(any -> mailboxManager.endProcessingRequest(toSession)); } - private Mono migrateMailbox(MailboxSession fromSession, MailboxSession toSession, org.apache.james.mailbox.model.MailboxMetaData mailbox) { + private Mono migrateMailbox(MailboxSession fromSession, MailboxSession toSession, org.apache.james.mailbox.model.MailboxMetaData mailbox, int renameOperationId) { MailboxPath renamedPath = mailbox.getPath().withUser(toSession.getUser()); return mailboxManager.mailboxExists(renamedPath, toSession) .flatMap(exist -> { if (!exist) { return renameMailboxAndRenameSubscriptionForDelegatee(fromSession, toSession, mailbox, renamedPath); } else { - return moveWhenMailboxExist(fromSession, toSession, mailbox, renamedPath); + return moveWhenMailboxExist(fromSession, toSession, mailbox, renamedPath, renameOperationId); } }); } @@ -100,8 +107,8 @@ private Mono renameMailboxAndRenameSubscriptionForDelegatee(MailboxSession // The destination mailbox already exists: bring the source mailbox into the destination // account under a temporary name, MOVE its messages into the destination, then drop the // emptied temporary mailbox. - private Mono moveWhenMailboxExist(MailboxSession fromSession, MailboxSession toSession, MailboxMetaData mailbox, MailboxPath renamedPath) { - MailboxPath temporaryPath = new MailboxPath(renamedPath.getNamespace(), renamedPath.getUser(), renamedPath.getName() + "tmp"); + private Mono moveWhenMailboxExist(MailboxSession fromSession, MailboxSession toSession, MailboxMetaData mailbox, MailboxPath renamedPath, int renameOperationId) { + MailboxPath temporaryPath = new MailboxPath(renamedPath.getNamespace(), renamedPath.getUser(), renamedPath.getName() + "-tmp-" + renameOperationId); return mailboxManager.renameMailboxReactive(mailbox.getPath(), temporaryPath, MailboxManager.RenameOption.NONE, fromSession, toSession) .thenMany(mailboxManager.moveMessagesReactive(MessageRange.all(), temporaryPath, renamedPath, toSession)) From 6d9c419c9e6e73d364d09043067c309104b607c9 Mon Sep 17 00:00:00 2001 From: Benoit TELLIER Date: Tue, 16 Jun 2026 22:03:41 +0200 Subject: [PATCH 05/12] [FIX] User rename: handle submailbox edge cases when mailbox exist --- .../MailboxUsernameChangeTaskStep.java | 21 ++- .../MailboxUsernameChangeTaskStepTest.java | 177 ++++++++++++++++++ 2 files changed, 192 insertions(+), 6 deletions(-) diff --git a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java index b23334c7861..016e662c935 100644 --- a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java +++ b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java @@ -19,6 +19,7 @@ package org.apache.james.adapter.mailbox; +import java.util.Comparator; import java.util.concurrent.ThreadLocalRandom; import jakarta.inject.Inject; @@ -77,14 +78,20 @@ public Publisher changeUsername(Username oldUsername, Username newUsername int renameOperationId = ThreadLocalRandom.current().nextInt(1000); return mailboxManager.search(queryUser, MailboxManager.MailboxSearchFetchType.Minimal, fromSession) - // Only keep top level, rename takes care of sub mailboxes - .filter(mailbox -> mailbox.getPath().getHierarchyLevels(fromSession.getPathDelimiter()).size() == 1) + // Process the deepest mailboxes first. As renameMailbox is recursive (it also renames + // sub mailboxes), handling a mailbox only once all of its descendants have already been + // migrated guarantees it is effectively a leaf at that point: the rename then carries no + // sub mailbox along, which lets us migrate every node individually - including when the + // destination mailbox already exists and a plain rename is therefore not possible. + .sort(Comparator.comparingInt( + (MailboxMetaData mailbox) -> mailbox.getPath().getHierarchyLevels(fromSession.getPathDelimiter()).size()) + .reversed()) .concatMap(mailbox -> migrateMailbox(fromSession, toSession, mailbox, renameOperationId)) .doFinally(any -> mailboxManager.endProcessingRequest(fromSession)) .doFinally(any -> mailboxManager.endProcessingRequest(toSession)); } - private Mono migrateMailbox(MailboxSession fromSession, MailboxSession toSession, org.apache.james.mailbox.model.MailboxMetaData mailbox, int renameOperationId) { + private Mono migrateMailbox(MailboxSession fromSession, MailboxSession toSession, MailboxMetaData mailbox, int renameOperationId) { MailboxPath renamedPath = mailbox.getPath().withUser(toSession.getUser()); return mailboxManager.mailboxExists(renamedPath, toSession) .flatMap(exist -> { @@ -104,9 +111,11 @@ private Mono renameMailboxAndRenameSubscriptionForDelegatee(MailboxSession .then(); } - // The destination mailbox already exists: bring the source mailbox into the destination - // account under a temporary name, MOVE its messages into the destination, then drop the - // emptied temporary mailbox. + // The destination mailbox already exists: a plain rename is impossible, so we bring the source + // mailbox into the destination account under a temporary name, MOVE its messages into the + // destination, then drop the emptied temporary mailbox. Sub mailboxes are not handled here: as + // mailboxes are migrated deepest first, the source mailbox no longer has any descendant when we + // reach this point. private Mono moveWhenMailboxExist(MailboxSession fromSession, MailboxSession toSession, MailboxMetaData mailbox, MailboxPath renamedPath, int renameOperationId) { MailboxPath temporaryPath = new MailboxPath(renamedPath.getNamespace(), renamedPath.getUser(), renamedPath.getName() + "-tmp-" + renameOperationId); return mailboxManager.renameMailboxReactive(mailbox.getPath(), temporaryPath, diff --git a/server/container/mailbox-adapter/src/test/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStepTest.java b/server/container/mailbox-adapter/src/test/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStepTest.java index 18c51871164..439f77d02ba 100644 --- a/server/container/mailbox-adapter/src/test/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStepTest.java +++ b/server/container/mailbox-adapter/src/test/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStepTest.java @@ -38,6 +38,7 @@ import org.apache.james.mailbox.store.StoreSubscriptionManager; import org.apache.james.mime4j.dom.Message; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; @@ -198,4 +199,180 @@ void shouldMigrateMailboxesWhenNewUserHasAlreadyOtherMailboxes() throws Exceptio MailboxPath.forUser(BOB, "test.child"), bobInbox); } + + private void appendMessage(MailboxPath path, MailboxSession session, String body) throws Exception { + mailboxManager.getMailbox(path, session) + .appendMessage(MessageManager.AppendCommand.from(Message.Builder.of() + .setSubject("subject") + .setBody(body, StandardCharsets.UTF_8) + .build()), session); + } + + private long messageCount(MailboxPath path, MailboxSession session) throws Exception { + return mailboxManager.getMailbox(path, session).getMessageCount(session); + } + + @Nested + class WhenDestinationParentMailboxAlreadyExists { + @Test + void shouldMigrateSubMailboxesWhenDestinationParentAlreadyExists() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(mailboxManager.list(mailboxManager.createSystemSession(BOB))) + .containsOnly(MailboxPath.forUser(BOB, "test"), + MailboxPath.forUser(BOB, "test.child")); + } + + @Test + void shouldNotLeaveTemporaryMailboxesWhenDestinationParentAlreadyExists() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(mailboxManager.list(mailboxManager.createSystemSession(BOB))) + .noneMatch(path -> path.getName().contains("-tmp-")); + } + + @Test + void shouldNotLooseMessagesOfSourceOnlyTopLevelMailboxWhenDestinationParentAlreadyExists() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + appendMessage(MailboxPath.forUser(ALICE, "test"), aliceSession, "alice"); + appendMessage(MailboxPath.forUser(BOB, "test"), bobSession, "bob"); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(messageCount(MailboxPath.forUser(BOB, "test"), bobSession)).isEqualTo(2); + } + + @Test + void shouldMoveSubMailboxMessagesWhenDestinationParentAlreadyExists() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + appendMessage(MailboxPath.forUser(ALICE, "test.child"), aliceSession, "alice-child"); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(messageCount(MailboxPath.forUser(BOB, "test.child"), bobSession)).isEqualTo(1); + } + + @Test + void shouldMergeSubMailboxMessagesWhenBothUsersHaveTheSameSubMailbox() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test.child"), MailboxManager.CreateOption.NONE, bobSession); + appendMessage(MailboxPath.forUser(ALICE, "test"), aliceSession, "alice-parent"); + appendMessage(MailboxPath.forUser(ALICE, "test.child"), aliceSession, "alice-child"); + appendMessage(MailboxPath.forUser(BOB, "test"), bobSession, "bob-parent"); + appendMessage(MailboxPath.forUser(BOB, "test.child"), bobSession, "bob-child"); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(mailboxManager.list(mailboxManager.createSystemSession(BOB))) + .containsOnly(MailboxPath.forUser(BOB, "test"), + MailboxPath.forUser(BOB, "test.child")); + assertThat(messageCount(MailboxPath.forUser(BOB, "test"), bobSession)).isEqualTo(2); + assertThat(messageCount(MailboxPath.forUser(BOB, "test.child"), bobSession)).isEqualTo(2); + } + + @Test + void shouldMigrateDeepHierarchyWhenDestinationParentAlreadyExists() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child.grandchild"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(mailboxManager.list(mailboxManager.createSystemSession(BOB))) + .containsOnly(MailboxPath.forUser(BOB, "test"), + MailboxPath.forUser(BOB, "test.child"), + MailboxPath.forUser(BOB, "test.child.grandchild")); + } + + @Test + void shouldMigrateSubMailboxesWhenOnlyDeepestChildConflicts() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.other"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test.child"), MailboxManager.CreateOption.NONE, bobSession); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(mailboxManager.list(mailboxManager.createSystemSession(BOB))) + .containsOnly(MailboxPath.forUser(BOB, "test"), + MailboxPath.forUser(BOB, "test.child"), + MailboxPath.forUser(BOB, "test.other")); + } + + @Test + void shouldRemoveSourceMailboxesWhenDestinationParentAlreadyExists() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(mailboxManager.list(mailboxManager.createSystemSession(BOB))) + .filteredOn(path -> ALICE.equals(path.getUser())) + .isEmpty(); + } + + @Test + void shouldTransferSubscriptionOfSubMailboxWhenDestinationParentAlreadyExists() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.CREATE_SUBSCRIPTION, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(subscriptionManager.subscriptions(mailboxManager.createSystemSession(BOB))) + .containsOnly(MailboxPath.forUser(BOB, "test.child")); + } + + @Test + void shouldMigrateDelegationAclOfSubMailboxWhenDestinationParentAlreadyExists() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + mailboxManager.applyRightsCommand(MailboxPath.forUser(ALICE, "test.child"), + MailboxACL.command().forUser(CEDRIC).rights(MailboxACL.FULL_RIGHTS).asAddition(), + aliceSession); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + MailboxSession cedricSession = mailboxManager.createSystemSession(CEDRIC); + assertThatCode(() -> mailboxManager.getMailbox(MailboxPath.forUser(BOB, "test.child"), cedricSession)) + .doesNotThrowAnyException(); + } + } } \ No newline at end of file From 945a6c97f0c0776bee514fb46261afe9c23525b9 Mon Sep 17 00:00:00 2001 From: Benoit TELLIER Date: Tue, 16 Jun 2026 17:36:28 +0200 Subject: [PATCH 06/12] [FIX] Cassandra folder rename: base decision on truth table Carry over truth table from MailboxManager --- .../mail/CassandraMailboxMapper.java | 28 +++++++++---------- .../mailbox/store/StoreMailboxManager.java | 4 +-- .../mailbox/store/mail/MailboxMapper.java | 6 +++- .../org/apache/james/util/ReactorUtils.java | 11 ++++++++ 4 files changed, 32 insertions(+), 17 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java index cb7e2b0eed8..eceec29a4b8 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java @@ -44,7 +44,7 @@ import org.apache.james.mailbox.model.UidValidity; import org.apache.james.mailbox.model.search.MailboxQuery; import org.apache.james.mailbox.store.mail.MailboxMapper; -import org.apache.james.util.FunctionalUtils; +import org.apache.james.util.ReactorUtils; import com.google.common.base.Preconditions; @@ -221,23 +221,23 @@ public Mono rename(Mailbox mailbox) { Preconditions.checkNotNull(mailbox.getMailboxId(), "A mailbox we want to rename should have a defined mailboxId"); CassandraId cassandraId = (CassandraId) mailbox.getMailboxId(); - return tryRename(mailbox, cassandraId) - .filter(FunctionalUtils.identityPredicate()) - .switchIfEmpty(Mono.error(() -> new MailboxExistsException(mailbox.generateAssociatedPath().asString()))) - .thenReturn(cassandraId); - } - - private Mono tryRename(Mailbox cassandraMailbox, CassandraId cassandraId) { return mailboxDAO.retrieveMailbox(cassandraId) - .flatMap(mailbox -> mailboxPathV3DAO.save(cassandraMailbox) - .filter(isCreated -> isCreated) - .flatMap(mailboxHasCreated -> deletePreviousMailboxPathReference(mailbox.generateAssociatedPath()) - .then(persistMailboxEntity(cassandraMailbox)) - .thenReturn(true)) - .defaultIfEmpty(false)) + .flatMap(storedMailbox -> rename(mailbox, storedMailbox.generateAssociatedPath())) .switchIfEmpty(Mono.error(() -> new MailboxNotFoundException(cassandraId))); } + @Override + public Mono rename(Mailbox cassandraMailbox, MailboxPath previousPath) { + Preconditions.checkNotNull(cassandraMailbox.getMailboxId(), "A mailbox we want to rename should have a defined mailboxId"); + CassandraId cassandraId = (CassandraId) cassandraMailbox.getMailboxId(); + + return mailboxPathV3DAO.save(cassandraMailbox) + .handle(ReactorUtils.raiseErrorIfFalse(() -> new MailboxExistsException(cassandraMailbox.generateAssociatedPath().asString()))) + .flatMap(applied -> deletePreviousMailboxPathReference(previousPath) + .then(persistMailboxEntity(cassandraMailbox))) + .thenReturn(cassandraId); + } + private Mono persistMailboxEntity(Mailbox cassandraMailbox) { return mailboxDAO.save(cassandraMailbox) .retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java index 475c1ba0036..44ce721635e 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java @@ -723,7 +723,7 @@ private Mono> doRenameMailbox(Mailbox mailbox, Mailbo .build() .asUserBound(); - return mapper.rename(mailbox) + return mapper.rename(mailbox, from) .map(mailboxId -> { resultBuilder.add(new MailboxRenamedResult(mailboxId, from, newMailboxPath)); return mailboxId; @@ -754,7 +754,7 @@ private Publisher renameSubMailboxes(MailboxPath newMailboxPath, MailboxMa MailboxPath fromPath = new MailboxPath(from, subOriginalName); sub.setName(subNewName); sub.setUser(newMailboxPath.getUser()); - return mapper.rename(sub) + return mapper.rename(sub, fromPath) .map(mailboxId -> { resultBuilder.add(new MailboxRenamedResult(sub.getMailboxId(), fromPath, sub.generateAssociatedPath())); return mailboxId; diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java index a88d6a10123..3412efe295f 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java @@ -50,8 +50,12 @@ public interface MailboxMapper extends Mapper { /** * Rename the given {@link Mailbox} to the underlying storage */ + default Mono rename(Mailbox mailbox, MailboxPath previousPath) { + return rename(mailbox); + } + Mono rename(Mailbox mailbox); - + /** * Delete the given {@link Mailbox} from the underlying storage */ diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java index 0f69d237cf8..4845ba437e9 100644 --- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java +++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java @@ -28,6 +28,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Stream; import org.reactivestreams.Publisher; @@ -107,6 +108,16 @@ public static BiConsumer, SynchronousSink> publishIfPresent() return (element, sink) -> element.ifPresent(sink::next); } + public static BiConsumer> raiseErrorIfFalse(Supplier exceptionSupplier) { + return (element, sink) -> { + if (!element) { + sink.error(exceptionSupplier.get()); + } else { + sink.next(element); + } + }; + } + public static InputStream toInputStream(Flux byteArrays) { return new StreamInputStream(byteArrays.toStream(DEFAULT_INPUT_STREAM_PREFETCH)); } From 46a425e81bf41b18f3c7dada9000cb18bae3edf1 Mon Sep 17 00:00:00 2001 From: Benoit TELLIER Date: Wed, 17 Jun 2026 00:22:05 +0200 Subject: [PATCH 07/12] [FIX] Enhance SolveMailboxInconsistencies to fix duplicated mailboxPathV3 registration and recover from failed mailboxPathV3 deletion upon rename --- .../mail/CassandraMailboxMapper.java | 8 +- .../mail/CassandraMailboxPathV3DAO.java | 21 ++++ .../SolveMailboxInconsistenciesService.java | 101 +++++++++++++++--- .../mail/CassandraMailboxMapperTest.java | 38 ++++++- ...olveMailboxInconsistenciesServiceTest.java | 39 +++++-- 5 files changed, 180 insertions(+), 27 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java index eceec29a4b8..b5fd64291e4 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java @@ -233,8 +233,12 @@ public Mono rename(Mailbox cassandraMailbox, MailboxPath previousPath return mailboxPathV3DAO.save(cassandraMailbox) .handle(ReactorUtils.raiseErrorIfFalse(() -> new MailboxExistsException(cassandraMailbox.generateAssociatedPath().asString()))) - .flatMap(applied -> deletePreviousMailboxPathReference(previousPath) - .then(persistMailboxEntity(cassandraMailbox))) + // Additive writes first (the new path reference and the projection), subtractive write + // last (drop the old path reference). At any crash point the mailbox thus stays + // reachable under its new path with a consistent projection; only a stale old path + // reference may linger (the delete is retried to absorb transient failures). + .flatMap(applied -> persistMailboxEntity(cassandraMailbox) + .then(deletePreviousMailboxPathReference(previousPath))) .thenReturn(cassandraId); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java index 31f198b54dc..266e9193d4a 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java @@ -64,6 +64,7 @@ public class CassandraMailboxPathV3DAO { private final PreparedStatement delete; private final PreparedStatement insert; private final PreparedStatement select; + private final PreparedStatement selectWriteTime; private final PreparedStatement selectUser; private final PreparedStatement selectAll; private final CqlSession session; @@ -77,6 +78,7 @@ public CassandraMailboxPathV3DAO(CqlSession session) { this.insert = prepareInsert(); this.delete = prepareDelete(); this.select = prepareSelect(); + this.selectWriteTime = prepareSelectWriteTime(); this.selectUser = prepareSelectUser(); this.selectAll = prepareSelectAll(); this.lwtProfile = JamesExecutionProfiles.getLWTProfile(session); @@ -112,6 +114,15 @@ private PreparedStatement prepareSelect() { .build()); } + private PreparedStatement prepareSelectWriteTime() { + return session.prepare(selectFrom(TABLE_NAME) + .writeTime(MAILBOX_ID) + .where(column(NAMESPACE).isEqualTo(bindMarker(NAMESPACE)), + column(USER).isEqualTo(bindMarker(USER)), + column(MAILBOX_NAME).isEqualTo(bindMarker(MAILBOX_NAME))) + .build()); + } + private PreparedStatement prepareSelectUser() { return session.prepare(selectFrom(TABLE_NAME) .columns(MAILBOX_ID, UIDVALIDITY, MAILBOX_NAME) @@ -142,6 +153,16 @@ public Mono retrieve(MailboxPath mailboxPath, JamesExecutionProfiles.Co .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> logGhostMailboxFailure(mailboxPath))); } + public Mono writeTime(MailboxPath mailboxPath) { + BoundStatement statement = selectWriteTime.bind() + .set(NAMESPACE, mailboxPath.getNamespace(), TypeCodecs.TEXT) + .set(USER, sanitizeUser(mailboxPath.getUser()), TypeCodecs.TEXT) + .set(MAILBOX_NAME, mailboxPath.getName(), TypeCodecs.TEXT); + + return cassandraAsyncExecutor.executeSingleRow(setExecutionProfileIfNeeded(statement, STRONG)) + .map(row -> row.getLong(0)); + } + public Flux listUserMailboxes(String namespace, Username user, JamesExecutionProfiles.ConsistencyChoice consistencyChoice) { BoundStatementBuilder statementBuilder = selectUser.boundStatementBuilder() .set(NAMESPACE, namespace, TypeCodecs.TEXT) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java index d1d5fcf4140..0a76fab0d9d 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java @@ -37,6 +37,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathV3DAO; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxId; +import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.task.Task; import org.apache.james.task.Task.Result; import org.slf4j.Logger; @@ -61,9 +62,7 @@ static Mono detectMailboxDaoInconsistency(Mailbox mailboxEntry, M return NO_INCONSISTENCY; } // Path entry references another mailbox. - return new ConflictingEntryInconsistency(ConflictingEntry.builder() - .mailboxDaoEntry(mailboxEntry) - .mailboxPathDaoEntry(mailboxByPath)); + return new ConflictingEntryInconsistency(mailboxEntry, mailboxByPath); }) .defaultIfEmpty(new OrphanMailboxDAOEntry(mailboxEntry)); } @@ -76,9 +75,7 @@ static Mono detectMailboxPathDaoInconsistency(Mailbox mailboxByPa return NO_INCONSISTENCY; } // Mailbox references another path - return new ConflictingEntryInconsistency(ConflictingEntry.builder() - .mailboxDaoEntry(mailboxById) - .mailboxPathDaoEntry(mailboxByPathEntry)); + return new ConflictingEntryInconsistency(mailboxById, mailboxByPathEntry); }) .defaultIfEmpty(new OrphanMailboxPathDAOEntry(mailboxByPathEntry)); } @@ -181,19 +178,87 @@ public Mono fix(Context context, CassandraMailboxDAO mailboxDAO, Cassand * See https://github.com/apache/james-project/blob/master/src/site/markdown/server/manage-webadmin.md#correcting-ghost-mailbox */ private static class ConflictingEntryInconsistency implements Inconsistency { - private final ConflictingEntry conflictingEntry; + private final Mailbox mailboxDaoEntry; + private final Mailbox mailboxPathEntry; - private ConflictingEntryInconsistency(ConflictingEntry conflictingEntry) { - this.conflictingEntry = conflictingEntry; + private ConflictingEntryInconsistency(Mailbox mailboxDaoEntry, Mailbox mailboxPathEntry) { + this.mailboxDaoEntry = mailboxDaoEntry; + this.mailboxPathEntry = mailboxPathEntry; } @Override public Mono fix(Context context, CassandraMailboxDAO mailboxDAO, CassandraMailboxPathV3DAO pathV3DAO) { + if (mailboxDaoEntry.getMailboxId().equals(mailboxPathEntry.getMailboxId())) { + // Same mailbox referenced by two different paths. This happens when a rename (or any + // partial path update) failed to drop the old path reference. As both registrations + // point to the same mailbox id, the messages (keyed by id) are safe regardless of the + // path we keep: we keep the most recently written path reference, realign the + // projection on it and drop the stale one. + return fixSameMailboxConflict(context, mailboxDAO, pathV3DAO); + } + + return reportConflict(context); + } + + // Auto-resolution is restricted to the case where BOTH the conflicting path entry and the + // projection's path are registered to the same mailbox id: only then are they two genuine + // aliases of a single mailbox, hence safe to deduplicate without data loss. If the + // projection's path is unregistered or held by another mailbox (e.g. a reference loop), we + // fall back to the conservative reporting so an admin can merge. + // + // As fixes are applied sequentially, we re-read the current state first: a previous fix may + // already have reconciled this mailbox, in which case there is nothing left to do. + private Mono fixSameMailboxConflict(Context context, CassandraMailboxDAO mailboxDAO, CassandraMailboxPathV3DAO pathV3DAO) { + CassandraId mailboxId = (CassandraId) mailboxPathEntry.getMailboxId(); + MailboxPath conflictingPath = mailboxPathEntry.generateAssociatedPath(); + + return pathV3DAO.retrieve(conflictingPath, STRONG) + .filter(stillRegistered -> stillRegistered.getMailboxId().equals(mailboxId)) + .flatMap(conflictingEntry -> mailboxDAO.retrieveMailbox(mailboxId) + .flatMap(currentProjection -> resolveSameMailboxConflict(context, mailboxDAO, pathV3DAO, currentProjection, conflictingEntry))) + // The conflicting path entry is gone (already reconciled): nothing left to do. + .switchIfEmpty(Mono.just(Result.COMPLETED)); + } + + private Mono resolveSameMailboxConflict(Context context, CassandraMailboxDAO mailboxDAO, CassandraMailboxPathV3DAO pathV3DAO, + Mailbox currentProjection, Mailbox conflictingEntry) { + MailboxPath projectionPath = currentProjection.generateAssociatedPath(); + MailboxPath conflictingPath = conflictingEntry.generateAssociatedPath(); + if (projectionPath.equals(conflictingPath)) { + // The projection now matches this path: no longer inconsistent. + return Mono.just(Result.COMPLETED); + } + return pathV3DAO.retrieve(projectionPath, STRONG) + .filter(projectionRegistration -> projectionRegistration.getMailboxId().equals(currentProjection.getMailboxId())) + .flatMap(projectionRegistration -> Mono.zip( + pathV3DAO.writeTime(conflictingPath), + pathV3DAO.writeTime(projectionPath)) + .flatMap(writeTimes -> { + boolean projectionWins = writeTimes.getT2() >= writeTimes.getT1(); + Mailbox winner = projectionWins ? currentProjection : conflictingEntry; + Mailbox loser = projectionWins ? conflictingEntry : currentProjection; + return mailboxDAO.save(winner) + .then(pathV3DAO.delete(loser.generateAssociatedPath())) + .then(Mono.fromRunnable(() -> { + LOGGER.info("Inconsistency fixed for mailbox {}: kept path {}, dropped stale path {}", + winner.getMailboxId().serialize(), + winner.generateAssociatedPath().asString(), + loser.generateAssociatedPath().asString()); + context.addFixedInconsistency(winner.getMailboxId()); + })) + .thenReturn(Result.COMPLETED); + })) + .switchIfEmpty(Mono.defer(() -> reportConflict(context))); + } + + private Mono reportConflict(Context context) { LOGGER.error("MailboxDAO contains mailbox {} {} which conflict with corresponding registration {} {}. " + "We recommend merging these mailboxes together to prevent mail data loss.", - conflictingEntry.getMailboxDaoEntry().getMailboxId(), conflictingEntry.getMailboxDaoEntry().getMailboxPath(), - conflictingEntry.getMailboxPathDaoEntry().getMailboxId(), conflictingEntry.getMailboxPathDaoEntry().getMailboxPath()); - context.addConflictingEntries(conflictingEntry); + mailboxDaoEntry.getMailboxId(), mailboxDaoEntry.generateAssociatedPath(), + mailboxPathEntry.getMailboxId(), mailboxPathEntry.generateAssociatedPath()); + context.addConflictingEntries(ConflictingEntry.builder() + .mailboxDaoEntry(mailboxDaoEntry) + .mailboxPathDaoEntry(mailboxPathEntry)); return Mono.just(Result.PARTIAL); } } @@ -413,8 +478,16 @@ private void assertValidVersion() { private Flux processMailboxPathDaoInconsistencies(Context context) { return mailboxPathV3DAO.listAll() .flatMap(this::detectMailboxPathDaoInconsistency, DEFAULT_CONCURRENCY) - .flatMap(inconsistency -> inconsistency.fix(context, mailboxDAO, mailboxPathV3DAO), DEFAULT_CONCURRENCY) - .doOnNext(any -> context.incrementProcessedMailboxPathEntries()); + .doOnNext(any -> context.incrementProcessedMailboxPathEntries()) + // Detect every inconsistency first, then fix them one at a time. Resolving a same-mailbox + // conflict may realign the projection, so a fully materialized detection set fixed + // sequentially prevents a fix from racing with the detection or resolution of a sibling + // path entry. Consistent entries are filtered out to keep the materialized set small; + // each fix re-confirms the inconsistency against the current state before acting. + .filter(inconsistency -> inconsistency != NO_INCONSISTENCY) + .collectList() + .flatMapMany(inconsistencies -> Flux.fromIterable(inconsistencies) + .concatMap(inconsistency -> inconsistency.fix(context, mailboxDAO, mailboxPathV3DAO))); } private Flux processMailboxDaoInconsistencies(Context context) { diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java index e13e452c7ee..c27ffd68b02 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java @@ -464,10 +464,14 @@ void renameThenFailToDeleteMailboxPathShouldBeConsistentWhenFindByInbox(Cassandr doQuietly(() -> testee.rename(inboxRenamed).block()); + // rename writes additively first (new path reference + projection) and removes the old + // path reference last. When that removal fails, the projection already reflects the + // rename, so the mailbox is consistent under its new identity; the old INBOX path + // reference merely lingers (the delete is retried to absorb transient failures). SoftAssertions.assertSoftly(Throwing.consumer(softly -> { softly(softly) .assertThat(testee.findMailboxById(inboxId).block()) - .isEqualTo(inbox); + .isEqualTo(inboxRenamed); softly(softly) .assertThat(testee.findMailboxByPath(inboxPath).block()) .isEqualTo(inbox); @@ -479,6 +483,38 @@ void renameThenFailToDeleteMailboxPathShouldBeConsistentWhenFindByInbox(Cassandr })); } + @Test + void renameThenFailToDeleteMailboxPathShouldExposeRenamedMailbox(CassandraCluster cassandra) { + Mailbox inbox = testee.create(inboxPath, UID_VALIDITY).block(); + CassandraId inboxId = (CassandraId) inbox.getMailboxId(); + Mailbox inboxRenamed = createInboxRenamedMailbox(inboxId); + + cassandra.getConf() + .registerScenario(fail() + .times(TRY_COUNT_BEFORE_FAILURE) + .whenQueryStartsWith("DELETE FROM mailboxpathv3 WHERE namespace=:namespace AND user=:user AND mailboxname=:mailboxname IF EXISTS")); + + doQuietly(() -> testee.rename(inboxRenamed).block()); + + // Counterpart of the previous test: even though the old path cleanup failed, the rename + // is fully observable under its new identity (by id and by renamed path). With the legacy + // ordering - projection written last, after the delete - the projection would still read + // INBOX here, contradicting the already committed renamed path reference. + SoftAssertions.assertSoftly(Throwing.consumer(softly -> { + softly(softly) + .assertThat(testee.findMailboxById(inboxId).block()) + .isEqualTo(inboxRenamed); + softly(softly) + .assertThat(testee.findMailboxByPath(inboxPathRenamed).block()) + .isEqualTo(inboxRenamed); + softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery) + .collectList().block()) + .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) + .assertThat(searchMailbox) + .isEqualTo(inboxRenamed)); + })); + } + @Disabled("JAMES-3056 returning two mailboxes with same name and id") @Test void renameThenFailToDeleteMailboxPathShouldBeConsistentWhenFindAll(CassandraCluster cassandra) { diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java index 2d3437a9a92..8179fc883e8 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java @@ -154,12 +154,13 @@ void fixMailboxInconsistenciesShouldReturnPartialWhenDAOMisMatchOnId() { } @Test - void fixMailboxInconsistenciesShouldReturnPartialWhenDAOMisMatchOnPath() { + void fixMailboxInconsistenciesShouldReturnCompletedWhenDAOMisMatchOnPath() { + // Same mailbox id referenced by two paths: auto-resolvable without data loss. mailboxDAO.save(MAILBOX).block(); mailboxPathV3DAO.save(MAILBOX_NEW_PATH).block(); assertThat(testee.fixMailboxInconsistencies(new Context()).block()) - .isEqualTo(Result.PARTIAL); + .isEqualTo(Result.COMPLETED); } @Test @@ -248,14 +249,14 @@ void fixMailboxInconsistenciesShouldUpdateContextWhenDAOMisMatchOnPath() { testee.fixMailboxInconsistencies(context).block(); + // The orphan mailbox pass re-registers the projection path, then the same-id conflict pass + // keeps the most recent path and drops the stale one: two fixes, no conflicting entry. assertThat(context.snapshot()) .isEqualTo(Context.builder() .processedMailboxEntries(1) .processedMailboxPathEntries(2) .addFixedInconsistencies(CASSANDRA_ID_1) - .addConflictingEntry(ConflictingEntry.builder() - .mailboxDaoEntry(MAILBOX) - .mailboxPathDaoEntry(MAILBOX_NEW_PATH)) + .addFixedInconsistencies(CASSANDRA_ID_1) .build() .snapshot()); } @@ -335,8 +336,8 @@ void fixMailboxInconsistenciesShouldNotAlterStateWhenLoop() { @Test void fixMailboxInconsistenciesShouldAlterStateWhenDaoMisMatchOnPath() { - // Note that CASSANDRA_ID_1 becomes usable - // However in order to avoid data loss, merging CASSANDRA_ID_1 and CASSANDRA_ID_2 is still required + // Same mailbox id referenced by two paths (a partial rename leftover): the solver keeps the + // most recently written path and drops the stale one, leaving a single consistent registration. mailboxDAO.save(MAILBOX).block(); mailboxPathV3DAO.save(MAILBOX_NEW_PATH).block(); @@ -346,9 +347,27 @@ void fixMailboxInconsistenciesShouldAlterStateWhenDaoMisMatchOnPath() { softly.assertThat(mailboxDAO.retrieveAllMailboxes().collectList().block()) .containsExactlyInAnyOrder(MAILBOX); softly.assertThat(mailboxPathV3DAO.listAll().collectList().block()) - .containsExactlyInAnyOrder( - MAILBOX_NEW_PATH, - MAILBOX); + .containsExactlyInAnyOrder(MAILBOX); + }); + } + + @Test + void fixMailboxInconsistenciesShouldKeepMostRecentPathWhenSameMailboxRegisteredUnderTwoPaths() { + // Real partial rename leftover: the projection points to the new path, and both the old and + // the new path are still registered to the same id. The most recently written path (the + // rename target) wins and the stale old path reference is dropped. + mailboxDAO.save(MAILBOX_NEW_PATH).block(); + mailboxPathV3DAO.save(MAILBOX).block(); + mailboxPathV3DAO.save(MAILBOX_NEW_PATH).block(); + + Result result = testee.fixMailboxInconsistencies(new Context()).block(); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(result).isEqualTo(Result.COMPLETED); + softly.assertThat(mailboxDAO.retrieveAllMailboxes().collectList().block()) + .containsExactlyInAnyOrder(MAILBOX_NEW_PATH); + softly.assertThat(mailboxPathV3DAO.listAll().collectList().block()) + .containsExactlyInAnyOrder(MAILBOX_NEW_PATH); }); } From 05fba16e34d7a055c5ca924f68d8aa1c07eeca85 Mon Sep 17 00:00:00 2001 From: Benoit TELLIER Date: Wed, 17 Jun 2026 09:24:21 +0200 Subject: [PATCH 08/12] [FIX] Folder rename: upfront read of the mailbox and its children --- .../mailbox/store/StoreMailboxManager.java | 101 +++++++++--------- 1 file changed, 48 insertions(+), 53 deletions(-) diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java index 44ce721635e..4f6912cb015 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java @@ -24,6 +24,7 @@ import java.time.Clock; import java.time.Duration; +import java.util.Comparator; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -618,13 +619,31 @@ public Mono> renameMailboxReactive(MailboxPath from, MailboxSession fromSession, MailboxSession toSession) { LOGGER.debug("renameMailbox {} to {}", from, to); MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(fromSession); - - Mono fromMailboxPublisher = assertCanDeleteWhenRename(fromSession, from) - .then(mapper.findMailboxByPath(from) - .switchIfEmpty(Mono.error(() -> new MailboxNotFoundException(from)))); + Mono> mailboxesToBeRenamed = retrieveSubMailboxes(from, fromSession, mapper); return sanitizedMailboxPath(to, toSession) - .flatMap(sanitizedPath -> processRename(fromMailboxPublisher, sanitizedPath, option, fromSession, toSession)); + .flatMap(sanitizedPath -> processRename(mailboxesToBeRenamed, sanitizedPath, option, fromSession, toSession)); + } + + private Mono> retrieveSubMailboxes(MailboxPath from, MailboxSession fromSession, MailboxMapper mapper) { + MailboxQuery.UserBound query = MailboxQuery.builder() + .userAndNamespaceFrom(from) + .expression(new PrefixedWildcard(from.getName())) + .build() + .asUserBound(); + + return assertCanDeleteWhenRename(fromSession, from) + .then(mapper.findMailboxWithPathLike(query) + .filter(mailbox -> mailbox.generateAssociatedPath().getHierarchyLevels(fromSession.getPathDelimiter()).contains(from)) + .sort(Comparator.comparing(mailbox -> mailbox.generateAssociatedPath().getHierarchyLevels(fromSession.getPathDelimiter()).size())) + .collectList() + .handle((mailboxes, sink) -> { + if (mailboxes.stream().map(mailbox -> mailbox.generateAssociatedPath()).anyMatch(from::equals)) { + sink.next(mailboxes); + } else { + sink.error(new MailboxNotFoundException(from)); + } + })); } private Mono> renameSubscriptionsIfNeeded(List renamedResults, @@ -666,9 +685,8 @@ public Mono> renameMailboxReactive(MailboxId mailboxI LOGGER.debug("renameMailbox {} to {}", mailboxId, newMailboxPath); MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(session); - Mono fromMailboxPublisher = mapper.findMailboxById(mailboxId) - .flatMap(mailbox -> assertCanDeleteWhenRename(session, mailbox.generateAssociatedPath()) - .thenReturn(mailbox)) + Mono> fromMailboxPublisher = mapper.findMailboxById(mailboxId) + .flatMap(from -> retrieveSubMailboxes(from.generateAssociatedPath(), session, mapper)) .switchIfEmpty(Mono.error(() -> new MailboxNotFoundException(mailboxId))); return sanitizedMailboxPath(newMailboxPath, session) @@ -689,47 +707,28 @@ private Mono sanitizedMailboxPath(MailboxPath mailboxPath, MailboxS .flatMap(newMailboxPath -> Mono.fromCallable(() -> newMailboxPath.assertAcceptable(session.getPathDelimiter()))); } - private Mono> processRename(Mono fromMailboxPublisher, MailboxPath to, RenameOption option, + private Mono> processRename(Mono> fromMailboxPublisher, MailboxPath to, RenameOption option, MailboxSession fromSession, MailboxSession toSession) { MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(fromSession); return mapper.executeReactive(fromMailboxPublisher - .flatMap(mailbox -> doRenameMailbox(mailbox, to, fromSession, toSession, mapper) - .doOnEach(ReactorUtils.logFinally(() -> AuditTrail.entry() + .flatMap(mailboxes -> doRenameMailboxes(mailboxes, to, fromSession, toSession, mapper) + .doOnNext(result -> AuditTrail.entry() .username(() -> fromSession.getUser().asString()) .sessionId(() -> String.valueOf(fromSession.getSessionId().getValue())) .protocol("mailbox") .action("rename") - .parameters(Throwing.supplier(() -> ImmutableMap.of("mailboxId", mailbox.getMailboxId().serialize(), - "fromMailboxPath", mailbox.generateAssociatedPath().asString(), - "toMailboxPath", to.asString()))) - .log("Mailbox Rename"))) + .parameters(Throwing.supplier(() -> ImmutableMap.of("mailboxId", result.getMailboxId().serialize(), + "fromMailboxPath", result.getOriginPath().asString(), + "toMailboxPath", result.getDestinationPath().asString()))) + .log("Mailbox Rename")) + .collectList() .flatMap(renamedResults -> renameSubscriptionsIfNeeded(renamedResults, option, fromSession, toSession)))); } - private Mono> doRenameMailbox(Mailbox mailbox, MailboxPath newMailboxPath, MailboxSession fromSession, MailboxSession toSession, MailboxMapper mapper) { - // TODO put this into a serilizable transaction - - ImmutableList.Builder resultBuilder = ImmutableList.builder(); - - MailboxPath from = mailbox.generateAssociatedPath(); - mailbox.setNamespace(newMailboxPath.getNamespace()); - mailbox.setUser(newMailboxPath.getUser()); - mailbox.setName(newMailboxPath.getName()); - // Find submailboxes - MailboxQuery.UserBound query = MailboxQuery.builder() - .userAndNamespaceFrom(from) - .expression(new PrefixedWildcard(from.getName() + fromSession.getPathDelimiter())) - .build() - .asUserBound(); - - return mapper.rename(mailbox, from) - .map(mailboxId -> { - resultBuilder.add(new MailboxRenamedResult(mailboxId, from, newMailboxPath)); - return mailboxId; - }) - .then(Mono.from(renameSubMailboxes(newMailboxPath, mapper, from, query, resultBuilder))) - .then(Mono.defer(() -> Flux.fromIterable(resultBuilder.build()) + private Flux doRenameMailboxes(List mailboxes, MailboxPath newMailboxPath, MailboxSession fromSession, MailboxSession toSession, MailboxMapper mapper) { + return renameSubMailboxes(newMailboxPath, mapper, mailboxes.get(0).generateAssociatedPath(), mailboxes) + .flatMapIterable(results -> results) .concatMap(result -> eventBus.dispatch(EventFactory.mailboxRenamed() .randomEventId() .mailboxSession(fromSession) @@ -737,17 +736,16 @@ private Mono> doRenameMailbox(Mailbox mailbox, Mailbo .oldPath(result.getOriginPath()) .newPath(result.getDestinationPath()) .build(), - new MailboxIdRegistrationKey(result.getMailboxId()))) - .then())) - .then(Mono.fromCallable(resultBuilder::build)); + new MailboxIdRegistrationKey(result.getMailboxId())) + .thenReturn(result)); } - private Publisher renameSubMailboxes(MailboxPath newMailboxPath, MailboxMapper mapper, - MailboxPath from, MailboxQuery.UserBound query, ImmutableList.Builder resultBuilder) { - if (DefaultMailboxes.INBOX.equalsIgnoreCase(from.getName())) { - return Mono.empty(); - } - return locker.executeReactiveWithLockReactive(from, mapper.findMailboxWithPathLike(query) + private Mono> renameSubMailboxes(MailboxPath newMailboxPath, MailboxMapper mapper, + MailboxPath from, List mailboxes) { + // Renaming INBOX renames the INBOX mailbox itself but leaves its sub-mailboxes in place. + // The list is sorted shallowest-first, so the first entry is the renamed root. + List mailboxesToRename = DefaultMailboxes.INBOX.equalsIgnoreCase(from.getName()) ? mailboxes.subList(0, 1) : mailboxes; + return Mono.from(locker.executeReactiveWithLockReactive(from, Flux.fromIterable(mailboxesToRename) .concatMap(sub -> { String subOriginalName = sub.getName(); String subNewName = newMailboxPath.getName() + subOriginalName.substring(from.getName().length()); @@ -755,14 +753,11 @@ private Publisher renameSubMailboxes(MailboxPath newMailboxPath, MailboxMa sub.setName(subNewName); sub.setUser(newMailboxPath.getUser()); return mapper.rename(sub, fromPath) - .map(mailboxId -> { - resultBuilder.add(new MailboxRenamedResult(sub.getMailboxId(), fromPath, sub.generateAssociatedPath())); - return mailboxId; - }) + .map(mailboxId -> new MailboxRenamedResult(sub.getMailboxId(), fromPath, sub.generateAssociatedPath())) .retryWhen(Retry.backoff(5, Duration.ofMillis(10))) - .then(Mono.fromRunnable(() -> LOGGER.debug("Rename mailbox sub-mailbox {} to {}", subOriginalName, subNewName))); + .doOnSuccess(any -> LOGGER.debug("Rename mailbox sub-mailbox {} to {}", subOriginalName, subNewName)); }, LOW_CONCURRENCY) - .then(), MailboxPathLocker.LockType.Write); + .collectList(), MailboxPathLocker.LockType.Write)); } @Override From 3480d0226d5583ff32bcc9b57562040de74f8526 Mon Sep 17 00:00:00 2001 From: Benoit TELLIER Date: Wed, 17 Jun 2026 09:58:05 +0200 Subject: [PATCH 09/12] [FIX] Cassandra folder rename: use SERIAL for initial picture Improve overall atomicity Please note that we do not address TOCTOU races. We also do not need serial for destination mailbix as it is gated by an idempotent DELETE...IF NOT EXIST --- .../mailbox/cassandra/CassandraMailboxManager.java | 13 +++++++++++++ .../cassandra/mail/CassandraMailboxMapper.java | 13 +++++-------- .../james/mailbox/store/StoreMailboxManager.java | 6 +++++- 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxManager.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxManager.java index fe1dac1827c..51ee7225a96 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxManager.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxManager.java @@ -24,24 +24,29 @@ import jakarta.inject.Inject; +import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles; import org.apache.james.events.EventBus; import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxPathLocker; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.SessionProvider; +import org.apache.james.mailbox.cassandra.mail.CassandraMailboxMapper; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MessageId; +import org.apache.james.mailbox.model.search.MailboxQuery; import org.apache.james.mailbox.store.MailboxManagerConfiguration; import org.apache.james.mailbox.store.PreDeletionHooks; import org.apache.james.mailbox.store.StoreMailboxAnnotationManager; import org.apache.james.mailbox.store.StoreMailboxManager; import org.apache.james.mailbox.store.StoreMessageManager; import org.apache.james.mailbox.store.StoreRightManager; +import org.apache.james.mailbox.store.mail.MailboxMapper; import org.apache.james.mailbox.store.mail.ThreadIdGuessingAlgorithm; import org.apache.james.mailbox.store.mail.model.impl.MessageParser; import org.apache.james.mailbox.store.quota.QuotaComponents; import org.apache.james.mailbox.store.search.MessageSearchIndex; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -118,4 +123,12 @@ protected StoreMessageManager createMessageManager(Mailbox mailboxRow, MailboxSe public Mono manageProcessing(Mono toBeWrapped, MailboxSession mailboxSession) { return toBeWrapped; } + + @Override + protected Flux getMailboxWithPathLikeUponRename(MailboxMapper mapper, MailboxQuery.UserBound query) { + if (mapper instanceof CassandraMailboxMapper cassandraMailboxMapper) { + return cassandraMailboxMapper.findMailboxWithPathLike(query, JamesExecutionProfiles.ConsistencyChoice.STRONG); + } + return super.getMailboxWithPathLikeUponRename(mapper, query); + } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java index b5fd64291e4..125d142781a 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java @@ -192,18 +192,15 @@ private Mailbox addAcl(MailboxACL acl, Mailbox mailbox) { @Override public Flux findMailboxWithPathLike(MailboxQuery.UserBound query) { - String fixedNamespace = query.getFixedNamespace(); - Username fixedUser = query.getFixedUser(); + return findMailboxWithPathLike(query, consistencyChoice()); + } - return performReadRepair(listMailboxes(fixedNamespace, fixedUser)) + public Flux findMailboxWithPathLike(MailboxQuery.UserBound query, JamesExecutionProfiles.ConsistencyChoice consistencyChoice) { + return performReadRepair(mailboxPathV3DAO.listUserMailboxes(query.getFixedNamespace(), query.getFixedUser(), consistencyChoice)) .filter(mailbox -> query.isPathMatch(mailbox.generateAssociatedPath())) .flatMap(this::addAcl, CONCURRENCY); } - private Flux listMailboxes(String fixedNamespace, Username fixedUser) { - return mailboxPathV3DAO.listUserMailboxes(fixedNamespace, fixedUser, consistencyChoice()); - } - @Override public Mono create(MailboxPath mailboxPath, UidValidity uidValidity) { CassandraId cassandraId = CassandraId.timeBased(); @@ -254,7 +251,7 @@ private Mono deletePreviousMailboxPathReference(MailboxPath mailboxPath) { @Override public Mono hasChildren(Mailbox mailbox, char delimiter) { - return performReadRepair(listMailboxes(mailbox.getNamespace(), mailbox.getUser())) + return performReadRepair(mailboxPathV3DAO.listUserMailboxes(mailbox.getNamespace(), mailbox.getUser(), consistencyChoice())) .filter(idAndPath -> isPathChildOfMailbox(idAndPath, mailbox, delimiter)) .hasElements(); } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java index 4f6912cb015..64a66db2de4 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java @@ -633,7 +633,7 @@ private Mono> retrieveSubMailboxes(MailboxPath from, MailboxSessio .asUserBound(); return assertCanDeleteWhenRename(fromSession, from) - .then(mapper.findMailboxWithPathLike(query) + .then(getMailboxWithPathLikeUponRename(mapper, query) .filter(mailbox -> mailbox.generateAssociatedPath().getHierarchyLevels(fromSession.getPathDelimiter()).contains(from)) .sort(Comparator.comparing(mailbox -> mailbox.generateAssociatedPath().getHierarchyLevels(fromSession.getPathDelimiter()).size())) .collectList() @@ -646,6 +646,10 @@ private Mono> retrieveSubMailboxes(MailboxPath from, MailboxSessio })); } + protected Flux getMailboxWithPathLikeUponRename(MailboxMapper mapper, MailboxQuery.UserBound query) { + return mapper.findMailboxWithPathLike(query); + } + private Mono> renameSubscriptionsIfNeeded(List renamedResults, RenameOption option, MailboxSession fromSession, MailboxSession toSession) { if (option == RenameOption.RENAME_SUBSCRIPTIONS) { From 2902e5f04f64687f4b6c961c648cc32d348fb1d4 Mon Sep 17 00:00:00 2001 From: Benoit TELLIER Date: Wed, 17 Jun 2026 10:44:24 +0200 Subject: [PATCH 10/12] [FIX] SolveMailboxInconsistencies: several runs until convergence --- .../SolveMailboxInconsistenciesService.java | 56 +++++++++++++++++++ .../task/SolveMailboxInconsistenciesTask.java | 22 ++++++-- ...istenciesTaskAdditionalInformationDTO.java | 19 ++++++- .../SolveMailboxInconsistenciesTaskDTO.java | 20 ++++++- ...xInconsistenciesTaskSerializationTest.java | 11 ++-- ...veMailboxInconsistenciesRequestToTask.java | 7 ++- ...ilboxInconsistenciesRequestToTaskTest.java | 2 +- 7 files changed, 121 insertions(+), 16 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java index 0a76fab0d9d..d7f71b20425 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java @@ -263,6 +263,36 @@ private Mono reportConflict(Context context) { } } + public static class RunningOptions { + public static final int DEFAULT_MAX_ITERATIONS = 1; + public static final RunningOptions DEFAULT = new RunningOptions(DEFAULT_MAX_ITERATIONS); + + private final int maxIterations; + + public RunningOptions(int maxIterations) { + Preconditions.checkArgument(maxIterations >= 1, "'maxIterations' must be strictly positive"); + this.maxIterations = maxIterations; + } + + public int getMaxIterations() { + return maxIterations; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof RunningOptions) { + RunningOptions that = (RunningOptions) o; + return this.maxIterations == that.maxIterations; + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(maxIterations); + } + } + public static class Context { static class Builder { private Optional processedMailboxEntries; @@ -457,7 +487,33 @@ Snapshot snapshot() { } public Mono fixMailboxInconsistencies(Context context) { + return fixMailboxInconsistencies(context, RunningOptions.DEFAULT); + } + + public Mono fixMailboxInconsistencies(Context context, RunningOptions runningOptions) { assertValidVersion(); + return fixUntilStable(context, runningOptions.getMaxIterations()); + } + + // Reconciliation is run to a fixpoint: fixing an inconsistency in one pass (dropping a stale + // path, merging a ghost mailbox...) can surface a new inconsistency that only a subsequent pass + // detects. We re-run as long as a pass keeps applying fixes, bounded by maxIterations to guard + // against oscillation. As fixes only ever grow the fixedInconsistencies list, a pass that adds + // none means we reached a stable state. + private Mono fixUntilStable(Context context, int remainingIterations) { + int fixedBefore = context.snapshot().getFixedInconsistencies().size(); + return runOnePass(context) + .flatMap(result -> { + boolean appliedFixes = context.snapshot().getFixedInconsistencies().size() > fixedBefore; + if (appliedFixes && remainingIterations > 1) { + return fixUntilStable(context, remainingIterations - 1) + .map(nextResult -> Task.combine(result, nextResult)); + } + return Mono.just(result); + }); + } + + private Mono runOnePass(Context context) { return Flux.concat( processMailboxDaoInconsistencies(context), processMailboxPathDaoInconsistencies(context)) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTask.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTask.java index 8c4d679384d..6e34f7e250f 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTask.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTask.java @@ -23,6 +23,7 @@ import java.time.Instant; import java.util.Optional; +import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesService.RunningOptions; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.task.Task; import org.apache.james.task.TaskExecutionDetails; @@ -44,15 +45,17 @@ public static class Details implements TaskExecutionDetails.AdditionalInformatio private final ImmutableList fixedInconsistencies; private final ImmutableList conflictingEntries; private final long errors; + private final RunningOptions runningOptions; Details(Instant instant, long processedMailboxEntries, long processedMailboxPathEntries, ImmutableList fixedInconsistencies, - ImmutableList conflictingEntries, long errors) { + ImmutableList conflictingEntries, long errors, RunningOptions runningOptions) { this.instant = instant; this.processedMailboxEntries = processedMailboxEntries; this.processedMailboxPathEntries = processedMailboxPathEntries; this.fixedInconsistencies = fixedInconsistencies; this.conflictingEntries = conflictingEntries; this.errors = errors; + this.runningOptions = runningOptions; } @Override @@ -79,18 +82,24 @@ ImmutableList getConflictingEntries() { long getErrors() { return errors; } + + public RunningOptions getRunningOptions() { + return runningOptions; + } } private final SolveMailboxInconsistenciesService service; + private final RunningOptions runningOptions; - public SolveMailboxInconsistenciesTask(SolveMailboxInconsistenciesService service) { + public SolveMailboxInconsistenciesTask(SolveMailboxInconsistenciesService service, RunningOptions runningOptions) { this.service = service; + this.runningOptions = runningOptions; this.context = new SolveMailboxInconsistenciesService.Context(); } @Override public Result run() { - return service.fixMailboxInconsistencies(context) + return service.fixMailboxInconsistencies(context, runningOptions) .block(); } @@ -99,6 +108,10 @@ public TaskType type() { return SOLVE_MAILBOX_INCONSISTENCIES; } + public RunningOptions getRunningOptions() { + return runningOptions; + } + @Override public Optional details() { SolveMailboxInconsistenciesService.Context.Snapshot snapshot = context.snapshot(); @@ -109,6 +122,7 @@ public Optional details() { .map(MailboxId::serialize) .collect(ImmutableList.toImmutableList()), snapshot.getConflictingEntries(), - snapshot.getErrors())); + snapshot.getErrors(), + runningOptions)); } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskAdditionalInformationDTO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskAdditionalInformationDTO.java index 5811a68fcf5..e484e540d3f 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskAdditionalInformationDTO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskAdditionalInformationDTO.java @@ -20,8 +20,10 @@ package org.apache.james.mailbox.cassandra.mail.task; import java.time.Instant; +import java.util.Optional; import org.apache.james.json.DTOModule; +import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesService.RunningOptions; import org.apache.james.server.task.json.dto.AdditionalInformationDTO; import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule; @@ -37,7 +39,8 @@ private static SolveMailboxInconsistenciesTaskAdditionalInformationDTO fromDomai details.getFixedInconsistencies(), details.getConflictingEntries(), details.getErrors(), - details.timestamp()); + details.timestamp(), + Optional.of(SolveMailboxInconsistenciesRunningOptionsDTO.asDTO(details.getRunningOptions()))); } public static AdditionalInformationDTOModule module() { @@ -57,6 +60,7 @@ public static AdditionalInformationDTOModule conflictingEntries; private final long errors; private final Instant timestamp; + private final Optional runningOptions; public SolveMailboxInconsistenciesTaskAdditionalInformationDTO(@JsonProperty("type") String type, @JsonProperty("processedMailboxEntries") long processedMailboxEntries, @@ -64,7 +68,8 @@ public SolveMailboxInconsistenciesTaskAdditionalInformationDTO(@JsonProperty("ty @JsonProperty("fixedInconsistencies") ImmutableList fixedInconsistencies, @JsonProperty("conflictingEntries") ImmutableList conflictingEntries, @JsonProperty("errors") long errors, - @JsonProperty("timestamp") Instant timestamp) { + @JsonProperty("timestamp") Instant timestamp, + @JsonProperty("runningOptions") Optional runningOptions) { this.type = type; this.processedMailboxEntries = processedMailboxEntries; this.timestamp = timestamp; @@ -72,6 +77,7 @@ public SolveMailboxInconsistenciesTaskAdditionalInformationDTO(@JsonProperty("ty this.fixedInconsistencies = fixedInconsistencies; this.conflictingEntries = conflictingEntries; this.errors = errors; + this.runningOptions = runningOptions; } public long getProcessedMailboxEntries() { @@ -94,6 +100,10 @@ public long getErrors() { return errors; } + public Optional getRunningOptions() { + return runningOptions; + } + @Override public Instant getTimestamp() { return timestamp; @@ -110,6 +120,9 @@ private SolveMailboxInconsistenciesTask.Details toDomainObject() { processedMailboxPathEntries, fixedInconsistencies, conflictingEntries, - errors); + errors, + runningOptions + .map(SolveMailboxInconsistenciesRunningOptionsDTO::asDomainObject) + .orElse(RunningOptions.DEFAULT)); } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskDTO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskDTO.java index aa22280989f..265d1a3030f 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskDTO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskDTO.java @@ -18,7 +18,10 @@ ****************************************************************/ package org.apache.james.mailbox.cassandra.mail.task; +import java.util.Optional; + import org.apache.james.json.DTOModule; +import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesService.RunningOptions; import org.apache.james.server.task.json.dto.TaskDTO; import org.apache.james.server.task.json.dto.TaskDTOModule; @@ -26,7 +29,8 @@ public class SolveMailboxInconsistenciesTaskDTO implements TaskDTO { private static SolveMailboxInconsistenciesTaskDTO toDTO(SolveMailboxInconsistenciesTask domainObject, String typeName) { - return new SolveMailboxInconsistenciesTaskDTO(typeName); + return new SolveMailboxInconsistenciesTaskDTO(typeName, + Optional.of(SolveMailboxInconsistenciesRunningOptionsDTO.asDTO(domainObject.getRunningOptions()))); } public static TaskDTOModule module(SolveMailboxInconsistenciesService service) { @@ -40,17 +44,27 @@ public static TaskDTOModule runningOptions; - public SolveMailboxInconsistenciesTaskDTO(@JsonProperty("type") String type) { + public SolveMailboxInconsistenciesTaskDTO(@JsonProperty("type") String type, + @JsonProperty("runningOptions") Optional runningOptions) { this.type = type; + this.runningOptions = runningOptions; } private SolveMailboxInconsistenciesTask toDomainObject(SolveMailboxInconsistenciesService service) { - return new SolveMailboxInconsistenciesTask(service); + return new SolveMailboxInconsistenciesTask(service, + runningOptions + .map(SolveMailboxInconsistenciesRunningOptionsDTO::asDomainObject) + .orElse(RunningOptions.DEFAULT)); } @Override public String getType() { return type; } + + public Optional getRunningOptions() { + return runningOptions; + } } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskSerializationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskSerializationTest.java index e44d3474ad3..85d25369eb0 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskSerializationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskSerializationTest.java @@ -41,14 +41,16 @@ class SolveMailboxInconsistenciesTaskSerializationTest { public static final String MAILBOX_ID_AS_STRING = "464765a0-e4e7-11e4-aba4-710c1de3782b"; private static final CassandraId MAILBOX_ID = CassandraId.of(UUID.fromString(MAILBOX_ID_AS_STRING)); + private static final SolveMailboxInconsistenciesService.RunningOptions RUNNING_OPTIONS = new SolveMailboxInconsistenciesService.RunningOptions(3); + private static final SolveMailboxInconsistenciesService SERVICE = mock(SolveMailboxInconsistenciesService.class); - private static final SolveMailboxInconsistenciesTask TASK = new SolveMailboxInconsistenciesTask(SERVICE); - private static final String SERIALIZED_TASK = "{\"type\": \"solve-mailbox-inconsistencies\"}"; + private static final SolveMailboxInconsistenciesTask TASK = new SolveMailboxInconsistenciesTask(SERVICE, RUNNING_OPTIONS); + private static final String SERIALIZED_TASK = "{\"type\": \"solve-mailbox-inconsistencies\", \"runningOptions\":{\"maxIterations\":3}}"; private static final ConflictingEntry CONFLICTING_ENTRY = ConflictingEntry.builder() .mailboxDaoEntry(MAILBOX_PATH, MAILBOX_ID) .mailboxPathDaoEntry(MAILBOX_PATH_2, MAILBOX_ID); private static final ImmutableList CONFLICTING_ENTRIES = ImmutableList.of(CONFLICTING_ENTRY); - private static final SolveMailboxInconsistenciesTask.Details DETAILS = new SolveMailboxInconsistenciesTask.Details(TIMESTAMP, 0, 1, ImmutableList.of(MAILBOX_ID_AS_STRING), CONFLICTING_ENTRIES, 3); + private static final SolveMailboxInconsistenciesTask.Details DETAILS = new SolveMailboxInconsistenciesTask.Details(TIMESTAMP, 0, 1, ImmutableList.of(MAILBOX_ID_AS_STRING), CONFLICTING_ENTRIES, 3, RUNNING_OPTIONS); private static final String SERIALIZED_ADDITIONAL_INFORMATION = "{" + " \"type\":\"solve-mailbox-inconsistencies\"," + " \"processedMailboxEntries\":0," + @@ -65,7 +67,8 @@ class SolveMailboxInconsistenciesTaskSerializationTest { " }" + " }]," + " \"errors\":3," + - " \"timestamp\":\"2018-11-13T12:00:55Z\"" + + " \"timestamp\":\"2018-11-13T12:00:55Z\"," + + " \"runningOptions\":{\"maxIterations\":3}" + "}"; @Test diff --git a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java index 0df2783b142..6ec5928b9cb 100644 --- a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java +++ b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java @@ -22,6 +22,7 @@ import jakarta.inject.Inject; import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesService; +import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesService.RunningOptions; import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesTask; import org.apache.james.webadmin.tasks.TaskFromRequestRegistry; import org.apache.james.webadmin.tasks.TaskRegistrationKey; @@ -42,7 +43,11 @@ public SolveMailboxInconsistenciesRequestToTask(SolveMailboxInconsistenciesServi "`ALL-SERVICES-ARE-OFFLINE` in order to prevent accidental calls. " + "Check the documentation for details."); - return new SolveMailboxInconsistenciesTask(service); + RunningOptions runningOptions = RunningOptionsParser.intQueryParameter(request, "maxIterations") + .map(RunningOptions::new) + .orElse(RunningOptions.DEFAULT); + + return new SolveMailboxInconsistenciesTask(service, runningOptions); }); } } diff --git a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTaskTest.java b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTaskTest.java index b0a3cc7b898..5e34d34fe3e 100644 --- a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTaskTest.java +++ b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTaskTest.java @@ -86,7 +86,7 @@ void setUp() { taskManager = new MemoryTaskManager(new Hostname("foo")); service = mock(SolveMailboxInconsistenciesService.class); - Mockito.when(service.fixMailboxInconsistencies(any())).thenReturn(Mono.just(Task.Result.COMPLETED)); + Mockito.when(service.fixMailboxInconsistencies(any(), any())).thenReturn(Mono.just(Task.Result.COMPLETED)); webAdminServer = WebAdminUtils.createWebAdminServer( new TasksRoutes(taskManager, jsonTransformer, DTOConverter.of(SolveMessageInconsistenciesTaskAdditionalInformationDTO.module())), From 95a6e87b9555bcb2f448b878fc7420546873570a Mon Sep 17 00:00:00 2001 From: Benoit TELLIER Date: Wed, 17 Jun 2026 14:42:10 +0200 Subject: [PATCH 11/12] [ENHANCEMENT] Add a autoMerge mode (default to false) to SolveMailboxInconsistencies --- .../mail/task/MailboxMergingTaskRunner.java | 8 +- ...ilboxInconsistenciesRunningOptionsDTO.java | 59 ++++++++ .../SolveMailboxInconsistenciesService.java | 134 ++++++++++++++---- ...olveMailboxInconsistenciesServiceTest.java | 57 +++++++- ...xInconsistenciesTaskSerializationTest.java | 6 +- ...veMailboxInconsistenciesRequestToTask.java | 12 +- 6 files changed, 241 insertions(+), 35 deletions(-) create mode 100644 mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesRunningOptionsDTO.java diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java index 20d8e28054e..916075b4a34 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java @@ -68,11 +68,15 @@ public MailboxMergingTaskRunner(MailboxManager mailboxManager, StoreMessageIdMan } public Task.Result run(CassandraId oldMailboxId, CassandraId newMailboxId, MailboxMergingTask.Context context) { - return moveMessages(oldMailboxId, newMailboxId, mailboxSession, context) - .flatMap(onMoveCompleteOperations(oldMailboxId, newMailboxId)) + return runReactive(oldMailboxId, newMailboxId, context) .block(); } + public Mono runReactive(CassandraId oldMailboxId, CassandraId newMailboxId, MailboxMergingTask.Context context) { + return moveMessages(oldMailboxId, newMailboxId, mailboxSession, context) + .flatMap(onMoveCompleteOperations(oldMailboxId, newMailboxId)); + } + private Function> onMoveCompleteOperations(CassandraId oldMailboxId, CassandraId newMailboxId) { return result -> { if (result == Task.Result.COMPLETED) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesRunningOptionsDTO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesRunningOptionsDTO.java new file mode 100644 index 00000000000..b53ab20fdc1 --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesRunningOptionsDTO.java @@ -0,0 +1,59 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail.task; + +import java.util.Optional; + +import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesService.RunningOptions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class SolveMailboxInconsistenciesRunningOptionsDTO { + public static SolveMailboxInconsistenciesRunningOptionsDTO asDTO(RunningOptions domainObject) { + return new SolveMailboxInconsistenciesRunningOptionsDTO( + Optional.of(domainObject.getMaxIterations()), + Optional.of(domainObject.isAutoMerge())); + } + + private final Optional maxIterations; + private final Optional autoMerge; + + @JsonCreator + public SolveMailboxInconsistenciesRunningOptionsDTO(@JsonProperty("maxIterations") Optional maxIterations, + @JsonProperty("autoMerge") Optional autoMerge) { + this.maxIterations = maxIterations; + this.autoMerge = autoMerge; + } + + public Optional getMaxIterations() { + return maxIterations; + } + + public Optional getAutoMerge() { + return autoMerge; + } + + public RunningOptions asDomainObject() { + return new RunningOptions( + maxIterations.orElse(RunningOptions.DEFAULT_MAX_ITERATIONS), + autoMerge.orElse(RunningOptions.DEFAULT_AUTO_MERGE)); + } +} diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java index d7f71b20425..e345faa0d70 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java @@ -56,26 +56,39 @@ public class SolveMailboxInconsistenciesService { @FunctionalInterface public interface Inconsistency { static Mono detectMailboxDaoInconsistency(Mailbox mailboxEntry, Mono pathEntry) { + // Read-repair entry point: never auto-merges (no merging runner available, and read paths + // must not trigger destructive ghost merges). + return detectMailboxDaoInconsistency(mailboxEntry, pathEntry, null, false); + } + + static Mono detectMailboxDaoInconsistency(Mailbox mailboxEntry, Mono pathEntry, + MailboxMergingTaskRunner mergingRunner, boolean autoMerge) { return pathEntry .map(mailboxByPath -> { if (mailboxByPath.getMailboxId().equals(mailboxEntry.getMailboxId())) { return NO_INCONSISTENCY; } // Path entry references another mailbox. - return new ConflictingEntryInconsistency(mailboxEntry, mailboxByPath); + return new ConflictingEntryInconsistency(mailboxEntry, mailboxByPath, mergingRunner, autoMerge); }) .defaultIfEmpty(new OrphanMailboxDAOEntry(mailboxEntry)); } static Mono detectMailboxPathDaoInconsistency(Mailbox mailboxByPathEntry, Mono mailboxEntry) { + // Read-repair entry point: never auto-merges (this site only ever yields same-id conflicts anyway). + return detectMailboxPathDaoInconsistency(mailboxByPathEntry, mailboxEntry, null, false); + } + + static Mono detectMailboxPathDaoInconsistency(Mailbox mailboxByPathEntry, Mono mailboxEntry, + MailboxMergingTaskRunner mergingRunner, boolean autoMerge) { return mailboxEntry .map(mailboxById -> { if (mailboxByPathEntry.generateAssociatedPath().equals(mailboxById.generateAssociatedPath())) { return NO_INCONSISTENCY; } // Mailbox references another path - return new ConflictingEntryInconsistency(mailboxById, mailboxByPathEntry); + return new ConflictingEntryInconsistency(mailboxById, mailboxByPathEntry, mergingRunner, autoMerge); }) .defaultIfEmpty(new OrphanMailboxPathDAOEntry(mailboxByPathEntry)); } @@ -180,10 +193,15 @@ public Mono fix(Context context, CassandraMailboxDAO mailboxDAO, Cassand private static class ConflictingEntryInconsistency implements Inconsistency { private final Mailbox mailboxDaoEntry; private final Mailbox mailboxPathEntry; + private final MailboxMergingTaskRunner mergingRunner; + private final boolean autoMerge; - private ConflictingEntryInconsistency(Mailbox mailboxDaoEntry, Mailbox mailboxPathEntry) { + private ConflictingEntryInconsistency(Mailbox mailboxDaoEntry, Mailbox mailboxPathEntry, + MailboxMergingTaskRunner mergingRunner, boolean autoMerge) { this.mailboxDaoEntry = mailboxDaoEntry; this.mailboxPathEntry = mailboxPathEntry; + this.mergingRunner = mergingRunner; + this.autoMerge = autoMerge; } @Override @@ -197,9 +215,59 @@ public Mono fix(Context context, CassandraMailboxDAO mailboxDAO, Cassand return fixSameMailboxConflict(context, mailboxDAO, pathV3DAO); } + if (autoMerge) { + return autoMergeConflict(context, mailboxDAO, pathV3DAO); + } return reportConflict(context); } + // Two *different* mailboxes resolve to the same path (the historical "ghost mailbox"): the + // path table registers the winner, while the loser is a projection entry squatting that path. + // The path table being the source of truth, the registered mailbox wins and the squatting + // projection is merged into it (its messages and rights are moved over, then its projection + // row is dropped) using the same machinery as MailboxMergingTask. + // + // We only merge once re-reading (STRONG) confirms the clean-ghost picture: the winner still + // owns the path, the loser still resolves to it, and -- crucially -- the path table vouches + // for the loser *nowhere*. If the loser owns another path, it is a genuine mailbox with its + // own (same-id) conflict to resolve first, so we report rather than destroy it. + private Mono autoMergeConflict(Context context, CassandraMailboxDAO mailboxDAO, CassandraMailboxPathV3DAO pathV3DAO) { + CassandraId winnerId = (CassandraId) mailboxPathEntry.getMailboxId(); + CassandraId loserId = (CassandraId) mailboxDaoEntry.getMailboxId(); + MailboxPath conflictingPath = mailboxPathEntry.generateAssociatedPath(); + + Mono pathStillOwnedByWinner = pathV3DAO.retrieve(conflictingPath, STRONG) + .map(entry -> entry.getMailboxId().equals(winnerId)) + .defaultIfEmpty(false); + Mono loserStillResolvesToPath = mailboxDAO.retrieveMailbox(loserId) + .map(projection -> projection.generateAssociatedPath().equals(conflictingPath)) + .defaultIfEmpty(false); + Mono loserIsUnregistered = pathV3DAO.listUserMailboxes(mailboxDaoEntry.getNamespace(), mailboxDaoEntry.getUser(), STRONG) + .filter(entry -> entry.getMailboxId().equals(loserId)) + .hasElements() + .map(referenced -> !referenced); + + return Mono.zip(pathStillOwnedByWinner, loserStillResolvesToPath, loserIsUnregistered) + .flatMap(state -> { + boolean cleanGhost = state.getT1() && state.getT2() && state.getT3(); + if (!cleanGhost) { + // State no longer matches the clean-ghost picture (already reconciled, or the + // loser owns another path): do not merge. + if (state.getT1() && state.getT2()) { + // Still conflicting but the loser is registered elsewhere: leave it to an admin. + return reportConflict(context); + } + return Mono.just(Result.COMPLETED); + } + return mergingRunner.runReactive(loserId, winnerId, new MailboxMergingTask.Context(0)) + .doOnNext(result -> { + LOGGER.info("Auto-merged ghost mailbox {} into {} at path {}", + loserId.serialize(), winnerId.serialize(), conflictingPath.asString()); + context.addFixedInconsistency(winnerId); + }); + }); + } + // Auto-resolution is restricted to the case where BOTH the conflicting path entry and the // projection's path are registered to the same mailbox id: only then are they two genuine // aliases of a single mailbox, hence safe to deduplicate without data loss. If the @@ -265,31 +333,39 @@ private Mono reportConflict(Context context) { public static class RunningOptions { public static final int DEFAULT_MAX_ITERATIONS = 1; - public static final RunningOptions DEFAULT = new RunningOptions(DEFAULT_MAX_ITERATIONS); + public static final boolean DEFAULT_AUTO_MERGE = false; + public static final RunningOptions DEFAULT = new RunningOptions(DEFAULT_MAX_ITERATIONS, DEFAULT_AUTO_MERGE); private final int maxIterations; + private final boolean autoMerge; - public RunningOptions(int maxIterations) { + public RunningOptions(int maxIterations, boolean autoMerge) { Preconditions.checkArgument(maxIterations >= 1, "'maxIterations' must be strictly positive"); this.maxIterations = maxIterations; + this.autoMerge = autoMerge; } public int getMaxIterations() { return maxIterations; } + public boolean isAutoMerge() { + return autoMerge; + } + @Override public final boolean equals(Object o) { if (o instanceof RunningOptions) { RunningOptions that = (RunningOptions) o; - return this.maxIterations == that.maxIterations; + return this.maxIterations == that.maxIterations + && this.autoMerge == that.autoMerge; } return false; } @Override public final int hashCode() { - return Objects.hash(maxIterations); + return Objects.hash(maxIterations, autoMerge); } } @@ -477,13 +553,15 @@ Snapshot snapshot() { private final CassandraMailboxDAO mailboxDAO; private final CassandraMailboxPathV3DAO mailboxPathV3DAO; private final CassandraSchemaVersionManager versionManager; + private final MailboxMergingTaskRunner mergingRunner; @Inject SolveMailboxInconsistenciesService(CassandraMailboxDAO mailboxDAO, CassandraMailboxPathV3DAO mailboxPathV3DAO, - CassandraSchemaVersionManager versionManager) { + CassandraSchemaVersionManager versionManager, MailboxMergingTaskRunner mergingRunner) { this.mailboxDAO = mailboxDAO; this.mailboxPathV3DAO = mailboxPathV3DAO; this.versionManager = versionManager; + this.mergingRunner = mergingRunner; } public Mono fixMailboxInconsistencies(Context context) { @@ -492,7 +570,7 @@ public Mono fixMailboxInconsistencies(Context context) { public Mono fixMailboxInconsistencies(Context context, RunningOptions runningOptions) { assertValidVersion(); - return fixUntilStable(context, runningOptions.getMaxIterations()); + return fixUntilStable(context, runningOptions.getMaxIterations(), runningOptions.isAutoMerge()); } // Reconciliation is run to a fixpoint: fixing an inconsistency in one pass (dropping a stale @@ -500,23 +578,23 @@ public Mono fixMailboxInconsistencies(Context context, RunningOptions ru // detects. We re-run as long as a pass keeps applying fixes, bounded by maxIterations to guard // against oscillation. As fixes only ever grow the fixedInconsistencies list, a pass that adds // none means we reached a stable state. - private Mono fixUntilStable(Context context, int remainingIterations) { + private Mono fixUntilStable(Context context, int remainingIterations, boolean autoMerge) { int fixedBefore = context.snapshot().getFixedInconsistencies().size(); - return runOnePass(context) + return runOnePass(context, autoMerge) .flatMap(result -> { boolean appliedFixes = context.snapshot().getFixedInconsistencies().size() > fixedBefore; if (appliedFixes && remainingIterations > 1) { - return fixUntilStable(context, remainingIterations - 1) + return fixUntilStable(context, remainingIterations - 1, autoMerge) .map(nextResult -> Task.combine(result, nextResult)); } return Mono.just(result); }); } - private Mono runOnePass(Context context) { + private Mono runOnePass(Context context, boolean autoMerge) { return Flux.concat( - processMailboxDaoInconsistencies(context), - processMailboxPathDaoInconsistencies(context)) + processMailboxDaoInconsistencies(context, autoMerge), + processMailboxPathDaoInconsistencies(context, autoMerge)) .reduce(Result.COMPLETED, Task::combine); } @@ -531,9 +609,9 @@ private void assertValidVersion() { version.getValue()); } - private Flux processMailboxPathDaoInconsistencies(Context context) { + private Flux processMailboxPathDaoInconsistencies(Context context, boolean autoMerge) { return mailboxPathV3DAO.listAll() - .flatMap(this::detectMailboxPathDaoInconsistency, DEFAULT_CONCURRENCY) + .flatMap(entry -> detectMailboxPathDaoInconsistency(entry, autoMerge), DEFAULT_CONCURRENCY) .doOnNext(any -> context.incrementProcessedMailboxPathEntries()) // Detect every inconsistency first, then fix them one at a time. Resolving a same-mailbox // conflict may realign the projection, so a fully materialized detection set fixed @@ -546,22 +624,28 @@ private Flux processMailboxPathDaoInconsistencies(Context context) { .concatMap(inconsistency -> inconsistency.fix(context, mailboxDAO, mailboxPathV3DAO))); } - private Flux processMailboxDaoInconsistencies(Context context) { + private Flux processMailboxDaoInconsistencies(Context context, boolean autoMerge) { return mailboxDAO.retrieveAllMailboxes() - .flatMap(this::detectMailboxDaoInconsistency, DEFAULT_CONCURRENCY) - .flatMap(inconsistency -> inconsistency.fix(context, mailboxDAO, mailboxPathV3DAO), DEFAULT_CONCURRENCY) - .doOnNext(any -> context.incrementProcessedMailboxEntries()); + .flatMap(entry -> detectMailboxDaoInconsistency(entry, autoMerge), DEFAULT_CONCURRENCY) + .doOnNext(any -> context.incrementProcessedMailboxEntries()) + // Detect every inconsistency first, then fix them one at a time. Auto-merging a ghost + // mailbox mutates message and projection state, so a fully materialized detection set + // fixed sequentially prevents a fix from racing with the detection of a sibling entry. + .filter(inconsistency -> inconsistency != NO_INCONSISTENCY) + .collectList() + .flatMapMany(inconsistencies -> Flux.fromIterable(inconsistencies) + .concatMap(inconsistency -> inconsistency.fix(context, mailboxDAO, mailboxPathV3DAO))); } - private Mono detectMailboxDaoInconsistency(Mailbox mailboxEntry) { + private Mono detectMailboxDaoInconsistency(Mailbox mailboxEntry, boolean autoMerge) { Mono pathEntry = mailboxPathV3DAO.retrieve(mailboxEntry.generateAssociatedPath(), STRONG); - return Inconsistency.detectMailboxDaoInconsistency(mailboxEntry, pathEntry); + return Inconsistency.detectMailboxDaoInconsistency(mailboxEntry, pathEntry, mergingRunner, autoMerge); } - private Mono detectMailboxPathDaoInconsistency(Mailbox mailboxByPathEntry) { + private Mono detectMailboxPathDaoInconsistency(Mailbox mailboxByPathEntry, boolean autoMerge) { CassandraId cassandraId = (CassandraId) mailboxByPathEntry.getMailboxId(); Mono mailboxEntry = mailboxDAO.retrieveMailbox(cassandraId); - return Inconsistency.detectMailboxPathDaoInconsistency(mailboxByPathEntry, mailboxEntry); + return Inconsistency.detectMailboxPathDaoInconsistency(mailboxByPathEntry, mailboxEntry, mergingRunner, autoMerge); } } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java index 8179fc883e8..ea92010ccc7 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java @@ -23,6 +23,12 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; @@ -70,6 +76,7 @@ class SolveMailboxInconsistenciesServiceTest { CassandraMailboxDAO mailboxDAO; CassandraMailboxPathV3DAO mailboxPathV3DAO; CassandraSchemaVersionDAO versionDAO; + MailboxMergingTaskRunner mergingRunner; SolveMailboxInconsistenciesService testee; @BeforeEach @@ -80,7 +87,8 @@ void setUp(CassandraCluster cassandra) { mailboxPathV3DAO = new CassandraMailboxPathV3DAO( cassandra.getConf()); versionDAO = new CassandraSchemaVersionDAO(cassandra.getConf()); - testee = new SolveMailboxInconsistenciesService(mailboxDAO, mailboxPathV3DAO, new CassandraSchemaVersionManager(versionDAO)); + mergingRunner = mock(MailboxMergingTaskRunner.class); + testee = new SolveMailboxInconsistenciesService(mailboxDAO, mailboxPathV3DAO, new CassandraSchemaVersionManager(versionDAO), mergingRunner); versionDAO.updateVersion(new SchemaVersion(8)).block(); } @@ -420,4 +428,51 @@ void fixMailboxInconsistenciesShouldNotAlterStateWhenTwoEntriesWithSamePath() { .containsExactlyInAnyOrder(MAILBOX_2); }); } + + @Test + void fixMailboxInconsistenciesShouldAutoMergeGhostMailboxWhenAutoMergeEnabled() { + // Two different mailboxes resolve to the same path "abc": CASSANDRA_ID_2 owns the path (path + // table = source of truth), CASSANDRA_ID_1 is a ghost projection squatting it with no path + // registration of its own. With autoMerge on, the ghost is merged into the path owner. + Mailbox pathOwnerProjection = new Mailbox(MAILBOX_PATH, UID_VALIDITY_2, CASSANDRA_ID_2); + mailboxDAO.save(MAILBOX).block(); + mailboxDAO.save(pathOwnerProjection).block(); + mailboxPathV3DAO.save(MAILBOX_2).block(); + + // The merging runner moves messages then drops the loser's projection row: simulate that side effect. + when(mergingRunner.runReactive(eq(CASSANDRA_ID_1), eq(CASSANDRA_ID_2), any())) + .thenReturn(mailboxDAO.delete(CASSANDRA_ID_1).thenReturn(Result.COMPLETED)); + + Context context = new Context(); + testee.fixMailboxInconsistencies(context, new SolveMailboxInconsistenciesService.RunningOptions(1, true)).block(); + + SoftAssertions.assertSoftly(softly -> { + verify(mergingRunner).runReactive(eq(CASSANDRA_ID_1), eq(CASSANDRA_ID_2), any()); + softly.assertThat(mailboxDAO.retrieveAllMailboxes().collectList().block()) + .containsExactlyInAnyOrder(pathOwnerProjection); + softly.assertThat(mailboxPathV3DAO.listAll().collectList().block()) + .containsExactlyInAnyOrder(MAILBOX_2); + softly.assertThat(context.snapshot().getFixedInconsistencies()) + .containsExactly(CASSANDRA_ID_2); + }); + } + + @Test + void fixMailboxInconsistenciesShouldNotAutoMergeWhenLoserIsRegisteredElsewhere() { + // CASSANDRA_ID_1 squats path "abc" (owned by CASSANDRA_ID_2) but also legitimately owns path + // "xyz": it is not a clean ghost, so the path table vouches for it and it must not be merged away. + Mailbox pathOwnerProjection = new Mailbox(MAILBOX_PATH, UID_VALIDITY_2, CASSANDRA_ID_2); + Mailbox loserOtherPath = new Mailbox(NEW_MAILBOX_PATH, UID_VALIDITY_1, CASSANDRA_ID_1); + mailboxDAO.save(MAILBOX).block(); + mailboxDAO.save(pathOwnerProjection).block(); + mailboxPathV3DAO.save(MAILBOX_2).block(); + mailboxPathV3DAO.save(loserOtherPath).block(); + + testee.fixMailboxInconsistencies(new Context(), new SolveMailboxInconsistenciesService.RunningOptions(1, true)).block(); + + verify(mergingRunner, never()).runReactive(any(), any(), any()); + // The ghost projection is preserved (not destroyed by an auto-merge). + assertThat(mailboxDAO.retrieveAllMailboxes().collectList().block()) + .contains(MAILBOX); + } } \ No newline at end of file diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskSerializationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskSerializationTest.java index 85d25369eb0..97fec53ee58 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskSerializationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskSerializationTest.java @@ -41,11 +41,11 @@ class SolveMailboxInconsistenciesTaskSerializationTest { public static final String MAILBOX_ID_AS_STRING = "464765a0-e4e7-11e4-aba4-710c1de3782b"; private static final CassandraId MAILBOX_ID = CassandraId.of(UUID.fromString(MAILBOX_ID_AS_STRING)); - private static final SolveMailboxInconsistenciesService.RunningOptions RUNNING_OPTIONS = new SolveMailboxInconsistenciesService.RunningOptions(3); + private static final SolveMailboxInconsistenciesService.RunningOptions RUNNING_OPTIONS = new SolveMailboxInconsistenciesService.RunningOptions(3, true); private static final SolveMailboxInconsistenciesService SERVICE = mock(SolveMailboxInconsistenciesService.class); private static final SolveMailboxInconsistenciesTask TASK = new SolveMailboxInconsistenciesTask(SERVICE, RUNNING_OPTIONS); - private static final String SERIALIZED_TASK = "{\"type\": \"solve-mailbox-inconsistencies\", \"runningOptions\":{\"maxIterations\":3}}"; + private static final String SERIALIZED_TASK = "{\"type\": \"solve-mailbox-inconsistencies\", \"runningOptions\":{\"maxIterations\":3,\"autoMerge\":true}}"; private static final ConflictingEntry CONFLICTING_ENTRY = ConflictingEntry.builder() .mailboxDaoEntry(MAILBOX_PATH, MAILBOX_ID) .mailboxPathDaoEntry(MAILBOX_PATH_2, MAILBOX_ID); @@ -68,7 +68,7 @@ class SolveMailboxInconsistenciesTaskSerializationTest { " }]," + " \"errors\":3," + " \"timestamp\":\"2018-11-13T12:00:55Z\"," + - " \"runningOptions\":{\"maxIterations\":3}" + + " \"runningOptions\":{\"maxIterations\":3,\"autoMerge\":true}" + "}"; @Test diff --git a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java index 6ec5928b9cb..988e342e25d 100644 --- a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java +++ b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java @@ -19,6 +19,8 @@ package org.apache.james.webadmin.routes; +import java.util.Optional; + import jakarta.inject.Inject; import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesService; @@ -43,11 +45,13 @@ public SolveMailboxInconsistenciesRequestToTask(SolveMailboxInconsistenciesServi "`ALL-SERVICES-ARE-OFFLINE` in order to prevent accidental calls. " + "Check the documentation for details."); - RunningOptions runningOptions = RunningOptionsParser.intQueryParameter(request, "maxIterations") - .map(RunningOptions::new) - .orElse(RunningOptions.DEFAULT); + int maxIterations = RunningOptionsParser.intQueryParameter(request, "maxIterations") + .orElse(RunningOptions.DEFAULT_MAX_ITERATIONS); + boolean autoMerge = Optional.ofNullable(request.queryParams("autoMerge")) + .map(Boolean::parseBoolean) + .orElse(RunningOptions.DEFAULT_AUTO_MERGE); - return new SolveMailboxInconsistenciesTask(service, runningOptions); + return new SolveMailboxInconsistenciesTask(service, new RunningOptions(maxIterations, autoMerge)); }); } } From b0d6c043270947c13619db919dbec031bd9690c7 Mon Sep 17 00:00:00 2001 From: Benoit TELLIER Date: Wed, 17 Jun 2026 15:02:17 +0200 Subject: [PATCH 12/12] [ENHANCEMENT] have SolveMailboxInconsistencies address all possible failures --- .../webadmin/admin-mailboxes-extend.adoc | 40 ++++++++++++++++--- .../SolveMailboxInconsistenciesService.java | 32 +++++++++++---- ...olveMailboxInconsistenciesServiceTest.java | 21 ++++++---- src/site/markdown/server/manage-webadmin.md | 32 +++++++++++++-- 4 files changed, 102 insertions(+), 23 deletions(-) diff --git a/docs/modules/servers/pages/distributed/operate/webadmin/admin-mailboxes-extend.adoc b/docs/modules/servers/pages/distributed/operate/webadmin/admin-mailboxes-extend.adoc index ecdb8b2006a..a1dabd72909 100644 --- a/docs/modules/servers/pages/distributed/operate/webadmin/admin-mailboxes-extend.adoc +++ b/docs/modules/servers/pages/distributed/operate/webadmin/admin-mailboxes-extend.adoc @@ -13,6 +13,32 @@ a task]. The `I-KNOW-WHAT-I-M-DOING` header is mandatory (you can read more information about it in the warning section below). +Optional query parameters: + +* `maxIterations` strictly positive integer, defaults to `1`. +Reconciliation is run up to a fixpoint: fixing an inconsistency in one +pass (dropping a stale path, merging a ghost mailbox...) can surface a +new inconsistency that only a subsequent pass detects. The task re-runs +as long as a pass keeps applying fixes, bounded by `maxIterations` to +guard against oscillation. A value of `1` keeps the historical +single-pass behaviour. +* `autoMerge` boolean, defaults to `false`. When `true`, conflicting +entries where two *different* mailboxes resolve to the same path (the +historical "ghost mailbox") are resolved automatically: the mailbox +registered in the path table (the source of truth) is kept, and the +squatting one is merged into it (its messages and rights are moved over, +then its projection is dropped), reusing the +link:#_correcting_ghost_mailbox[ghost mailbox] merging machinery. This +is *destructive* (mail data is moved), only attempted once a strong read +confirms the loser is a genuine ghost not registered anywhere else, and +left `false` by default so that an admin explicitly opts in. Combine it +with `maxIterations > 1` so that any residual left by a merge converges +within the same task run. + +.... +curl -XPOST '/mailboxes?task=SolveInconsistencies&maxIterations=5&autoMerge=true' +.... + The scheduled task will have the following type `solve-mailbox-inconsistencies` and the following `additionalInformation`: @@ -33,14 +59,18 @@ The scheduled task will have the following type "mailboxPath":"#private:user:mailboxName2", "mailboxId":"464765a0-e4e7-11e4-aba4-710c1de3782b" } - }] + }], + "runningOptions":{ + "maxIterations": 5, + "autoMerge": true + } } .... -Note that conflicting entry inconsistencies will not be fixed and will -require to explicitly use link:#_correcting_ghost_mailbox[ghost mailbox] -endpoint in order to merge the conflicting mailboxes and prevent any -message loss. +Note that, unless `autoMerge` is enabled, conflicting entry +inconsistencies will not be fixed and will require to explicitly use +link:#_correcting_ghost_mailbox[ghost mailbox] endpoint in order to +merge the conflicting mailboxes and prevent any message loss. *WARNING*: this task can cancel concurrently running legitimate user operations upon dirty read. As such this task should be run offline. diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java index e345faa0d70..9061e2d8181 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java @@ -251,12 +251,11 @@ private Mono autoMergeConflict(Context context, CassandraMailboxDAO mail .flatMap(state -> { boolean cleanGhost = state.getT1() && state.getT2() && state.getT3(); if (!cleanGhost) { - // State no longer matches the clean-ghost picture (already reconciled, or the - // loser owns another path): do not merge. - if (state.getT1() && state.getT2()) { - // Still conflicting but the loser is registered elsewhere: leave it to an admin. - return reportConflict(context); - } + // State no longer matches the clean-ghost picture: either already reconciled, + // or the loser owns another path. In the latter case the loser is a genuine + // mailbox whose stale projection gets realigned onto its registered path by the + // same-mailbox conflict resolution, making this ghost disappear without a merge. + // We therefore leave it to that resolution rather than destroying or reporting it. return Mono.just(Result.COMPLETED); } return mergingRunner.runReactive(loserId, winnerId, new MailboxMergingTask.Context(0)) @@ -316,7 +315,26 @@ private Mono resolveSameMailboxConflict(Context context, CassandraMailbo })) .thenReturn(Result.COMPLETED); })) - .switchIfEmpty(Mono.defer(() -> reportConflict(context))); + // The projection points to a path it does not own (unregistered, or held by another + // mailbox), while the conflicting path *is* registered to this mailbox: the projection + // is simply stale. Realign it onto the path the source of truth vouches for. + .switchIfEmpty(Mono.defer(() -> realignStaleProjection(context, mailboxDAO, currentProjection, conflictingEntry))); + } + + private Mono realignStaleProjection(Context context, CassandraMailboxDAO mailboxDAO, + Mailbox currentProjection, Mailbox conflictingEntry) { + MailboxPath stalePath = currentProjection.generateAssociatedPath(); + Mailbox realigned = new Mailbox(conflictingEntry.generateAssociatedPath(), + currentProjection.getUidValidity(), currentProjection.getMailboxId()); + return mailboxDAO.save(realigned) + .then(Mono.fromRunnable(() -> { + LOGGER.info("Inconsistency fixed for mailbox {}: realigned stale projection from path {} to registered path {}", + realigned.getMailboxId().serialize(), + stalePath.asString(), + realigned.generateAssociatedPath().asString()); + context.addFixedInconsistency(realigned.getMailboxId()); + })) + .thenReturn(Result.COMPLETED); } private Mono reportConflict(Context context) { diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java index ea92010ccc7..15125954116 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java @@ -458,9 +458,11 @@ void fixMailboxInconsistenciesShouldAutoMergeGhostMailboxWhenAutoMergeEnabled() } @Test - void fixMailboxInconsistenciesShouldNotAutoMergeWhenLoserIsRegisteredElsewhere() { + void fixMailboxInconsistenciesShouldRealignLoserInsteadOfMergingWhenLoserIsRegisteredElsewhere() { // CASSANDRA_ID_1 squats path "abc" (owned by CASSANDRA_ID_2) but also legitimately owns path - // "xyz": it is not a clean ghost, so the path table vouches for it and it must not be merged away. + // "xyz": it is not a clean ghost. Rather than being merged away, its stale projection is + // realigned onto its registered path "xyz", which makes the ghost on "abc" disappear without + // any merge. The whole picture becomes consistent within a single run. Mailbox pathOwnerProjection = new Mailbox(MAILBOX_PATH, UID_VALIDITY_2, CASSANDRA_ID_2); Mailbox loserOtherPath = new Mailbox(NEW_MAILBOX_PATH, UID_VALIDITY_1, CASSANDRA_ID_1); mailboxDAO.save(MAILBOX).block(); @@ -468,11 +470,16 @@ void fixMailboxInconsistenciesShouldNotAutoMergeWhenLoserIsRegisteredElsewhere() mailboxPathV3DAO.save(MAILBOX_2).block(); mailboxPathV3DAO.save(loserOtherPath).block(); - testee.fixMailboxInconsistencies(new Context(), new SolveMailboxInconsistenciesService.RunningOptions(1, true)).block(); + Context context = new Context(); + testee.fixMailboxInconsistencies(context, new SolveMailboxInconsistenciesService.RunningOptions(1, true)).block(); - verify(mergingRunner, never()).runReactive(any(), any(), any()); - // The ghost projection is preserved (not destroyed by an auto-merge). - assertThat(mailboxDAO.retrieveAllMailboxes().collectList().block()) - .contains(MAILBOX); + SoftAssertions.assertSoftly(softly -> { + verify(mergingRunner, never()).runReactive(any(), any(), any()); + softly.assertThat(mailboxDAO.retrieveAllMailboxes().collectList().block()) + .containsExactlyInAnyOrder(pathOwnerProjection, loserOtherPath); + softly.assertThat(mailboxPathV3DAO.listAll().collectList().block()) + .containsExactlyInAnyOrder(MAILBOX_2, loserOtherPath); + softly.assertThat(context.snapshot().getConflictingEntries()).isEmpty(); + }); } } \ No newline at end of file diff --git a/src/site/markdown/server/manage-webadmin.md b/src/site/markdown/server/manage-webadmin.md index e338ebc313e..24773c9063d 100644 --- a/src/site/markdown/server/manage-webadmin.md +++ b/src/site/markdown/server/manage-webadmin.md @@ -849,6 +849,26 @@ Will schedule a task for fixing inconsistencies for the mailbox deduplicated obj The `I-KNOW-WHAT-I-M-DOING` header is mandatory (you can read more information about it in the warning section below). +Optional query parameters: + + - `maxIterations` strictly positive integer, defaults to `1`. Reconciliation is run up to a fixpoint: + fixing an inconsistency in one pass (dropping a stale path, merging a ghost mailbox...) can surface a + new inconsistency that only a subsequent pass detects. The task re-runs as long as a pass keeps applying + fixes, bounded by `maxIterations` to guard against oscillation. A value of `1` keeps the historical + single-pass behaviour. + - `autoMerge` boolean, defaults to `false`. When `true`, conflicting entries where two **different** + mailboxes resolve to the same path (the historical "ghost mailbox") are resolved automatically: the + mailbox registered in the path table (the source of truth) is kept, and the squatting one is merged into + it (its messages and rights are moved over, then its projection is dropped), reusing the + [ghost mailbox](#correcting-ghost-mailbox) merging machinery. This is **destructive** (mail data is + moved), only attempted once a strong read confirms the loser is a genuine ghost not registered anywhere + else, and left `false` by default so that an admin explicitly opts in. Combine it with `maxIterations > 1` + so that any residual left by a merge converges within the same task run. + +``` +curl -XPOST '/mailboxes?task=SolveInconsistencies&maxIterations=5&autoMerge=true' +``` + The scheduled task will have the following type `solve-mailbox-inconsistencies` and the following `additionalInformation`: ``` @@ -867,13 +887,17 @@ The scheduled task will have the following type `solve-mailbox-inconsistencies` "mailboxPath":"#private:user:mailboxName2", "mailboxId":"464765a0-e4e7-11e4-aba4-710c1de3782b" } - }] + }], + "runningOptions":{ + "maxIterations": 5, + "autoMerge": true + } } ``` -Note that conflicting entry inconsistencies will not be fixed and will require to explicitly use -[ghost mailbox](#correcting-ghost-mailbox) endpoint in order to merge the conflicting mailboxes and prevent any message -loss. +Note that, unless `autoMerge` is enabled, conflicting entry inconsistencies will not be fixed and will +require to explicitly use [ghost mailbox](#correcting-ghost-mailbox) endpoint in order to merge the +conflicting mailboxes and prevent any message loss. **WARNING**: this task can cancel concurrently running legitimate user operations upon dirty read. As such this task should be run offline.