Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.ignite.internal.codegen.GridDistributedTxFinishResponseSerializer;
import org.apache.ignite.internal.codegen.GridDistributedTxPrepareRequestSerializer;
import org.apache.ignite.internal.codegen.GridDistributedTxPrepareResponseSerializer;
import org.apache.ignite.internal.codegen.GridEventStorageMessageSerializer;
import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer;
import org.apache.ignite.internal.codegen.GridJobExecuteRequestSerializer;
import org.apache.ignite.internal.codegen.GridJobExecuteResponseSerializer;
Expand Down Expand Up @@ -367,7 +368,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
factory.register((short)10, GridDeploymentInfoBean::new);
factory.register((short)11, GridDeploymentRequest::new);
factory.register((short)12, GridDeploymentResponse::new, new GridDeploymentResponseSerializer());
factory.register((short)13, GridEventStorageMessage::new);
factory.register((short)13, GridEventStorageMessage::new, new GridEventStorageMessageSerializer());
factory.register((short)16, GridCacheTxRecoveryRequest::new, new GridCacheTxRecoveryRequestSerializer());
factory.register((short)17, GridCacheTxRecoveryResponse::new, new GridCacheTxRecoveryResponseSerializer());
factory.register((short)18, IndexQueryResultMeta::new, new IndexQueryResultMetaSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1024,13 +1024,7 @@ private <T extends Event> List<T> query(IgnitePredicate<T> p, Collection<? exten
GridEventStorageMessage res = (GridEventStorageMessage)msg;

try {
if (res.eventsBytes() != null)
res.events(U.<Collection<Event>>unmarshal(marsh, res.eventsBytes(),
U.resolveClassLoader(ctx.config())));

if (res.exceptionBytes() != null)
res.exception(U.<Throwable>unmarshal(marsh, res.exceptionBytes(),
U.resolveClassLoader(ctx.config())));
res.finishUnmarshal(marsh, U.resolveClassLoader(ctx.config()), null);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal events query response: " + msg, e);
Expand Down Expand Up @@ -1066,17 +1060,14 @@ private <T extends Event> List<T> query(IgnitePredicate<T> p, Collection<? exten

ioMgr.addMessageListener(resTopic, resLsnr);

byte[] serFilter = U.marshal(marsh, p);

GridDeployment dep = ctx.deploy().deploy(p.getClass(), U.detectClassLoader(p.getClass()));

if (dep == null)
throw new IgniteDeploymentCheckedException("Failed to deploy event filter: " + p);

GridEventStorageMessage msg = new GridEventStorageMessage(
resTopic,
serFilter,
p.getClass().getName(),
p,
dep.classLoaderId(),
dep.deployMode(),
dep.userVersion(),
Expand Down Expand Up @@ -1154,7 +1145,7 @@ private void sendMessage(Collection<? extends ClusterNode> nodes, GridTopic topi
ctx.io().sendToGridTopic(locNode, topic, msg, plc);

if (!rmtNodes.isEmpty()) {
msg.responseTopicBytes(U.marshal(marsh, msg.responseTopic()));
msg.prepareMarshal(marsh);

ctx.io().sendToGridTopic(rmtNodes, topic, msg, plc);
}
Expand Down Expand Up @@ -1209,9 +1200,6 @@ private class RequestListener implements GridMessageListener {
Collection<Event> evts;

try {
if (req.responseTopicBytes() != null)
req.responseTopic(U.unmarshal(marsh, req.responseTopicBytes(), U.resolveClassLoader(ctx.config())));

GridDeployment dep = ctx.deploy().getGlobalDeployment(
req.deploymentMode(),
req.filterClassName(),
Expand All @@ -1226,7 +1214,9 @@ private class RequestListener implements GridMessageListener {
throw new IgniteDeploymentCheckedException("Failed to obtain deployment for event filter " +
"(is peer class loading turned on?): " + req);

filter = U.unmarshal(marsh, req.filter(), U.resolveClassLoader(dep.classLoader(), ctx.config()));
req.finishUnmarshal(marsh, U.resolveClassLoader(ctx.config()), U.resolveClassLoader(dep.classLoader(), ctx.config()));

filter = (IgnitePredicate<Event>)req.filter();

// Resource injection.
ctx.resource().inject(dep, dep.deployedClass(req.filterClassName()).get1(), filter);
Expand Down Expand Up @@ -1260,10 +1250,8 @@ private class RequestListener implements GridMessageListener {
if (log.isDebugEnabled())
log.debug("Sending event query response to node [nodeId=" + nodeId + "res=" + res + ']');

if (!ctx.localNodeId().equals(nodeId)) {
res.eventsBytes(U.marshal(marsh, res.events()));
res.exceptionBytes(U.marshal(marsh, res.exception()));
}
if (!ctx.localNodeId().equals(nodeId))
res.prepareMarshal(marsh);

ctx.io().sendToCustomTopic(node, req.responseTopic(), res, PUBLIC_POOL);
}
Expand Down
Loading
Loading