From f93c6ffdb3e9a5d46f1f94481ced97639bff865f Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 4 Feb 2026 15:33:27 +0500 Subject: [PATCH 1/3] IGNITE-27735 Use MessageSerializer for GridEventStorageMessage minimal changes --- .../communication/GridIoMessageFactory.java | 3 +- .../eventstorage/GridEventStorageMessage.java | 238 +++++------------- 2 files changed, 60 insertions(+), 181 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index e607d42eb0d7b..6436a2852cfc3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -93,6 +93,7 @@ import org.apache.ignite.internal.codegen.GridDistributedTxFinishResponseSerializer; import org.apache.ignite.internal.codegen.GridDistributedTxPrepareRequestSerializer; import org.apache.ignite.internal.codegen.GridDistributedTxPrepareResponseSerializer; +import org.apache.ignite.internal.codegen.GridEventStorageMessageSerializer; import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer; import org.apache.ignite.internal.codegen.GridJobExecuteRequestSerializer; import org.apache.ignite.internal.codegen.GridJobExecuteResponseSerializer; @@ -368,7 +369,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider { factory.register((short)10, GridDeploymentInfoBean::new); factory.register((short)11, GridDeploymentRequest::new); factory.register((short)12, GridDeploymentResponse::new, new GridDeploymentResponseSerializer()); - factory.register((short)13, GridEventStorageMessage::new); + factory.register((short)13, GridEventStorageMessage::new, new GridEventStorageMessageSerializer()); factory.register((short)16, GridCacheTxRecoveryRequest::new, new GridCacheTxRecoveryRequestSerializer()); factory.register((short)17, GridCacheTxRecoveryResponse::new, new GridCacheTxRecoveryResponseSerializer()); factory.register((short)18, IndexQueryResultMeta::new, new IndexQueryResultMetaSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java index 937f4381a9d82..08f888136bb5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java @@ -17,22 +17,17 @@ package org.apache.ignite.internal.managers.eventstorage; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.UUID; import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.events.Event; -import org.apache.ignite.internal.GridDirectMap; -import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; /** @@ -40,44 +35,49 @@ */ public class GridEventStorageMessage implements Message { /** */ - @GridDirectTransient private Object resTopic; /** */ + @Order(value = 0, method = "responseTopicBytes") private byte[] resTopicBytes; /** */ + @Order(1) private byte[] filter; /** */ - @GridDirectTransient private Collection evts; /** */ + @Order(value = 2, method = "eventsBytes") private byte[] evtsBytes; /** */ - @GridDirectTransient private Throwable ex; /** */ + @Order(value = 3, method = "exceptionBytes") private byte[] exBytes; /** */ + @Order(value = 4, method = "classLoaderId") private IgniteUuid clsLdrId; /** */ + @Order(value = 5, method = "deploymentMode") private DeploymentMode depMode; /** */ + @Order(value = 6, method = "filterClassName") private String filterClsName; /** */ + @Order(value = 7, method = "userVersion") private String userVer; /** Node class loader participants. */ @GridToStringInclude - @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class) + @Order(value = 8, method = "loaderParticipants") private Map ldrParties; /** */ @@ -147,24 +147,31 @@ void responseTopic(Object resTopic) { /** * @return Serialized response topic. */ - byte[] responseTopicBytes() { + public byte[] responseTopicBytes() { return resTopicBytes; } /** * @param resTopicBytes Serialized response topic. */ - void responseTopicBytes(byte[] resTopicBytes) { + public void responseTopicBytes(byte[] resTopicBytes) { this.resTopicBytes = resTopicBytes; } /** * @return Filter. */ - byte[] filter() { + public byte[] filter() { return filter; } + /** + * @param filter Filter. + */ + public void filter(byte[] filter) { + this.filter = filter; + } + /** * @return Events. */ @@ -182,56 +189,84 @@ void events(@Nullable Collection evts) { /** * @return Serialized events. */ - byte[] eventsBytes() { + public byte[] eventsBytes() { return evtsBytes; } /** * @param evtsBytes Serialized events. */ - void eventsBytes(byte[] evtsBytes) { + public void eventsBytes(byte[] evtsBytes) { this.evtsBytes = evtsBytes; } /** * @return the Class loader ID. */ - IgniteUuid classLoaderId() { + public IgniteUuid classLoaderId() { return clsLdrId; } + /** + * @param clsLdrId the Class loader ID. + */ + public void classLoaderId(IgniteUuid clsLdrId) { + this.clsLdrId = clsLdrId; + } + /** * @return Deployment mode. */ - DeploymentMode deploymentMode() { + public DeploymentMode deploymentMode() { return depMode; } + /** + * @param depMode Deployment mode. + */ + public void deploymentMode(DeploymentMode depMode) { + this.depMode = depMode; + } + /** * @return Filter class name. */ - String filterClassName() { + public String filterClassName() { return filterClsName; } + /** + * @param filterClsName Filter class name. + */ + public void filterClassName(String filterClsName) { + this.filterClsName = filterClsName; + } + /** * @return User version. */ - String userVersion() { + public String userVersion() { return userVer; } + /** + * @param userVer User version. + */ + public void userVersion(String userVer) { + this.userVer = userVer; + } + /** * @return Node class loader participant map. */ - @Nullable Map loaderParticipants() { + public @Nullable Map loaderParticipants() { return ldrParties != null ? Collections.unmodifiableMap(ldrParties) : null; } /** * @param ldrParties Node class loader participant map. */ - void loaderParticipants(Map ldrParties) { + public void loaderParticipants(@Nullable Map ldrParties) { this.ldrParties = ldrParties; } @@ -252,174 +287,17 @@ void exception(Throwable ex) { /** * @return Serialized exception. */ - byte[] exceptionBytes() { + public byte[] exceptionBytes() { return exBytes; } /** * @param exBytes Serialized exception. */ - void exceptionBytes(byte[] exBytes) { + public void exceptionBytes(byte[] exBytes) { this.exBytes = exBytes; } - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeIgniteUuid(clsLdrId)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeByte(depMode != null ? (byte)depMode.ordinal() : -1)) - return false; - - writer.incrementState(); - - case 2: - if (!writer.writeByteArray(evtsBytes)) - return false; - - writer.incrementState(); - - case 3: - if (!writer.writeByteArray(exBytes)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeByteArray(filter)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeString(filterClsName)) - return false; - - writer.incrementState(); - - case 6: - if (!writer.writeMap(ldrParties, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID)) - return false; - - writer.incrementState(); - - case 7: - if (!writer.writeByteArray(resTopicBytes)) - return false; - - writer.incrementState(); - - case 8: - if (!writer.writeString(userVer)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - switch (reader.state()) { - case 0: - clsLdrId = reader.readIgniteUuid(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - byte depModeOrd; - - depModeOrd = reader.readByte(); - - if (!reader.isLastRead()) - return false; - - depMode = DeploymentMode.fromOrdinal(depModeOrd); - - reader.incrementState(); - - case 2: - evtsBytes = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 3: - exBytes = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: - filter = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - filterClsName = reader.readString(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 6: - ldrParties = reader.readMap(MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 7: - resTopicBytes = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 8: - userVer = reader.readString(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - /** {@inheritDoc} */ @Override public short directType() { return 13; From e1a26881be31730a20243f81efd0737b3845bac4 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Fri, 6 Feb 2026 11:45:50 +0500 Subject: [PATCH 2/3] use ErrorMessage --- .../eventstorage/GridEventStorageManager.java | 8 +--- .../eventstorage/GridEventStorageMessage.java | 37 ++++++++----------- 2 files changed, 16 insertions(+), 29 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index afedf59bf3f6e..1f77c9add4ddb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -1027,10 +1027,6 @@ private List query(IgnitePredicate p, Collection>unmarshal(marsh, res.eventsBytes(), U.resolveClassLoader(ctx.config()))); - - if (res.exceptionBytes() != null) - res.exception(U.unmarshal(marsh, res.exceptionBytes(), - U.resolveClassLoader(ctx.config()))); } catch (IgniteCheckedException e) { U.error(log, "Failed to unmarshal events query response: " + msg, e); @@ -1260,10 +1256,8 @@ private class RequestListener implements GridMessageListener { if (log.isDebugEnabled()) log.debug("Sending event query response to node [nodeId=" + nodeId + "res=" + res + ']'); - if (!ctx.localNodeId().equals(nodeId)) { + if (!ctx.localNodeId().equals(nodeId)) res.eventsBytes(U.marshal(marsh, res.events())); - res.exceptionBytes(U.marshal(marsh, res.exception())); - } ctx.io().sendToCustomTopic(node, req.responseTopic(), res, PUBLIC_POOL); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java index 08f888136bb5b..053724e98a814 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java @@ -24,6 +24,7 @@ import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.events.Event; import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.communication.ErrorMessage; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -53,11 +54,8 @@ public class GridEventStorageMessage implements Message { private byte[] evtsBytes; /** */ - private Throwable ex; - - /** */ - @Order(value = 3, method = "exceptionBytes") - private byte[] exBytes; + @Order(value = 3, method = "errorMessage") + private ErrorMessage errMsg; /** */ @Order(value = 4, method = "classLoaderId") @@ -111,7 +109,7 @@ public GridEventStorageMessage() { this.ldrParties = ldrParties; evts = null; - ex = null; + errMsg = null; } /** @@ -120,7 +118,9 @@ public GridEventStorageMessage() { */ GridEventStorageMessage(Collection evts, Throwable ex) { this.evts = evts; - this.ex = ex; + + if (ex != null) + errMsg = new ErrorMessage(ex); resTopic = null; filter = null; @@ -273,29 +273,22 @@ public void loaderParticipants(@Nullable Map ldrParties) { /** * @return Exception. */ - Throwable exception() { - return ex; - } - - /** - * @param ex Exception. - */ - void exception(Throwable ex) { - this.ex = ex; + @Nullable Throwable exception() { + return ErrorMessage.error(errMsg); } /** - * @return Serialized exception. + * @return Error message. */ - public byte[] exceptionBytes() { - return exBytes; + public @Nullable ErrorMessage errorMessage() { + return errMsg; } /** - * @param exBytes Serialized exception. + * @param errMsg Error message. */ - public void exceptionBytes(byte[] exBytes) { - this.exBytes = exBytes; + public void errorMessage(@Nullable ErrorMessage errMsg) { + this.errMsg = errMsg; } /** {@inheritDoc} */ From ef733c5e994255a2786cb2482d9bd849ee39ee16 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Tue, 10 Feb 2026 13:53:15 +0500 Subject: [PATCH 3/3] refactoring --- .../eventstorage/GridEventStorageManager.java | 20 ++--- .../eventstorage/GridEventStorageMessage.java | 85 +++++++++++++------ 2 files changed, 68 insertions(+), 37 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index 1f77c9add4ddb..6783bb139955f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -1024,9 +1024,7 @@ private List query(IgnitePredicate p, Collection>unmarshal(marsh, res.eventsBytes(), - U.resolveClassLoader(ctx.config()))); + res.finishUnmarshal(marsh, U.resolveClassLoader(ctx.config()), null); } catch (IgniteCheckedException e) { U.error(log, "Failed to unmarshal events query response: " + msg, e); @@ -1062,8 +1060,6 @@ private List query(IgnitePredicate p, Collection List query(IgnitePredicate p, Collection nodes, GridTopic topi ctx.io().sendToGridTopic(locNode, topic, msg, plc); if (!rmtNodes.isEmpty()) { - msg.responseTopicBytes(U.marshal(marsh, msg.responseTopic())); + msg.prepareMarshal(marsh); ctx.io().sendToGridTopic(rmtNodes, topic, msg, plc); } @@ -1205,9 +1200,6 @@ private class RequestListener implements GridMessageListener { Collection evts; try { - if (req.responseTopicBytes() != null) - req.responseTopic(U.unmarshal(marsh, req.responseTopicBytes(), U.resolveClassLoader(ctx.config()))); - GridDeployment dep = ctx.deploy().getGlobalDeployment( req.deploymentMode(), req.filterClassName(), @@ -1222,7 +1214,9 @@ private class RequestListener implements GridMessageListener { throw new IgniteDeploymentCheckedException("Failed to obtain deployment for event filter " + "(is peer class loading turned on?): " + req); - filter = U.unmarshal(marsh, req.filter(), U.resolveClassLoader(dep.classLoader(), ctx.config())); + req.finishUnmarshal(marsh, U.resolveClassLoader(ctx.config()), U.resolveClassLoader(dep.classLoader(), ctx.config())); + + filter = (IgnitePredicate)req.filter(); // Resource injection. ctx.resource().inject(dep, dep.deployedClass(req.filterClassName()).get1(), filter); @@ -1257,7 +1251,7 @@ private class RequestListener implements GridMessageListener { log.debug("Sending event query response to node [nodeId=" + nodeId + "res=" + res + ']'); if (!ctx.localNodeId().equals(nodeId)) - res.eventsBytes(U.marshal(marsh, res.events())); + res.prepareMarshal(marsh); ctx.io().sendToCustomTopic(node, req.responseTopic(), res, PUBLIC_POOL); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java index 053724e98a814..6bcf91275e769 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java @@ -21,13 +21,17 @@ import java.util.Collections; import java.util.Map; import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.events.Event; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.communication.ErrorMessage; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; @@ -43,8 +47,11 @@ public class GridEventStorageMessage implements Message { private byte[] resTopicBytes; /** */ - @Order(1) - private byte[] filter; + private IgnitePredicate filter; + + /** */ + @Order(value = 1, method = "filterBytes") + private byte[] filterBytes; /** */ private Collection evts; @@ -86,7 +93,6 @@ public GridEventStorageMessage() { /** * @param resTopic Response topic, * @param filter Query filter. - * @param filterClsName Filter class name. * @param clsLdrId Class loader ID. * @param depMode Deployment mode. * @param userVer User version. @@ -94,15 +100,14 @@ public GridEventStorageMessage() { */ GridEventStorageMessage( Object resTopic, - byte[] filter, - String filterClsName, + IgnitePredicate filter, IgniteUuid clsLdrId, DeploymentMode depMode, String userVer, Map ldrParties) { this.resTopic = resTopic; this.filter = filter; - this.filterClsName = filterClsName; + filterClsName = filter.getClass().getName(); this.depMode = depMode; this.clsLdrId = clsLdrId; this.userVer = userVer; @@ -137,13 +142,6 @@ Object responseTopic() { return resTopic; } - /** - * @param resTopic Response topic. - */ - void responseTopic(Object resTopic) { - this.resTopic = resTopic; - } - /** * @return Serialized response topic. */ @@ -161,29 +159,29 @@ public void responseTopicBytes(byte[] resTopicBytes) { /** * @return Filter. */ - public byte[] filter() { - return filter; + public byte[] filterBytes() { + return filterBytes; } /** - * @param filter Filter. + * @param filterBytes Filter bytes. */ - public void filter(byte[] filter) { - this.filter = filter; + public void filterBytes(byte[] filterBytes) { + this.filterBytes = filterBytes; } /** - * @return Events. + * @return Filter. */ - @Nullable Collection events() { - return evts != null ? Collections.unmodifiableCollection(evts) : null; + public IgnitePredicate filter() { + return filter; } /** - * @param evts Events. + * @return Events. */ - void events(@Nullable Collection evts) { - this.evts = evts; + @Nullable Collection events() { + return evts != null ? Collections.unmodifiableCollection(evts) : null; } /** @@ -291,6 +289,45 @@ public void errorMessage(@Nullable ErrorMessage errMsg) { this.errMsg = errMsg; } + /** + * @param marsh Marshaller. + */ + public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (resTopic != null && resTopicBytes == null) + resTopicBytes = U.marshal(marsh, resTopic); + + if (filter != null && filterBytes == null) + filterBytes = U.marshal(marsh, filter); + + if (evts != null && evtsBytes == null) + evtsBytes = U.marshal(marsh, evts); + } + + /** + * @param marsh Marshaller. + * @param ldr Class loader. + * @param filterClsLdr Class loader for filter. + */ + public void finishUnmarshal(Marshaller marsh, ClassLoader ldr, ClassLoader filterClsLdr) throws IgniteCheckedException { + if (resTopicBytes != null && resTopic == null) { + resTopic = U.unmarshal(marsh, resTopicBytes, ldr); + + resTopicBytes = null; + } + + if (filterBytes != null && filter == null && filterClsLdr != null) { + filter = U.unmarshal(marsh, filterBytes, filterClsLdr); + + filterBytes = null; + } + + if (evtsBytes != null && evts == null) { + evts = U.unmarshal(marsh, evtsBytes, ldr); + + evtsBytes = null; + } + } + /** {@inheritDoc} */ @Override public short directType() { return 13;