diff --git a/docs/modules/servers/pages/distributed/operate/webadmin/admin-mailboxes-extend.adoc b/docs/modules/servers/pages/distributed/operate/webadmin/admin-mailboxes-extend.adoc index ecdb8b2006a..a1dabd72909 100644 --- a/docs/modules/servers/pages/distributed/operate/webadmin/admin-mailboxes-extend.adoc +++ b/docs/modules/servers/pages/distributed/operate/webadmin/admin-mailboxes-extend.adoc @@ -13,6 +13,32 @@ a task]. The `I-KNOW-WHAT-I-M-DOING` header is mandatory (you can read more information about it in the warning section below). +Optional query parameters: + +* `maxIterations` strictly positive integer, defaults to `1`. +Reconciliation is run up to a fixpoint: fixing an inconsistency in one +pass (dropping a stale path, merging a ghost mailbox...) can surface a +new inconsistency that only a subsequent pass detects. The task re-runs +as long as a pass keeps applying fixes, bounded by `maxIterations` to +guard against oscillation. A value of `1` keeps the historical +single-pass behaviour. +* `autoMerge` boolean, defaults to `false`. When `true`, conflicting +entries where two *different* mailboxes resolve to the same path (the +historical "ghost mailbox") are resolved automatically: the mailbox +registered in the path table (the source of truth) is kept, and the +squatting one is merged into it (its messages and rights are moved over, +then its projection is dropped), reusing the +link:#_correcting_ghost_mailbox[ghost mailbox] merging machinery. This +is *destructive* (mail data is moved), only attempted once a strong read +confirms the loser is a genuine ghost not registered anywhere else, and +left `false` by default so that an admin explicitly opts in. Combine it +with `maxIterations > 1` so that any residual left by a merge converges +within the same task run. + +.... +curl -XPOST '/mailboxes?task=SolveInconsistencies&maxIterations=5&autoMerge=true' +.... + The scheduled task will have the following type `solve-mailbox-inconsistencies` and the following `additionalInformation`: @@ -33,14 +59,18 @@ The scheduled task will have the following type "mailboxPath":"#private:user:mailboxName2", "mailboxId":"464765a0-e4e7-11e4-aba4-710c1de3782b" } - }] + }], + "runningOptions":{ + "maxIterations": 5, + "autoMerge": true + } } .... -Note that conflicting entry inconsistencies will not be fixed and will -require to explicitly use link:#_correcting_ghost_mailbox[ghost mailbox] -endpoint in order to merge the conflicting mailboxes and prevent any -message loss. +Note that, unless `autoMerge` is enabled, conflicting entry +inconsistencies will not be fixed and will require to explicitly use +link:#_correcting_ghost_mailbox[ghost mailbox] endpoint in order to +merge the conflicting mailboxes and prevent any message loss. *WARNING*: this task can cancel concurrently running legitimate user operations upon dirty read. As such this task should be run offline. diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxManager.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxManager.java index fe1dac1827c..51ee7225a96 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxManager.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxManager.java @@ -24,24 +24,29 @@ import jakarta.inject.Inject; +import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles; import org.apache.james.events.EventBus; import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxPathLocker; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.SessionProvider; +import org.apache.james.mailbox.cassandra.mail.CassandraMailboxMapper; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MessageId; +import org.apache.james.mailbox.model.search.MailboxQuery; import org.apache.james.mailbox.store.MailboxManagerConfiguration; import org.apache.james.mailbox.store.PreDeletionHooks; import org.apache.james.mailbox.store.StoreMailboxAnnotationManager; import org.apache.james.mailbox.store.StoreMailboxManager; import org.apache.james.mailbox.store.StoreMessageManager; import org.apache.james.mailbox.store.StoreRightManager; +import org.apache.james.mailbox.store.mail.MailboxMapper; import org.apache.james.mailbox.store.mail.ThreadIdGuessingAlgorithm; import org.apache.james.mailbox.store.mail.model.impl.MessageParser; import org.apache.james.mailbox.store.quota.QuotaComponents; import org.apache.james.mailbox.store.search.MessageSearchIndex; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -118,4 +123,12 @@ protected StoreMessageManager createMessageManager(Mailbox mailboxRow, MailboxSe public Mono manageProcessing(Mono toBeWrapped, MailboxSession mailboxSession) { return toBeWrapped; } + + @Override + protected Flux getMailboxWithPathLikeUponRename(MailboxMapper mapper, MailboxQuery.UserBound query) { + if (mapper instanceof CassandraMailboxMapper cassandraMailboxMapper) { + return cassandraMailboxMapper.findMailboxWithPathLike(query, JamesExecutionProfiles.ConsistencyChoice.STRONG); + } + return super.getMailboxWithPathLikeUponRename(mapper, query); + } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java index cb7e2b0eed8..125d142781a 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java @@ -44,7 +44,7 @@ import org.apache.james.mailbox.model.UidValidity; import org.apache.james.mailbox.model.search.MailboxQuery; import org.apache.james.mailbox.store.mail.MailboxMapper; -import org.apache.james.util.FunctionalUtils; +import org.apache.james.util.ReactorUtils; import com.google.common.base.Preconditions; @@ -192,18 +192,15 @@ private Mailbox addAcl(MailboxACL acl, Mailbox mailbox) { @Override public Flux findMailboxWithPathLike(MailboxQuery.UserBound query) { - String fixedNamespace = query.getFixedNamespace(); - Username fixedUser = query.getFixedUser(); + return findMailboxWithPathLike(query, consistencyChoice()); + } - return performReadRepair(listMailboxes(fixedNamespace, fixedUser)) + public Flux findMailboxWithPathLike(MailboxQuery.UserBound query, JamesExecutionProfiles.ConsistencyChoice consistencyChoice) { + return performReadRepair(mailboxPathV3DAO.listUserMailboxes(query.getFixedNamespace(), query.getFixedUser(), consistencyChoice)) .filter(mailbox -> query.isPathMatch(mailbox.generateAssociatedPath())) .flatMap(this::addAcl, CONCURRENCY); } - private Flux listMailboxes(String fixedNamespace, Username fixedUser) { - return mailboxPathV3DAO.listUserMailboxes(fixedNamespace, fixedUser, consistencyChoice()); - } - @Override public Mono create(MailboxPath mailboxPath, UidValidity uidValidity) { CassandraId cassandraId = CassandraId.timeBased(); @@ -221,23 +218,27 @@ public Mono rename(Mailbox mailbox) { Preconditions.checkNotNull(mailbox.getMailboxId(), "A mailbox we want to rename should have a defined mailboxId"); CassandraId cassandraId = (CassandraId) mailbox.getMailboxId(); - return tryRename(mailbox, cassandraId) - .filter(FunctionalUtils.identityPredicate()) - .switchIfEmpty(Mono.error(() -> new MailboxExistsException(mailbox.generateAssociatedPath().asString()))) - .thenReturn(cassandraId); - } - - private Mono tryRename(Mailbox cassandraMailbox, CassandraId cassandraId) { return mailboxDAO.retrieveMailbox(cassandraId) - .flatMap(mailbox -> mailboxPathV3DAO.save(cassandraMailbox) - .filter(isCreated -> isCreated) - .flatMap(mailboxHasCreated -> deletePreviousMailboxPathReference(mailbox.generateAssociatedPath()) - .then(persistMailboxEntity(cassandraMailbox)) - .thenReturn(true)) - .defaultIfEmpty(false)) + .flatMap(storedMailbox -> rename(mailbox, storedMailbox.generateAssociatedPath())) .switchIfEmpty(Mono.error(() -> new MailboxNotFoundException(cassandraId))); } + @Override + public Mono rename(Mailbox cassandraMailbox, MailboxPath previousPath) { + Preconditions.checkNotNull(cassandraMailbox.getMailboxId(), "A mailbox we want to rename should have a defined mailboxId"); + CassandraId cassandraId = (CassandraId) cassandraMailbox.getMailboxId(); + + return mailboxPathV3DAO.save(cassandraMailbox) + .handle(ReactorUtils.raiseErrorIfFalse(() -> new MailboxExistsException(cassandraMailbox.generateAssociatedPath().asString()))) + // Additive writes first (the new path reference and the projection), subtractive write + // last (drop the old path reference). At any crash point the mailbox thus stays + // reachable under its new path with a consistent projection; only a stale old path + // reference may linger (the delete is retried to absorb transient failures). + .flatMap(applied -> persistMailboxEntity(cassandraMailbox) + .then(deletePreviousMailboxPathReference(previousPath))) + .thenReturn(cassandraId); + } + private Mono persistMailboxEntity(Mailbox cassandraMailbox) { return mailboxDAO.save(cassandraMailbox) .retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)); @@ -250,7 +251,7 @@ private Mono deletePreviousMailboxPathReference(MailboxPath mailboxPath) { @Override public Mono hasChildren(Mailbox mailbox, char delimiter) { - return performReadRepair(listMailboxes(mailbox.getNamespace(), mailbox.getUser())) + return performReadRepair(mailboxPathV3DAO.listUserMailboxes(mailbox.getNamespace(), mailbox.getUser(), consistencyChoice())) .filter(idAndPath -> isPathChildOfMailbox(idAndPath, mailbox, delimiter)) .hasElements(); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java index d6b4217ee9f..266e9193d4a 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java @@ -64,12 +64,12 @@ public class CassandraMailboxPathV3DAO { private final PreparedStatement delete; private final PreparedStatement insert; private final PreparedStatement select; + private final PreparedStatement selectWriteTime; private final PreparedStatement selectUser; private final PreparedStatement selectAll; private final CqlSession session; private final DriverExecutionProfile lwtProfile; private final DriverExecutionProfile readProfile; - private final DriverExecutionProfile writeProfile; @Inject public CassandraMailboxPathV3DAO(CqlSession session) { @@ -78,11 +78,11 @@ public CassandraMailboxPathV3DAO(CqlSession session) { this.insert = prepareInsert(); this.delete = prepareDelete(); this.select = prepareSelect(); + this.selectWriteTime = prepareSelectWriteTime(); this.selectUser = prepareSelectUser(); this.selectAll = prepareSelectAll(); this.lwtProfile = JamesExecutionProfiles.getLWTProfile(session); this.readProfile = ProfileLocator.READ.locateProfile(session, "MAILBOXPATHV3"); - this.writeProfile = ProfileLocator.WRITE.locateProfile(session, "MAILBOXPATHV3"); } private PreparedStatement prepareDelete() { @@ -114,6 +114,15 @@ private PreparedStatement prepareSelect() { .build()); } + private PreparedStatement prepareSelectWriteTime() { + return session.prepare(selectFrom(TABLE_NAME) + .writeTime(MAILBOX_ID) + .where(column(NAMESPACE).isEqualTo(bindMarker(NAMESPACE)), + column(USER).isEqualTo(bindMarker(USER)), + column(MAILBOX_NAME).isEqualTo(bindMarker(MAILBOX_NAME))) + .build()); + } + private PreparedStatement prepareSelectUser() { return session.prepare(selectFrom(TABLE_NAME) .columns(MAILBOX_ID, UIDVALIDITY, MAILBOX_NAME) @@ -144,6 +153,16 @@ public Mono retrieve(MailboxPath mailboxPath, JamesExecutionProfiles.Co .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> logGhostMailboxFailure(mailboxPath))); } + public Mono writeTime(MailboxPath mailboxPath) { + BoundStatement statement = selectWriteTime.bind() + .set(NAMESPACE, mailboxPath.getNamespace(), TypeCodecs.TEXT) + .set(USER, sanitizeUser(mailboxPath.getUser()), TypeCodecs.TEXT) + .set(MAILBOX_NAME, mailboxPath.getName(), TypeCodecs.TEXT); + + return cassandraAsyncExecutor.executeSingleRow(setExecutionProfileIfNeeded(statement, STRONG)) + .map(row -> row.getLong(0)); + } + public Flux listUserMailboxes(String namespace, Username user, JamesExecutionProfiles.ConsistencyChoice consistencyChoice) { BoundStatementBuilder statementBuilder = selectUser.boundStatementBuilder() .set(NAMESPACE, namespace, TypeCodecs.TEXT) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java index 20d8e28054e..916075b4a34 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java @@ -68,11 +68,15 @@ public MailboxMergingTaskRunner(MailboxManager mailboxManager, StoreMessageIdMan } public Task.Result run(CassandraId oldMailboxId, CassandraId newMailboxId, MailboxMergingTask.Context context) { - return moveMessages(oldMailboxId, newMailboxId, mailboxSession, context) - .flatMap(onMoveCompleteOperations(oldMailboxId, newMailboxId)) + return runReactive(oldMailboxId, newMailboxId, context) .block(); } + public Mono runReactive(CassandraId oldMailboxId, CassandraId newMailboxId, MailboxMergingTask.Context context) { + return moveMessages(oldMailboxId, newMailboxId, mailboxSession, context) + .flatMap(onMoveCompleteOperations(oldMailboxId, newMailboxId)); + } + private Function> onMoveCompleteOperations(CassandraId oldMailboxId, CassandraId newMailboxId) { return result -> { if (result == Task.Result.COMPLETED) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesRunningOptionsDTO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesRunningOptionsDTO.java new file mode 100644 index 00000000000..b53ab20fdc1 --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesRunningOptionsDTO.java @@ -0,0 +1,59 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail.task; + +import java.util.Optional; + +import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesService.RunningOptions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class SolveMailboxInconsistenciesRunningOptionsDTO { + public static SolveMailboxInconsistenciesRunningOptionsDTO asDTO(RunningOptions domainObject) { + return new SolveMailboxInconsistenciesRunningOptionsDTO( + Optional.of(domainObject.getMaxIterations()), + Optional.of(domainObject.isAutoMerge())); + } + + private final Optional maxIterations; + private final Optional autoMerge; + + @JsonCreator + public SolveMailboxInconsistenciesRunningOptionsDTO(@JsonProperty("maxIterations") Optional maxIterations, + @JsonProperty("autoMerge") Optional autoMerge) { + this.maxIterations = maxIterations; + this.autoMerge = autoMerge; + } + + public Optional getMaxIterations() { + return maxIterations; + } + + public Optional getAutoMerge() { + return autoMerge; + } + + public RunningOptions asDomainObject() { + return new RunningOptions( + maxIterations.orElse(RunningOptions.DEFAULT_MAX_ITERATIONS), + autoMerge.orElse(RunningOptions.DEFAULT_AUTO_MERGE)); + } +} diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java index d1d5fcf4140..9061e2d8181 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java @@ -37,6 +37,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathV3DAO; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxId; +import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.task.Task; import org.apache.james.task.Task.Result; import org.slf4j.Logger; @@ -55,30 +56,39 @@ public class SolveMailboxInconsistenciesService { @FunctionalInterface public interface Inconsistency { static Mono detectMailboxDaoInconsistency(Mailbox mailboxEntry, Mono pathEntry) { + // Read-repair entry point: never auto-merges (no merging runner available, and read paths + // must not trigger destructive ghost merges). + return detectMailboxDaoInconsistency(mailboxEntry, pathEntry, null, false); + } + + static Mono detectMailboxDaoInconsistency(Mailbox mailboxEntry, Mono pathEntry, + MailboxMergingTaskRunner mergingRunner, boolean autoMerge) { return pathEntry .map(mailboxByPath -> { if (mailboxByPath.getMailboxId().equals(mailboxEntry.getMailboxId())) { return NO_INCONSISTENCY; } // Path entry references another mailbox. - return new ConflictingEntryInconsistency(ConflictingEntry.builder() - .mailboxDaoEntry(mailboxEntry) - .mailboxPathDaoEntry(mailboxByPath)); + return new ConflictingEntryInconsistency(mailboxEntry, mailboxByPath, mergingRunner, autoMerge); }) .defaultIfEmpty(new OrphanMailboxDAOEntry(mailboxEntry)); } static Mono detectMailboxPathDaoInconsistency(Mailbox mailboxByPathEntry, Mono mailboxEntry) { + // Read-repair entry point: never auto-merges (this site only ever yields same-id conflicts anyway). + return detectMailboxPathDaoInconsistency(mailboxByPathEntry, mailboxEntry, null, false); + } + + static Mono detectMailboxPathDaoInconsistency(Mailbox mailboxByPathEntry, Mono mailboxEntry, + MailboxMergingTaskRunner mergingRunner, boolean autoMerge) { return mailboxEntry .map(mailboxById -> { if (mailboxByPathEntry.generateAssociatedPath().equals(mailboxById.generateAssociatedPath())) { return NO_INCONSISTENCY; } // Mailbox references another path - return new ConflictingEntryInconsistency(ConflictingEntry.builder() - .mailboxDaoEntry(mailboxById) - .mailboxPathDaoEntry(mailboxByPathEntry)); + return new ConflictingEntryInconsistency(mailboxById, mailboxByPathEntry, mergingRunner, autoMerge); }) .defaultIfEmpty(new OrphanMailboxPathDAOEntry(mailboxByPathEntry)); } @@ -181,23 +191,202 @@ public Mono fix(Context context, CassandraMailboxDAO mailboxDAO, Cassand * See https://github.com/apache/james-project/blob/master/src/site/markdown/server/manage-webadmin.md#correcting-ghost-mailbox */ private static class ConflictingEntryInconsistency implements Inconsistency { - private final ConflictingEntry conflictingEntry; - - private ConflictingEntryInconsistency(ConflictingEntry conflictingEntry) { - this.conflictingEntry = conflictingEntry; + private final Mailbox mailboxDaoEntry; + private final Mailbox mailboxPathEntry; + private final MailboxMergingTaskRunner mergingRunner; + private final boolean autoMerge; + + private ConflictingEntryInconsistency(Mailbox mailboxDaoEntry, Mailbox mailboxPathEntry, + MailboxMergingTaskRunner mergingRunner, boolean autoMerge) { + this.mailboxDaoEntry = mailboxDaoEntry; + this.mailboxPathEntry = mailboxPathEntry; + this.mergingRunner = mergingRunner; + this.autoMerge = autoMerge; } @Override public Mono fix(Context context, CassandraMailboxDAO mailboxDAO, CassandraMailboxPathV3DAO pathV3DAO) { + if (mailboxDaoEntry.getMailboxId().equals(mailboxPathEntry.getMailboxId())) { + // Same mailbox referenced by two different paths. This happens when a rename (or any + // partial path update) failed to drop the old path reference. As both registrations + // point to the same mailbox id, the messages (keyed by id) are safe regardless of the + // path we keep: we keep the most recently written path reference, realign the + // projection on it and drop the stale one. + return fixSameMailboxConflict(context, mailboxDAO, pathV3DAO); + } + + if (autoMerge) { + return autoMergeConflict(context, mailboxDAO, pathV3DAO); + } + return reportConflict(context); + } + + // Two *different* mailboxes resolve to the same path (the historical "ghost mailbox"): the + // path table registers the winner, while the loser is a projection entry squatting that path. + // The path table being the source of truth, the registered mailbox wins and the squatting + // projection is merged into it (its messages and rights are moved over, then its projection + // row is dropped) using the same machinery as MailboxMergingTask. + // + // We only merge once re-reading (STRONG) confirms the clean-ghost picture: the winner still + // owns the path, the loser still resolves to it, and -- crucially -- the path table vouches + // for the loser *nowhere*. If the loser owns another path, it is a genuine mailbox with its + // own (same-id) conflict to resolve first, so we report rather than destroy it. + private Mono autoMergeConflict(Context context, CassandraMailboxDAO mailboxDAO, CassandraMailboxPathV3DAO pathV3DAO) { + CassandraId winnerId = (CassandraId) mailboxPathEntry.getMailboxId(); + CassandraId loserId = (CassandraId) mailboxDaoEntry.getMailboxId(); + MailboxPath conflictingPath = mailboxPathEntry.generateAssociatedPath(); + + Mono pathStillOwnedByWinner = pathV3DAO.retrieve(conflictingPath, STRONG) + .map(entry -> entry.getMailboxId().equals(winnerId)) + .defaultIfEmpty(false); + Mono loserStillResolvesToPath = mailboxDAO.retrieveMailbox(loserId) + .map(projection -> projection.generateAssociatedPath().equals(conflictingPath)) + .defaultIfEmpty(false); + Mono loserIsUnregistered = pathV3DAO.listUserMailboxes(mailboxDaoEntry.getNamespace(), mailboxDaoEntry.getUser(), STRONG) + .filter(entry -> entry.getMailboxId().equals(loserId)) + .hasElements() + .map(referenced -> !referenced); + + return Mono.zip(pathStillOwnedByWinner, loserStillResolvesToPath, loserIsUnregistered) + .flatMap(state -> { + boolean cleanGhost = state.getT1() && state.getT2() && state.getT3(); + if (!cleanGhost) { + // State no longer matches the clean-ghost picture: either already reconciled, + // or the loser owns another path. In the latter case the loser is a genuine + // mailbox whose stale projection gets realigned onto its registered path by the + // same-mailbox conflict resolution, making this ghost disappear without a merge. + // We therefore leave it to that resolution rather than destroying or reporting it. + return Mono.just(Result.COMPLETED); + } + return mergingRunner.runReactive(loserId, winnerId, new MailboxMergingTask.Context(0)) + .doOnNext(result -> { + LOGGER.info("Auto-merged ghost mailbox {} into {} at path {}", + loserId.serialize(), winnerId.serialize(), conflictingPath.asString()); + context.addFixedInconsistency(winnerId); + }); + }); + } + + // Auto-resolution is restricted to the case where BOTH the conflicting path entry and the + // projection's path are registered to the same mailbox id: only then are they two genuine + // aliases of a single mailbox, hence safe to deduplicate without data loss. If the + // projection's path is unregistered or held by another mailbox (e.g. a reference loop), we + // fall back to the conservative reporting so an admin can merge. + // + // As fixes are applied sequentially, we re-read the current state first: a previous fix may + // already have reconciled this mailbox, in which case there is nothing left to do. + private Mono fixSameMailboxConflict(Context context, CassandraMailboxDAO mailboxDAO, CassandraMailboxPathV3DAO pathV3DAO) { + CassandraId mailboxId = (CassandraId) mailboxPathEntry.getMailboxId(); + MailboxPath conflictingPath = mailboxPathEntry.generateAssociatedPath(); + + return pathV3DAO.retrieve(conflictingPath, STRONG) + .filter(stillRegistered -> stillRegistered.getMailboxId().equals(mailboxId)) + .flatMap(conflictingEntry -> mailboxDAO.retrieveMailbox(mailboxId) + .flatMap(currentProjection -> resolveSameMailboxConflict(context, mailboxDAO, pathV3DAO, currentProjection, conflictingEntry))) + // The conflicting path entry is gone (already reconciled): nothing left to do. + .switchIfEmpty(Mono.just(Result.COMPLETED)); + } + + private Mono resolveSameMailboxConflict(Context context, CassandraMailboxDAO mailboxDAO, CassandraMailboxPathV3DAO pathV3DAO, + Mailbox currentProjection, Mailbox conflictingEntry) { + MailboxPath projectionPath = currentProjection.generateAssociatedPath(); + MailboxPath conflictingPath = conflictingEntry.generateAssociatedPath(); + if (projectionPath.equals(conflictingPath)) { + // The projection now matches this path: no longer inconsistent. + return Mono.just(Result.COMPLETED); + } + return pathV3DAO.retrieve(projectionPath, STRONG) + .filter(projectionRegistration -> projectionRegistration.getMailboxId().equals(currentProjection.getMailboxId())) + .flatMap(projectionRegistration -> Mono.zip( + pathV3DAO.writeTime(conflictingPath), + pathV3DAO.writeTime(projectionPath)) + .flatMap(writeTimes -> { + boolean projectionWins = writeTimes.getT2() >= writeTimes.getT1(); + Mailbox winner = projectionWins ? currentProjection : conflictingEntry; + Mailbox loser = projectionWins ? conflictingEntry : currentProjection; + return mailboxDAO.save(winner) + .then(pathV3DAO.delete(loser.generateAssociatedPath())) + .then(Mono.fromRunnable(() -> { + LOGGER.info("Inconsistency fixed for mailbox {}: kept path {}, dropped stale path {}", + winner.getMailboxId().serialize(), + winner.generateAssociatedPath().asString(), + loser.generateAssociatedPath().asString()); + context.addFixedInconsistency(winner.getMailboxId()); + })) + .thenReturn(Result.COMPLETED); + })) + // The projection points to a path it does not own (unregistered, or held by another + // mailbox), while the conflicting path *is* registered to this mailbox: the projection + // is simply stale. Realign it onto the path the source of truth vouches for. + .switchIfEmpty(Mono.defer(() -> realignStaleProjection(context, mailboxDAO, currentProjection, conflictingEntry))); + } + + private Mono realignStaleProjection(Context context, CassandraMailboxDAO mailboxDAO, + Mailbox currentProjection, Mailbox conflictingEntry) { + MailboxPath stalePath = currentProjection.generateAssociatedPath(); + Mailbox realigned = new Mailbox(conflictingEntry.generateAssociatedPath(), + currentProjection.getUidValidity(), currentProjection.getMailboxId()); + return mailboxDAO.save(realigned) + .then(Mono.fromRunnable(() -> { + LOGGER.info("Inconsistency fixed for mailbox {}: realigned stale projection from path {} to registered path {}", + realigned.getMailboxId().serialize(), + stalePath.asString(), + realigned.generateAssociatedPath().asString()); + context.addFixedInconsistency(realigned.getMailboxId()); + })) + .thenReturn(Result.COMPLETED); + } + + private Mono reportConflict(Context context) { LOGGER.error("MailboxDAO contains mailbox {} {} which conflict with corresponding registration {} {}. " + "We recommend merging these mailboxes together to prevent mail data loss.", - conflictingEntry.getMailboxDaoEntry().getMailboxId(), conflictingEntry.getMailboxDaoEntry().getMailboxPath(), - conflictingEntry.getMailboxPathDaoEntry().getMailboxId(), conflictingEntry.getMailboxPathDaoEntry().getMailboxPath()); - context.addConflictingEntries(conflictingEntry); + mailboxDaoEntry.getMailboxId(), mailboxDaoEntry.generateAssociatedPath(), + mailboxPathEntry.getMailboxId(), mailboxPathEntry.generateAssociatedPath()); + context.addConflictingEntries(ConflictingEntry.builder() + .mailboxDaoEntry(mailboxDaoEntry) + .mailboxPathDaoEntry(mailboxPathEntry)); return Mono.just(Result.PARTIAL); } } + public static class RunningOptions { + public static final int DEFAULT_MAX_ITERATIONS = 1; + public static final boolean DEFAULT_AUTO_MERGE = false; + public static final RunningOptions DEFAULT = new RunningOptions(DEFAULT_MAX_ITERATIONS, DEFAULT_AUTO_MERGE); + + private final int maxIterations; + private final boolean autoMerge; + + public RunningOptions(int maxIterations, boolean autoMerge) { + Preconditions.checkArgument(maxIterations >= 1, "'maxIterations' must be strictly positive"); + this.maxIterations = maxIterations; + this.autoMerge = autoMerge; + } + + public int getMaxIterations() { + return maxIterations; + } + + public boolean isAutoMerge() { + return autoMerge; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof RunningOptions) { + RunningOptions that = (RunningOptions) o; + return this.maxIterations == that.maxIterations + && this.autoMerge == that.autoMerge; + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(maxIterations, autoMerge); + } + } + public static class Context { static class Builder { private Optional processedMailboxEntries; @@ -382,20 +571,48 @@ Snapshot snapshot() { private final CassandraMailboxDAO mailboxDAO; private final CassandraMailboxPathV3DAO mailboxPathV3DAO; private final CassandraSchemaVersionManager versionManager; + private final MailboxMergingTaskRunner mergingRunner; @Inject SolveMailboxInconsistenciesService(CassandraMailboxDAO mailboxDAO, CassandraMailboxPathV3DAO mailboxPathV3DAO, - CassandraSchemaVersionManager versionManager) { + CassandraSchemaVersionManager versionManager, MailboxMergingTaskRunner mergingRunner) { this.mailboxDAO = mailboxDAO; this.mailboxPathV3DAO = mailboxPathV3DAO; this.versionManager = versionManager; + this.mergingRunner = mergingRunner; } public Mono fixMailboxInconsistencies(Context context) { + return fixMailboxInconsistencies(context, RunningOptions.DEFAULT); + } + + public Mono fixMailboxInconsistencies(Context context, RunningOptions runningOptions) { assertValidVersion(); + return fixUntilStable(context, runningOptions.getMaxIterations(), runningOptions.isAutoMerge()); + } + + // Reconciliation is run to a fixpoint: fixing an inconsistency in one pass (dropping a stale + // path, merging a ghost mailbox...) can surface a new inconsistency that only a subsequent pass + // detects. We re-run as long as a pass keeps applying fixes, bounded by maxIterations to guard + // against oscillation. As fixes only ever grow the fixedInconsistencies list, a pass that adds + // none means we reached a stable state. + private Mono fixUntilStable(Context context, int remainingIterations, boolean autoMerge) { + int fixedBefore = context.snapshot().getFixedInconsistencies().size(); + return runOnePass(context, autoMerge) + .flatMap(result -> { + boolean appliedFixes = context.snapshot().getFixedInconsistencies().size() > fixedBefore; + if (appliedFixes && remainingIterations > 1) { + return fixUntilStable(context, remainingIterations - 1, autoMerge) + .map(nextResult -> Task.combine(result, nextResult)); + } + return Mono.just(result); + }); + } + + private Mono runOnePass(Context context, boolean autoMerge) { return Flux.concat( - processMailboxDaoInconsistencies(context), - processMailboxPathDaoInconsistencies(context)) + processMailboxDaoInconsistencies(context, autoMerge), + processMailboxPathDaoInconsistencies(context, autoMerge)) .reduce(Result.COMPLETED, Task::combine); } @@ -410,29 +627,43 @@ private void assertValidVersion() { version.getValue()); } - private Flux processMailboxPathDaoInconsistencies(Context context) { + private Flux processMailboxPathDaoInconsistencies(Context context, boolean autoMerge) { return mailboxPathV3DAO.listAll() - .flatMap(this::detectMailboxPathDaoInconsistency, DEFAULT_CONCURRENCY) - .flatMap(inconsistency -> inconsistency.fix(context, mailboxDAO, mailboxPathV3DAO), DEFAULT_CONCURRENCY) - .doOnNext(any -> context.incrementProcessedMailboxPathEntries()); + .flatMap(entry -> detectMailboxPathDaoInconsistency(entry, autoMerge), DEFAULT_CONCURRENCY) + .doOnNext(any -> context.incrementProcessedMailboxPathEntries()) + // Detect every inconsistency first, then fix them one at a time. Resolving a same-mailbox + // conflict may realign the projection, so a fully materialized detection set fixed + // sequentially prevents a fix from racing with the detection or resolution of a sibling + // path entry. Consistent entries are filtered out to keep the materialized set small; + // each fix re-confirms the inconsistency against the current state before acting. + .filter(inconsistency -> inconsistency != NO_INCONSISTENCY) + .collectList() + .flatMapMany(inconsistencies -> Flux.fromIterable(inconsistencies) + .concatMap(inconsistency -> inconsistency.fix(context, mailboxDAO, mailboxPathV3DAO))); } - private Flux processMailboxDaoInconsistencies(Context context) { + private Flux processMailboxDaoInconsistencies(Context context, boolean autoMerge) { return mailboxDAO.retrieveAllMailboxes() - .flatMap(this::detectMailboxDaoInconsistency, DEFAULT_CONCURRENCY) - .flatMap(inconsistency -> inconsistency.fix(context, mailboxDAO, mailboxPathV3DAO), DEFAULT_CONCURRENCY) - .doOnNext(any -> context.incrementProcessedMailboxEntries()); + .flatMap(entry -> detectMailboxDaoInconsistency(entry, autoMerge), DEFAULT_CONCURRENCY) + .doOnNext(any -> context.incrementProcessedMailboxEntries()) + // Detect every inconsistency first, then fix them one at a time. Auto-merging a ghost + // mailbox mutates message and projection state, so a fully materialized detection set + // fixed sequentially prevents a fix from racing with the detection of a sibling entry. + .filter(inconsistency -> inconsistency != NO_INCONSISTENCY) + .collectList() + .flatMapMany(inconsistencies -> Flux.fromIterable(inconsistencies) + .concatMap(inconsistency -> inconsistency.fix(context, mailboxDAO, mailboxPathV3DAO))); } - private Mono detectMailboxDaoInconsistency(Mailbox mailboxEntry) { + private Mono detectMailboxDaoInconsistency(Mailbox mailboxEntry, boolean autoMerge) { Mono pathEntry = mailboxPathV3DAO.retrieve(mailboxEntry.generateAssociatedPath(), STRONG); - return Inconsistency.detectMailboxDaoInconsistency(mailboxEntry, pathEntry); + return Inconsistency.detectMailboxDaoInconsistency(mailboxEntry, pathEntry, mergingRunner, autoMerge); } - private Mono detectMailboxPathDaoInconsistency(Mailbox mailboxByPathEntry) { + private Mono detectMailboxPathDaoInconsistency(Mailbox mailboxByPathEntry, boolean autoMerge) { CassandraId cassandraId = (CassandraId) mailboxByPathEntry.getMailboxId(); Mono mailboxEntry = mailboxDAO.retrieveMailbox(cassandraId); - return Inconsistency.detectMailboxPathDaoInconsistency(mailboxByPathEntry, mailboxEntry); + return Inconsistency.detectMailboxPathDaoInconsistency(mailboxByPathEntry, mailboxEntry, mergingRunner, autoMerge); } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTask.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTask.java index 8c4d679384d..6e34f7e250f 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTask.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTask.java @@ -23,6 +23,7 @@ import java.time.Instant; import java.util.Optional; +import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesService.RunningOptions; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.task.Task; import org.apache.james.task.TaskExecutionDetails; @@ -44,15 +45,17 @@ public static class Details implements TaskExecutionDetails.AdditionalInformatio private final ImmutableList fixedInconsistencies; private final ImmutableList conflictingEntries; private final long errors; + private final RunningOptions runningOptions; Details(Instant instant, long processedMailboxEntries, long processedMailboxPathEntries, ImmutableList fixedInconsistencies, - ImmutableList conflictingEntries, long errors) { + ImmutableList conflictingEntries, long errors, RunningOptions runningOptions) { this.instant = instant; this.processedMailboxEntries = processedMailboxEntries; this.processedMailboxPathEntries = processedMailboxPathEntries; this.fixedInconsistencies = fixedInconsistencies; this.conflictingEntries = conflictingEntries; this.errors = errors; + this.runningOptions = runningOptions; } @Override @@ -79,18 +82,24 @@ ImmutableList getConflictingEntries() { long getErrors() { return errors; } + + public RunningOptions getRunningOptions() { + return runningOptions; + } } private final SolveMailboxInconsistenciesService service; + private final RunningOptions runningOptions; - public SolveMailboxInconsistenciesTask(SolveMailboxInconsistenciesService service) { + public SolveMailboxInconsistenciesTask(SolveMailboxInconsistenciesService service, RunningOptions runningOptions) { this.service = service; + this.runningOptions = runningOptions; this.context = new SolveMailboxInconsistenciesService.Context(); } @Override public Result run() { - return service.fixMailboxInconsistencies(context) + return service.fixMailboxInconsistencies(context, runningOptions) .block(); } @@ -99,6 +108,10 @@ public TaskType type() { return SOLVE_MAILBOX_INCONSISTENCIES; } + public RunningOptions getRunningOptions() { + return runningOptions; + } + @Override public Optional details() { SolveMailboxInconsistenciesService.Context.Snapshot snapshot = context.snapshot(); @@ -109,6 +122,7 @@ public Optional details() { .map(MailboxId::serialize) .collect(ImmutableList.toImmutableList()), snapshot.getConflictingEntries(), - snapshot.getErrors())); + snapshot.getErrors(), + runningOptions)); } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskAdditionalInformationDTO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskAdditionalInformationDTO.java index 5811a68fcf5..e484e540d3f 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskAdditionalInformationDTO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskAdditionalInformationDTO.java @@ -20,8 +20,10 @@ package org.apache.james.mailbox.cassandra.mail.task; import java.time.Instant; +import java.util.Optional; import org.apache.james.json.DTOModule; +import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesService.RunningOptions; import org.apache.james.server.task.json.dto.AdditionalInformationDTO; import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule; @@ -37,7 +39,8 @@ private static SolveMailboxInconsistenciesTaskAdditionalInformationDTO fromDomai details.getFixedInconsistencies(), details.getConflictingEntries(), details.getErrors(), - details.timestamp()); + details.timestamp(), + Optional.of(SolveMailboxInconsistenciesRunningOptionsDTO.asDTO(details.getRunningOptions()))); } public static AdditionalInformationDTOModule module() { @@ -57,6 +60,7 @@ public static AdditionalInformationDTOModule conflictingEntries; private final long errors; private final Instant timestamp; + private final Optional runningOptions; public SolveMailboxInconsistenciesTaskAdditionalInformationDTO(@JsonProperty("type") String type, @JsonProperty("processedMailboxEntries") long processedMailboxEntries, @@ -64,7 +68,8 @@ public SolveMailboxInconsistenciesTaskAdditionalInformationDTO(@JsonProperty("ty @JsonProperty("fixedInconsistencies") ImmutableList fixedInconsistencies, @JsonProperty("conflictingEntries") ImmutableList conflictingEntries, @JsonProperty("errors") long errors, - @JsonProperty("timestamp") Instant timestamp) { + @JsonProperty("timestamp") Instant timestamp, + @JsonProperty("runningOptions") Optional runningOptions) { this.type = type; this.processedMailboxEntries = processedMailboxEntries; this.timestamp = timestamp; @@ -72,6 +77,7 @@ public SolveMailboxInconsistenciesTaskAdditionalInformationDTO(@JsonProperty("ty this.fixedInconsistencies = fixedInconsistencies; this.conflictingEntries = conflictingEntries; this.errors = errors; + this.runningOptions = runningOptions; } public long getProcessedMailboxEntries() { @@ -94,6 +100,10 @@ public long getErrors() { return errors; } + public Optional getRunningOptions() { + return runningOptions; + } + @Override public Instant getTimestamp() { return timestamp; @@ -110,6 +120,9 @@ private SolveMailboxInconsistenciesTask.Details toDomainObject() { processedMailboxPathEntries, fixedInconsistencies, conflictingEntries, - errors); + errors, + runningOptions + .map(SolveMailboxInconsistenciesRunningOptionsDTO::asDomainObject) + .orElse(RunningOptions.DEFAULT)); } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskDTO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskDTO.java index aa22280989f..265d1a3030f 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskDTO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskDTO.java @@ -18,7 +18,10 @@ ****************************************************************/ package org.apache.james.mailbox.cassandra.mail.task; +import java.util.Optional; + import org.apache.james.json.DTOModule; +import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesService.RunningOptions; import org.apache.james.server.task.json.dto.TaskDTO; import org.apache.james.server.task.json.dto.TaskDTOModule; @@ -26,7 +29,8 @@ public class SolveMailboxInconsistenciesTaskDTO implements TaskDTO { private static SolveMailboxInconsistenciesTaskDTO toDTO(SolveMailboxInconsistenciesTask domainObject, String typeName) { - return new SolveMailboxInconsistenciesTaskDTO(typeName); + return new SolveMailboxInconsistenciesTaskDTO(typeName, + Optional.of(SolveMailboxInconsistenciesRunningOptionsDTO.asDTO(domainObject.getRunningOptions()))); } public static TaskDTOModule module(SolveMailboxInconsistenciesService service) { @@ -40,17 +44,27 @@ public static TaskDTOModule runningOptions; - public SolveMailboxInconsistenciesTaskDTO(@JsonProperty("type") String type) { + public SolveMailboxInconsistenciesTaskDTO(@JsonProperty("type") String type, + @JsonProperty("runningOptions") Optional runningOptions) { this.type = type; + this.runningOptions = runningOptions; } private SolveMailboxInconsistenciesTask toDomainObject(SolveMailboxInconsistenciesService service) { - return new SolveMailboxInconsistenciesTask(service); + return new SolveMailboxInconsistenciesTask(service, + runningOptions + .map(SolveMailboxInconsistenciesRunningOptionsDTO::asDomainObject) + .orElse(RunningOptions.DEFAULT)); } @Override public String getType() { return type; } + + public Optional getRunningOptions() { + return runningOptions; + } } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java index e13e452c7ee..c27ffd68b02 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java @@ -464,10 +464,14 @@ void renameThenFailToDeleteMailboxPathShouldBeConsistentWhenFindByInbox(Cassandr doQuietly(() -> testee.rename(inboxRenamed).block()); + // rename writes additively first (new path reference + projection) and removes the old + // path reference last. When that removal fails, the projection already reflects the + // rename, so the mailbox is consistent under its new identity; the old INBOX path + // reference merely lingers (the delete is retried to absorb transient failures). SoftAssertions.assertSoftly(Throwing.consumer(softly -> { softly(softly) .assertThat(testee.findMailboxById(inboxId).block()) - .isEqualTo(inbox); + .isEqualTo(inboxRenamed); softly(softly) .assertThat(testee.findMailboxByPath(inboxPath).block()) .isEqualTo(inbox); @@ -479,6 +483,38 @@ void renameThenFailToDeleteMailboxPathShouldBeConsistentWhenFindByInbox(Cassandr })); } + @Test + void renameThenFailToDeleteMailboxPathShouldExposeRenamedMailbox(CassandraCluster cassandra) { + Mailbox inbox = testee.create(inboxPath, UID_VALIDITY).block(); + CassandraId inboxId = (CassandraId) inbox.getMailboxId(); + Mailbox inboxRenamed = createInboxRenamedMailbox(inboxId); + + cassandra.getConf() + .registerScenario(fail() + .times(TRY_COUNT_BEFORE_FAILURE) + .whenQueryStartsWith("DELETE FROM mailboxpathv3 WHERE namespace=:namespace AND user=:user AND mailboxname=:mailboxname IF EXISTS")); + + doQuietly(() -> testee.rename(inboxRenamed).block()); + + // Counterpart of the previous test: even though the old path cleanup failed, the rename + // is fully observable under its new identity (by id and by renamed path). With the legacy + // ordering - projection written last, after the delete - the projection would still read + // INBOX here, contradicting the already committed renamed path reference. + SoftAssertions.assertSoftly(Throwing.consumer(softly -> { + softly(softly) + .assertThat(testee.findMailboxById(inboxId).block()) + .isEqualTo(inboxRenamed); + softly(softly) + .assertThat(testee.findMailboxByPath(inboxPathRenamed).block()) + .isEqualTo(inboxRenamed); + softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery) + .collectList().block()) + .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) + .assertThat(searchMailbox) + .isEqualTo(inboxRenamed)); + })); + } + @Disabled("JAMES-3056 returning two mailboxes with same name and id") @Test void renameThenFailToDeleteMailboxPathShouldBeConsistentWhenFindAll(CassandraCluster cassandra) { diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java index 2d3437a9a92..15125954116 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java @@ -23,6 +23,12 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; @@ -70,6 +76,7 @@ class SolveMailboxInconsistenciesServiceTest { CassandraMailboxDAO mailboxDAO; CassandraMailboxPathV3DAO mailboxPathV3DAO; CassandraSchemaVersionDAO versionDAO; + MailboxMergingTaskRunner mergingRunner; SolveMailboxInconsistenciesService testee; @BeforeEach @@ -80,7 +87,8 @@ void setUp(CassandraCluster cassandra) { mailboxPathV3DAO = new CassandraMailboxPathV3DAO( cassandra.getConf()); versionDAO = new CassandraSchemaVersionDAO(cassandra.getConf()); - testee = new SolveMailboxInconsistenciesService(mailboxDAO, mailboxPathV3DAO, new CassandraSchemaVersionManager(versionDAO)); + mergingRunner = mock(MailboxMergingTaskRunner.class); + testee = new SolveMailboxInconsistenciesService(mailboxDAO, mailboxPathV3DAO, new CassandraSchemaVersionManager(versionDAO), mergingRunner); versionDAO.updateVersion(new SchemaVersion(8)).block(); } @@ -154,12 +162,13 @@ void fixMailboxInconsistenciesShouldReturnPartialWhenDAOMisMatchOnId() { } @Test - void fixMailboxInconsistenciesShouldReturnPartialWhenDAOMisMatchOnPath() { + void fixMailboxInconsistenciesShouldReturnCompletedWhenDAOMisMatchOnPath() { + // Same mailbox id referenced by two paths: auto-resolvable without data loss. mailboxDAO.save(MAILBOX).block(); mailboxPathV3DAO.save(MAILBOX_NEW_PATH).block(); assertThat(testee.fixMailboxInconsistencies(new Context()).block()) - .isEqualTo(Result.PARTIAL); + .isEqualTo(Result.COMPLETED); } @Test @@ -248,14 +257,14 @@ void fixMailboxInconsistenciesShouldUpdateContextWhenDAOMisMatchOnPath() { testee.fixMailboxInconsistencies(context).block(); + // The orphan mailbox pass re-registers the projection path, then the same-id conflict pass + // keeps the most recent path and drops the stale one: two fixes, no conflicting entry. assertThat(context.snapshot()) .isEqualTo(Context.builder() .processedMailboxEntries(1) .processedMailboxPathEntries(2) .addFixedInconsistencies(CASSANDRA_ID_1) - .addConflictingEntry(ConflictingEntry.builder() - .mailboxDaoEntry(MAILBOX) - .mailboxPathDaoEntry(MAILBOX_NEW_PATH)) + .addFixedInconsistencies(CASSANDRA_ID_1) .build() .snapshot()); } @@ -335,8 +344,8 @@ void fixMailboxInconsistenciesShouldNotAlterStateWhenLoop() { @Test void fixMailboxInconsistenciesShouldAlterStateWhenDaoMisMatchOnPath() { - // Note that CASSANDRA_ID_1 becomes usable - // However in order to avoid data loss, merging CASSANDRA_ID_1 and CASSANDRA_ID_2 is still required + // Same mailbox id referenced by two paths (a partial rename leftover): the solver keeps the + // most recently written path and drops the stale one, leaving a single consistent registration. mailboxDAO.save(MAILBOX).block(); mailboxPathV3DAO.save(MAILBOX_NEW_PATH).block(); @@ -346,9 +355,27 @@ void fixMailboxInconsistenciesShouldAlterStateWhenDaoMisMatchOnPath() { softly.assertThat(mailboxDAO.retrieveAllMailboxes().collectList().block()) .containsExactlyInAnyOrder(MAILBOX); softly.assertThat(mailboxPathV3DAO.listAll().collectList().block()) - .containsExactlyInAnyOrder( - MAILBOX_NEW_PATH, - MAILBOX); + .containsExactlyInAnyOrder(MAILBOX); + }); + } + + @Test + void fixMailboxInconsistenciesShouldKeepMostRecentPathWhenSameMailboxRegisteredUnderTwoPaths() { + // Real partial rename leftover: the projection points to the new path, and both the old and + // the new path are still registered to the same id. The most recently written path (the + // rename target) wins and the stale old path reference is dropped. + mailboxDAO.save(MAILBOX_NEW_PATH).block(); + mailboxPathV3DAO.save(MAILBOX).block(); + mailboxPathV3DAO.save(MAILBOX_NEW_PATH).block(); + + Result result = testee.fixMailboxInconsistencies(new Context()).block(); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(result).isEqualTo(Result.COMPLETED); + softly.assertThat(mailboxDAO.retrieveAllMailboxes().collectList().block()) + .containsExactlyInAnyOrder(MAILBOX_NEW_PATH); + softly.assertThat(mailboxPathV3DAO.listAll().collectList().block()) + .containsExactlyInAnyOrder(MAILBOX_NEW_PATH); }); } @@ -401,4 +428,58 @@ void fixMailboxInconsistenciesShouldNotAlterStateWhenTwoEntriesWithSamePath() { .containsExactlyInAnyOrder(MAILBOX_2); }); } + + @Test + void fixMailboxInconsistenciesShouldAutoMergeGhostMailboxWhenAutoMergeEnabled() { + // Two different mailboxes resolve to the same path "abc": CASSANDRA_ID_2 owns the path (path + // table = source of truth), CASSANDRA_ID_1 is a ghost projection squatting it with no path + // registration of its own. With autoMerge on, the ghost is merged into the path owner. + Mailbox pathOwnerProjection = new Mailbox(MAILBOX_PATH, UID_VALIDITY_2, CASSANDRA_ID_2); + mailboxDAO.save(MAILBOX).block(); + mailboxDAO.save(pathOwnerProjection).block(); + mailboxPathV3DAO.save(MAILBOX_2).block(); + + // The merging runner moves messages then drops the loser's projection row: simulate that side effect. + when(mergingRunner.runReactive(eq(CASSANDRA_ID_1), eq(CASSANDRA_ID_2), any())) + .thenReturn(mailboxDAO.delete(CASSANDRA_ID_1).thenReturn(Result.COMPLETED)); + + Context context = new Context(); + testee.fixMailboxInconsistencies(context, new SolveMailboxInconsistenciesService.RunningOptions(1, true)).block(); + + SoftAssertions.assertSoftly(softly -> { + verify(mergingRunner).runReactive(eq(CASSANDRA_ID_1), eq(CASSANDRA_ID_2), any()); + softly.assertThat(mailboxDAO.retrieveAllMailboxes().collectList().block()) + .containsExactlyInAnyOrder(pathOwnerProjection); + softly.assertThat(mailboxPathV3DAO.listAll().collectList().block()) + .containsExactlyInAnyOrder(MAILBOX_2); + softly.assertThat(context.snapshot().getFixedInconsistencies()) + .containsExactly(CASSANDRA_ID_2); + }); + } + + @Test + void fixMailboxInconsistenciesShouldRealignLoserInsteadOfMergingWhenLoserIsRegisteredElsewhere() { + // CASSANDRA_ID_1 squats path "abc" (owned by CASSANDRA_ID_2) but also legitimately owns path + // "xyz": it is not a clean ghost. Rather than being merged away, its stale projection is + // realigned onto its registered path "xyz", which makes the ghost on "abc" disappear without + // any merge. The whole picture becomes consistent within a single run. + Mailbox pathOwnerProjection = new Mailbox(MAILBOX_PATH, UID_VALIDITY_2, CASSANDRA_ID_2); + Mailbox loserOtherPath = new Mailbox(NEW_MAILBOX_PATH, UID_VALIDITY_1, CASSANDRA_ID_1); + mailboxDAO.save(MAILBOX).block(); + mailboxDAO.save(pathOwnerProjection).block(); + mailboxPathV3DAO.save(MAILBOX_2).block(); + mailboxPathV3DAO.save(loserOtherPath).block(); + + Context context = new Context(); + testee.fixMailboxInconsistencies(context, new SolveMailboxInconsistenciesService.RunningOptions(1, true)).block(); + + SoftAssertions.assertSoftly(softly -> { + verify(mergingRunner, never()).runReactive(any(), any(), any()); + softly.assertThat(mailboxDAO.retrieveAllMailboxes().collectList().block()) + .containsExactlyInAnyOrder(pathOwnerProjection, loserOtherPath); + softly.assertThat(mailboxPathV3DAO.listAll().collectList().block()) + .containsExactlyInAnyOrder(MAILBOX_2, loserOtherPath); + softly.assertThat(context.snapshot().getConflictingEntries()).isEmpty(); + }); + } } \ No newline at end of file diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskSerializationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskSerializationTest.java index e44d3474ad3..97fec53ee58 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskSerializationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesTaskSerializationTest.java @@ -41,14 +41,16 @@ class SolveMailboxInconsistenciesTaskSerializationTest { public static final String MAILBOX_ID_AS_STRING = "464765a0-e4e7-11e4-aba4-710c1de3782b"; private static final CassandraId MAILBOX_ID = CassandraId.of(UUID.fromString(MAILBOX_ID_AS_STRING)); + private static final SolveMailboxInconsistenciesService.RunningOptions RUNNING_OPTIONS = new SolveMailboxInconsistenciesService.RunningOptions(3, true); + private static final SolveMailboxInconsistenciesService SERVICE = mock(SolveMailboxInconsistenciesService.class); - private static final SolveMailboxInconsistenciesTask TASK = new SolveMailboxInconsistenciesTask(SERVICE); - private static final String SERIALIZED_TASK = "{\"type\": \"solve-mailbox-inconsistencies\"}"; + private static final SolveMailboxInconsistenciesTask TASK = new SolveMailboxInconsistenciesTask(SERVICE, RUNNING_OPTIONS); + private static final String SERIALIZED_TASK = "{\"type\": \"solve-mailbox-inconsistencies\", \"runningOptions\":{\"maxIterations\":3,\"autoMerge\":true}}"; private static final ConflictingEntry CONFLICTING_ENTRY = ConflictingEntry.builder() .mailboxDaoEntry(MAILBOX_PATH, MAILBOX_ID) .mailboxPathDaoEntry(MAILBOX_PATH_2, MAILBOX_ID); private static final ImmutableList CONFLICTING_ENTRIES = ImmutableList.of(CONFLICTING_ENTRY); - private static final SolveMailboxInconsistenciesTask.Details DETAILS = new SolveMailboxInconsistenciesTask.Details(TIMESTAMP, 0, 1, ImmutableList.of(MAILBOX_ID_AS_STRING), CONFLICTING_ENTRIES, 3); + private static final SolveMailboxInconsistenciesTask.Details DETAILS = new SolveMailboxInconsistenciesTask.Details(TIMESTAMP, 0, 1, ImmutableList.of(MAILBOX_ID_AS_STRING), CONFLICTING_ENTRIES, 3, RUNNING_OPTIONS); private static final String SERIALIZED_ADDITIONAL_INFORMATION = "{" + " \"type\":\"solve-mailbox-inconsistencies\"," + " \"processedMailboxEntries\":0," + @@ -65,7 +67,8 @@ class SolveMailboxInconsistenciesTaskSerializationTest { " }" + " }]," + " \"errors\":3," + - " \"timestamp\":\"2018-11-13T12:00:55Z\"" + + " \"timestamp\":\"2018-11-13T12:00:55Z\"," + + " \"runningOptions\":{\"maxIterations\":3,\"autoMerge\":true}" + "}"; @Test diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java index 475c1ba0036..64a66db2de4 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java @@ -24,6 +24,7 @@ import java.time.Clock; import java.time.Duration; +import java.util.Comparator; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -618,13 +619,35 @@ public Mono> renameMailboxReactive(MailboxPath from, MailboxSession fromSession, MailboxSession toSession) { LOGGER.debug("renameMailbox {} to {}", from, to); MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(fromSession); - - Mono fromMailboxPublisher = assertCanDeleteWhenRename(fromSession, from) - .then(mapper.findMailboxByPath(from) - .switchIfEmpty(Mono.error(() -> new MailboxNotFoundException(from)))); + Mono> mailboxesToBeRenamed = retrieveSubMailboxes(from, fromSession, mapper); return sanitizedMailboxPath(to, toSession) - .flatMap(sanitizedPath -> processRename(fromMailboxPublisher, sanitizedPath, option, fromSession, toSession)); + .flatMap(sanitizedPath -> processRename(mailboxesToBeRenamed, sanitizedPath, option, fromSession, toSession)); + } + + private Mono> retrieveSubMailboxes(MailboxPath from, MailboxSession fromSession, MailboxMapper mapper) { + MailboxQuery.UserBound query = MailboxQuery.builder() + .userAndNamespaceFrom(from) + .expression(new PrefixedWildcard(from.getName())) + .build() + .asUserBound(); + + return assertCanDeleteWhenRename(fromSession, from) + .then(getMailboxWithPathLikeUponRename(mapper, query) + .filter(mailbox -> mailbox.generateAssociatedPath().getHierarchyLevels(fromSession.getPathDelimiter()).contains(from)) + .sort(Comparator.comparing(mailbox -> mailbox.generateAssociatedPath().getHierarchyLevels(fromSession.getPathDelimiter()).size())) + .collectList() + .handle((mailboxes, sink) -> { + if (mailboxes.stream().map(mailbox -> mailbox.generateAssociatedPath()).anyMatch(from::equals)) { + sink.next(mailboxes); + } else { + sink.error(new MailboxNotFoundException(from)); + } + })); + } + + protected Flux getMailboxWithPathLikeUponRename(MailboxMapper mapper, MailboxQuery.UserBound query) { + return mapper.findMailboxWithPathLike(query); } private Mono> renameSubscriptionsIfNeeded(List renamedResults, @@ -666,9 +689,8 @@ public Mono> renameMailboxReactive(MailboxId mailboxI LOGGER.debug("renameMailbox {} to {}", mailboxId, newMailboxPath); MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(session); - Mono fromMailboxPublisher = mapper.findMailboxById(mailboxId) - .flatMap(mailbox -> assertCanDeleteWhenRename(session, mailbox.generateAssociatedPath()) - .thenReturn(mailbox)) + Mono> fromMailboxPublisher = mapper.findMailboxById(mailboxId) + .flatMap(from -> retrieveSubMailboxes(from.generateAssociatedPath(), session, mapper)) .switchIfEmpty(Mono.error(() -> new MailboxNotFoundException(mailboxId))); return sanitizedMailboxPath(newMailboxPath, session) @@ -689,47 +711,28 @@ private Mono sanitizedMailboxPath(MailboxPath mailboxPath, MailboxS .flatMap(newMailboxPath -> Mono.fromCallable(() -> newMailboxPath.assertAcceptable(session.getPathDelimiter()))); } - private Mono> processRename(Mono fromMailboxPublisher, MailboxPath to, RenameOption option, + private Mono> processRename(Mono> fromMailboxPublisher, MailboxPath to, RenameOption option, MailboxSession fromSession, MailboxSession toSession) { MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(fromSession); return mapper.executeReactive(fromMailboxPublisher - .flatMap(mailbox -> doRenameMailbox(mailbox, to, fromSession, toSession, mapper) - .doOnEach(ReactorUtils.logFinally(() -> AuditTrail.entry() + .flatMap(mailboxes -> doRenameMailboxes(mailboxes, to, fromSession, toSession, mapper) + .doOnNext(result -> AuditTrail.entry() .username(() -> fromSession.getUser().asString()) .sessionId(() -> String.valueOf(fromSession.getSessionId().getValue())) .protocol("mailbox") .action("rename") - .parameters(Throwing.supplier(() -> ImmutableMap.of("mailboxId", mailbox.getMailboxId().serialize(), - "fromMailboxPath", mailbox.generateAssociatedPath().asString(), - "toMailboxPath", to.asString()))) - .log("Mailbox Rename"))) + .parameters(Throwing.supplier(() -> ImmutableMap.of("mailboxId", result.getMailboxId().serialize(), + "fromMailboxPath", result.getOriginPath().asString(), + "toMailboxPath", result.getDestinationPath().asString()))) + .log("Mailbox Rename")) + .collectList() .flatMap(renamedResults -> renameSubscriptionsIfNeeded(renamedResults, option, fromSession, toSession)))); } - private Mono> doRenameMailbox(Mailbox mailbox, MailboxPath newMailboxPath, MailboxSession fromSession, MailboxSession toSession, MailboxMapper mapper) { - // TODO put this into a serilizable transaction - - ImmutableList.Builder resultBuilder = ImmutableList.builder(); - - MailboxPath from = mailbox.generateAssociatedPath(); - mailbox.setNamespace(newMailboxPath.getNamespace()); - mailbox.setUser(newMailboxPath.getUser()); - mailbox.setName(newMailboxPath.getName()); - // Find submailboxes - MailboxQuery.UserBound query = MailboxQuery.builder() - .userAndNamespaceFrom(from) - .expression(new PrefixedWildcard(from.getName() + fromSession.getPathDelimiter())) - .build() - .asUserBound(); - - return mapper.rename(mailbox) - .map(mailboxId -> { - resultBuilder.add(new MailboxRenamedResult(mailboxId, from, newMailboxPath)); - return mailboxId; - }) - .then(Mono.from(renameSubMailboxes(newMailboxPath, mapper, from, query, resultBuilder))) - .then(Mono.defer(() -> Flux.fromIterable(resultBuilder.build()) + private Flux doRenameMailboxes(List mailboxes, MailboxPath newMailboxPath, MailboxSession fromSession, MailboxSession toSession, MailboxMapper mapper) { + return renameSubMailboxes(newMailboxPath, mapper, mailboxes.get(0).generateAssociatedPath(), mailboxes) + .flatMapIterable(results -> results) .concatMap(result -> eventBus.dispatch(EventFactory.mailboxRenamed() .randomEventId() .mailboxSession(fromSession) @@ -737,32 +740,28 @@ private Mono> doRenameMailbox(Mailbox mailbox, Mailbo .oldPath(result.getOriginPath()) .newPath(result.getDestinationPath()) .build(), - new MailboxIdRegistrationKey(result.getMailboxId()))) - .then())) - .then(Mono.fromCallable(resultBuilder::build)); + new MailboxIdRegistrationKey(result.getMailboxId())) + .thenReturn(result)); } - private Publisher renameSubMailboxes(MailboxPath newMailboxPath, MailboxMapper mapper, - MailboxPath from, MailboxQuery.UserBound query, ImmutableList.Builder resultBuilder) { - if (DefaultMailboxes.INBOX.equalsIgnoreCase(from.getName())) { - return Mono.empty(); - } - return locker.executeReactiveWithLockReactive(from, mapper.findMailboxWithPathLike(query) + private Mono> renameSubMailboxes(MailboxPath newMailboxPath, MailboxMapper mapper, + MailboxPath from, List mailboxes) { + // Renaming INBOX renames the INBOX mailbox itself but leaves its sub-mailboxes in place. + // The list is sorted shallowest-first, so the first entry is the renamed root. + List mailboxesToRename = DefaultMailboxes.INBOX.equalsIgnoreCase(from.getName()) ? mailboxes.subList(0, 1) : mailboxes; + return Mono.from(locker.executeReactiveWithLockReactive(from, Flux.fromIterable(mailboxesToRename) .concatMap(sub -> { String subOriginalName = sub.getName(); String subNewName = newMailboxPath.getName() + subOriginalName.substring(from.getName().length()); MailboxPath fromPath = new MailboxPath(from, subOriginalName); sub.setName(subNewName); sub.setUser(newMailboxPath.getUser()); - return mapper.rename(sub) - .map(mailboxId -> { - resultBuilder.add(new MailboxRenamedResult(sub.getMailboxId(), fromPath, sub.generateAssociatedPath())); - return mailboxId; - }) + return mapper.rename(sub, fromPath) + .map(mailboxId -> new MailboxRenamedResult(sub.getMailboxId(), fromPath, sub.generateAssociatedPath())) .retryWhen(Retry.backoff(5, Duration.ofMillis(10))) - .then(Mono.fromRunnable(() -> LOGGER.debug("Rename mailbox sub-mailbox {} to {}", subOriginalName, subNewName))); + .doOnSuccess(any -> LOGGER.debug("Rename mailbox sub-mailbox {} to {}", subOriginalName, subNewName)); }, LOW_CONCURRENCY) - .then(), MailboxPathLocker.LockType.Write); + .collectList(), MailboxPathLocker.LockType.Write)); } @Override diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java index a88d6a10123..3412efe295f 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java @@ -50,8 +50,12 @@ public interface MailboxMapper extends Mapper { /** * Rename the given {@link Mailbox} to the underlying storage */ + default Mono rename(Mailbox mailbox, MailboxPath previousPath) { + return rename(mailbox); + } + Mono rename(Mailbox mailbox); - + /** * Delete the given {@link Mailbox} from the underlying storage */ diff --git a/server/container/mailbox-adapter/pom.xml b/server/container/mailbox-adapter/pom.xml index 80fcbda7cf8..f69974e71f3 100644 --- a/server/container/mailbox-adapter/pom.xml +++ b/server/container/mailbox-adapter/pom.xml @@ -58,6 +58,10 @@ ${james.groupId} apache-james-mailbox-store + + ${james.groupId} + apache-james-mailbox-tools-quota-recompute + ${james.groupId} event-bus-api diff --git a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java index 45523ec5995..016e662c935 100644 --- a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java +++ b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java @@ -19,6 +19,9 @@ package org.apache.james.adapter.mailbox; +import java.util.Comparator; +import java.util.concurrent.ThreadLocalRandom; + import jakarta.inject.Inject; import org.apache.james.core.Username; @@ -69,23 +72,33 @@ public Publisher changeUsername(Username oldUsername, Username newUsername .user(fromSession.getUser()) .build(); + // A single random suffix is generated for the whole rename operation so that temporary + // mailboxes never collide with a leftover from a previous run (we thus do not loose the + // previous content) while staying recognizable. + int renameOperationId = ThreadLocalRandom.current().nextInt(1000); + return mailboxManager.search(queryUser, MailboxManager.MailboxSearchFetchType.Minimal, fromSession) - // Only keep top level, rename takes care of sub mailboxes - .filter(mailbox -> mailbox.getPath().getHierarchyLevels(fromSession.getPathDelimiter()).size() == 1) - .concatMap(mailbox -> migrateMailbox(fromSession, toSession, mailbox)) + // Process the deepest mailboxes first. As renameMailbox is recursive (it also renames + // sub mailboxes), handling a mailbox only once all of its descendants have already been + // migrated guarantees it is effectively a leaf at that point: the rename then carries no + // sub mailbox along, which lets us migrate every node individually - including when the + // destination mailbox already exists and a plain rename is therefore not possible. + .sort(Comparator.comparingInt( + (MailboxMetaData mailbox) -> mailbox.getPath().getHierarchyLevels(fromSession.getPathDelimiter()).size()) + .reversed()) + .concatMap(mailbox -> migrateMailbox(fromSession, toSession, mailbox, renameOperationId)) .doFinally(any -> mailboxManager.endProcessingRequest(fromSession)) .doFinally(any -> mailboxManager.endProcessingRequest(toSession)); } - private Mono migrateMailbox(MailboxSession fromSession, MailboxSession toSession, org.apache.james.mailbox.model.MailboxMetaData mailbox) { + private Mono migrateMailbox(MailboxSession fromSession, MailboxSession toSession, MailboxMetaData mailbox, int renameOperationId) { MailboxPath renamedPath = mailbox.getPath().withUser(toSession.getUser()); return mailboxManager.mailboxExists(renamedPath, toSession) .flatMap(exist -> { if (!exist) { return renameMailboxAndRenameSubscriptionForDelegatee(fromSession, toSession, mailbox, renamedPath); } else { - return renameWhenMailboxExist(toSession, renamedPath, - renameMailboxAndRenameSubscriptionForDelegatee(fromSession, toSession, mailbox, renamedPath)); + return moveWhenMailboxExist(fromSession, toSession, mailbox, renamedPath, renameOperationId); } }); } @@ -98,18 +111,17 @@ private Mono renameMailboxAndRenameSubscriptionForDelegatee(MailboxSession .then(); } - // rename: renamedPath -> temporaryPath - // rename: mailbox.getPath -> renamedPath - // copy messages: temporaryPath -> renamedPath - // delete: temporaryPath - private Mono renameWhenMailboxExist(MailboxSession toSession, MailboxPath renamedPath, Mono renamePublisher) { - MailboxPath temporaryPath = new MailboxPath(renamedPath.getNamespace(), renamedPath.getUser(), renamedPath.getName() + "tmp"); - return mailboxManager.renameMailboxReactive(renamedPath, temporaryPath, - MailboxManager.RenameOption.NONE, toSession) - .then(renamePublisher) - .then(mailboxManager.copyMessagesReactive(MessageRange.all(), - temporaryPath, renamedPath, toSession) - .then()) + // The destination mailbox already exists: a plain rename is impossible, so we bring the source + // mailbox into the destination account under a temporary name, MOVE its messages into the + // destination, then drop the emptied temporary mailbox. Sub mailboxes are not handled here: as + // mailboxes are migrated deepest first, the source mailbox no longer has any descendant when we + // reach this point. + private Mono moveWhenMailboxExist(MailboxSession fromSession, MailboxSession toSession, MailboxMetaData mailbox, MailboxPath renamedPath, int renameOperationId) { + MailboxPath temporaryPath = new MailboxPath(renamedPath.getNamespace(), renamedPath.getUser(), renamedPath.getName() + "-tmp-" + renameOperationId); + return mailboxManager.renameMailboxReactive(mailbox.getPath(), temporaryPath, + MailboxManager.RenameOption.NONE, fromSession, toSession) + .thenMany(mailboxManager.moveMessagesReactive(MessageRange.all(), temporaryPath, renamedPath, toSession)) + .then(renameSubscriptionsForDelegatee(mailbox, renamedPath)) .then(mailboxManager.deleteMailboxReactive(temporaryPath, toSession)); } diff --git a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/QuotaUsernameChangeTaskStep.java b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/QuotaUsernameChangeTaskStep.java index abc439de45d..7614771f5ca 100644 --- a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/QuotaUsernameChangeTaskStep.java +++ b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/QuotaUsernameChangeTaskStep.java @@ -19,49 +19,37 @@ package org.apache.james.adapter.mailbox; -import java.time.Instant; import java.util.Optional; import jakarta.inject.Inject; import org.apache.james.core.Username; -import org.apache.james.events.EventBus; -import org.apache.james.events.RegistrationKey; -import org.apache.james.mailbox.model.CurrentQuotas; import org.apache.james.mailbox.model.Quota; -import org.apache.james.mailbox.model.QuotaOperation; import org.apache.james.mailbox.model.QuotaRoot; -import org.apache.james.mailbox.quota.CurrentQuotaManager; import org.apache.james.mailbox.quota.MaxQuotaManager; import org.apache.james.mailbox.quota.QuotaManager; import org.apache.james.mailbox.quota.UserQuotaRootResolver; -import org.apache.james.mailbox.store.event.EventFactory; +import org.apache.james.mailbox.quota.task.RecomputeMailboxCurrentQuotasService; import org.apache.james.user.api.UsernameChangeTaskStep; import org.reactivestreams.Publisher; -import com.google.common.collect.ImmutableSet; - import reactor.core.publisher.Mono; public class QuotaUsernameChangeTaskStep implements UsernameChangeTaskStep { - private static final ImmutableSet NO_REGISTRATION_KEYS = ImmutableSet.of(); private final QuotaManager quotaManager; private final MaxQuotaManager maxQuotaManager; - private final CurrentQuotaManager currentQuotaManager; + private final RecomputeMailboxCurrentQuotasService recomputeMailboxCurrentQuotasService; private final UserQuotaRootResolver userQuotaRootResolver; - private final EventBus eventBus; @Inject public QuotaUsernameChangeTaskStep(QuotaManager quotaManager, - CurrentQuotaManager currentQuotaManager, + RecomputeMailboxCurrentQuotasService recomputeMailboxCurrentQuotasService, UserQuotaRootResolver userQuotaRootResolver, - MaxQuotaManager maxQuotaManager, - EventBus eventBus) { + MaxQuotaManager maxQuotaManager) { this.quotaManager = quotaManager; - this.currentQuotaManager = currentQuotaManager; + this.recomputeMailboxCurrentQuotasService = recomputeMailboxCurrentQuotasService; this.userQuotaRootResolver = userQuotaRootResolver; this.maxQuotaManager = maxQuotaManager; - this.eventBus = eventBus; } @Override @@ -78,13 +66,8 @@ public int priority() { public Publisher changeUsername(Username oldUsername, Username newUsername) { return Mono.from(quotaManager.getQuotasReactive(userQuotaRootResolver.forUser(oldUsername))) .flatMap(quotas -> Mono.fromCallable(() -> userQuotaRootResolver.forUser(newUsername)) - .flatMap(newUserQuotaRoot -> setQuotaForNewUser(newUserQuotaRoot, quotas) - .then(dispatchNewEventQuota(newUserQuotaRoot, newUsername)))); - } - - private Mono setQuotaForNewUser(QuotaRoot quotaRoot, QuotaManager.Quotas quotas) { - return setMaxQuota(quotaRoot, quotas) - .then(setCurrentQuota(quotaRoot, quotas)); + .flatMap(newUserQuotaRoot -> setMaxQuota(newUserQuotaRoot, quotas))) + .then(Mono.from(recomputeMailboxCurrentQuotasService.recomputeCurrentQuotas(newUsername))); } private Mono setMaxQuota(QuotaRoot quotaRoot, QuotaManager.Quotas quotas) { @@ -104,23 +87,4 @@ private Mono setMaxMessagesQuota(QuotaRoot quotaRoot, QuotaManager.Quotas .get(Quota.Scope.User))) .flatMap(quotaCountLimit -> Mono.from(maxQuotaManager.setMaxMessageReactive(quotaRoot, quotaCountLimit))); } - - private Mono setCurrentQuota(QuotaRoot quotaRoot, QuotaManager.Quotas quotas) { - return Mono.from(currentQuotaManager.setCurrentQuotas(QuotaOperation.from(quotaRoot, - new CurrentQuotas(quotas.getMessageQuota().getUsed(), quotas.getStorageQuota().getUsed())))); - } - - private Mono dispatchNewEventQuota(QuotaRoot quotaRoot, Username username) { - return Mono.from(quotaManager.getQuotasReactive(quotaRoot)) - .flatMap(quotas -> eventBus.dispatch( - EventFactory.quotaUpdated() - .randomEventId() - .user(username) - .quotaRoot(quotaRoot) - .quotaCount(quotas.getMessageQuota()) - .quotaSize(quotas.getStorageQuota()) - .instant(Instant.now()) - .build(), - NO_REGISTRATION_KEYS)); - } } diff --git a/server/container/mailbox-adapter/src/test/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStepTest.java b/server/container/mailbox-adapter/src/test/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStepTest.java index 18c51871164..439f77d02ba 100644 --- a/server/container/mailbox-adapter/src/test/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStepTest.java +++ b/server/container/mailbox-adapter/src/test/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStepTest.java @@ -38,6 +38,7 @@ import org.apache.james.mailbox.store.StoreSubscriptionManager; import org.apache.james.mime4j.dom.Message; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; @@ -198,4 +199,180 @@ void shouldMigrateMailboxesWhenNewUserHasAlreadyOtherMailboxes() throws Exceptio MailboxPath.forUser(BOB, "test.child"), bobInbox); } + + private void appendMessage(MailboxPath path, MailboxSession session, String body) throws Exception { + mailboxManager.getMailbox(path, session) + .appendMessage(MessageManager.AppendCommand.from(Message.Builder.of() + .setSubject("subject") + .setBody(body, StandardCharsets.UTF_8) + .build()), session); + } + + private long messageCount(MailboxPath path, MailboxSession session) throws Exception { + return mailboxManager.getMailbox(path, session).getMessageCount(session); + } + + @Nested + class WhenDestinationParentMailboxAlreadyExists { + @Test + void shouldMigrateSubMailboxesWhenDestinationParentAlreadyExists() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(mailboxManager.list(mailboxManager.createSystemSession(BOB))) + .containsOnly(MailboxPath.forUser(BOB, "test"), + MailboxPath.forUser(BOB, "test.child")); + } + + @Test + void shouldNotLeaveTemporaryMailboxesWhenDestinationParentAlreadyExists() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(mailboxManager.list(mailboxManager.createSystemSession(BOB))) + .noneMatch(path -> path.getName().contains("-tmp-")); + } + + @Test + void shouldNotLooseMessagesOfSourceOnlyTopLevelMailboxWhenDestinationParentAlreadyExists() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + appendMessage(MailboxPath.forUser(ALICE, "test"), aliceSession, "alice"); + appendMessage(MailboxPath.forUser(BOB, "test"), bobSession, "bob"); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(messageCount(MailboxPath.forUser(BOB, "test"), bobSession)).isEqualTo(2); + } + + @Test + void shouldMoveSubMailboxMessagesWhenDestinationParentAlreadyExists() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + appendMessage(MailboxPath.forUser(ALICE, "test.child"), aliceSession, "alice-child"); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(messageCount(MailboxPath.forUser(BOB, "test.child"), bobSession)).isEqualTo(1); + } + + @Test + void shouldMergeSubMailboxMessagesWhenBothUsersHaveTheSameSubMailbox() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test.child"), MailboxManager.CreateOption.NONE, bobSession); + appendMessage(MailboxPath.forUser(ALICE, "test"), aliceSession, "alice-parent"); + appendMessage(MailboxPath.forUser(ALICE, "test.child"), aliceSession, "alice-child"); + appendMessage(MailboxPath.forUser(BOB, "test"), bobSession, "bob-parent"); + appendMessage(MailboxPath.forUser(BOB, "test.child"), bobSession, "bob-child"); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(mailboxManager.list(mailboxManager.createSystemSession(BOB))) + .containsOnly(MailboxPath.forUser(BOB, "test"), + MailboxPath.forUser(BOB, "test.child")); + assertThat(messageCount(MailboxPath.forUser(BOB, "test"), bobSession)).isEqualTo(2); + assertThat(messageCount(MailboxPath.forUser(BOB, "test.child"), bobSession)).isEqualTo(2); + } + + @Test + void shouldMigrateDeepHierarchyWhenDestinationParentAlreadyExists() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child.grandchild"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(mailboxManager.list(mailboxManager.createSystemSession(BOB))) + .containsOnly(MailboxPath.forUser(BOB, "test"), + MailboxPath.forUser(BOB, "test.child"), + MailboxPath.forUser(BOB, "test.child.grandchild")); + } + + @Test + void shouldMigrateSubMailboxesWhenOnlyDeepestChildConflicts() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.other"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test.child"), MailboxManager.CreateOption.NONE, bobSession); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(mailboxManager.list(mailboxManager.createSystemSession(BOB))) + .containsOnly(MailboxPath.forUser(BOB, "test"), + MailboxPath.forUser(BOB, "test.child"), + MailboxPath.forUser(BOB, "test.other")); + } + + @Test + void shouldRemoveSourceMailboxesWhenDestinationParentAlreadyExists() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(mailboxManager.list(mailboxManager.createSystemSession(BOB))) + .filteredOn(path -> ALICE.equals(path.getUser())) + .isEmpty(); + } + + @Test + void shouldTransferSubscriptionOfSubMailboxWhenDestinationParentAlreadyExists() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.CREATE_SUBSCRIPTION, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + assertThat(subscriptionManager.subscriptions(mailboxManager.createSystemSession(BOB))) + .containsOnly(MailboxPath.forUser(BOB, "test.child")); + } + + @Test + void shouldMigrateDelegationAclOfSubMailboxWhenDestinationParentAlreadyExists() throws Exception { + MailboxSession aliceSession = mailboxManager.createSystemSession(ALICE); + MailboxSession bobSession = mailboxManager.createSystemSession(BOB); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(ALICE, "test.child"), MailboxManager.CreateOption.NONE, aliceSession); + mailboxManager.createMailbox(MailboxPath.forUser(BOB, "test"), MailboxManager.CreateOption.NONE, bobSession); + mailboxManager.applyRightsCommand(MailboxPath.forUser(ALICE, "test.child"), + MailboxACL.command().forUser(CEDRIC).rights(MailboxACL.FULL_RIGHTS).asAddition(), + aliceSession); + + Mono.from(testee.changeUsername(ALICE, BOB)).block(); + + MailboxSession cedricSession = mailboxManager.createSystemSession(CEDRIC); + assertThatCode(() -> mailboxManager.getMailbox(MailboxPath.forUser(BOB, "test.child"), cedricSession)) + .doesNotThrowAnyException(); + } + } } \ No newline at end of file diff --git a/server/container/mailbox-adapter/src/test/java/org/apache/james/adapter/mailbox/QuotaUsernameChangeTaskStepTest.java b/server/container/mailbox-adapter/src/test/java/org/apache/james/adapter/mailbox/QuotaUsernameChangeTaskStepTest.java index 8c789c6f0b4..d1bf2ee9e14 100644 --- a/server/container/mailbox-adapter/src/test/java/org/apache/james/adapter/mailbox/QuotaUsernameChangeTaskStepTest.java +++ b/server/container/mailbox-adapter/src/test/java/org/apache/james/adapter/mailbox/QuotaUsernameChangeTaskStepTest.java @@ -24,6 +24,7 @@ import static org.assertj.core.api.SoftAssertions.assertSoftly; import static org.awaitility.Durations.TEN_SECONDS; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import org.apache.james.core.Username; @@ -35,17 +36,22 @@ import org.apache.james.events.EventBus; import org.apache.james.events.EventListener; import org.apache.james.events.Group; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.MessageManager; import org.apache.james.mailbox.events.GenericGroup; import org.apache.james.mailbox.events.MailboxEvents; +import org.apache.james.mailbox.inmemory.InMemoryMailboxManager; import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources; import org.apache.james.mailbox.model.CurrentQuotas; +import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.model.Quota; -import org.apache.james.mailbox.model.QuotaOperation; import org.apache.james.mailbox.model.QuotaRoot; import org.apache.james.mailbox.quota.CurrentQuotaManager; import org.apache.james.mailbox.quota.MaxQuotaManager; import org.apache.james.mailbox.quota.QuotaManager; import org.apache.james.mailbox.quota.UserQuotaRootResolver; +import org.apache.james.mailbox.quota.task.RecomputeMailboxCurrentQuotasService; +import org.apache.james.mime4j.dom.Message; import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -55,8 +61,11 @@ class QuotaUsernameChangeTaskStepTest { private static final Username ALICE = Username.of("alice"); private static final Username BOB = Username.of("bob"); + // Quotas resulting from a single appended message, see appendAMessage. + private static final CurrentQuotas ONE_MESSAGE_QUOTAS = new CurrentQuotas(QuotaCountUsage.count(1L), QuotaSizeUsage.size(103L)); private QuotaUsernameChangeTaskStep testee; + private InMemoryMailboxManager mailboxManager; private QuotaManager quotaManager; private CurrentQuotaManager currentQuotaManager; private UserQuotaRootResolver quotaRootResolver; @@ -67,6 +76,7 @@ class QuotaUsernameChangeTaskStepTest { @BeforeEach void setUp() { InMemoryIntegrationResources resources = InMemoryIntegrationResources.defaultResources(); + mailboxManager = resources.getMailboxManager(); quotaManager = resources.getQuotaManager(); currentQuotaManager = resources.getCurrentQuotaManager(); quotaRootResolver = resources.getDefaultUserQuotaRootResolver(); @@ -88,84 +98,85 @@ public void event(Event event) { } }); - testee = new QuotaUsernameChangeTaskStep( - quotaManager, + RecomputeMailboxCurrentQuotasService recomputeMailboxCurrentQuotasService = new RecomputeMailboxCurrentQuotasService( currentQuotaManager, + resources.getCurrentQuotaCalculator(), quotaRootResolver, - maxQuotaManager, + mailboxManager.getSessionProvider(), + mailboxManager, + quotaManager, eventBus); + + testee = new QuotaUsernameChangeTaskStep( + quotaManager, + recomputeMailboxCurrentQuotasService, + quotaRootResolver, + maxQuotaManager); } @Test - void shouldMigrateQuotas() throws Exception { + void shouldMigrateMaxQuotas() throws Exception { QuotaRoot bobQuotaRoot = quotaRootResolver.forUser(BOB); maxQuotaManager.setMaxMessage(bobQuotaRoot, QuotaCountLimit.count(50)); maxQuotaManager.setMaxStorage(bobQuotaRoot, QuotaSizeLimit.size(100)); - Mono.from(currentQuotaManager.setCurrentQuotas(QuotaOperation.from(bobQuotaRoot, new CurrentQuotas( - QuotaCountUsage.count(5), QuotaSizeUsage.size(10) - )))).block(); Mono.from(testee.changeUsername(BOB, ALICE)).block(); QuotaManager.Quotas aliceQuotas = quotaManager.getQuotas(quotaRootResolver.forUser(ALICE)); assertSoftly(softly -> { - softly.assertThat(aliceQuotas.getMessageQuota()) - .isEqualTo(Quota.builder().used(QuotaCountUsage.count(5)).computedLimit(QuotaCountLimit.count(50)).build()); - - softly.assertThat(aliceQuotas.getStorageQuota()) - .isEqualTo(Quota.builder().used(QuotaSizeUsage.size(10)).computedLimit(QuotaSizeLimit.size(100)).build()); + softly.assertThat(aliceQuotas.getMessageQuota().getLimit()).isEqualTo(QuotaCountLimit.count(50)); + softly.assertThat(aliceQuotas.getStorageQuota().getLimit()).isEqualTo(QuotaSizeLimit.size(100)); }); } @Test - void migrateShouldNotThrowWhenNoQuotas() throws Exception { - assertThatCode(() -> Mono.from(testee.changeUsername(BOB, ALICE)).block()) - .doesNotThrowAnyException(); - QuotaManager.Quotas aliceQuotas = quotaManager.getQuotas(quotaRootResolver.forUser(ALICE)); - assertSoftly(softly -> { - softly.assertThat(aliceQuotas.getMessageQuota()) - .isEqualTo(Quota.builder().used(QuotaCountUsage.count(0)).computedLimit(QuotaCountLimit.unlimited()).build()); + void shouldRecomputeCurrentQuotasFromNewUserMailboxes() throws Exception { + appendAMessage(ALICE); - softly.assertThat(aliceQuotas.getStorageQuota()) - .isEqualTo(Quota.builder().used(QuotaSizeUsage.size(0)).computedLimit(QuotaSizeLimit.unlimited()).build()); - }); + Mono.from(testee.changeUsername(BOB, ALICE)).block(); + + assertThat(Mono.from(currentQuotaManager.getCurrentQuotas(quotaRootResolver.forUser(ALICE))).block()) + .isEqualTo(ONE_MESSAGE_QUOTAS); } @Test - void migrateShouldSucceedWhenUnlimitedQuotas() throws Exception { + void shouldNotOverwriteDestinationCurrentQuotasWithSourceOnes() throws Exception { + // Source (BOB) holds a current quota value... QuotaRoot bobQuotaRoot = quotaRootResolver.forUser(BOB); - maxQuotaManager.setMaxMessage(bobQuotaRoot, QuotaCountLimit.unlimited()); - maxQuotaManager.setMaxStorage(bobQuotaRoot, QuotaSizeLimit.unlimited()); - Mono.from(currentQuotaManager.setCurrentQuotas(QuotaOperation.from(bobQuotaRoot, new CurrentQuotas( - QuotaCountUsage.count(5), QuotaSizeUsage.size(10) - )))).block(); + maxQuotaManager.setMaxMessage(bobQuotaRoot, QuotaCountLimit.count(50)); + maxQuotaManager.setMaxStorage(bobQuotaRoot, QuotaSizeLimit.size(100)); + + // ...while the destination (ALICE) already has its own content. + appendAMessage(ALICE); Mono.from(testee.changeUsername(BOB, ALICE)).block(); - QuotaManager.Quotas aliceQuotas = quotaManager.getQuotas(quotaRootResolver.forUser(ALICE)); - assertSoftly(softly -> { - softly.assertThat(aliceQuotas.getMessageQuota()) - .isEqualTo(Quota.builder().used(QuotaCountUsage.count(5)).computedLimit(QuotaCountLimit.unlimited()).build()); + // ALICE current quotas reflect her actual mailboxes content, not BOB's figures. + assertThat(Mono.from(currentQuotaManager.getCurrentQuotas(quotaRootResolver.forUser(ALICE))).block()) + .isEqualTo(ONE_MESSAGE_QUOTAS); + } - softly.assertThat(aliceQuotas.getStorageQuota()) - .isEqualTo(Quota.builder().used(QuotaSizeUsage.size(10)).computedLimit(QuotaSizeLimit.unlimited()).build()); - }); + @Test + void migrateShouldNotThrowWhenNoQuotas() throws Exception { + assertThatCode(() -> Mono.from(testee.changeUsername(BOB, ALICE)).block()) + .doesNotThrowAnyException(); + + assertThat(Mono.from(currentQuotaManager.getCurrentQuotas(quotaRootResolver.forUser(ALICE))).block()) + .isEqualTo(CurrentQuotas.emptyQuotas()); } @Test void migrateShouldSucceedWhenOnlyMessagesQuota() throws Exception { QuotaRoot bobQuotaRoot = quotaRootResolver.forUser(BOB); maxQuotaManager.setMaxMessage(bobQuotaRoot, QuotaCountLimit.count(10)); + Mono.from(testee.changeUsername(BOB, ALICE)).block(); QuotaManager.Quotas aliceQuotas = quotaManager.getQuotas(quotaRootResolver.forUser(ALICE)); assertSoftly(softly -> { - softly.assertThat(aliceQuotas.getMessageQuota()) - .isEqualTo(Quota.builder().used(QuotaCountUsage.count(0)).computedLimit(QuotaCountLimit.count(10)).build()); - - softly.assertThat(aliceQuotas.getStorageQuota()) - .isEqualTo(Quota.builder().used(QuotaSizeUsage.size(0)).computedLimit(QuotaSizeLimit.unlimited()).build()); + softly.assertThat(aliceQuotas.getMessageQuota().getLimit()).isEqualTo(QuotaCountLimit.count(10)); + softly.assertThat(aliceQuotas.getStorageQuota().getLimit()).isEqualTo(QuotaSizeLimit.unlimited()); }); } @@ -173,44 +184,13 @@ void migrateShouldSucceedWhenOnlyMessagesQuota() throws Exception { void migrateShouldSucceedWhenOnlyStorageQuota() throws Exception { QuotaRoot bobQuotaRoot = quotaRootResolver.forUser(BOB); maxQuotaManager.setMaxStorage(bobQuotaRoot, QuotaSizeLimit.size(10)); - Mono.from(testee.changeUsername(BOB, ALICE)).block(); - - QuotaManager.Quotas aliceQuotas = quotaManager.getQuotas(quotaRootResolver.forUser(ALICE)); - assertSoftly(softly -> { - softly.assertThat(aliceQuotas.getMessageQuota()) - .isEqualTo(Quota.builder().used(QuotaCountUsage.count(0)).computedLimit(QuotaCountLimit.unlimited()).build()); - - softly.assertThat(aliceQuotas.getStorageQuota()) - .isEqualTo(Quota.builder().used(QuotaSizeUsage.size(0)).computedLimit(QuotaSizeLimit.size(10)).build()); - }); - } - - @Test - void migrateShouldSucceedWhenAliceAlreadyQuotas() throws Exception { - QuotaRoot bobQuotaRoot = quotaRootResolver.forUser(BOB); - maxQuotaManager.setMaxMessage(bobQuotaRoot, QuotaCountLimit.count(50)); - maxQuotaManager.setMaxStorage(bobQuotaRoot, QuotaSizeLimit.size(100)); - Mono.from(currentQuotaManager.setCurrentQuotas(QuotaOperation.from(bobQuotaRoot, new CurrentQuotas( - QuotaCountUsage.count(5), QuotaSizeUsage.size(10) - )))).block(); - - QuotaRoot aliceQuotaRoot = quotaRootResolver.forUser(ALICE); - maxQuotaManager.setMaxMessage(aliceQuotaRoot, QuotaCountLimit.count(55)); - maxQuotaManager.setMaxStorage(aliceQuotaRoot, QuotaSizeLimit.size(150)); - Mono.from(currentQuotaManager.setCurrentQuotas(QuotaOperation.from(aliceQuotaRoot, new CurrentQuotas( - QuotaCountUsage.count(7), QuotaSizeUsage.size(8) - )))).block(); Mono.from(testee.changeUsername(BOB, ALICE)).block(); QuotaManager.Quotas aliceQuotas = quotaManager.getQuotas(quotaRootResolver.forUser(ALICE)); - assertSoftly(softly -> { - softly.assertThat(aliceQuotas.getMessageQuota()) - .isEqualTo(Quota.builder().used(QuotaCountUsage.count(5)).computedLimit(QuotaCountLimit.count(50)).build()); - - softly.assertThat(aliceQuotas.getStorageQuota()) - .isEqualTo(Quota.builder().used(QuotaSizeUsage.size(10)).computedLimit(QuotaSizeLimit.size(100)).build()); + softly.assertThat(aliceQuotas.getMessageQuota().getLimit()).isEqualTo(QuotaCountLimit.unlimited()); + softly.assertThat(aliceQuotas.getStorageQuota().getLimit()).isEqualTo(QuotaSizeLimit.size(10)); }); } @@ -219,28 +199,43 @@ void migrateShouldDispatchQuotaUpdateEvent() throws Exception { QuotaRoot bobQuotaRoot = quotaRootResolver.forUser(BOB); maxQuotaManager.setMaxMessage(bobQuotaRoot, QuotaCountLimit.count(50)); maxQuotaManager.setMaxStorage(bobQuotaRoot, QuotaSizeLimit.size(100)); - Mono.from(currentQuotaManager.setCurrentQuotas(QuotaOperation.from(bobQuotaRoot, new CurrentQuotas( - QuotaCountUsage.count(5), QuotaSizeUsage.size(10) - )))).block(); - Mono.from(testee.changeUsername(BOB, ALICE)).block(); + appendAMessage(ALICE); + Mono.from(testee.changeUsername(BOB, ALICE)).block(); Awaitility.await() .atMost(TEN_SECONDS) - .untilAsserted(() -> assertThat(eventStore.size()).isEqualTo(1)); + .untilAsserted(() -> assertThat(eventStore).anyMatch(event -> event instanceof MailboxEvents.QuotaUsageUpdatedEvent)); - MailboxEvents.QuotaUsageUpdatedEvent quotaUsageUpdatedEvent = (MailboxEvents.QuotaUsageUpdatedEvent) eventStore.get(0); + MailboxEvents.QuotaUsageUpdatedEvent quotaUsageUpdatedEvent = eventStore.stream() + .filter(event -> event instanceof MailboxEvents.QuotaUsageUpdatedEvent) + .map(MailboxEvents.QuotaUsageUpdatedEvent.class::cast) + .reduce((first, second) -> second) + .orElseThrow(); assertSoftly(softly -> { softly.assertThat(quotaUsageUpdatedEvent.getCountQuota()) - .isEqualTo(Quota.builder().used(QuotaCountUsage.count(5)).computedLimit(QuotaCountLimit.count(50)).build()); + .isEqualTo(Quota.builder().used(QuotaCountUsage.count(1)).computedLimit(QuotaCountLimit.count(50)).build()); softly.assertThat(quotaUsageUpdatedEvent.getSizeQuota()) - .isEqualTo(Quota.builder().used(QuotaSizeUsage.size(10)).computedLimit(QuotaSizeLimit.size(100)).build()); + .isEqualTo(Quota.builder().used(QuotaSizeUsage.size(103)).computedLimit(QuotaSizeLimit.size(100)).build()); softly.assertThat(quotaUsageUpdatedEvent.getUsername()) .isEqualTo(ALICE); softly.assertThat(quotaUsageUpdatedEvent.getQuotaRoot()) .isEqualTo(quotaRootResolver.forUser(ALICE)); }); } -} \ No newline at end of file + + private void appendAMessage(Username username) throws Exception { + MailboxSession session = mailboxManager.createSystemSession(username); + MailboxPath inbox = MailboxPath.inbox(username); + mailboxManager.createMailbox(inbox, session); + MessageManager messageManager = mailboxManager.getMailbox(inbox, session); + messageManager.appendMessage(MessageManager.AppendCommand.from( + Message.Builder.of() + .setTo("test@localhost.com") + .setBody("This is a message", StandardCharsets.UTF_8)), + session); + mailboxManager.endProcessingRequest(session); + } +} diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java index 0f69d237cf8..4845ba437e9 100644 --- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java +++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java @@ -28,6 +28,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Stream; import org.reactivestreams.Publisher; @@ -107,6 +108,16 @@ public static BiConsumer, SynchronousSink> publishIfPresent() return (element, sink) -> element.ifPresent(sink::next); } + public static BiConsumer> raiseErrorIfFalse(Supplier exceptionSupplier) { + return (element, sink) -> { + if (!element) { + sink.error(exceptionSupplier.get()); + } else { + sink.next(element); + } + }; + } + public static InputStream toInputStream(Flux byteArrays) { return new StreamInputStream(byteArrays.toStream(DEFAULT_INPUT_STREAM_PREFETCH)); } diff --git a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java index 0df2783b142..988e342e25d 100644 --- a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java +++ b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java @@ -19,9 +19,12 @@ package org.apache.james.webadmin.routes; +import java.util.Optional; + import jakarta.inject.Inject; import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesService; +import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesService.RunningOptions; import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesTask; import org.apache.james.webadmin.tasks.TaskFromRequestRegistry; import org.apache.james.webadmin.tasks.TaskRegistrationKey; @@ -42,7 +45,13 @@ public SolveMailboxInconsistenciesRequestToTask(SolveMailboxInconsistenciesServi "`ALL-SERVICES-ARE-OFFLINE` in order to prevent accidental calls. " + "Check the documentation for details."); - return new SolveMailboxInconsistenciesTask(service); + int maxIterations = RunningOptionsParser.intQueryParameter(request, "maxIterations") + .orElse(RunningOptions.DEFAULT_MAX_ITERATIONS); + boolean autoMerge = Optional.ofNullable(request.queryParams("autoMerge")) + .map(Boolean::parseBoolean) + .orElse(RunningOptions.DEFAULT_AUTO_MERGE); + + return new SolveMailboxInconsistenciesTask(service, new RunningOptions(maxIterations, autoMerge)); }); } } diff --git a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTaskTest.java b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTaskTest.java index b0a3cc7b898..5e34d34fe3e 100644 --- a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTaskTest.java +++ b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTaskTest.java @@ -86,7 +86,7 @@ void setUp() { taskManager = new MemoryTaskManager(new Hostname("foo")); service = mock(SolveMailboxInconsistenciesService.class); - Mockito.when(service.fixMailboxInconsistencies(any())).thenReturn(Mono.just(Task.Result.COMPLETED)); + Mockito.when(service.fixMailboxInconsistencies(any(), any())).thenReturn(Mono.just(Task.Result.COMPLETED)); webAdminServer = WebAdminUtils.createWebAdminServer( new TasksRoutes(taskManager, jsonTransformer, DTOConverter.of(SolveMessageInconsistenciesTaskAdditionalInformationDTO.module())), diff --git a/src/site/markdown/server/manage-webadmin.md b/src/site/markdown/server/manage-webadmin.md index e338ebc313e..24773c9063d 100644 --- a/src/site/markdown/server/manage-webadmin.md +++ b/src/site/markdown/server/manage-webadmin.md @@ -849,6 +849,26 @@ Will schedule a task for fixing inconsistencies for the mailbox deduplicated obj The `I-KNOW-WHAT-I-M-DOING` header is mandatory (you can read more information about it in the warning section below). +Optional query parameters: + + - `maxIterations` strictly positive integer, defaults to `1`. Reconciliation is run up to a fixpoint: + fixing an inconsistency in one pass (dropping a stale path, merging a ghost mailbox...) can surface a + new inconsistency that only a subsequent pass detects. The task re-runs as long as a pass keeps applying + fixes, bounded by `maxIterations` to guard against oscillation. A value of `1` keeps the historical + single-pass behaviour. + - `autoMerge` boolean, defaults to `false`. When `true`, conflicting entries where two **different** + mailboxes resolve to the same path (the historical "ghost mailbox") are resolved automatically: the + mailbox registered in the path table (the source of truth) is kept, and the squatting one is merged into + it (its messages and rights are moved over, then its projection is dropped), reusing the + [ghost mailbox](#correcting-ghost-mailbox) merging machinery. This is **destructive** (mail data is + moved), only attempted once a strong read confirms the loser is a genuine ghost not registered anywhere + else, and left `false` by default so that an admin explicitly opts in. Combine it with `maxIterations > 1` + so that any residual left by a merge converges within the same task run. + +``` +curl -XPOST '/mailboxes?task=SolveInconsistencies&maxIterations=5&autoMerge=true' +``` + The scheduled task will have the following type `solve-mailbox-inconsistencies` and the following `additionalInformation`: ``` @@ -867,13 +887,17 @@ The scheduled task will have the following type `solve-mailbox-inconsistencies` "mailboxPath":"#private:user:mailboxName2", "mailboxId":"464765a0-e4e7-11e4-aba4-710c1de3782b" } - }] + }], + "runningOptions":{ + "maxIterations": 5, + "autoMerge": true + } } ``` -Note that conflicting entry inconsistencies will not be fixed and will require to explicitly use -[ghost mailbox](#correcting-ghost-mailbox) endpoint in order to merge the conflicting mailboxes and prevent any message -loss. +Note that, unless `autoMerge` is enabled, conflicting entry inconsistencies will not be fixed and will +require to explicitly use [ghost mailbox](#correcting-ghost-mailbox) endpoint in order to merge the +conflicting mailboxes and prevent any message loss. **WARNING**: this task can cancel concurrently running legitimate user operations upon dirty read. As such this task should be run offline.