Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DEPENDENCY-LICENSES
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ List of third-party dependencies grouped by their license type.

Eclipse Public License (EPL) 1.0, GNU Lesser General Public License Version 2.1, February 1999

* JGraphT - Core (org.jgrapht:jgrapht-core:0.9.0 - http://www.jgrapht.org/jgrapht-core)
* JGraphT - Core (org.jgrapht:jgrapht-core:1.5.3 - http://www.jgrapht.org/jgrapht-core)

Eclipse Public License 2.0, GNU General Public License, version 2 with the GNU Classpath Exception

Expand Down
2 changes: 1 addition & 1 deletion LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ The license texts of these dependencies can be found in the licenses directory.

Eclipse Public License (EPL) 1.0, GNU Lesser General Public License Version 2.1, February 1999

* JGraphT - Core (org.jgrapht:jgrapht-core:0.9.0 - http://www.jgrapht.org/jgrapht-core)
* JGraphT - Core (org.jgrapht:jgrapht-core:1.5.3 - http://www.jgrapht.org/jgrapht-core)

Eclipse Public License v. 2.0, GNU General Public License, version 2 with the GNU Classpath Exception

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
<httpclient.version>4.5.14</httpclient.version>
<httpclient.core.version>4.4.16</httpclient.core.version>
<jctools.version>4.0.6</jctools.version>
<jgrapht.version>0.9.0</jgrapht.version>
<jgrapht.version>1.5.3</jgrapht.version>
<guava.version>33.6.0-jre</guava.version>
<auto-service.version>1.1.1</auto-service.version>
<netty-tcnative.version>2.0.74.Final</netty-tcnative.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.util.List;
import java.util.Map;
import org.apache.storm.shade.com.google.common.collect.Multimap;
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
import org.apache.storm.shade.org.jgrapht.Graph;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
Expand All @@ -28,7 +28,7 @@
class ProcessorBolt extends BaseRichBolt implements StreamBolt {
private final ProcessorBoltDelegate delegate;

ProcessorBolt(String id, DirectedGraph<Node, Edge> graph, List<ProcessorNode> nodes) {
ProcessorBolt(String id, Graph<Node, Edge> graph, List<ProcessorNode> nodes) {
delegate = new ProcessorBoltDelegate(id, graph, nodes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.storm.shade.com.google.common.collect.HashBasedTable;
import org.apache.storm.shade.com.google.common.collect.Multimap;
import org.apache.storm.shade.com.google.common.collect.Table;
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
import org.apache.storm.shade.org.jgrapht.graph.DirectedSubgraph;
import org.apache.storm.shade.org.jgrapht.Graph;
import org.apache.storm.shade.org.jgrapht.graph.AsSubgraph;
import org.apache.storm.shade.org.jgrapht.traverse.TopologicalOrderIterator;
import org.apache.storm.streams.processors.ChainedProcessorContext;
import org.apache.storm.streams.processors.EmittingProcessorContext;
Expand All @@ -45,7 +45,7 @@
class ProcessorBoltDelegate implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(ProcessorBoltDelegate.class);
private final String id;
private final DirectedGraph<Node, Edge> graph;
private final Graph<Node, Edge> graph;
private final List<ProcessorNode> nodes;
private final List<ProcessorNode> outgoingProcessors = new ArrayList<>();
private final Set<EmittingProcessorContext> emittingProcessorContexts = new HashSet<>();
Expand All @@ -57,7 +57,7 @@ class ProcessorBoltDelegate implements Serializable {
private Multimap<String, ProcessorNode> streamToInitialProcessors;
private String timestampField;

ProcessorBoltDelegate(String id, DirectedGraph<Node, Edge> graph, List<ProcessorNode> nodes) {
ProcessorBoltDelegate(String id, Graph<Node, Edge> graph, List<ProcessorNode> nodes) {
this.id = id;
this.graph = graph;
this.nodes = new ArrayList<>(nodes);
Expand All @@ -79,7 +79,7 @@ void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollec
this.topoConf = topoConf;
topologyContext = context;
outputCollector = collector;
DirectedSubgraph<Node, Edge> subgraph = new DirectedSubgraph<>(graph, new HashSet<>(nodes), null);
AsSubgraph<Node, Edge> subgraph = new AsSubgraph<>(graph, new HashSet<>(nodes), null);
TopologicalOrderIterator<Node, Edge> it = new TopologicalOrderIterator<>(subgraph);
while (it.hasNext()) {
Node node = it.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.storm.shade.com.google.common.collect.Multimap;
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
import org.apache.storm.shade.org.jgrapht.Graph;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.streams.processors.StatefulProcessor;
import org.apache.storm.streams.processors.UpdateStateByKeyProcessor;
Expand All @@ -35,7 +35,7 @@ class StatefulProcessorBolt<K, V> extends BaseStatefulBolt<KeyValueState<K, V>>
// can be UpdateStateByKey or StateQuery processors
private final Set<StatefulProcessor<K, V>> statefulProcessors;

StatefulProcessorBolt(String boltId, DirectedGraph<Node, Edge> graph, List<ProcessorNode> nodes) {
StatefulProcessorBolt(String boltId, Graph<Node, Edge> graph, List<ProcessorNode> nodes) {
delegate = new ProcessorBoltDelegate(boltId, graph, nodes);
statefulProcessors = getStatefulProcessors(nodes);
}
Expand Down
14 changes: 6 additions & 8 deletions storm-client/src/jvm/org/apache/storm/streams/StreamBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.storm.annotation.InterfaceStability;
Expand Down Expand Up @@ -68,7 +67,7 @@ public class StreamBuilder {
* Creates a new {@link StreamBuilder}.
*/
public StreamBuilder() {
graph = new DefaultDirectedGraph<>(new StreamsEdgeFactory());
graph = new DefaultDirectedGraph<>(null, null, false);
}

/**
Expand Down Expand Up @@ -158,7 +157,7 @@ public StormTopology build() {
nodeGroupingInfo.clear();
windowInfo.clear();
curGroup.clear();
TopologicalOrderIterator<Node, Edge> iterator = new TopologicalOrderIterator<>(graph, queue());
TopologicalOrderIterator<Node, Edge> iterator = new TopologicalOrderIterator<>(graph, priorityComparator());
TopologyBuilder topologyBuilder = new TopologyBuilder();
while (iterator.hasNext()) {
Node node = iterator.next();
Expand Down Expand Up @@ -196,7 +195,7 @@ Node addNode(Node parent, Node child, String parentStreamId) {

Node addNode(Node parent, Node child, String parentStreamId, int parallelism) {
graph.addVertex(child);
graph.addEdge(parent, child);
graph.addEdge(parent, child, new Edge(parent, child));
child.setParallelism(parallelism);
if (parent instanceof WindowNode || parent instanceof PartitionNode) {
child.addParentStream(parentNode(parent), parentStreamId);
Expand Down Expand Up @@ -236,9 +235,8 @@ Node insert(Node parent, Node child) {
return newChild;
}

private PriorityQueue<Node> queue() {
// min-heap
return new PriorityQueue<>(new Comparator<Node>() {
private Comparator<Node> priorityComparator() {
return new Comparator<Node>() {
/*
* Nodes in the descending order of priority.
* ProcessorNode has higher priority than partition and window nodes
Expand Down Expand Up @@ -279,7 +277,7 @@ private int getPriority(Node node) {
}
return Integer.MAX_VALUE;
}
});
};
}

private void handleProcessorNode(ProcessorNode processorNode, TopologyBuilder topologyBuilder) {
Expand Down
6 changes: 3 additions & 3 deletions storm-client/src/jvm/org/apache/storm/streams/StreamUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

import java.util.ArrayList;
import java.util.List;
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
import org.apache.storm.shade.org.jgrapht.Graph;
import org.apache.storm.tuple.Fields;

public class StreamUtil {
@SuppressWarnings("unchecked")
public static <T> List<T> getParents(DirectedGraph<Node, Edge> graph, Node node) {
public static <T> List<T> getParents(Graph<Node, Edge> graph, Node node) {
List<Edge> incoming = new ArrayList<>(graph.incomingEdgesOf(node));
List<T> ret = new ArrayList<>();
for (Edge e : incoming) {
Expand All @@ -31,7 +31,7 @@ public static <T> List<T> getParents(DirectedGraph<Node, Edge> graph, Node node)
}

@SuppressWarnings("unchecked")
public static <T> List<T> getChildren(DirectedGraph<Node, Edge> graph, Node node) {
public static <T> List<T> getChildren(Graph<Node, Edge> graph, Node node) {
List<Edge> outgoing = new ArrayList<>(graph.outgoingEdgesOf(node));
List<T> ret = new ArrayList<>();
for (Edge e : outgoing) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.List;
import java.util.Map;
import org.apache.storm.shade.com.google.common.collect.Multimap;
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
import org.apache.storm.shade.org.jgrapht.Graph;
import org.apache.storm.streams.windowing.SlidingWindows;
import org.apache.storm.streams.windowing.TumblingWindows;
import org.apache.storm.streams.windowing.Window;
Expand All @@ -39,7 +39,7 @@ class WindowedProcessorBolt extends BaseWindowedBolt implements StreamBolt {
private final ProcessorBoltDelegate delegate;
private final Window<?, ?> window;

WindowedProcessorBolt(String id, DirectedGraph<Node, Edge> graph,
WindowedProcessorBolt(String id, Graph<Node, Edge> graph,
List<ProcessorNode> nodes,
Window<?, ?> window) {
delegate = new ProcessorBoltDelegate(id, graph, nodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@
import org.apache.storm.generated.SharedMemory;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
import org.apache.storm.shade.org.jgrapht.UndirectedGraph;
import org.apache.storm.shade.org.jgrapht.alg.ConnectivityInspector;
import org.apache.storm.shade.org.jgrapht.Graph;
import org.apache.storm.shade.org.jgrapht.alg.connectivity.ConnectivityInspector;
import org.apache.storm.shade.org.jgrapht.graph.DefaultDirectedGraph;
import org.apache.storm.shade.org.jgrapht.graph.Pseudograph;
import org.apache.storm.topology.BoltDeclarer;
Expand Down Expand Up @@ -103,7 +102,7 @@ public class TridentTopology {
Map<String, Number> masterCoordResources = new HashMap<>();

public TridentTopology() {
this(new DefaultDirectedGraph<Node, IndexedEdge>(new ErrorEdgeFactory()),
this(new DefaultDirectedGraph<Node, IndexedEdge>(null, new ErrorEdgeFactory(), false),
new LinkedHashMap<String, List<Node>>(),
new UniqueIdGen());
}
Expand Down Expand Up @@ -333,9 +332,9 @@ private static Set<String> committerBatches(Group g, Map<Node, String> batchGrou
return ret;
}

private static Map<Group, Integer> getGroupParallelisms(DirectedGraph<Node, IndexedEdge> graph, GraphGrouper grouper,
private static Map<Group, Integer> getGroupParallelisms(Graph<Node, IndexedEdge> graph, GraphGrouper grouper,
Collection<Group> groups) {
UndirectedGraph<Group, Object> equivs = new Pseudograph<>(Object.class);
Graph<Group, Object> equivs = new Pseudograph<>(Object.class);
for (Group g : groups) {
equivs.addVertex(g);
}
Expand Down Expand Up @@ -440,7 +439,7 @@ private static boolean isIdentityPartition(PartitionNode n) {
return false;
}

private static void addEdge(DirectedGraph g, Object source, Object target, int index) {
private static void addEdge(Graph g, Object source, Object target, int index) {
g.addEdge(source, target, new IndexedEdge(source, target, index));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
import org.apache.storm.shade.org.jgrapht.Graph;
import org.apache.storm.trident.planner.Node;
import org.apache.storm.trident.util.IndexedEdge;

public class GraphGrouper {
final DirectedGraph<Node, IndexedEdge> graph;
final Graph<Node, IndexedEdge> graph;
final Set<Group> currGroups;
final Map<Node, Group> groupIndex = new HashMap<>();

public GraphGrouper(DirectedGraph<Node, IndexedEdge> graph, Collection<Group> initialGroups) {
public GraphGrouper(Graph<Node, IndexedEdge> graph, Collection<Group> initialGroups) {
this.graph = graph;
this.currGroups = new LinkedHashSet<>(initialGroups);
reindex();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@
import java.util.Set;
import java.util.UUID;
import org.apache.storm.generated.SharedMemory;
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
import org.apache.storm.shade.org.jgrapht.Graph;
import org.apache.storm.trident.planner.Node;
import org.apache.storm.trident.util.IndexedEdge;
import org.apache.storm.trident.util.TridentUtils;

public class Group {
public final Set<Node> nodes = new HashSet<>();
private final DirectedGraph<Node, IndexedEdge> graph;
private final Graph<Node, IndexedEdge> graph;
private final String id = UUID.randomUUID().toString();

public Group(DirectedGraph graph, List<Node> nodes) {
public Group(Graph graph, List<Node> nodes) {
this.graph = graph;
this.nodes.addAll(nodes);
}

public Group(DirectedGraph graph, Node n) {
public Group(Graph graph, Node n) {
this(graph, Arrays.asList(n));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import java.util.Set;
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
import org.apache.storm.shade.org.jgrapht.Graph;
import org.apache.storm.shade.org.jgrapht.graph.AsSubgraph;
import org.apache.storm.shade.org.jgrapht.graph.DefaultDirectedGraph;
import org.apache.storm.shade.org.jgrapht.graph.DirectedSubgraph;
import org.apache.storm.shade.org.jgrapht.traverse.TopologicalOrderIterator;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
Expand All @@ -45,7 +45,7 @@
public class SubtopologyBolt implements ITridentBatchBolt {
private static final long serialVersionUID = 1475508603138688412L;
@SuppressWarnings("rawtypes")
final DirectedGraph<Node, IndexedEdge> graph;
final Graph<Node, IndexedEdge> graph;
final Set<Node> nodes;
final Map<String, InitialReceiver> roots = new HashMap<>();
final Map<Node, Factory> outputFactories = new HashMap<>();
Expand All @@ -56,7 +56,7 @@ public class SubtopologyBolt implements ITridentBatchBolt {
@SuppressWarnings({ "unchecked", "rawtypes" })
public SubtopologyBolt(DefaultDirectedGraph<Node, IndexedEdge> graph, Set<Node> nodes, Map<Node, String> batchGroups) {
this.nodes = nodes;
this.graph = (DirectedGraph<Node, IndexedEdge>) graph.clone();
this.graph = (Graph<Node, IndexedEdge>) graph.clone();
this.batchGroups = copyAndOnlyKeep(batchGroups, nodes);

//Remove the unneeded entries from the graph
Expand Down Expand Up @@ -95,8 +95,8 @@ public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutp
context.setTaskData(n.stateInfo.id, s);
}
}
DirectedSubgraph<Node, ?> subgraph = new DirectedSubgraph<>(graph, nodes, null);
TopologicalOrderIterator<Node, ?> it = new TopologicalOrderIterator<>(subgraph);
AsSubgraph<Node, IndexedEdge> subgraph = new AsSubgraph<>(graph, nodes, null);
TopologicalOrderIterator<Node, IndexedEdge> it = new TopologicalOrderIterator<>(subgraph);
int stateIndex = 0;
while (it.hasNext()) {
Node n = it.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
package org.apache.storm.trident.util;

import java.io.Serializable;
import org.apache.storm.shade.org.jgrapht.EdgeFactory;
import java.util.function.Supplier;

public class ErrorEdgeFactory implements EdgeFactory, Serializable {
public class ErrorEdgeFactory implements Supplier<IndexedEdge>, Serializable {
@Override
public Object createEdge(Object v, Object v1) {
public IndexedEdge get() {
throw new RuntimeException("Edges should be made explicitly");
}
}
Loading
Loading