diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CacheWriteSynchronizationModeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CacheWriteSynchronizationModeMessage.java deleted file mode 100644 index 3769c41d049a8..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CacheWriteSynchronizationModeMessage.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.managers.communication; - -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.internal.Order; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.jetbrains.annotations.Nullable; - -/** */ -public class CacheWriteSynchronizationModeMessage implements Message { - /** Type code. */ - public static final short TYPE_CODE = 503; - - /** Cache write synchronization mode value. */ - @Nullable private CacheWriteSynchronizationMode cacheWriteSyncMode; - - /** Code of cache write synchronization mode. */ - @Order(0) - private byte code = -1; - - /** Constructor. */ - public CacheWriteSynchronizationModeMessage() { - // No-op. - } - - /** Constructor. */ - public CacheWriteSynchronizationModeMessage(@Nullable CacheWriteSynchronizationMode mode) { - cacheWriteSyncMode = mode; - code = encode(mode); - } - - /** @param mode Cache write synchronization mode to encode. */ - private static byte encode(@Nullable CacheWriteSynchronizationMode mode) { - if (mode == null) - return -1; - - switch (mode) { - case FULL_SYNC: return 0; - case FULL_ASYNC: return 1; - case PRIMARY_SYNC: return 2; - } - - throw new IllegalArgumentException("Unknown cache write synchronization mode: " + mode); - } - - /** @param code Code of cache write synchronization mode to decode. */ - @Nullable private static CacheWriteSynchronizationMode decode(short code) { - switch (code) { - case -1: return null; - case 0: return CacheWriteSynchronizationMode.FULL_SYNC; - case 1: return CacheWriteSynchronizationMode.FULL_ASYNC; - case 2: return CacheWriteSynchronizationMode.PRIMARY_SYNC; - } - - throw new IllegalArgumentException("Unknown cache write synchronization mode code: " + code); - } - - /** @param code Code of cache write synchronization mode. */ - public void code(byte code) { - this.code = code; - cacheWriteSyncMode = decode(code); - } - - /** @return Code of cache write synchronization mode. */ - public byte code() { - return code; - } - - /** @return Cache write synchronization mode value. */ - public CacheWriteSynchronizationMode value() { - return cacheWriteSyncMode; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return TYPE_CODE; - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridCacheOperationMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridCacheOperationMessage.java deleted file mode 100644 index e187c6549190a..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridCacheOperationMessage.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ignite.internal.managers.communication; - -import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.processors.cache.GridCacheOperation; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.jetbrains.annotations.Nullable; - -/** */ -public class GridCacheOperationMessage implements Message { - /** Type code. */ - public static final short TYPE_CODE = 504; - - /** Cache oparation. */ - @Nullable private GridCacheOperation cacheOperation; - - /** Cache oparation code. */ - @Order(0) - private byte code = -1; - - /** Constructor. */ - public GridCacheOperationMessage() { - // No-op. - } - - /** Constructor. */ - public GridCacheOperationMessage(@Nullable GridCacheOperation cacheOperation) { - this.cacheOperation = cacheOperation; - code = encode(cacheOperation); - } - - /** @param operation Cache operation to encode. */ - private static byte encode(@Nullable GridCacheOperation operation) { - if (operation == null) - return -1; - - switch (operation) { - case READ: return 0; - case CREATE: return 1; - case UPDATE: return 2; - case DELETE: return 3; - case TRANSFORM: return 4; - case RELOAD: return 5; - case NOOP: return 6; - } - - throw new IllegalArgumentException("Unknown cache operation: " + operation); - } - - /** @param code Cache operation code to dencode to a cache operation value. */ - @Nullable private static GridCacheOperation decode(byte code) { - switch (code) { - case -1: return null; - case 0: return GridCacheOperation.READ; - case 1: return GridCacheOperation.CREATE; - case 2: return GridCacheOperation.UPDATE; - case 3: return GridCacheOperation.DELETE; - case 4: return GridCacheOperation.TRANSFORM; - case 5: return GridCacheOperation.RELOAD; - case 6: return GridCacheOperation.NOOP; - } - - throw new IllegalArgumentException("Unknown cache operation code: " + code); - } - - /** @code Cache operation code. */ - public void code(byte code) { - this.code = code; - cacheOperation = decode(code); - } - - /** @return Cache operation code. */ - public byte code() { - return code; - } - - /** @return Cache operation value. */ - @Nullable public GridCacheOperation value() { - return cacheOperation; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return TYPE_CODE; - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index e607d42eb0d7b..6f82ec8402394 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -46,14 +46,12 @@ import org.apache.ignite.internal.codegen.CachePartitionPartialCountersMapSerializer; import org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer; import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer; -import org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer; import org.apache.ignite.internal.codegen.ClusterMetricsUpdateMessageSerializer; import org.apache.ignite.internal.codegen.ContinuousRoutineStartResultMessageSerializer; import org.apache.ignite.internal.codegen.ErrorMessageSerializer; import org.apache.ignite.internal.codegen.ExchangeInfoSerializer; import org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer; import org.apache.ignite.internal.codegen.GridCacheEntryInfoSerializer; -import org.apache.ignite.internal.codegen.GridCacheOperationMessageSerializer; import org.apache.ignite.internal.codegen.GridCacheQueryResponseSerializer; import org.apache.ignite.internal.codegen.GridCacheReturnSerializer; import org.apache.ignite.internal.codegen.GridCacheSqlQuerySerializer; @@ -99,6 +97,7 @@ import org.apache.ignite.internal.codegen.GridJobSiblingsRequestSerializer; import org.apache.ignite.internal.codegen.GridJobSiblingsResponseSerializer; import org.apache.ignite.internal.codegen.GridNearAtomicCheckUpdateRequestSerializer; +import org.apache.ignite.internal.codegen.GridNearAtomicFullUpdateRequestSerializer; import org.apache.ignite.internal.codegen.GridNearAtomicSingleUpdateFilterRequestSerializer; import org.apache.ignite.internal.codegen.GridNearAtomicSingleUpdateInvokeRequestSerializer; import org.apache.ignite.internal.codegen.GridNearAtomicSingleUpdateRequestSerializer; @@ -393,7 +392,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider { factory.register((short)37, GridDhtAtomicDeferredUpdateResponse::new, new GridDhtAtomicDeferredUpdateResponseSerializer()); factory.register((short)38, GridDhtAtomicUpdateRequest::new, new GridDhtAtomicUpdateRequestSerializer()); factory.register((short)39, GridDhtAtomicUpdateResponse::new, new GridDhtAtomicUpdateResponseSerializer()); - factory.register((short)40, GridNearAtomicFullUpdateRequest::new); + factory.register((short)40, GridNearAtomicFullUpdateRequest::new, new GridNearAtomicFullUpdateRequestSerializer()); factory.register((short)41, GridNearAtomicUpdateResponse::new, new GridNearAtomicUpdateResponseSerializer()); factory.register((short)42, GridDhtForceKeysRequest::new, new GridDhtForceKeysRequestSerializer()); factory.register((short)43, GridDhtForceKeysResponse::new, new GridDhtForceKeysResponseSerializer()); @@ -502,9 +501,6 @@ public class GridIoMessageFactory implements MessageFactoryProvider { new CachePartitionPartialCountersMapSerializer()); factory.register(IgniteDhtDemandedPartitionsMap.TYPE_CODE, IgniteDhtDemandedPartitionsMap::new, new IgniteDhtDemandedPartitionsMapSerializer()); - factory.register(CacheWriteSynchronizationModeMessage.TYPE_CODE, CacheWriteSynchronizationModeMessage::new, - new CacheWriteSynchronizationModeMessageSerializer()); - factory.register(GridCacheOperationMessage.TYPE_CODE, GridCacheOperationMessage::new, new GridCacheOperationMessageSerializer()); factory.register(BinaryMetadataVersionInfo.TYPE_CODE, BinaryMetadataVersionInfo::new, new BinaryMetadataVersionInfoSerializer()); factory.register(CachePartitionFullCountersMap.TYPE_CODE, CachePartitionFullCountersMap::new, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java index cb80feb7e2e96..2f4159640ab68 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; -import java.nio.ByteBuffer; import java.util.List; import java.util.UUID; import javax.cache.expiry.ExpiryPolicy; @@ -25,8 +24,6 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.managers.communication.CacheWriteSynchronizationModeMessage; -import org.apache.ignite.internal.managers.communication.GridCacheOperationMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -38,8 +35,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -50,7 +45,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheIdMes /** Message index. */ public static final int CACHE_MSG_IDX = nextIndexId(); - /** . */ + /** */ private static final int NEED_PRIMARY_RES_FLAG_MASK = 0x01; /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */ @@ -88,13 +83,13 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheIdMes @Order(value = 5, method = "topologyVersion") protected AffinityTopologyVersion topVer; - /** Cache operation wrapper message. */ - @Order(value = 6, method = "cacheOperationMessage") - protected GridCacheOperationMessage opMsg; + /** Cache update operation. */ + @Order(value = 6, method = "operation") + protected GridCacheOperation op; - /** Write synchronization mode wrapper message. */ - @Order(value = 7, method = "writeSynchronizationModeMessage") - protected CacheWriteSynchronizationModeMessage syncModeMsg; + /** Write synchronization mode. */ + @Order(value = 7, method = "writeSynchronizationMode") + protected CacheWriteSynchronizationMode syncMode; /** Task name hash. */ @Order(8) @@ -143,8 +138,8 @@ protected GridNearAtomicAbstractUpdateRequest( this.nodeId = nodeId; this.futId = futId; this.topVer = topVer; - this.opMsg = new GridCacheOperationMessage(op); - this.syncModeMsg = new CacheWriteSynchronizationModeMessage(syncMode); + this.op = op; + this.syncMode = syncMode; this.taskNameHash = taskNameHash; this.flags = flags; this.addDepInfo = addDepInfo; @@ -241,7 +236,7 @@ public void topologyVersion(AffinityTopologyVersion topVer) { } /** {@inheritDoc} */ - @Override public final IgniteLogger messageLogger(GridCacheSharedContext ctx) { + @Override public final IgniteLogger messageLogger(GridCacheSharedContext ctx) { return ctx.atomicMessageLogger(); } @@ -270,9 +265,9 @@ void needPrimaryResponse(boolean needRes) { * @return {@code True} if update is processed in {@link CacheWriteSynchronizationMode#FULL_SYNC} mode. */ boolean fullSync() { - assert syncModeMsg != null && writeSynchronizationMode() != null; + assert syncMode != null; - return writeSynchronizationMode() == CacheWriteSynchronizationMode.FULL_SYNC; + return syncMode == CacheWriteSynchronizationMode.FULL_SYNC; } /** @@ -304,15 +299,29 @@ public void flags(short flags) { } /** - * @return Update opreation. + * @return Cache update operation. + */ + public GridCacheOperation operation() { + return op; + } + + /** + * @param op Cache update operation. */ - @Nullable public GridCacheOperation operation() { - return opMsg.value(); + public void operation(GridCacheOperation op) { + this.op = op; } - /** @return Cache operatrion. */ - @Nullable public CacheWriteSynchronizationMode writeSynchronizationMode() { - return syncModeMsg.value(); + /** @return Write synchronization mode. */ + public CacheWriteSynchronizationMode writeSynchronizationMode() { + return syncMode; + } + + /** + * @param syncMode Write synchronization mode. + */ + public void writeSynchronizationMode(CacheWriteSynchronizationMode syncMode) { + this.syncMode = syncMode; } /** @@ -336,28 +345,6 @@ public void futureId(long futId) { this.futId = futId; } - /** @return The cache operation wrapper message. */ - public GridCacheOperationMessage cacheOperationMessage() { - return opMsg; - } - - /** Sets the cache operation wrapper message. */ - public void cacheOperationMessage(GridCacheOperationMessage cacheOpMsg) { - this.opMsg = cacheOpMsg; - } - - /** - * @return The write mode synchronization wrapper message. - */ - public final CacheWriteSynchronizationModeMessage writeSynchronizationModeMessage() { - return syncModeMsg; - } - - /** Sets the write mode synchronization wrapper message */ - public void writeSynchronizationModeMessage(CacheWriteSynchronizationModeMessage writeSyncModeMsg) { - this.syncModeMsg = writeSyncModeMsg; - } - /** * @param res Response. * @return {@code True} if current response was {@code null}. @@ -376,7 +363,7 @@ public boolean onResponse(GridNearAtomicUpdateResponse res) { * */ void resetResponse() { - this.res = null; + res = null; } /** @@ -400,13 +387,6 @@ final boolean topologyLocked() { return isFlag(TOP_LOCKED_FLAG_MASK); } - /** - * @param val {@code True} if topology is locked on near node. - */ - private void topologyLocked(boolean val) { - setFlag(val, TOP_LOCKED_FLAG_MASK); - } - /** * @return Return value flag. */ @@ -414,13 +394,6 @@ public final boolean returnValue() { return isFlag(RET_VAL_FLAG_MASK); } - /** - * @param val Return value flag. - */ - public final void returnValue(boolean val) { - setFlag(val, RET_VAL_FLAG_MASK); - } - /** * @return Skip write-through to a persistent storage. */ @@ -433,13 +406,6 @@ public final boolean skipReadThrough() { return isFlag(SKIP_READ_THROUGH_FLAG_MASK); } - /** - * @param val Skip store flag. - */ - public void skipStore(boolean val) { - setFlag(val, SKIP_STORE_FLAG_MASK); - } - /** * @return Keep binary flag. */ @@ -447,13 +413,6 @@ public final boolean keepBinary() { return isFlag(KEEP_BINARY_FLAG_MASK); } - /** - * @param val Keep binary flag. - */ - public void keepBinary(boolean val) { - setFlag(val, KEEP_BINARY_FLAG_MASK); - } - /** * @return Recovery flag. */ @@ -461,13 +420,6 @@ public final boolean recovery() { return isFlag(RECOVERY_FLAG_MASK); } - /** - * @param val Recovery flag. - */ - public void recovery(boolean val) { - setFlag(val, RECOVERY_FLAG_MASK); - } - /** * Sets flag mask. * @@ -585,125 +537,6 @@ abstract void addUpdateEntry(KeyCacheObject key, */ public abstract KeyCacheObject key(int idx); - // TODO: remove after IGNITE-26599, IGNITE-26577 - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 4: - if (!writer.writeShort(flags)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeLong(futId)) - return false; - - writer.incrementState(); - - case 6: - if (!writer.writeMessage(opMsg)) - return false; - - writer.incrementState(); - - case 7: - if (!writer.writeMessage(syncModeMsg)) - return false; - - writer.incrementState(); - - case 8: - if (!writer.writeInt(taskNameHash)) - return false; - - writer.incrementState(); - - case 9: - if (!writer.writeAffinityTopologyVersion(topVer)) - return false; - - writer.incrementState(); - - } - - return true; - } - - // TODO: remove after IGNITE-26599, IGNITE-26577 - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 4: - flags = reader.readShort(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - futId = reader.readLong(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 6: - opMsg = reader.readMessage(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 7: - syncModeMsg = reader.readMessage(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 8: - taskNameHash = reader.readInt(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 9: - topVer = reader.readAffinityTopologyVersion(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - /** {@inheritDoc} */ @Override public String toString() { StringBuilder flags = new StringBuilder(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java index 5105db3b9e3be..a80d8d9961b33 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -26,8 +25,7 @@ import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.internal.GridDirectCollection; -import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -43,9 +41,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -58,51 +53,52 @@ */ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdateRequest { /** Keys to update. */ + @Order(10) @GridToStringInclude - @GridDirectCollection(KeyCacheObject.class) private List keys; /** Values to update. */ - @GridDirectCollection(CacheObject.class) + @Order(value = 11, method = "valuesToUpdate") private List vals; /** Entry processors. */ - @GridDirectTransient private List> entryProcessors; /** Entry processors bytes. */ - @GridDirectCollection(byte[].class) - private List entryProcessorsBytes; + @Order(12) + private @Nullable List entryProcessorsBytes; /** Conflict versions. */ - @GridDirectCollection(GridCacheVersion.class) - private List conflictVers; + @Order(value = 13, method = "conflictVersions") + private @Nullable List conflictVers; /** Conflict TTLs. */ + @Order(14) private GridLongList conflictTtls; /** Conflict expire times. */ + @Order(15) private GridLongList conflictExpireTimes; /** Optional arguments for entry processor. */ - @GridDirectTransient - private Object[] invokeArgs; + private @Nullable Object[] invokeArgs; /** Entry processor arguments bytes. */ - private byte[][] invokeArgsBytes; + @Order(value = 16, method = "invokeArgumentsBytes") + private @Nullable List invokeArgsBytes; /** Expiry policy. */ - @GridDirectTransient - private ExpiryPolicy expiryPlc; + private @Nullable ExpiryPolicy expiryPlc; /** Expiry policy bytes. */ - private byte[] expiryPlcBytes; + @Order(value = 17, method = "expiryPolicyBytes") + private @Nullable byte[] expiryPlcBytes; /** Filter. */ - private CacheEntryPredicate[] filter; + @Order(18) + private @Nullable CacheEntryPredicate[] filter; /** Maximum possible size of inner collections. */ - @GridDirectTransient private int initSize; /** @@ -158,7 +154,7 @@ public GridNearAtomicFullUpdateRequest() { this.invokeArgs = invokeArgs; this.filter = filter; - // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries + // By default, ArrayList expands to array of 10 elements on first add. We cannot guess how many entries // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys // participate in request. As such, we know upper bound of all collections in request. If this bound is lower // than 10, we use it. @@ -242,6 +238,13 @@ else if (conflictVers != null) return keys; } + /** + * @param keys Keys to update. + */ + public void keys(List keys) { + this.keys = keys; + } + /** {@inheritDoc} */ @Override public int size() { assert keys != null; @@ -286,6 +289,13 @@ else if (conflictVers != null) return conflictVers; } + /** + * @param conflictVers Conflict versions. + */ + public void conflictVersions(@Nullable List conflictVers) { + this.conflictVers = conflictVers; + } + /** {@inheritDoc} */ @Override @Nullable public GridCacheVersion conflictVersion(int idx) { if (conflictVers != null) { @@ -329,16 +339,23 @@ else if (conflictVers != null) return filter; } + /** + * @param filter Filter. + */ + public void filter(@Nullable CacheEntryPredicate[] filter) { + this.filter = filter; + } + /** {@inheritDoc} */ @Override public ExpiryPolicy expiry() { return expiryPlc; } /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - GridCacheContext cctx = ctx.cacheContext(cacheId); + GridCacheContext cctx = ctx.cacheContext(cacheId); if (expiryPlc != null && expiryPlcBytes == null) expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc)); @@ -368,18 +385,18 @@ else if (conflictVers != null) if (entryProcessorsBytes == null) entryProcessorsBytes = marshalCollection(entryProcessors, cctx); - if (invokeArgsBytes == null) - invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx); + if (!F.isEmpty(invokeArgs) && invokeArgsBytes == null) + invokeArgsBytes = Arrays.asList(marshalInvokeArguments(invokeArgs, cctx)); } else prepareMarshalCacheObjects(vals, cctx); } /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - GridCacheContext cctx = ctx.cacheContext(cacheId); + GridCacheContext cctx = ctx.cacheContext(cacheId); if (expiryPlcBytes != null && expiryPlc == null) expiryPlc = U.unmarshal(ctx, expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); @@ -397,8 +414,8 @@ else if (conflictVers != null) if (entryProcessors == null) entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); - if (invokeArgs == null) - invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr); + if (invokeArgsBytes != null && invokeArgs == null) + invokeArgs = unmarshalInvokeArguments(invokeArgsBytes.toArray(new byte[invokeArgsBytes.size()][]), ctx, ldr); } else finishUnmarshalCacheObjects(vals, cctx, ldr); @@ -411,163 +428,88 @@ else if (conflictVers != null) return keys.get(0).partition(); } - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 10: - if (!writer.writeGridLongList(conflictExpireTimes)) - return false; - - writer.incrementState(); - - case 11: - if (!writer.writeGridLongList(conflictTtls)) - return false; - - writer.incrementState(); - - case 12: - if (!writer.writeCollection(conflictVers, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 13: - if (!writer.writeCollection(entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) - return false; - - writer.incrementState(); - - case 14: - if (!writer.writeByteArray(expiryPlcBytes)) - return false; - - writer.incrementState(); - - case 15: - if (!writer.writeObjectArray(filter, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 16: - if (!writer.writeObjectArray(invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) - return false; - - writer.incrementState(); - - case 17: - if (!writer.writeCollection(keys, MessageCollectionItemType.KEY_CACHE_OBJECT)) - return false; - - writer.incrementState(); - - case 18: - if (!writer.writeCollection(vals, MessageCollectionItemType.CACHE_OBJECT)) - return false; - - writer.incrementState(); - - } - - return true; + /** + * @return Values to update. + */ + public List valuesToUpdate() { + return vals; } - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 10: - conflictExpireTimes = reader.readGridLongList(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 11: - conflictTtls = reader.readGridLongList(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 12: - conflictVers = reader.readCollection(MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 13: - entryProcessorsBytes = reader.readCollection(MessageCollectionItemType.BYTE_ARR); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 14: - expiryPlcBytes = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 15: - filter = reader.readObjectArray(MessageCollectionItemType.MSG, CacheEntryPredicate.class); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 16: - invokeArgsBytes = reader.readObjectArray(MessageCollectionItemType.BYTE_ARR, byte[].class); + /** + * @param vals Values to update. + */ + public void valuesToUpdate(List vals) { + this.vals = vals; + } - if (!reader.isLastRead()) - return false; + /** + * @return Entry processors bytes. + */ + public @Nullable List entryProcessorsBytes() { + return entryProcessorsBytes; + } - reader.incrementState(); + /** + * @param entryProcessorsBytes Entry processors bytes. + */ + public void entryProcessorsBytes(@Nullable List entryProcessorsBytes) { + this.entryProcessorsBytes = entryProcessorsBytes; + } - case 17: - keys = reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT); + /** + * @return Conflict TTLs. + */ + public GridLongList conflictTtls() { + return conflictTtls; + } - if (!reader.isLastRead()) - return false; + /** + * @param conflictTtls Conflict TTLs. + */ + public void conflictTtls(GridLongList conflictTtls) { + this.conflictTtls = conflictTtls; + } - reader.incrementState(); + /** + * @return Conflict expire times. + */ + public GridLongList conflictExpireTimes() { + return conflictExpireTimes; + } - case 18: - vals = reader.readCollection(MessageCollectionItemType.CACHE_OBJECT); + /** + * @param conflictExpireTimes Conflict expire times. + */ + public void conflictExpireTimes(GridLongList conflictExpireTimes) { + this.conflictExpireTimes = conflictExpireTimes; + } - if (!reader.isLastRead()) - return false; + /** + * @return Entry processor arguments bytes. + */ + public @Nullable List invokeArgumentsBytes() { + return invokeArgsBytes; + } - reader.incrementState(); + /** + * @param invokeArgsBytes Entry processor arguments bytes. + */ + public void invokeArgumentsBytes(@Nullable List invokeArgsBytes) { + this.invokeArgsBytes = invokeArgsBytes; + } - } + /** + * @return Expiry policy bytes. + */ + public @Nullable byte[] expiryPolicyBytes() { + return expiryPlcBytes; + } - return true; + /** + * @param expiryPlcBytes Expiry policy bytes. + */ + public void expiryPolicyBytes(@Nullable byte[] expiryPlcBytes) { + this.expiryPlcBytes = expiryPlcBytes; } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CacheWriteSynchroizationModeMessageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CacheWriteSynchroizationModeMessageTest.java deleted file mode 100644 index 50bd6370c6347..0000000000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CacheWriteSynchroizationModeMessageTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.managers.communication; - -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.internal.util.typedef.F; -import org.junit.Test; - -import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; - -/** */ -public class CacheWriteSynchroizationModeMessageTest { - /** */ - @Test - public void testCacheWriteSynchroizationCode() { - assertEquals(-1, new CacheWriteSynchronizationModeMessage(null).code()); - assertEquals(0, new CacheWriteSynchronizationModeMessage(CacheWriteSynchronizationMode.FULL_SYNC).code()); - assertEquals(1, new CacheWriteSynchronizationModeMessage(CacheWriteSynchronizationMode.FULL_ASYNC).code()); - assertEquals(2, new CacheWriteSynchronizationModeMessage(CacheWriteSynchronizationMode.PRIMARY_SYNC).code()); - - for (CacheWriteSynchronizationMode op : CacheWriteSynchronizationMode.values()) { - assertTrue(new CacheWriteSynchronizationModeMessage(op).code() >= 0); - assertTrue(new CacheWriteSynchronizationModeMessage(op).code() < 3); - } - } - - /** */ - @Test - public void testCacheWriteSynchronizationFromCode() { - CacheWriteSynchronizationModeMessage msg = new CacheWriteSynchronizationModeMessage(null); - - msg.code((byte)-1); - assertNull(msg.value()); - - msg.code((byte)0); - assertSame(CacheWriteSynchronizationMode.FULL_SYNC, msg.value()); - - msg.code((byte)1); - assertSame(CacheWriteSynchronizationMode.FULL_ASYNC, msg.value()); - - msg.code((byte)2); - assertSame(CacheWriteSynchronizationMode.PRIMARY_SYNC, msg.value()); - - Throwable t = assertThrowsWithCause(() -> msg.code((byte)3), IllegalArgumentException.class); - assertEquals("Unknown cache write synchronization mode code: 3", t.getMessage()); - } - - /** */ - @Test - public void testConversionConsistency() { - for (CacheWriteSynchronizationMode op : F.concat(CacheWriteSynchronizationMode.values(), (CacheWriteSynchronizationMode)null)) { - CacheWriteSynchronizationModeMessage msg = new CacheWriteSynchronizationModeMessage(op); - - assertEquals(op, msg.value()); - - CacheWriteSynchronizationModeMessage newMsg = new CacheWriteSynchronizationModeMessage(); - newMsg.code(msg.code()); - - assertEquals(msg.value(), newMsg.value()); - } - } -} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCacheOperationModeMessageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCacheOperationModeMessageTest.java deleted file mode 100644 index c120a692e59d6..0000000000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCacheOperationModeMessageTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.managers.communication; - -import org.apache.ignite.internal.processors.cache.GridCacheOperation; -import org.apache.ignite.internal.util.typedef.F; -import org.junit.Test; - -import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; - -/** */ -public class GridCacheOperationModeMessageTest { - /** */ - @Test - public void testCacheOperationModeCode() { - assertEquals(-1, new GridCacheOperationMessage(null).code()); - assertEquals(0, new GridCacheOperationMessage(GridCacheOperation.READ).code()); - assertEquals(1, new GridCacheOperationMessage(GridCacheOperation.CREATE).code()); - assertEquals(2, new GridCacheOperationMessage(GridCacheOperation.UPDATE).code()); - assertEquals(3, new GridCacheOperationMessage(GridCacheOperation.DELETE).code()); - assertEquals(4, new GridCacheOperationMessage(GridCacheOperation.TRANSFORM).code()); - assertEquals(5, new GridCacheOperationMessage(GridCacheOperation.RELOAD).code()); - assertEquals(6, new GridCacheOperationMessage(GridCacheOperation.NOOP).code()); - - for (GridCacheOperation op : GridCacheOperation.values()) { - assertTrue(new GridCacheOperationMessage(op).code() >= 0); - assertTrue(new GridCacheOperationMessage(op).code() < 7); - } - } - - /** */ - @Test - public void testCacheOperationModeFromCode() { - GridCacheOperationMessage msg = new GridCacheOperationMessage(null); - - msg.code((byte)-1); - assertNull(msg.value()); - - msg.code((byte)0); - assertSame(GridCacheOperation.READ, msg.value()); - - msg.code((byte)1); - assertSame(GridCacheOperation.CREATE, msg.value()); - - msg.code((byte)2); - assertSame(GridCacheOperation.UPDATE, msg.value()); - - msg.code((byte)3); - assertSame(GridCacheOperation.DELETE, msg.value()); - - msg.code((byte)4); - assertSame(GridCacheOperation.TRANSFORM, msg.value()); - - msg.code((byte)5); - assertSame(GridCacheOperation.RELOAD, msg.value()); - - msg.code((byte)6); - assertSame(GridCacheOperation.NOOP, msg.value()); - - Throwable t = assertThrowsWithCause(() -> msg.code((byte)7), IllegalArgumentException.class); - assertEquals("Unknown cache operation code: 7", t.getMessage()); - } - - /** */ - @Test - public void testConversionConsistency() { - for (GridCacheOperation op : F.concat(GridCacheOperation.values(), (GridCacheOperation)null)) { - GridCacheOperationMessage msg = new GridCacheOperationMessage(op); - - assertEquals(op, msg.value()); - - GridCacheOperationMessage newMsg = new GridCacheOperationMessage(); - newMsg.code(msg.code()); - - assertEquals(msg.value(), newMsg.value()); - } - } -} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 303b2431d78aa..fbd490fb6b174 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -38,10 +38,8 @@ import org.apache.ignite.internal.TransactionsMXBeanImplTest; import org.apache.ignite.internal.codegen.MessageProcessorTest; import org.apache.ignite.internal.managers.communication.CacheEntryPredicateAdapterMessageTest; -import org.apache.ignite.internal.managers.communication.CacheWriteSynchroizationModeMessageTest; import org.apache.ignite.internal.managers.communication.DefaultEnumMapperTest; import org.apache.ignite.internal.managers.communication.ErrorMessageSelfTest; -import org.apache.ignite.internal.managers.communication.GridCacheOperationModeMessageTest; import org.apache.ignite.internal.managers.communication.IndexKeyTypeMessageTest; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentV2Test; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentV2TestNoOptimizations; @@ -151,8 +149,6 @@ MessageProcessorTest.class, ErrorMessageSelfTest.class, - GridCacheOperationModeMessageTest.class, - CacheWriteSynchroizationModeMessageTest.class, CacheEntryPredicateAdapterMessageTest.class, DefaultEnumMapperTest.class, IndexKeyTypeMessageTest.class