From 595c395f0d03a2fe0cbb7fbaad9064a5f6f7545c Mon Sep 17 00:00:00 2001 From: zstan Date: Wed, 28 Jan 2026 08:47:53 +0300 Subject: [PATCH 1/5] IGNITE-27678 Same partitions on different nodes can hold different updates if writeThrough is enabled --- .../IgniteControlUtilityTestSuite2.java | 4 +- .../IdleVerifyCheckWithWriteThroughTest.java | 278 ++++++++++++++++++ .../distributed/dht/GridDhtTxRemote.java | 9 +- .../store/GridCacheStoreManagerAdapter.java | 22 ++ 4 files changed, 305 insertions(+), 8 deletions(-) create mode 100644 modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java index d7ed2b5124613..a914465294d67 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java @@ -34,6 +34,7 @@ import org.apache.ignite.util.GridCommandHandlerPropertiesTest; import org.apache.ignite.util.GridCommandHandlerScheduleIndexRebuildTest; import org.apache.ignite.util.GridCommandHandlerTracingConfigurationTest; +import org.apache.ignite.util.IdleVerifyCheckWithWriteThroughTest; import org.apache.ignite.util.IdleVerifyDumpTest; import org.apache.ignite.util.MetricCommandTest; import org.apache.ignite.util.PerformanceStatisticsCommandTest; @@ -77,7 +78,8 @@ SecurityCommandHandlerPermissionsTest.class, - IdleVerifyDumpTest.class + IdleVerifyDumpTest.class, + IdleVerifyCheckWithWriteThroughTest.class }) public class IgniteControlUtilityTestSuite2 { } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java new file mode 100644 index 0000000000000..a19503041ed4f --- /dev/null +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.util; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreSession; +import org.apache.ignite.cache.store.CacheStoreSessionListener; +import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.AbstractFailureHandler; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.MapCacheStoreStrategy; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.Transaction; +import org.junit.Test; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; +import static org.apache.ignite.testframework.GridTestUtils.assertContains; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** */ +public class IdleVerifyCheckWithWriteThroughTest extends GridCommandHandlerClusterPerMethodAbstractTest { + /** */ + private AtomicReference err; + + /** Node kill trigger. */ + private static CountDownLatch nodeKill; + + /** Tx message flag. */ + private static volatile boolean finalTxMsgPassed; + + /** Session method flag. */ + private static AtomicBoolean sessionTriggered = new AtomicBoolean(); + + /** Storage exception message. */ + private static final String storageExceptionMessage = "Internal storage exception raised"; + + /** */ + @Parameterized.Parameter(1) + public Boolean withPersistence; + + /** */ + @Parameterized.Parameter(2) + public static Boolean failOnSessionStart; + + /** */ + @Parameterized.Parameters(name = "cmdHnd={0}, withPersistence={1}") + public static Collection parameters() { + return List.of( + new Object[] {CLI_CMD_HND, false, false}, + new Object[] {CLI_CMD_HND, false, true}, + new Object[] {CLI_CMD_HND, true, false}, + new Object[] {CLI_CMD_HND, true, true} + ); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + persistenceEnable(withPersistence); + + if (withPersistence) + cleanPersistenceDir(); + + err = new AtomicReference<>(); + + nodeKill = new CountDownLatch(1); + sessionTriggered = new AtomicBoolean(); + finalTxMsgPassed = false; + } + + /** {@inheritDoc} */ + @Override protected boolean persistenceEnable() { + return false; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setCommunicationSpi(new TestRecordingCommunicationSpi()) + .setFailureHandler(new AbstractFailureHandler() { + @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { + err.compareAndSet(null, failureCtx.error()); + + return false; + } + }); + } + + /** Test scenario: + *
    + *
  • Start 3 node [node0, node1, node2].
  • + *
  • Initialize put operation into transactional cache where [node1] holds primary partition for such insertion.
  • + *
  • Kill [node1] right after tx PREPARE stage is completed (it triggers tx recovery procedure.
  • + *
+ * + * @see IgniteTxManager#salvageTx(IgniteInternalTx) + */ + @Test + public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exception { + // sequential start is important here + startGrid(0); + startGrid(1); + startGrid(2); + + injectTestSystemOut(); + + int gridToStop = 1; + + IgniteEx instanceToStop = grid(gridToStop); + instanceToStop.cluster().state(ClusterState.ACTIVE); + + TestRecordingCommunicationSpi commSpi = + (TestRecordingCommunicationSpi)instanceToStop.configuration().getCommunicationSpi(); + commSpi.record(GridDhtTxFinishRequest.class); + + commSpi.blockMessages((node, msg) -> { + boolean ret = msg instanceof GridDhtTxFinishRequest; + + if (ret) { + nodeKill.countDown(); + finalTxMsgPassed = true; + } + + return ret; + }); + + MapCacheStoreStrategy strategy = new MapCacheStoreStrategy(); + strategy.resetStore(); + Factory> storeFactory = strategy.getStoreFactory(); + CacheConfiguration ccfg = new CacheConfiguration<>("cache"); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setCacheMode(CacheMode.REPLICATED); + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + ccfg.setCacheStoreFactory(storeFactory); + ccfg.setCacheStoreSessionListenerFactories(new TestCacheStoreFactory()); + + IgniteCache cache = instanceToStop.createCache(ccfg); + + awaitPartitionMapExchange(); + + IgniteInternalFuture stopFut = GridTestUtils.runAsync(() -> { + nodeKill.await(); + stopGrid(gridToStop); + }); + + // primary key for [node1] + Integer primaryKey = primaryKey(cache); + + //noinspection EmptyCatchBlock + try (Transaction tx = instanceToStop.transactions().txStart(OPTIMISTIC, READ_COMMITTED)) { + cache.put(primaryKey, new Object()); + tx.commit(); + } + catch (Throwable th) { + // No op + } + + stopFut.get(getTestTimeout()); + awaitPartitionMapExchange(); + + assertEquals(EXIT_CODE_OK, execute("--port", connectorPort(grid(2)), "--cache", "idle_verify")); + + String out = testOut.toString(); + + assertContains(log, out, "The check procedure has failed"); + // Update counters are equal but size is different + if (withPersistence) { + assertContains(log, out, "updateCntr=[lwm=0, missed=[], hwm=0], partitionState=OWNING, size=0"); + assertContains(log, out, "updateCntr=[lwm=1, missed=[], hwm=1], partitionState=OWNING, size=1"); + } + else { + assertContains(log, out, "updateCntr=1, partitionState=OWNING, size=0"); + assertContains(log, out, "updateCntr=1, partitionState=OWNING, size=1"); + } + testOut.reset(); + + assertNotNull(err.get()); + assertThat(err.get().getMessage(), is(containsString(storageExceptionMessage))); + + if (withPersistence) { + stopAllGrids(); + startGridsMultiThreaded(3); + + awaitPartitionMapExchange(true, true, null); + + assertEquals(EXIT_CODE_OK, execute("--port", connectorPort(grid(2)), "--cache", "idle_verify")); + out = testOut.toString(); + // partVerHash are different, thus only regex check here + Pattern primaryPattern = Pattern.compile("Partition instances: " + + "\\[PartitionHashRecord" + + ".*?hwm=1\\], partitionState=OWNING, size=1" + + ".*?hwm=1\\], partitionState=OWNING, size=1" + + ".*?hwm=1\\], partitionState=OWNING, size=1"); + + boolean matches = primaryPattern.matcher(out).find(); + assertTrue(matches); + } + } + + /** */ + private static class TestCacheStoreFactory implements Factory { + /** {@inheritDoc} */ + @Override public CacheStoreSessionListener create() { + return new TestCacheJdbcStoreSessionListener(); + } + } + + /** */ + private static class TestCacheJdbcStoreSessionListener extends CacheJdbcStoreSessionListener { + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + // No op. + } + + /** {@inheritDoc} */ + @Override public void onSessionStart(CacheStoreSession ses) { + // Originally connection need to be initialized here. + if (failOnSessionStart) { + if (finalTxMsgPassed && sessionTriggered.compareAndSet(false, true)) + throw new CacheWriterException(storageExceptionMessage); + } + } + + /** {@inheritDoc} */ + @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { + if (!failOnSessionStart) { + if (finalTxMsgPassed && sessionTriggered.compareAndSet(false, true)) + throw new CacheWriterException(storageExceptionMessage); + } + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index e60041e34ed06..f1b7e87dcbd1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -17,9 +17,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; import javax.cache.processor.EntryProcessor; @@ -226,12 +226,7 @@ public GridDhtTxRemote( /** {@inheritDoc} */ @Override public Collection masterNodeIds() { - Collection res = new ArrayList<>(2); - - res.add(nearNodeId); - res.add(nodeId); - - return res; + return nearNodeId != nodeId ? List.of(nearNodeId, nodeId) : List.of(nearNodeId); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index adb13f44816fa..f00bb1238e7a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -28,6 +28,7 @@ import java.util.NoSuchElementException; import java.util.Set; import java.util.UUID; +import java.util.function.Consumer; import javax.cache.Cache; import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; @@ -38,6 +39,8 @@ import org.apache.ignite.cache.store.CacheStoreSessionListener; import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -117,12 +120,17 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt /** Always keep binary. */ protected boolean alwaysKeepBinary; + /** Failure handler reaction. */ + private Consumer failureHandlerAction; + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void initialize(@Nullable CacheStore cfgStore, Map sesHolders) throws IgniteCheckedException { GridKernalContext ctx = igniteContext(); CacheConfiguration cfg = cacheConfiguration(); + failureHandlerAction = e -> ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + writeThrough = cfg.isWriteThrough(); readThrough = cfg.isReadThrough(); @@ -914,6 +922,13 @@ private void notifyCacheStoreSessionListeners(SessionData ses, @Nullable StoreOp lsnr.onSessionStart(locSes); } } + catch (RuntimeException e) { + U.error(log, "Exception raised during notify SessionListeners: ", e); + + failureHandlerAction.accept(e); + + throw e; + } catch (Exception e) { throw new IgniteCheckedException("Failed to start store session: " + e, e); } @@ -934,6 +949,13 @@ private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws store.sessionEnd(!threwEx); } } + catch (RuntimeException e) { + U.error(log, "Exception raised during sessionEnd: ", e); + + failureHandlerAction.accept(e); + + throw e; + } catch (Exception e) { if (!threwEx) throw U.cast(e); From 355f365b2b29cee73bbcd9e2dd194da94309f389 Mon Sep 17 00:00:00 2001 From: zstan Date: Thu, 29 Jan 2026 10:28:19 +0300 Subject: [PATCH 2/5] exception logic as for main stream --- .../processors/cache/store/GridCacheStoreManagerAdapter.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index f00bb1238e7a9..4e561cb65a035 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -954,7 +954,10 @@ private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws failureHandlerAction.accept(e); - throw e; + if (!threwEx) + throw U.cast(e); + else + throw e; } catch (Exception e) { if (!threwEx) From 9b4edaee7c2bc719dd5fa0a0336c08bbfc45c56c Mon Sep 17 00:00:00 2001 From: zstan Date: Fri, 30 Jan 2026 10:36:09 +0300 Subject: [PATCH 3/5] minor --- .../apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java index a19503041ed4f..b508103a0386b 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java @@ -115,7 +115,7 @@ public static Collection parameters() { /** {@inheritDoc} */ @Override protected boolean persistenceEnable() { - return false; + return withPersistence; } /** {@inheritDoc} */ From 767e485072ccd333b841f21669453e4735227ab5 Mon Sep 17 00:00:00 2001 From: zstan Date: Mon, 2 Feb 2026 11:59:42 +0300 Subject: [PATCH 4/5] fix after review --- .../IdleVerifyCheckWithWriteThroughTest.java | 23 ++++++++----------- .../distributed/dht/GridDhtTxRemote.java | 2 +- .../store/GridCacheStoreManagerAdapter.java | 14 +---------- 3 files changed, 12 insertions(+), 27 deletions(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java index b508103a0386b..2fb52277257b6 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java @@ -61,11 +61,11 @@ /** */ public class IdleVerifyCheckWithWriteThroughTest extends GridCommandHandlerClusterPerMethodAbstractTest { - /** */ - private AtomicReference err; + /** */ + private AtomicReference err = new AtomicReference<>(); /** Node kill trigger. */ - private static CountDownLatch nodeKill; + private static CountDownLatch nodeKillLatch; /** Tx message flag. */ private static volatile boolean finalTxMsgPassed; @@ -85,7 +85,7 @@ public class IdleVerifyCheckWithWriteThroughTest extends GridCommandHandlerClust public static Boolean failOnSessionStart; /** */ - @Parameterized.Parameters(name = "cmdHnd={0}, withPersistence={1}") + @Parameterized.Parameters(name = "cmdHnd={0}, withPersistence={1}, failOnSessionStart={2}") public static Collection parameters() { return List.of( new Object[] {CLI_CMD_HND, false, false}, @@ -106,9 +106,7 @@ public static Collection parameters() { if (withPersistence) cleanPersistenceDir(); - err = new AtomicReference<>(); - - nodeKill = new CountDownLatch(1); + nodeKillLatch = new CountDownLatch(1); sessionTriggered = new AtomicBoolean(); finalTxMsgPassed = false; } @@ -133,9 +131,9 @@ public static Collection parameters() { /** Test scenario: *
    - *
  • Start 3 node [node0, node1, node2].
  • + *
  • Start 3 nodes [node0, node1, node2].
  • *
  • Initialize put operation into transactional cache where [node1] holds primary partition for such insertion.
  • - *
  • Kill [node1] right after tx PREPARE stage is completed (it triggers tx recovery procedure.
  • + *
  • Kill [node1] right after tx PREPARE stage is completed (it triggers tx recovery procedure).
  • *
* * @see IgniteTxManager#salvageTx(IgniteInternalTx) @@ -162,7 +160,7 @@ public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exc boolean ret = msg instanceof GridDhtTxFinishRequest; if (ret) { - nodeKill.countDown(); + nodeKillLatch.countDown(); finalTxMsgPassed = true; } @@ -170,7 +168,6 @@ public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exc }); MapCacheStoreStrategy strategy = new MapCacheStoreStrategy(); - strategy.resetStore(); Factory> storeFactory = strategy.getStoreFactory(); CacheConfiguration ccfg = new CacheConfiguration<>("cache"); ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); @@ -185,7 +182,7 @@ public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exc awaitPartitionMapExchange(); IgniteInternalFuture stopFut = GridTestUtils.runAsync(() -> { - nodeKill.await(); + nodeKillLatch.await(); stopGrid(gridToStop); }); @@ -221,7 +218,7 @@ public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exc testOut.reset(); assertNotNull(err.get()); - assertThat(err.get().getMessage(), is(containsString(storageExceptionMessage))); + assertThat(err.get().getMessage(), is(containsString("Committing a transaction has produced runtime exception"))); if (withPersistence) { stopAllGrids(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index f1b7e87dcbd1c..c2ad54b41b910 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -226,7 +226,7 @@ public GridDhtTxRemote( /** {@inheritDoc} */ @Override public Collection masterNodeIds() { - return nearNodeId != nodeId ? List.of(nearNodeId, nodeId) : List.of(nearNodeId); + return List.of(nearNodeId, nodeId); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 4e561cb65a035..8f95792c3eefa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -28,7 +28,6 @@ import java.util.NoSuchElementException; import java.util.Set; import java.util.UUID; -import java.util.function.Consumer; import javax.cache.Cache; import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; @@ -39,8 +38,6 @@ import org.apache.ignite.cache.store.CacheStoreSessionListener; import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.failure.FailureContext; -import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -120,17 +117,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt /** Always keep binary. */ protected boolean alwaysKeepBinary; - /** Failure handler reaction. */ - private Consumer failureHandlerAction; - /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void initialize(@Nullable CacheStore cfgStore, Map sesHolders) throws IgniteCheckedException { GridKernalContext ctx = igniteContext(); CacheConfiguration cfg = cacheConfiguration(); - failureHandlerAction = e -> ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); - writeThrough = cfg.isWriteThrough(); readThrough = cfg.isReadThrough(); @@ -923,9 +915,7 @@ private void notifyCacheStoreSessionListeners(SessionData ses, @Nullable StoreOp } } catch (RuntimeException e) { - U.error(log, "Exception raised during notify SessionListeners: ", e); - - failureHandlerAction.accept(e); + U.error(log, "Exception raised during the notification of cache store session listeners: ", e); throw e; } @@ -952,8 +942,6 @@ private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws catch (RuntimeException e) { U.error(log, "Exception raised during sessionEnd: ", e); - failureHandlerAction.accept(e); - if (!threwEx) throw U.cast(e); else From 4f20b0b42ca1ab8a4955b68e2595c5b3f0ac6d22 Mon Sep 17 00:00:00 2001 From: zstan Date: Mon, 2 Feb 2026 12:44:05 +0300 Subject: [PATCH 5/5] err msg changed --- .../processors/cache/store/GridCacheStoreManagerAdapter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 8f95792c3eefa..18fcce947b566 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -940,7 +940,7 @@ private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws } } catch (RuntimeException e) { - U.error(log, "Exception raised during sessionEnd: ", e); + U.error(log, "Exception raised during the notification of cache store session listeners: ", e); if (!threwEx) throw U.cast(e);