Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 40 additions & 16 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.socket</groupId>
<artifactId>socket.io-client</artifactId>
<version>2.1.3-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>socket.io-client</name>
<description>Socket.IO Client Library for Java</description>
Expand Down Expand Up @@ -102,20 +102,6 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<compilerArgs>
<arg>-Xlint:unchecked</arg>
</compilerArgs>
<showWarnings>true</showWarnings>
<showDeprecation>true</showDeprecation>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand Down Expand Up @@ -207,7 +193,7 @@
<workingDirectory>./src/test/resources</workingDirectory>
<executable>npm</executable>
<arguments>
<argument>install</argument>
<argument>install</argument>
</arguments>
</configuration>
</execution>
Expand All @@ -223,6 +209,44 @@
<artifactId>maven-site-plugin</artifactId>
<version>3.9.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<executions>
<execution>
<id>default-compile</id>
<phase>none</phase>
</execution>
<execution>
<id>default-testCompile</id>
<phase>none</phase>
</execution>
<execution>
<id>compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>testCompile</id>
<phase>test-compile</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<source>1.7</source>
<target>1.7</target>
<compilerArgs>
<arg>-Xlint:unchecked</arg>
</compilerArgs>
<showWarnings>true</showWarnings>
<showDeprecation>true</showDeprecation>
</configuration>
</plugin>
</plugins>
</build>
</project>
7 changes: 4 additions & 3 deletions src/main/java/io/socket/client/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.socket.thread.EventThread;
import okhttp3.Call;
import okhttp3.WebSocket;
import org.json.JSONObject;

