From c6b9719b042d279de907d8d8e8cf74b5cfb48df1 Mon Sep 17 00:00:00 2001 From: Maksim Timonin Date: Tue, 27 Jan 2026 13:42:49 +0300 Subject: [PATCH] IGNITE-27666 Fix missed SessionContext in CacheInterceptor for jdbc calls --- .../query/calcite/exec/rel/ModifyNode.java | 7 + ...JdbcSetClientInfoCacheInterceptorTest.java | 144 ++++++++++++++++++ .../calcite/jdbc/JdbcSetClientInfoTest.java | 43 +++++- .../ignite/testsuites/JdbcTestSuite.java | 2 + .../processors/cache/GridCacheProxyImpl.java | 23 +++ .../processors/odbc/ClientTxSupport.java | 7 +- .../odbc/jdbc/JdbcRequestHandler.java | 3 +- .../client/tx/ClientTxStartRequest.java | 3 +- 8 files changed, 226 insertions(+), 6 deletions(-) create mode 100644 modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcSetClientInfoCacheInterceptorTest.java diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java index b84f5fe0b2e4f..650a775514d2e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java @@ -29,6 +29,7 @@ import org.apache.calcite.rel.core.TableModify; import org.apache.calcite.rel.type.RelDataType; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.cache.context.SessionContextImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheProxyImpl; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; @@ -218,6 +219,12 @@ private void invokeOutsideTransaction( List tuples, GridCacheProxyImpl cache ) throws IgniteCheckedException { + SessionContextImpl sesCtx = context().unwrap(SessionContextImpl.class); + Map sesAttrs = sesCtx == null ? null : sesCtx.attributes(); + + if (sesAttrs != null) + cache = cache.withApplicationAttributes(sesAttrs); + Map> map = invokeMap(tuples); Map> res = cacheForDML(cache).invokeAll(map); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcSetClientInfoCacheInterceptorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcSetClientInfoCacheInterceptorTest.java new file mode 100644 index 0000000000000..43bddd472c965 --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcSetClientInfoCacheInterceptorTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Collection; +import java.util.List; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheInterceptorAdapter; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.calcite.CalciteQueryEngineConfiguration; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.SqlConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.resources.SessionContextProviderResource; +import org.apache.ignite.session.SessionContextProvider; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.junit.Assume.assumeFalse; + +/** */ +@RunWith(Parameterized.class) +public class JdbcSetClientInfoCacheInterceptorTest extends GridCommonAbstractTest { + /** */ + private static final String SESSION_ID = "sessionId"; + + /** */ + private static final String URL = "jdbc:ignite:thin://127.0.0.1"; + + /** */ + @Parameterized.Parameter + public boolean runInTx; + + /** */ + @Parameterized.Parameter(1) + public CacheAtomicityMode cacheMode; + + /** */ + @Parameterized.Parameters(name = "runInTx={0}, mode={1}") + public static Collection data() { + return F.asList( + new Object[] { false, CacheAtomicityMode.TRANSACTIONAL }, + new Object[] { false, CacheAtomicityMode.ATOMIC }, + new Object[] { true, CacheAtomicityMode.TRANSACTIONAL }, + new Object[] { true, CacheAtomicityMode.ATOMIC } + ); + } + + /** */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setSqlConfiguration(new SqlConfiguration() + .setQueryEnginesConfiguration(new CalciteQueryEngineConfiguration().setDefault(true))); + + cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(runInTx); + + QueryEntity entity = new QueryEntity() + .setTableName("MYTABLE") + .setKeyType(Integer.class.getName()) + .setValueType(String.class.getName()) + .addQueryField("id", Integer.class.getName(), null) + .addQueryField("sessionId", String.class.getName(), null) + .setKeyFieldName("id") + .setValueFieldName("sessionId"); + + cfg.setCacheConfiguration(new CacheConfiguration() + .setAtomicityMode(cacheMode) + .setName(DEFAULT_CACHE_NAME) + .setSqlSchema("PUBLIC") + .setQueryEntities(List.of(entity)) + .setInterceptor(new SessionContextCacheInterceptor())); + + return cfg; + } + + /** */ + @Test + public void testInterceptInsert() throws Exception { + assumeFalse(runInTx && cacheMode == CacheAtomicityMode.ATOMIC); + + try (Ignite ignore = startGrid(); Connection conn = DriverManager.getConnection(URL)) { + conn.setClientInfo(SESSION_ID, "42"); + + try (Statement s = conn.createStatement()) { + assertEquals(1, s.executeUpdate("insert into PUBLIC.MYTABLE(id, sessionId) values (0, 1);")); + } + + try (Statement s = conn.createStatement()) { + assertTrue(s.execute("select id, sessionId from PUBLIC.MYTABLE;")); + + ResultSet rs = s.getResultSet(); + assertTrue(rs.next()); + + assertEquals(0, rs.getInt("id")); + assertEquals("42", rs.getString("sessionId")); + } + } + } + + /** */ + public static class SessionContextCacheInterceptor extends CacheInterceptorAdapter { + /** */ + @SessionContextProviderResource + private SessionContextProvider sessionCtxProv; + + /** */ + @Override public @Nullable String onBeforePut(Cache.Entry entry, String newVal) { + return sessionCtxProv.getSessionContext().getAttribute(SESSION_ID); + } + } + + /** */ + private List> query(IgniteEx ign, String sql, Object... args) { + return ign.context().query().querySqlFields(new SqlFieldsQuery(sql).setArgs(args), false).getAll(); + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcSetClientInfoTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcSetClientInfoTest.java index 89e36df4f89ed..149bd4011ff0a 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcSetClientInfoTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcSetClientInfoTest.java @@ -26,6 +26,8 @@ import java.util.Collections; import java.util.List; import java.util.Properties; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.cache.query.annotations.QuerySqlTableFunction; @@ -41,8 +43,13 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.junit.Assume.assumeFalse; /** */ +@RunWith(Parameterized.class) public class JdbcSetClientInfoTest extends GridCommonAbstractTest { /** */ private static final String SESSION_ID = "sessionId"; @@ -50,6 +57,25 @@ public class JdbcSetClientInfoTest extends GridCommonAbstractTest { /** */ private static final String URL = "jdbc:ignite:thin://127.0.0.1"; + /** */ + @Parameterized.Parameter + public boolean runInTx; + + /** */ + @Parameterized.Parameter(1) + public CacheAtomicityMode cacheMode; + + /** */ + @Parameterized.Parameters(name = "runInTx={0}, mode={1}") + public static Collection data() { + return F.asList( + new Object[] { false, CacheAtomicityMode.TRANSACTIONAL }, + new Object[] { false, CacheAtomicityMode.ATOMIC }, + new Object[] { true, CacheAtomicityMode.TRANSACTIONAL }, + new Object[] { true, CacheAtomicityMode.ATOMIC } + ); + } + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String instanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(instanceName); @@ -57,9 +83,22 @@ public class JdbcSetClientInfoTest extends GridCommonAbstractTest { cfg.setSqlConfiguration(new SqlConfiguration() .setQueryEnginesConfiguration(new CalciteQueryEngineConfiguration().setDefault(true))); + cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(runInTx); + + QueryEntity entity = new QueryEntity() + .setTableName("MYTABLE") + .setKeyType(Integer.class.getName()) + .setValueType(String.class.getName()) + .addQueryField("id", Integer.class.getName(), null) + .addQueryField("sessionId", String.class.getName(), null) + .setKeyFieldName("id") + .setValueFieldName("sessionId"); + cfg.setCacheConfiguration(new CacheConfiguration<>() .setName(DEFAULT_CACHE_NAME) + .setAtomicityMode(cacheMode) .setSqlSchema("PUBLIC") + .setQueryEntities(List.of(entity)) .setSqlFunctionClasses(SessionContextFunctions.class)); return cfg; @@ -67,9 +106,9 @@ public class JdbcSetClientInfoTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { - IgniteEx ign = startGrids(3); + assumeFalse(runInTx && cacheMode == CacheAtomicityMode.ATOMIC); - query(ign, "create table PUBLIC.MYTABLE(id int primary key, sessionId varchar);"); + startGrids(3); } /** {@inheritDoc} */ diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/JdbcTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/JdbcTestSuite.java index 1e9a8c4f2e4df..0ca27232c9dd1 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/JdbcTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/JdbcTestSuite.java @@ -21,6 +21,7 @@ import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcCrossEngineTest; import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcLocalFlagTest; import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcQueryTest; +import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcSetClientInfoCacheInterceptorTest; import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcSetClientInfoTest; import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcThinTransactionalSelfTest; import org.junit.runner.RunWith; @@ -35,6 +36,7 @@ JdbcCrossEngineTest.class, JdbcThinTransactionalSelfTest.class, JdbcSetClientInfoTest.class, + JdbcSetClientInfoCacheInterceptorTest.class, JdbcConnectionEnabledPropertyTest.class, JdbcLocalFlagTest.class, }) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index bf22e0aa722a5..4c51863f3936c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -302,6 +302,29 @@ public IgniteInternalCache delegate() { } } + /** @return New internal cache instance based on this one, but with application attributes. */ + public GridCacheProxyImpl withApplicationAttributes(Map attrs) { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return new GridCacheProxyImpl<>(ctx, delegate, + opCtx != null ? opCtx.setApplicationAttributes(attrs) : + new CacheOperationContext( + false, + true, + false, + null, + false, + null, + false, + null, + attrs)); + } + finally { + gate.leave(prev); + } + } + /** {@inheritDoc} */ @Override public GridCacheProxyImpl keepBinary() { if (opCtx != null && opCtx.isKeepBinary()) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientTxSupport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientTxSupport.java index 79d8e2228d356..2b15ba742dcce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientTxSupport.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientTxSupport.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.odbc; +import java.util.Map; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; @@ -36,6 +37,7 @@ public interface ClientTxSupport { * @param isolation Transaction isolation. * @param timeout Transaction timeout. * @param lb Transaction label. + * @param appAttrs Application attributes. * @return Transaction id. */ default int startClientTransaction( @@ -43,7 +45,8 @@ default int startClientTransaction( TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout, - String lb + String lb, + Map appAttrs ) { GridNearTxLocal tx; @@ -60,7 +63,7 @@ default int startClientTransaction( true, 0, lb, - null + appAttrs ); } finally { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java index 3a4a8091183b9..243c831d451e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java @@ -855,7 +855,8 @@ private int txId(int txId) { cliCtx.concurrency(), cliCtx.isolation(), cliCtx.transactionTimeout(), - cliCtx.transactionLabel() + cliCtx.transactionLabel(), + cliCtx.applicationAttributes() ); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxStartRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxStartRequest.java index d32cc626f9ae6..a13d2a59e880e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxStartRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxStartRequest.java @@ -60,7 +60,8 @@ public ClientTxStartRequest(BinaryRawReader reader) { /** {@inheritDoc} */ @Override public ClientResponse process(ClientConnectionContext ctx) { - return new ClientIntResponse(requestId(), startClientTransaction(ctx, concurrency, isolation, timeout, lb)); + // TODO IGNITE-23721: support application attributes for thin client. + return new ClientIntResponse(requestId(), startClientTransaction(ctx, concurrency, isolation, timeout, lb, null)); } /** {@inheritDoc} */