Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,32 @@ a task].
The `I-KNOW-WHAT-I-M-DOING` header is mandatory (you can read more
information about it in the warning section below).

Optional query parameters:

* `maxIterations` strictly positive integer, defaults to `1`.
Reconciliation is run up to a fixpoint: fixing an inconsistency in one
pass (dropping a stale path, merging a ghost mailbox...) can surface a
new inconsistency that only a subsequent pass detects. The task re-runs
as long as a pass keeps applying fixes, bounded by `maxIterations` to
guard against oscillation. A value of `1` keeps the historical
single-pass behaviour.
* `autoMerge` boolean, defaults to `false`. When `true`, conflicting
entries where two *different* mailboxes resolve to the same path (the
historical "ghost mailbox") are resolved automatically: the mailbox
registered in the path table (the source of truth) is kept, and the
squatting one is merged into it (its messages and rights are moved over,
then its projection is dropped), reusing the
link:#_correcting_ghost_mailbox[ghost mailbox] merging machinery. This
is *destructive* (mail data is moved), only attempted once a strong read
confirms the loser is a genuine ghost not registered anywhere else, and
left `false` by default so that an admin explicitly opts in. Combine it
with `maxIterations > 1` so that any residual left by a merge converges
within the same task run.

....
curl -XPOST '/mailboxes?task=SolveInconsistencies&maxIterations=5&autoMerge=true'
....

The scheduled task will have the following type
`solve-mailbox-inconsistencies` and the following
`additionalInformation`:
Expand All @@ -33,14 +59,18 @@ The scheduled task will have the following type
"mailboxPath":"#private:user:mailboxName2",
"mailboxId":"464765a0-e4e7-11e4-aba4-710c1de3782b"
}
}]
}],
"runningOptions":{
"maxIterations": 5,
"autoMerge": true
}
}
....

Note that conflicting entry inconsistencies will not be fixed and will
require to explicitly use link:#_correcting_ghost_mailbox[ghost mailbox]
endpoint in order to merge the conflicting mailboxes and prevent any
message loss.
Note that, unless `autoMerge` is enabled, conflicting entry
inconsistencies will not be fixed and will require to explicitly use
link:#_correcting_ghost_mailbox[ghost mailbox] endpoint in order to
merge the conflicting mailboxes and prevent any message loss.

*WARNING*: this task can cancel concurrently running legitimate user
operations upon dirty read. As such this task should be run offline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,29 @@

import jakarta.inject.Inject;