import java.net.URI;
import java.util.*;
Expand Down Expand Up @@ -78,7 +79,7 @@ public class Manager extends Emitter {
private final List<Packet> packetBuffer = new ArrayList<>();
private final Queue<On.Handle> subs = new LinkedList<>();;
private final Options opts;
/*package*/ io.socket.engineio.client.Socket engine;
public io.socket.engineio.client.Socket engine;
private final Parser.Encoder encoder;
private final Parser.Decoder decoder;

Expand Down Expand Up @@ -445,7 +446,7 @@ private void cleanup() {
this.decoder.destroy();
}

/*package*/ void close() {
public void close() {
logger.fine("disconnect");
this.skipReconnect = true;
this.reconnecting = false;
Expand Down Expand Up @@ -562,7 +563,7 @@ public static class Options extends io.socket.engineio.client.Socket.Options {
public double randomizationFactor;
public Parser.Encoder encoder;
public Parser.Decoder decoder;
public Map<String, String> auth;
public JSONObject auth;

/**
* Connection timeout (ms). Set -1 to disable.
Expand Down
88 changes: 69 additions & 19 deletions src/main/java/io/socket/client/Socket.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,15 @@ public class Socket extends Emitter {
private int ids;
private final String nsp;
private final Manager io;
private final Map<String, String> auth;
private final JSONObject auth;

private String _pid = null;
private String _lastOffset = null;
private boolean recovered = false;

private final Map<Integer, Ack> acks = new ConcurrentHashMap<>();
private Queue<On.Handle> subs;
private final Queue<List<Object>> receiveBuffer = new ConcurrentLinkedQueue<>();
private final Queue<Packet<JSONArray>> receiveBuffer = new ConcurrentLinkedQueue<>();
private final Queue<Packet<JSONArray>> sendBuffer = new ConcurrentLinkedQueue<>();

private final ConcurrentLinkedQueue<Listener> onAnyIncomingListeners = new ConcurrentLinkedQueue<>();
Expand Down Expand Up @@ -156,7 +161,7 @@ public void run() {
* Emits an event. When you pass {@link Ack} at the last argument, then the acknowledge is done.
*
* @param event an event name.
* @param args data to send.
* @param args data to send.
* @return a reference to this object.
*/
@Override
Expand All @@ -183,7 +188,7 @@ public void run() {
ack = null;
}

emit(event, _args, ack);
Socket.this.emit(event, _args, ack);
}
});
return this;
Expand All @@ -193,8 +198,8 @@ public void run() {
* Emits an event with an acknowledge.
*
* @param event an event name
* @param args data to send.
* @param ack the acknowledgement to be called
* @param args data to send.
* @param ack the acknowledgement to be called
* @return a reference to this object.
*/
public Emitter emit(final String event, final Object[] args, final Ack ack) {
Expand Down Expand Up @@ -268,8 +273,23 @@ private void packet(Packet packet) {
private void onopen() {
logger.fine("transport is open - connecting");

if (this.auth != null) {
this.packet(new Packet<>(Parser.CONNECT, new JSONObject(this.auth)));
JSONObject auth = this.auth;

if (_pid != null && _lastOffset != null) {
// Reconnection - add recovery data
try {
if (auth == null) {
auth = new JSONObject();
}
auth.put("pid", _pid);
auth.put("offset", String.valueOf(_lastOffset));
} catch (JSONException e) {
logger.warning(e.getMessage());
}
}

if (auth != null) {
this.packet(new Packet<>(Parser.CONNECT, auth));
} else {
this.packet(new Packet<>(Parser.CONNECT));
}
Expand Down Expand Up @@ -304,11 +324,19 @@ private void onpacket(Packet<?> packet) {

switch (packet.type) {
case Parser.CONNECT: {
if (packet.data instanceof JSONObject && ((JSONObject) packet.data).has("sid")) {
if (packet.data instanceof JSONObject) {
JSONObject data = (JSONObject) packet.data;
try {
this.onconnect(((JSONObject) packet.data).getString("sid"));
return;
} catch (JSONException e) {}
if (data.has("pid") && data.has("sid")) {
this.onconnect(data.getString("sid"), data.getString("pid"));
return;
} else if (data.has("sid")) {
this.onconnect(data.getString("sid"));
return;
}
} catch (JSONException e) {
logger.warning(e.getMessage());
}
} else {
super.emit(EVENT_CONNECT_ERROR, new SocketIOException("It seems you are trying to reach a Socket.IO server in v2.x with a v3.x client, which is not possible"));
}
Expand Down Expand Up @@ -363,8 +391,12 @@ private void onevent(Packet<JSONArray> packet) {
}
String event = args.remove(0).toString();
super.emit(event, args.toArray());
// Update offset if present
if (this._pid != null && args.get(args.size() - 1) instanceof String) {
_lastOffset = (String) args.get(args.size() - 1);
}
} else {
this.receiveBuffer.add(args);
this.receiveBuffer.add(packet);
}
}

Expand Down Expand Up @@ -411,18 +443,27 @@ private void onack(Packet<JSONArray> packet) {
}
}

private void onconnect(String id) {
private void onconnect(String sid) {
this.connected = true;
this.id = sid;
this.recovered = false;
this.emitBuffered();
super.emit(EVENT_CONNECT);
}

private void onconnect(String sid, String pid) {
this.connected = true;
this.id = id;
this.id = sid;
this.recovered = pid != null && pid.equals(_pid);
this._pid = pid;
this.emitBuffered();
super.emit(EVENT_CONNECT);
}

private void emitBuffered() {
List<Object> data;
Packet<JSONArray> data;
while ((data = this.receiveBuffer.poll()) != null) {
String event = (String)data.get(0);
super.emit(event, data.toArray());
onevent(data);
}
this.receiveBuffer.clear();

Expand All @@ -437,6 +478,7 @@ private void ondisconnect() {
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format("server disconnect (%s)", this.nsp));
}
recovered = false;
this.destroy();
this.onclose("io server disconnect");
}
Expand Down Expand Up @@ -498,7 +540,7 @@ public boolean connected() {

/**
* A property on the socket instance that is equal to the underlying engine.io socket id.
*
* <p>
* The value is present once the socket has connected, is removed when the socket disconnects and is updated if the socket reconnects.
*
* @return a socket id
Expand Down Expand Up @@ -566,5 +608,13 @@ public Socket offAnyOutgoing(Listener fn) {
}
return this;
}

public String getLastOffset() {
return this._lastOffset;
}

public boolean isRecovered() {
return recovered;
}
}

4 changes: 3 additions & 1 deletion src/main/java/io/socket/client/SocketOptionBuilder.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.socket.client;

import org.json.JSONObject;

import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -174,7 +176,7 @@ public SocketOptionBuilder setPath(String path) {
return this;
}

public SocketOptionBuilder setAuth(Map<String, String> auth) {
public SocketOptionBuilder setAuth(JSONObject auth) {
this.options.auth = auth;
return this;
}
Expand Down
Loading