Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions .build/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,9 @@
</module>

<module name="SuppressWithNearbyCommentFilter">
<property name="commentFormat" value="checkstyle: permit this invocation"/>
<property name="idFormat" value="blockPathToFile"/>
<property name="influenceFormat" value="0"/>
</module>

<module name="SuppressWithNearbyCommentFilter">
<property name="commentFormat" value="checkstyle: permit this invocation"/>
<property name="idFormat" value="blockToCases"/>
<property name="influenceFormat" value="0"/>
<property name="commentFormat" value="checkstyle: permit this invocation"/>
<property name="idFormat" value="blockInstantNow|blockPathToFile|blockToCases"/>
<property name="influenceFormat" value="0"/>
</module>

<module name="RegexpSinglelineJava">
Expand Down
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
url = https://github.com/apache/cassandra-accord.git
branch = trunk
url = https://github.com/belliottsmith/cassandra-accord.git
branch = tail-latency
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
6.0-alpha2
* Accord: Tail Latency Improvements (CASSANDRA-21361)
* Artificial Latency Injection (CASSANDRA-17024)
* Accord: Clean Shutdown/Restart, Rebootstrap, et al (CASSANDRA-21355)
* Reduce memory allocations in SelectStatement.getQuery (CASSANDRA-21351)
* Avoid allocation by getFunctions in SelectStatement.authorize (CASSANDRA-21347)
* Avoid unit conversion in DatabaseDescriptor.getMaxValueSize() for every deserializing Cell (CASSANDRA-21295)
Expand Down
2 changes: 1 addition & 1 deletion modules/accord
Submodule accord updated 149 files
15 changes: 9 additions & 6 deletions src/java/org/apache/cassandra/concurrent/ExecutorLocals.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*/
public class ExecutorLocals implements WithResources, Closeable
{
private static final ExecutorLocals none = new ExecutorLocals(null, null);
private static final ExecutorLocals none = new ExecutorLocals(null, null, false);
private static final FastThreadLocal<ExecutorLocals> locals = new FastThreadLocal<ExecutorLocals>()
{
@Override
Expand All @@ -45,20 +45,23 @@ protected ExecutorLocals initialValue()

public static class Impl
{
protected static void set(TraceState traceState, ClientWarn.State clientWarnState)
@SuppressWarnings("resource")
protected static void set(TraceState traceState, ClientWarn.State clientWarnState, boolean eligibleForArtificialLatency)
{
if (traceState == null && clientWarnState == null) locals.set(none);
else locals.set(new ExecutorLocals(traceState, clientWarnState));
if (traceState == null && clientWarnState == null && !eligibleForArtificialLatency) locals.set(none);
else locals.set(new ExecutorLocals(traceState, clientWarnState, eligibleForArtificialLatency));
}
}

public final TraceState traceState;
public final ClientWarn.State clientWarnState;
public final boolean eligibleForArtificialLatency;

protected ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState)
protected ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState, boolean eligibleForArtificialLatency)
{
this.traceState = traceState;
this.clientWarnState = clientWarnState;
this.eligibleForArtificialLatency = eligibleForArtificialLatency;
}

/**
Expand All @@ -82,7 +85,7 @@ public static WithResources propagate()
public static ExecutorLocals create(TraceState traceState)
{
ExecutorLocals current = locals.get();
return current.traceState == traceState ? current : new ExecutorLocals(traceState, current.clientWarnState);
return current.traceState == traceState ? current : new ExecutorLocals(traceState, current.clientWarnState, current.eligibleForArtificialLatency);
}

public static void clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,28 @@

import com.fasterxml.jackson.annotation.JsonIgnore;

import accord.api.ProtocolModifiers.CleanCfkBefore;
import accord.api.ProtocolModifiers.CoordinatorBacklogExecution;
import accord.api.ProtocolModifiers.FastExecution;
import accord.api.ProtocolModifiers.ReplicaExecution;
import accord.api.ProtocolModifiers.SendStableMessages;
import accord.primitives.TxnId;
import accord.utils.Invariants;

import org.apache.cassandra.journal.Params;
import org.apache.cassandra.service.accord.serializers.Version;
import org.apache.cassandra.service.consensus.TransactionalMode;

import static org.apache.cassandra.config.AccordSpec.CatchupMode.NORMAL;
import static org.apache.cassandra.config.AccordSpec.QueueShardModel.THREAD_POOL_PER_SHARD;
import static org.apache.cassandra.config.AccordSpec.QueueSubmissionModel.SYNC;
import static org.apache.cassandra.config.AccordSpec.RangeIndexMode.in_memory;
import static org.apache.cassandra.config.AccordConfig.CatchupMode.NORMAL;
import static org.apache.cassandra.config.AccordConfig.QueuePriorityModel.HLC_FIFO;
import static org.apache.cassandra.config.AccordConfig.QueueShardModel.THREAD_POOL_PER_SHARD;
import static org.apache.cassandra.config.AccordConfig.QueueSubmissionModel.SYNC;
import static org.apache.cassandra.config.AccordConfig.RangeIndexMode.in_memory;

// TODO (expected): rename to AccordConf?
public class AccordSpec
public class AccordConfig
{
public volatile boolean enabled = false;

// TODO (expected): move to JournalSpec
public volatile String journal_directory;

/**
Expand Down Expand Up @@ -107,6 +112,37 @@ public enum QueueSubmissionModel
EXEC_ST
}

public enum QueuePriorityModel
{
/**
* All work is queued on a first-come first-serve basis.
* Overload can lead to more rapid degradation, as later phases of the state machine are delayed
* by the arrival of new work.
*/
FIFO,