import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
import org.apache.james.events.EventBus;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxPathLocker;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.SessionProvider;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxMapper;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.search.MailboxQuery;
import org.apache.james.mailbox.store.MailboxManagerConfiguration;
import org.apache.james.mailbox.store.PreDeletionHooks;
import org.apache.james.mailbox.store.StoreMailboxAnnotationManager;
import org.apache.james.mailbox.store.StoreMailboxManager;
import org.apache.james.mailbox.store.StoreMessageManager;
import org.apache.james.mailbox.store.StoreRightManager;
import org.apache.james.mailbox.store.mail.MailboxMapper;
import org.apache.james.mailbox.store.mail.ThreadIdGuessingAlgorithm;
import org.apache.james.mailbox.store.mail.model.impl.MessageParser;
import org.apache.james.mailbox.store.quota.QuotaComponents;
import org.apache.james.mailbox.store.search.MessageSearchIndex;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
Expand Down Expand Up @@ -118,4 +123,12 @@ protected StoreMessageManager createMessageManager(Mailbox mailboxRow, MailboxSe
public <T> Mono<T> manageProcessing(Mono<T> toBeWrapped, MailboxSession mailboxSession) {
return toBeWrapped;
}

@Override
protected Flux<Mailbox> getMailboxWithPathLikeUponRename(MailboxMapper mapper, MailboxQuery.UserBound query) {
if (mapper instanceof CassandraMailboxMapper cassandraMailboxMapper) {
return cassandraMailboxMapper.findMailboxWithPathLike(query, JamesExecutionProfiles.ConsistencyChoice.STRONG);
}
return super.getMailboxWithPathLikeUponRename(mapper, query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.james.mailbox.model.UidValidity;
import org.apache.james.mailbox.model.search.MailboxQuery;
import org.apache.james.mailbox.store.mail.MailboxMapper;
import org.apache.james.util.FunctionalUtils;
import org.apache.james.util.ReactorUtils;

import com.google.common.base.Preconditions;

Expand Down Expand Up @@ -192,18 +192,15 @@ private Mailbox addAcl(MailboxACL acl, Mailbox mailbox) {

@Override
public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) {
String fixedNamespace = query.getFixedNamespace();
Username fixedUser = query.getFixedUser();
return findMailboxWithPathLike(query, consistencyChoice());
}

return performReadRepair(listMailboxes(fixedNamespace, fixedUser))
public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query, JamesExecutionProfiles.ConsistencyChoice consistencyChoice) {
return performReadRepair(mailboxPathV3DAO.listUserMailboxes(query.getFixedNamespace(), query.getFixedUser(), consistencyChoice))
.filter(mailbox -> query.isPathMatch(mailbox.generateAssociatedPath()))
.flatMap(this::addAcl, CONCURRENCY);
}

private Flux<Mailbox> listMailboxes(String fixedNamespace, Username fixedUser) {
return mailboxPathV3DAO.listUserMailboxes(fixedNamespace, fixedUser, consistencyChoice());
}

@Override
public Mono<Mailbox> create(MailboxPath mailboxPath, UidValidity uidValidity) {
CassandraId cassandraId = CassandraId.timeBased();
Expand All @@ -221,23 +218,27 @@ public Mono<MailboxId> rename(Mailbox mailbox) {
Preconditions.checkNotNull(mailbox.getMailboxId(), "A mailbox we want to rename should have a defined mailboxId");

CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
return tryRename(mailbox, cassandraId)
.filter(FunctionalUtils.identityPredicate())
.switchIfEmpty(Mono.error(() -> new MailboxExistsException(mailbox.generateAssociatedPath().asString())))
.thenReturn(cassandraId);
}

private Mono<Boolean> tryRename(Mailbox cassandraMailbox, CassandraId cassandraId) {
return mailboxDAO.retrieveMailbox(cassandraId)
.flatMap(mailbox -> mailboxPathV3DAO.save(cassandraMailbox)
.filter(isCreated -> isCreated)
.flatMap(mailboxHasCreated -> deletePreviousMailboxPathReference(mailbox.generateAssociatedPath())
.then(persistMailboxEntity(cassandraMailbox))
.thenReturn(true))
.defaultIfEmpty(false))
.flatMap(storedMailbox -> rename(mailbox, storedMailbox.generateAssociatedPath()))
.switchIfEmpty(Mono.error(() -> new MailboxNotFoundException(cassandraId)));
}

@Override
public Mono<MailboxId> rename(Mailbox cassandraMailbox, MailboxPath previousPath) {
Preconditions.checkNotNull(cassandraMailbox.getMailboxId(), "A mailbox we want to rename should have a defined mailboxId");
CassandraId cassandraId = (CassandraId) cassandraMailbox.getMailboxId();

return mailboxPathV3DAO.save(cassandraMailbox)
.handle(ReactorUtils.raiseErrorIfFalse(() -> new MailboxExistsException(cassandraMailbox.generateAssociatedPath().asString())))
// Additive writes first (the new path reference and the projection), subtractive write
// last (drop the old path reference). At any crash point the mailbox thus stays
// reachable under its new path with a consistent projection; only a stale old path
// reference may linger (the delete is retried to absorb transient failures).
.flatMap(applied -> persistMailboxEntity(cassandraMailbox)
.then(deletePreviousMailboxPathReference(previousPath)))
.thenReturn(cassandraId);
}

private Mono<Void> persistMailboxEntity(Mailbox cassandraMailbox) {
return mailboxDAO.save(cassandraMailbox)
.retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF));
Expand All @@ -250,7 +251,7 @@ private Mono<Void> deletePreviousMailboxPathReference(MailboxPath mailboxPath) {

@Override
public Mono<Boolean> hasChildren(Mailbox mailbox, char delimiter) {
return performReadRepair(listMailboxes(mailbox.getNamespace(), mailbox.getUser()))
return performReadRepair(mailboxPathV3DAO.listUserMailboxes(mailbox.getNamespace(), mailbox.getUser(), consistencyChoice()))
.filter(idAndPath -> isPathChildOfMailbox(idAndPath, mailbox, delimiter))
.hasElements();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public class CassandraMailboxPathV3DAO {
private final PreparedStatement delete;
private final PreparedStatement insert;
private final PreparedStatement select;
private final PreparedStatement selectWriteTime;
private final PreparedStatement selectUser;
private final PreparedStatement selectAll;
private final CqlSession session;
private final DriverExecutionProfile lwtProfile;
private final DriverExecutionProfile readProfile;
private final DriverExecutionProfile writeProfile;

@Inject
public CassandraMailboxPathV3DAO(CqlSession session) {
Expand All @@ -78,11 +78,11 @@ public CassandraMailboxPathV3DAO(CqlSession session) {
this.insert = prepareInsert();
this.delete = prepareDelete();
this.select = prepareSelect();
this.selectWriteTime = prepareSelectWriteTime();
this.selectUser = prepareSelectUser();
this.selectAll = prepareSelectAll();
this.lwtProfile = JamesExecutionProfiles.getLWTProfile(session);
this.readProfile = ProfileLocator.READ.locateProfile(session, "MAILBOXPATHV3");
this.writeProfile = ProfileLocator.WRITE.locateProfile(session, "MAILBOXPATHV3");
}

private PreparedStatement prepareDelete() {
Expand Down Expand Up @@ -114,6 +114,15 @@ private PreparedStatement prepareSelect() {
.build());
}

private PreparedStatement prepareSelectWriteTime() {
return session.prepare(selectFrom(TABLE_NAME)
.writeTime(MAILBOX_ID)
.where(column(NAMESPACE).isEqualTo(bindMarker(NAMESPACE)),
column(USER).isEqualTo(bindMarker(USER)),
column(MAILBOX_NAME).isEqualTo(bindMarker(MAILBOX_NAME)))
.build());
}

private PreparedStatement prepareSelectUser() {
return session.prepare(selectFrom(TABLE_NAME)
.columns(MAILBOX_ID, UIDVALIDITY, MAILBOX_NAME)
Expand Down Expand Up @@ -144,6 +153,16 @@ public Mono<Mailbox> retrieve(MailboxPath mailboxPath, JamesExecutionProfiles.Co
.switchIfEmpty(ReactorUtils.executeAndEmpty(() -> logGhostMailboxFailure(mailboxPath)));
}

public Mono<Long> writeTime(MailboxPath mailboxPath) {
BoundStatement statement = selectWriteTime.bind()
.set(NAMESPACE, mailboxPath.getNamespace(), TypeCodecs.TEXT)
.set(USER, sanitizeUser(mailboxPath.getUser()), TypeCodecs.TEXT)
.set(MAILBOX_NAME, mailboxPath.getName(), TypeCodecs.TEXT);

return cassandraAsyncExecutor.executeSingleRow(setExecutionProfileIfNeeded(statement, STRONG))
.map(row -> row.getLong(0));
}

public Flux<Mailbox> listUserMailboxes(String namespace, Username user, JamesExecutionProfiles.ConsistencyChoice consistencyChoice) {
BoundStatementBuilder statementBuilder = selectUser.boundStatementBuilder()
.set(NAMESPACE, namespace, TypeCodecs.TEXT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,15 @@ public MailboxMergingTaskRunner(MailboxManager mailboxManager, StoreMessageIdMan
}

public Task.Result run(CassandraId oldMailboxId, CassandraId newMailboxId, MailboxMergingTask.Context context) {
return moveMessages(oldMailboxId, newMailboxId, mailboxSession, context)
.flatMap(onMoveCompleteOperations(oldMailboxId, newMailboxId))
return runReactive(oldMailboxId, newMailboxId, context)
.block();
}

public Mono<Task.Result> runReactive(CassandraId oldMailboxId, CassandraId newMailboxId, MailboxMergingTask.Context context) {
return moveMessages(oldMailboxId, newMailboxId, mailboxSession, context)
.flatMap(onMoveCompleteOperations(oldMailboxId, newMailboxId));
}

private Function<Task.Result, Mono<Task.Result>> onMoveCompleteOperations(CassandraId oldMailboxId, CassandraId newMailboxId) {
return result -> {
if (result == Task.Result.COMPLETED) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.cassandra.mail.task;

import java.util.Optional;

import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesService.RunningOptions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public class SolveMailboxInconsistenciesRunningOptionsDTO {
public static SolveMailboxInconsistenciesRunningOptionsDTO asDTO(RunningOptions domainObject) {
return new SolveMailboxInconsistenciesRunningOptionsDTO(
Optional.of(domainObject.getMaxIterations()),
Optional.of(domainObject.isAutoMerge()));
}

private final Optional<Integer> maxIterations;
private final Optional<Boolean> autoMerge;

@JsonCreator
public SolveMailboxInconsistenciesRunningOptionsDTO(@JsonProperty("maxIterations") Optional<Integer> maxIterations,
@JsonProperty("autoMerge") Optional<Boolean> autoMerge) {
this.maxIterations = maxIterations;
this.autoMerge = autoMerge;
}

public Optional<Integer> getMaxIterations() {
return maxIterations;
}

public Optional<Boolean> getAutoMerge() {
return autoMerge;
}

public RunningOptions asDomainObject() {
return new RunningOptions(
maxIterations.orElse(RunningOptions.DEFAULT_MAX_ITERATIONS),
autoMerge.orElse(RunningOptions.DEFAULT_AUTO_MERGE));
}
}
Loading