diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java index d7ed2b5124613..a914465294d67 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java @@ -34,6 +34,7 @@ import org.apache.ignite.util.GridCommandHandlerPropertiesTest; import org.apache.ignite.util.GridCommandHandlerScheduleIndexRebuildTest; import org.apache.ignite.util.GridCommandHandlerTracingConfigurationTest; +import org.apache.ignite.util.IdleVerifyCheckWithWriteThroughTest; import org.apache.ignite.util.IdleVerifyDumpTest; import org.apache.ignite.util.MetricCommandTest; import org.apache.ignite.util.PerformanceStatisticsCommandTest; @@ -77,7 +78,8 @@ SecurityCommandHandlerPermissionsTest.class, - IdleVerifyDumpTest.class + IdleVerifyDumpTest.class, + IdleVerifyCheckWithWriteThroughTest.class }) public class IgniteControlUtilityTestSuite2 { } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java new file mode 100644 index 0000000000000..de8f3e35ff066 --- /dev/null +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.util; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.regex.Pattern; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.Transaction; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.hamcrest.CoreMatchers.anyOf; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.is; + +/** */ +public class IdleVerifyCheckWithWriteThroughTest extends GridCommandHandlerClusterPerMethodAbstractTest { + /** Node kill trigger. */ + private static CountDownLatch nodeKillLatch; + + /** */ + @Parameterized.Parameter(1) + public Boolean withPersistence; + + /** */ + @Parameterized.Parameter(2) + public Boolean multiCache; + + /** */ + private static final String CORRECT_VERIFY_MSG = "The check procedure has finished, no conflicts have been found."; + + /** */ + private static final String WITHOUT_WRITE_THROUGH_CACHE = "withoutWriteThrough"; + + /** */ + @Parameterized.Parameters(name = "cmdHnd={0}, withPersistence={1}, multiCache={2}") + public static Collection parameters() { + return List.of( + new Object[] {CLI_CMD_HND, false, false}, + new Object[] {CLI_CMD_HND, false, true}, + new Object[] {CLI_CMD_HND, true, false}, + new Object[] {CLI_CMD_HND, true, true} + ); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + persistenceEnable(withPersistence); + + if (withPersistence) + cleanPersistenceDir(); + + nodeKillLatch = new CountDownLatch(1); + MapCacheStore.salvagedLatch = new CountDownLatch(1); + MapCacheStore.txCoordStoreLatch = new CountDownLatch(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + try { + for (Ignite node : G.allGrids()) { + Collection txs = ((IgniteKernal)node).context().cache().context().tm().activeTransactions(); + + assertTrue("Unfinished txs [node=" + node.name() + ", txs=" + txs + ']', txs.isEmpty()); + } + } + finally { + stopAllGrids(); + + super.afterTest(); + } + } + + /** {@inheritDoc} */ + @Override protected boolean persistenceEnable() { + return withPersistence; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setCommunicationSpi(new TestRecordingCommunicationSpi()); + } + + /** */ + @Test + public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exception { + // sequential start is important here + IgniteEx nodeCoord = startGrid(0); + // near node + IgniteEx nodePrimary = startGrid(1); + // backup node + IgniteEx nodeBackup = startGrid(2); + + int firstVal = 0; + int secondVal = 1; + + nodeCoord.cluster().state(ClusterState.ACTIVE); + + CacheConfiguration ccfgWithWriteThrough = createCache(DEFAULT_CACHE_NAME, true); + IgniteCache cache = nodeCoord.createCache(ccfgWithWriteThrough); + + IgniteCache cacheWithoutWriteThrough = null; + + if (multiCache) { + CacheConfiguration ccfgWithoutWriteThrough = createCache(WITHOUT_WRITE_THROUGH_CACHE, false); + cacheWithoutWriteThrough = nodeCoord.createCache(ccfgWithoutWriteThrough); + } + + Integer primaryKey = primaryKey(nodePrimary.cache(DEFAULT_CACHE_NAME)); + + try (Transaction tx = nodeCoord.transactions().txStart(OPTIMISTIC, READ_COMMITTED)) { + cache.put(primaryKey, firstVal); + + if (multiCache) { + Integer primaryKeyWithoutWriteThrough = primaryKey(nodePrimary.cache(WITHOUT_WRITE_THROUGH_CACHE)); + cacheWithoutWriteThrough.put(primaryKeyWithoutWriteThrough, firstVal); + } + + tx.commit(); + } + + sqlVisibilityCheck(List.of(nodeCoord, nodeBackup), primaryKey, firstVal); + + nodeCoord.cluster().state(ClusterState.INACTIVE); + + GridMessageListener lsnr = new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { + if (msg instanceof GridNearTxPrepareResponse) { + Collection txs = nodeBackup.context().cache().context().tm().activeTransactions(); + + assertEquals(1, txs.size()); + assertFalse(txs.iterator().next().local()); + + nodeKillLatch.countDown(); + + try { + assertTrue(waitForCondition(() -> + !nodeBackup.context().cache().context().tm().hackMap.isEmpty(), 5_000)); + } + catch (IgniteInterruptedCheckedException e) { + throw new RuntimeException(e); + } + } + } + }; + + nodeCoord.context().io().removeMessageListener(GridTopic.TOPIC_CACHE); // Remove old cache listener. + nodeCoord.context().io().addMessageListener(GridTopic.TOPIC_CACHE, lsnr); // Register as first listener. + nodeCoord.context().cache().context().io().start0(); // Register cache listener again. + + nodeCoord.cluster().state(ClusterState.ACTIVE); + + nodeCoord.context().event().addDiscoveryEventListener(new BeforeRecoveryListener(), EVT_NODE_FAILED, EVT_NODE_LEFT); + + IgniteInternalFuture stopFut = GridTestUtils.runAsync(() -> { + nodeKillLatch.await(); + nodePrimary.close(); + }); + + injectTestSystemOut(); + + try (Transaction tx = nodeCoord.transactions().txStart(OPTIMISTIC, READ_COMMITTED)) { + cache.put(primaryKey, secondVal); + + if (multiCache) { + Integer primaryKeyWithoutWriteThrough = primaryKey(nodePrimary.cache(WITHOUT_WRITE_THROUGH_CACHE)); + cacheWithoutWriteThrough.put(primaryKeyWithoutWriteThrough, secondVal); + } + + tx.commit(); + } + catch (Throwable th) { + fail("Unexpected exception: " + th); + } + + stopFut.get(getTestTimeout()); + + awaitPartitionMapExchange(); + + sqlVisibilityCheck(List.of(nodeCoord, nodeBackup), primaryKey, secondVal); + + int cacheSize = cache.size(); + assertEquals(1, cacheSize); + + int locSize = -1; + Object peeked = null; + + for (Ignite g : G.allGrids()) { + final Affinity aff = affinity(g.cache(DEFAULT_CACHE_NAME)); + + boolean primary = aff.isPrimary(g.cluster().localNode(), primaryKey); + + Object peeked0; + + if (primary) + peeked0 = g.cache(DEFAULT_CACHE_NAME).localPeek(primaryKey, CachePeekMode.PRIMARY); + else + peeked0 = g.cache(DEFAULT_CACHE_NAME).localPeek(primaryKey, CachePeekMode.BACKUP); + + if (peeked == null) + peeked = peeked0; + else { + assertEquals(peeked, peeked0); + assertNotNull(peeked); + } + + int locSize0; + + if (primary) + locSize0 = g.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY); + else + locSize0 = g.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.BACKUP); + + if (locSize == -1) + locSize = locSize0; + else { + assertEquals(locSize, locSize0); + } + + assertNotNull("grid instance: " + g.name(), g.cache(DEFAULT_CACHE_NAME).get(primaryKey)); + } + + assertEquals(EXIT_CODE_OK, execute("--port", connectorPort(grid(2)), "--cache", "idle_verify")); + + String out = testOut.toString(); + + // partVerHash can be different + if (withPersistence) { + Assert.assertThat(out, anyOf(is(containsString("updateCntr=[lwm=2, missed=[], hwm=2], " + + "partitionState=OWNING, size=1")), is(containsString(CORRECT_VERIFY_MSG)))); + Assert.assertThat(out, anyOf(is(containsString("updateCntr=[lwm=2, missed=[], hwm=2], " + + "partitionState=OWNING, size=1")), is(containsString(CORRECT_VERIFY_MSG)))); + } + else { + Assert.assertThat(out, anyOf(is(containsString("consistentId=gridCommandHandlerTest0, " + + "updateCntr=1, partitionState=OWNING, size=1")), is(containsString(CORRECT_VERIFY_MSG)))); + Assert.assertThat(out, anyOf(is(containsString("consistentId=gridCommandHandlerTest2, " + + "updateCntr=1, partitionState=OWNING, size=1")), is(containsString(CORRECT_VERIFY_MSG)))); + } + testOut.reset(); + + if (withPersistence) { + stopAllGrids(); + startGridsMultiThreaded(3); + + awaitPartitionMapExchange(true, true, null); + + assertEquals(EXIT_CODE_OK, execute("--port", connectorPort(grid(2)), "--cache", "idle_verify")); + out = testOut.toString(); + + Pattern regexCorrectCheck = Pattern.compile(CORRECT_VERIFY_MSG); + boolean correctOut = regexCorrectCheck.matcher(out).find(); + + // partVerHash are different, thus only regex check here + String regexCheck = "Partition instances: \\[PartitionHashRecord" + + ".*?consistentId=%s, updateCntr=\\[lwm=2, missed=\\[\\], hwm=2\\], partitionState=OWNING, size=1"; + Pattern part0Pattern = Pattern.compile(String.format(regexCheck, "gridCommandHandlerTest0")); + Pattern part1Pattern = Pattern.compile(String.format(regexCheck, "gridCommandHandlerTest1")); + Pattern part2Pattern = Pattern.compile(String.format(regexCheck, "gridCommandHandlerTest2")); + + boolean matches = + part0Pattern.matcher(out).find() && + part1Pattern.matcher(out).find() && + part2Pattern.matcher(out).find(); + + assertTrue(out, matches || correctOut); + } + } + + /** */ + private void sqlVisibilityCheck(List nodes, int keyToCheck, int referal) { + for (Ignite node : nodes) { + Object ret = node.compute(node.cluster().forLocal()).call(new IgniteCallable<>() { + /** */ + @IgniteInstanceResource + private Ignite instance; + + /** */ + @Override public Integer call() { + String selectSql = "SELECT VAL FROM " + DEFAULT_CACHE_NAME + " WHERE ID=" + keyToCheck; + + List> res = instance.cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery(selectSql)).getAll(); + + return (int)res.get(0).get(0); + } + }); + + assertEquals("Unexpected result on node: " + node.name(), referal, ret); + } + } + + /** */ + private CacheConfiguration createCache(String cacheName, boolean writeThrough) { + CacheConfiguration ccfg = new CacheConfiguration<>(cacheName); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setCacheMode(CacheMode.REPLICATED); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg.setQueryEntities(List.of(queryEntity(cacheName))); + + if (writeThrough) { + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + ccfg.setCacheStoreFactory(MapCacheStore::new); + } + + return ccfg; + } + + /** */ + private QueryEntity queryEntity(String cacheName) { + var entity = new QueryEntity(); + + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Integer.class.getName()); + entity.addQueryField("ID", Integer.class.getName(), null); + entity.addQueryField("VAL", Integer.class.getName(), null); + entity.setKeyFieldName("ID"); + entity.setValueFieldName("VAL"); + entity.setTableName(cacheName); + + return entity; + } + + /** */ + public static class MapCacheStore extends CacheStoreAdapter { + /** Store map. */ + private final Map map = new ConcurrentHashMap<>(); + + /** */ + private static CountDownLatch salvagedLatch; + + /** */ + private static CountDownLatch txCoordStoreLatch; + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure clo, Object... args) { + for (Map.Entry e : map.entrySet()) + clo.apply(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public Object load(Object key) { + Object val = map.get(key); + + if (salvagedLatch != null) + salvagedLatch.countDown(); + + return val; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry e) { + map.put(e.getKey(), e.getValue()); + + if (txCoordStoreLatch != null) + txCoordStoreLatch.countDown(); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + map.remove(key); + } + } + + /** */ + private static class BeforeRecoveryListener implements DiscoveryEventListener, HighPriorityListener { + /** {@inheritDoc} */ + @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) { + U.awaitQuiet(MapCacheStore.txCoordStoreLatch); + } + + /** {@inheritDoc} */ + @Override public int order() { + return -1; + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 258bf5211cf95..f3f3d8063065a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -85,6 +85,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING; +import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.RECOVERY_FINISH_WT; import static org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx.addConflictVersion; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_BACKUP; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; @@ -529,6 +530,14 @@ private void commitIfLocked() throws IgniteCheckedException { while (true) { try { + boolean masterNodeInvolved = masterNodeIds().contains(cctx.localNodeId()); + if (isSystemInvalidate() && !masterNodeInvolved + && finalizationStatus() == RECOVERY_FINISH_WT) { + cctx.tm().hackMap.put(nearXidVersion(), this); + + break; + } + GridCacheEntryEx cached = txEntry.cached(); DataEntry dataEntry = null; @@ -815,9 +824,17 @@ else if (op == READ) { } } - cctx.tm().commitTx(this); + boolean masterNodeInvolved = masterNodeIds().contains(cctx.localNodeId()); - state(COMMITTED); + if (isSystemInvalidate() && storeWriteThrough() && + !masterNodeInvolved && finalizationStatus() == RECOVERY_FINISH_WT) { + systemInvalidate(false); + COMMIT_ALLOWED_UPD.compareAndSet(this, 1, 0); + } + else { + cctx.tm().commitTx(this); + state(COMMITTED); + } } } } @@ -827,8 +844,11 @@ else if (op == READ) { if (optimistic()) state(PREPARED); - if (!state(COMMITTING)) { - TransactionState state = state(); + TransactionState state = state(); + + // Possible GridDhtTxFinishRequest with checkCommited flag + if (!state(COMMITTING) && state != COMMITTING) { + state = state(); // If other thread is doing commit, then no-op. if (state == COMMITTING || state == COMMITTED) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index e60041e34ed06..c2ad54b41b910 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -17,9 +17,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; import javax.cache.processor.EntryProcessor; @@ -226,12 +226,7 @@ public GridDhtTxRemote( /** {@inheritDoc} */ @Override public Collection masterNodeIds() { - Collection res = new ArrayList<>(2); - - res.add(nearNodeId); - res.add(nodeId); - - return res; + return List.of(nearNodeId, nodeId); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index adb13f44816fa..18fcce947b566 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -914,6 +914,11 @@ private void notifyCacheStoreSessionListeners(SessionData ses, @Nullable StoreOp lsnr.onSessionStart(locSes); } } + catch (RuntimeException e) { + U.error(log, "Exception raised during the notification of cache store session listeners: ", e); + + throw e; + } catch (Exception e) { throw new IgniteCheckedException("Failed to start store session: " + e, e); } @@ -934,6 +939,14 @@ private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws store.sessionEnd(!threwEx); } } + catch (RuntimeException e) { + U.error(log, "Exception raised during the notification of cache store session listeners: ", e); + + if (!threwEx) + throw U.cast(e); + else + throw e; + } catch (Exception e) { if (!threwEx) throw U.cast(e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 92764327fa9d4..a8808fd2894d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -56,7 +56,10 @@ public enum FinalizationStatus { USER_FINISH, /** Transaction is being finalized by recovery procedure. */ - RECOVERY_FINISH + RECOVERY_FINISH, + + /** */ + RECOVERY_FINISH_WT } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 193eb2a4e8ae0..f60d9f59f9db0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -604,6 +604,7 @@ protected void uncommit() { break; + case RECOVERY_FINISH_WT: case RECOVERY_FINISH: res = FINALIZING_UPD.compareAndSet(this, FinalizationStatus.NONE, status) || finalizing == status; @@ -1608,6 +1609,7 @@ protected IgniteBiTuple applyTransformClosures( assert cacheCtx != null; + // TODO: write description if (isSystemInvalidate()) return F.t(cacheCtx.writeThrough() ? RELOAD : DELETE, null); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 9eb5115d5354b..19bcca50dff66 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -108,7 +108,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** Owning transaction. */ @GridToStringExclude @GridDirectTransient - public IgniteInternalTx tx; + @Nullable public IgniteInternalTx tx; /** Cache key. */ @GridToStringExclude diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 10f38a3d105e1..23ff733d06784 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -1351,6 +1351,8 @@ private void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishR assert nodeId != null; assert req != null; + GridDhtTxRemote dhtTx = null; + if (req.checkCommitted()) { boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null, req.version()); @@ -1362,14 +1364,20 @@ private void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishR fut.listen(() -> sendReply(nodeId, req, true, null)); } - return; + dhtTx = (GridDhtTxRemote)ctx.tm().hackMap.get(req.version()); + + if (dhtTx == null) + return; } // Always add version to rollback history to prevent races with rollbacks. if (!req.commit()) ctx.tm().addRolledbackTx(null, req.version()); - GridDhtTxRemote dhtTx = ctx.tm().tx(req.version()); + dhtTx = ctx.tm().tx(req.version()); // NOT GOOD NEED DIFFERENT APPROACH !!! + if (dhtTx == null) + dhtTx = (GridDhtTxRemote)ctx.tm().hackMap.get(req.version()); + GridNearTxRemote nearTx = ctx.tm().nearTx(req.version()); IgniteInternalTx anyTx = U.firstNotNull(dhtTx, nearTx); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 9990cb67a0f95..e5e46a093b3b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -140,6 +140,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.RECOVERY_FINISH; +import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.RECOVERY_FINISH_WT; import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH; import static org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId; import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap; @@ -239,6 +240,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** Per-ID map for near transactions. */ private final ConcurrentMap nearIdMap = newMap(); + /** */ + public final ConcurrentMap hackMap = newMap(); + /** Deadlock detection futures. */ private final ConcurrentMap deadlockDetectFuts = new ConcurrentHashMap<>(); @@ -3137,11 +3141,14 @@ private TxRecoveryInitRunnable(ClusterNode node) { ", failedNodeId=" + evtNodeId + ']'); for (final IgniteInternalTx tx : activeTransactions()) { - if ((tx.near() && !tx.local() && tx.originatingNodeId().equals(evtNodeId)) - || (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId))) { + if ((tx.near() && !tx.local() && tx.originatingNodeId().equals(evtNodeId)) || + (tx.storeWriteThrough() && tx.eventNodeId().equals(evtNodeId))) { // Invalidate transactions. salvageTx(tx, RECOVERY_FINISH); } + else if ((tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId))) { + salvageTx(tx, RECOVERY_FINISH_WT); + } else { // Check prepare only if originating node ID failed. Otherwise, parent node will finish this tx. if (tx.originatingNodeId().equals(evtNodeId)) { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java index 7d05a9bde65ad..9ab1c418331a9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java @@ -282,7 +282,7 @@ private void multithreaded(CacheWriteSynchronizationMode syncMode, ignite.destroyCache(DEFAULT_CACHE_NAME); if (restartFut != null) - restartFut.get(); + restartFut.get(getTestTimeout()); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java index 937b51cd4dee7..67de2b5bfe5dc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java @@ -379,7 +379,7 @@ private void txWithStore(final TransactionConcurrency concurrency, boolean write final IgniteCache clientCache = client.cache(DEFAULT_CACHE_NAME); IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { - @Override public Void call() throws Exception { + @Override public Void call() { log.info("Start put"); clientCache.put(key, 2);