/**
* If the work has an associated TxnId, prioritise by its HLC (and FIFO otherwise)
*/
HLC_FIFO,

/**
* Prioritise Apply, Stable, Commit, Accept, and PreAccept messages in that order.
* Within a given message type, prioritise by HLC.
* Other messages will be mixed with PreAccept messages, but using the next counter rather than the HLC of the TxnId.
* Note: this can have some performance edge cases for contended keys, as we may process Stable messages for later commands before
* we process earlier Accept/Commit, which may delay execution
*/
PHASE_HLC_FIFO,

/**
* Prioritise Apply, Stable, Commit, Accept, and PreAccept messages from the original coordinator only, in that order.
* Within a given message type, prioritise by HLC.
* Other messages will be mixed with PreAccept messages, but using the next counter rather than the HLC of the TxnId.
*/
ORIG_PHASE_HLC_FIFO
}

public QueueShardModel queue_shard_model = THREAD_POOL_PER_SHARD;
public QueueSubmissionModel queue_submission_model = SYNC;

Expand All @@ -115,6 +151,15 @@ public enum QueueSubmissionModel
*/
public volatile OptionaldPositiveInt queue_shard_count = OptionaldPositiveInt.UNDEFINED;

public QueuePriorityModel queue_priority_model = HLC_FIFO;

// yield to other executor threads after executing this many tasks in a row, if there are waiting threads and tasks
public int queue_yield_interval = 100;

/**
* If the HLC is older than this, queue by FIFO instead
*/
public DurationSpec.IntMillisecondsBound queue_priority_age_to_fifo = new DurationSpec.IntMillisecondsBound(500);
/**
* The target number of command stores to create per topology shard.
* This determines the amount of execution parallelism possible for a given table/shard on the host.
Expand Down Expand Up @@ -165,6 +210,10 @@ public enum QueueSubmissionModel
public volatile DurationSpec.IntSecondsBound shard_durability_cycle = new DurationSpec.IntSecondsBound(5, TimeUnit.MINUTES);
public volatile DurationSpec.IntSecondsBound global_durability_cycle = new DurationSpec.IntSecondsBound(5, TimeUnit.MINUTES);

public volatile DurationSpec.IntSecondsBound topology_watermark_interval = new DurationSpec.IntSecondsBound(60);
public volatile boolean topology_sync_propagator_enabled_pre_start = false;
public volatile boolean topology_sync_propagator_enabled_post_startup = false;

public enum TransactionalRangeMigration
{
auto, explicit
Expand All @@ -178,11 +227,6 @@ public enum TransactionalRangeMigration
*/
public volatile TransactionalRangeMigration range_migration = TransactionalRangeMigration.auto;

public enum RebootstrapMode
{
full_repair, truncate_and_stream
}

public enum CatchupMode
{
DISABLED,
Expand All @@ -195,15 +239,42 @@ public enum CatchupMode
* default transactional mode for tables created by this node when no transactional mode has been specified in the DDL
*/
public TransactionalMode default_transactional_mode = TransactionalMode.off;
public boolean ephemeralReadEnabled = true;

// ******** PROTOCOL MODIFIERS ***********

public CoordinatorBacklogExecution coordinator_backlog_execution;
public TxnId.FastPath permit_fast_path;
public Boolean permit_track_stable_medium_path;
public Boolean permit_fast_quorum_medium_path;
public Boolean always_inform_durable_single_key;
public ReplicaExecution replica_execution;
public Float replica_execution_distributed_persist_chance;
public FastExecution fast_write_execution;
public FastExecution fast_read_execution;
public CleanCfkBefore clean_cfk_before;
public SendStableMessages send_stable;
/**
* include the least information expected to be necessary in messages -
* this is more efficient but may lead to some additional traffic and latency when earlier messages had not arrived
*/
public Boolean send_minimal;
// note: simulator incompatible (for now)
public Boolean precise_micros;

public boolean ephemeral_reads = true;
public boolean state_cache_listener_jfr_enabled = false;

public float hard_reject_ratio = 0.5f;
public int min_soft_reject_count = 10;
public int max_soft_reject_count = 100;
public int min_soft_reject_count = 100;
public int max_soft_reject_count = 1000;
public DurationSpec.LongMicrosecondsBound soft_reject_age = new DurationSpec.LongMicrosecondsBound("10s");
public DurationSpec.LongMicrosecondsBound soft_reject_cumulative_age = new DurationSpec.LongMicrosecondsBound("60s");


public DurationSpec.IntSecondsBound commands_for_key_prune_delta = new DurationSpec.IntSecondsBound(1);
public int commands_for_key_prune_interval = 64;
public DurationSpec.IntSecondsBound max_conflicts_prune_delta = new DurationSpec.IntSecondsBound(1);

public DurationSpec.IntSecondsBound catchup_on_start_success_latency = new DurationSpec.IntSecondsBound(60);
public DurationSpec.IntSecondsBound catchup_on_start_fail_latency = new DurationSpec.IntSecondsBound(900);
public int catchup_on_start_max_attempts = 5;
Expand All @@ -215,7 +286,7 @@ public enum CatchupMode
public enum RangeIndexMode { in_memory, journal_sai }
public RangeIndexMode range_index_mode = in_memory;

public final JournalSpec journal = new JournalSpec();
public final JournalConfig journal = new JournalConfig();

public enum MixedTimeSourceHandling
{
Expand All @@ -224,7 +295,7 @@ public enum MixedTimeSourceHandling

public volatile MixedTimeSourceHandling mixedTimeSourceHandling = MixedTimeSourceHandling.reject;

public static class JournalSpec implements Params
public static class JournalConfig implements Params
{
public enum ReplayMode
{
Expand Down Expand Up @@ -264,11 +335,17 @@ public enum StopMarkerFailurePolicy
*/
EXIT,

/**
* @deprecated since alpha release, replaced by ALLOW_UNSAFE_STARTUP for consistency with FailurePolicy.ALLOW_UNSAFE_STARTUP
*/
@Deprecated(since="6.0")
UNSAFE_STARTUP,

/**
* If the start marker exceeds the stop marker startup, assuming the consensus log has been determined complete externally.
* Note this is VERY UNSAFE if you care about isolation guarantees.
*/
UNSAFE_STARTUP,
ALLOW_UNSAFE_STARTUP,

REBOOTSTRAP
}
Expand All @@ -289,7 +366,7 @@ public enum StopMarkerFailurePolicy
public Version version = Version.DOWNGRADE_SAFE_VERSION;
public boolean enable_compaction = true;

public JournalSpec setFlushPeriod(DurationSpec newFlushPeriod)
public JournalConfig setFlushPeriod(DurationSpec newFlushPeriod)
{
flushPeriod = newFlushPeriod;
flushCombinedBlockPeriod = Long.MIN_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public enum CassandraRelevantProperties
{
ACCORD_AGENT_CLASS("cassandra.test.accord.agent"),
ACCORD_ALLOW_TEST_MODES("cassandra.test.accord.allow_test_modes", "false"),
ACCORD_DEBUG_EXECUTION("accord.debug_execution"),
ACCORD_KEY_PARANOIA_COSTFACTOR(Invariants.KEY_PARANOIA_COSTFACTOR),
ACCORD_KEY_PARANOIA_CPU(Invariants.KEY_PARANOIA_CPU),
ACCORD_KEY_PARANOIA_MEMORY(Invariants.KEY_PARANOIA_MEMORY),
Expand All @@ -62,6 +63,10 @@ public enum CassandraRelevantProperties
ALLOW_UNSAFE_REPLACE("cassandra.allow_unsafe_replace"),
ALLOW_UNSAFE_TRANSIENT_CHANGES("cassandra.allow_unsafe_transient_changes"),
APPROXIMATE_TIME_PRECISION_MS("cassandra.approximate_time_precision_ms", "2"),
ARTIFICIAL_LATENCIES("cassandra.artificial_latencies"),
ARTIFICIAL_LATENCIES_UNSAFE("cassandra.artificial_latencies_unsafe"),
ARTIFICIAL_LATENCY_LIMIT("cassandra.artificial_latency_limit", "200ms"),
ARTIFICIAL_LATENCY_VERBS("cassandra.artificial_latency_verbs"),
ASYNC_PROFILER_ENABLED("cassandra.async_profiler.enabled", "false"),
ASYNC_PROFILER_UNSAFE_MODE("cassandra.async_profiler.unsafe_mode", "false"),
/** 2 ** GENSALT_LOG2_ROUNDS rounds of hashing will be performed. */
Expand Down
4 changes: 1 addition & 3 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ public static Set<String> splitCommaDelimited(String src)
@Replaces(oldName = "cas_contention_timeout_in_ms", converter = Converters.MILLIS_DURATION_LONG, deprecated = true)
public volatile DurationSpec.LongMillisecondsBound cas_contention_timeout = new DurationSpec.LongMillisecondsBound("1800ms");

public volatile DurationSpec.LongMillisecondsBound accord_preaccept_timeout = new DurationSpec.LongMillisecondsBound("1s");

@Replaces(oldName = "truncate_request_timeout_in_ms", converter = Converters.MILLIS_DURATION_LONG, deprecated = true)
public volatile DurationSpec.LongMillisecondsBound truncate_request_timeout = new DurationSpec.LongMillisecondsBound("60000ms");

Expand Down Expand Up @@ -1238,7 +1236,7 @@ public enum PaxosOnLinearizabilityViolation
*/
public ParameterizedClass default_compaction = null;

public final AccordSpec accord = new AccordSpec();
public final AccordConfig accord = new AccordConfig();

public static Supplier<Config> getOverrideLoadConfig()
{
Expand Down
Loading