From cbe785f9ffa1ac68d846c0adbebf2169f65bd993 Mon Sep 17 00:00:00 2001 From: RobinPcrd Date: Wed, 3 Dec 2025 15:27:36 +0100 Subject: [PATCH 01/11] feat: add connection state recovery and flexible auth - Implement Socket.IO v4 connection state recovery * Track PID and offset for session restoration * Add isRecovered() and getLastOffset() methods * Replay missed events after reconnect - Change auth from Map to JSONObject for flexibility - Upgrade test server from Socket.IO v3.0.4 to v4.8.1 * Fix tests for v4 offset parameter * Fix namespace emission bugs BREAKING CHANGE: auth is now JSONObject instead of Map --- pom.xml | 56 ++- src/main/java/io/socket/client/Manager.java | 3 +- src/main/java/io/socket/client/Socket.java | 280 ++++++++------- .../io/socket/client/SocketOptionBuilder.java | 4 +- .../java/io/socket/client/Connection.java | 2 +- .../java/io/socket/client/ConnectionTest.java | 157 +++++---- .../socket/client/ServerConnectionTest.java | 104 +++--- .../java/io/socket/client/SocketTest.java | 33 +- src/test/java/io/socket/parser/Helpers.java | 2 +- src/test/resources/package-lock.json | 325 +++++++++++------- src/test/resources/package.json | 2 +- src/test/resources/server.js | 36 +- 12 files changed, 591 insertions(+), 413 deletions(-) diff --git a/pom.xml b/pom.xml index 4636cb1b..2e35df48 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 io.socket socket.io-client - 2.1.3-SNAPSHOT + 2.1.52-SNAPSHOT jar socket.io-client Socket.IO Client Library for Java @@ -102,20 +102,6 @@ - - org.apache.maven.plugins - maven-compiler-plugin - 3.5.1 - - 1.7 - 1.7 - - -Xlint:unchecked - - true - true - - org.apache.maven.plugins maven-surefire-plugin @@ -207,7 +193,7 @@ ./src/test/resources npm - install + install @@ -223,6 +209,44 @@ maven-site-plugin 3.9.1 + + org.apache.maven.plugins + maven-compiler-plugin + 3.5.1 + + + default-compile + none + + + default-testCompile + none + + + compile + compile + + compile + + + + testCompile + test-compile + + testCompile + + + + + 1.8 + 1.8 + + -Xlint:unchecked + + true + true + + diff --git a/src/main/java/io/socket/client/Manager.java b/src/main/java/io/socket/client/Manager.java index 04198998..a98882b7 100644 --- a/src/main/java/io/socket/client/Manager.java +++ b/src/main/java/io/socket/client/Manager.java @@ -9,6 +9,7 @@ import io.socket.thread.EventThread; import okhttp3.Call; import okhttp3.WebSocket; +import org.json.JSONObject; import java.net.URI; import java.util.*; @@ -562,7 +563,7 @@ public static class Options extends io.socket.engineio.client.Socket.Options { public double randomizationFactor; public Parser.Encoder encoder; public Parser.Decoder decoder; - public Map auth; + public JSONObject auth; /** * Connection timeout (ms). Set -1 to disable. diff --git a/src/main/java/io/socket/client/Socket.java b/src/main/java/io/socket/client/Socket.java index 2227a30d..cf336315 100644 --- a/src/main/java/io/socket/client/Socket.java +++ b/src/main/java/io/socket/client/Socket.java @@ -4,6 +4,7 @@ import io.socket.parser.Packet; import io.socket.parser.Parser; import io.socket.thread.EventThread; +import org.jetbrains.annotations.Nullable; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -59,10 +60,15 @@ public class Socket extends Emitter { private int ids; private final String nsp; private final Manager io; - private final Map auth; + private final JSONObject auth; + + private String _pid = null; + private String _lastOffset = null; + private boolean recovered = false; + private final Map acks = new ConcurrentHashMap<>(); private Queue subs; - private final Queue> receiveBuffer = new ConcurrentLinkedQueue<>(); + private final Queue> receiveBuffer = new ConcurrentLinkedQueue<>(); private final Queue> sendBuffer = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue onAnyIncomingListeners = new ConcurrentLinkedQueue<>(); @@ -79,31 +85,19 @@ private void subEvents() { final Manager io = Socket.this.io; Socket.this.subs = new LinkedList() {{ - add(On.on(io, Manager.EVENT_OPEN, new Listener() { - @Override - public void call(Object... args) { - Socket.this.onopen(); - } + add(On.on(io, Manager.EVENT_OPEN, args -> { + Socket.this.onopen(); })); - add(On.on(io, Manager.EVENT_PACKET, new Listener() { - @Override - public void call(Object... args) { - Socket.this.onpacket((Packet) args[0]); - } + add(On.on(io, Manager.EVENT_PACKET, args -> { + Socket.this.onpacket((Packet) args[0]); })); - add(On.on(io, Manager.EVENT_ERROR, new Listener() { - @Override - public void call(Object... args) { - if (!Socket.this.connected) { - Socket.super.emit(EVENT_CONNECT_ERROR, args[0]); - } + add(On.on(io, Manager.EVENT_ERROR, args -> { + if (!Socket.this.connected) { + Socket.super.emit(EVENT_CONNECT_ERROR, args[0]); } })); - add(On.on(io, Manager.EVENT_CLOSE, new Listener() { - @Override - public void call(Object... args) { - Socket.this.onclose(args.length > 0 ? (String) args[0] : null); - } + add(On.on(io, Manager.EVENT_CLOSE, args -> { + Socket.this.onclose(args.length > 0 ? (String) args[0] : null); })); }}; } @@ -116,15 +110,12 @@ public boolean isActive() { * Connects the socket. */ public Socket open() { - EventThread.exec(new Runnable() { - @Override - public void run() { - if (Socket.this.connected || Socket.this.io.isReconnecting()) return; + EventThread.exec(() -> { + if (Socket.this.connected || Socket.this.io.isReconnecting()) return; - Socket.this.subEvents(); - Socket.this.io.open(); // ensure open - if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen(); - } + Socket.this.subEvents(); + Socket.this.io.open(); // ensure open + if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen(); }); return this; } @@ -143,12 +134,7 @@ public Socket connect() { * @return a reference to this object. */ public Socket send(final Object... args) { - EventThread.exec(new Runnable() { - @Override - public void run() { - Socket.this.emit(EVENT_MESSAGE, args); - } - }); + EventThread.exec(() -> Socket.this.emit(EVENT_MESSAGE, args)); return this; } @@ -156,7 +142,7 @@ public void run() { * Emits an event. When you pass {@link Ack} at the last argument, then the acknowledge is done. * * @param event an event name. - * @param args data to send. + * @param args data to send. * @return a reference to this object. */ @Override @@ -165,26 +151,23 @@ public Emitter emit(final String event, final Object... args) { throw new RuntimeException("'" + event + "' is a reserved event name"); } - EventThread.exec(new Runnable() { - @Override - public void run() { - Ack ack; - Object[] _args; - int lastIndex = args.length - 1; - - if (args.length > 0 && args[lastIndex] instanceof Ack) { - _args = new Object[lastIndex]; - for (int i = 0; i < lastIndex; i++) { - _args[i] = args[i]; - } - ack = (Ack) args[lastIndex]; - } else { - _args = args; - ack = null; - } + EventThread.exec(() -> { + Ack ack; + Object[] _args; + int lastIndex = args.length - 1; - emit(event, _args, ack); + if (args.length > 0 && args[lastIndex] instanceof Ack) { + _args = new Object[lastIndex]; + for (int i = 0; i < lastIndex; i++) { + _args[i] = args[i]; + } + ack = (Ack) args[lastIndex]; + } else { + _args = args; + ack = null; } + + emit(event, _args, ack); }); return this; } @@ -193,60 +176,57 @@ public void run() { * Emits an event with an acknowledge. * * @param event an event name - * @param args data to send. - * @param ack the acknowledgement to be called + * @param args data to send. + * @param ack the acknowledgement to be called * @return a reference to this object. */ public Emitter emit(final String event, final Object[] args, final Ack ack) { - EventThread.exec(new Runnable() { - @Override - public void run() { - JSONArray jsonArgs = new JSONArray(); - jsonArgs.put(event); + EventThread.exec(() -> { + JSONArray jsonArgs = new JSONArray(); + jsonArgs.put(event); - if (args != null) { - for (Object arg : args) { - jsonArgs.put(arg); - } + if (args != null) { + for (Object arg : args) { + jsonArgs.put(arg); } + } - Packet packet = new Packet<>(Parser.EVENT, jsonArgs); + Packet packet = new Packet<>(Parser.EVENT, jsonArgs); - if (ack != null) { - final int ackId = Socket.this.ids; + if (ack != null) { + final int ackId = Socket.this.ids; - logger.fine(String.format("emitting packet with ack id %d", ackId)); + logger.fine(String.format("emitting packet with ack id %d", ackId)); - if (ack instanceof AckWithTimeout) { - final AckWithTimeout ackWithTimeout = (AckWithTimeout) ack; - ackWithTimeout.schedule(new TimerTask() { - @Override - public void run() { - // remove the ack from the map (to prevent an actual acknowledgement) - acks.remove(ackId); + if (ack instanceof AckWithTimeout) { + final AckWithTimeout ackWithTimeout = (AckWithTimeout) ack; + ackWithTimeout.schedule(new TimerTask() { + @Override + public void run() { + // remove the ack from the map (to prevent an actual acknowledgement) + acks.remove(ackId); - // remove the packet from the buffer (if applicable) - Iterator> iterator = sendBuffer.iterator(); - while (iterator.hasNext()) { - if (iterator.next().id == ackId) { - iterator.remove(); - } + // remove the packet from the buffer (if applicable) + Iterator> iterator = sendBuffer.iterator(); + while (iterator.hasNext()) { + if (iterator.next().id == ackId) { + iterator.remove(); } - - ackWithTimeout.onTimeout(); } - }); - } - Socket.this.acks.put(ackId, ack); - packet.id = ids++; + ackWithTimeout.onTimeout(); + } + }); } - if (Socket.this.connected) { - Socket.this.packet(packet); - } else { - Socket.this.sendBuffer.add(packet); - } + Socket.this.acks.put(ackId, ack); + packet.id = ids++; + } + + if (Socket.this.connected) { + Socket.this.packet(packet); + } else { + Socket.this.sendBuffer.add(packet); } }); return this; @@ -268,8 +248,23 @@ private void packet(Packet packet) { private void onopen() { logger.fine("transport is open - connecting"); - if (this.auth != null) { - this.packet(new Packet<>(Parser.CONNECT, new JSONObject(this.auth))); + JSONObject auth = this.auth; + + if (_pid != null && _lastOffset != null) { + // Reconnection - add recovery data + try { + if (auth == null) { + auth = new JSONObject(); + } + auth.put("pid", _pid); + auth.put("offset", String.valueOf(_lastOffset)); + } catch (JSONException e) { + logger.warning(e.getMessage()); + } + } + + if (auth != null) { + this.packet(new Packet<>(Parser.CONNECT, auth)); } else { this.packet(new Packet<>(Parser.CONNECT)); } @@ -304,11 +299,19 @@ private void onpacket(Packet packet) { switch (packet.type) { case Parser.CONNECT: { - if (packet.data instanceof JSONObject && ((JSONObject) packet.data).has("sid")) { + if (packet.data instanceof JSONObject) { + JSONObject data = (JSONObject) packet.data; try { - this.onconnect(((JSONObject) packet.data).getString("sid")); - return; - } catch (JSONException e) {} + if (data.has("pid") && data.has("sid")) { + this.onconnect(data.getString("sid"), data.getString("pid")); + return; + } else if (data.has("sid")) { + this.onconnect(data.getString("sid")); + return; + } + } catch (JSONException e) { + logger.warning(e.getMessage()); + } } else { super.emit(EVENT_CONNECT_ERROR, new SocketIOException("It seems you are trying to reach a Socket.IO server in v2.x with a v3.x client, which is not possible")); } @@ -363,38 +366,34 @@ private void onevent(Packet packet) { } String event = args.remove(0).toString(); super.emit(event, args.toArray()); + // Update offset if present + if (this._pid != null && args.get(args.size() - 1) instanceof String) { + _lastOffset = (String) args.get(args.size() - 1); + } } else { - this.receiveBuffer.add(args); + this.receiveBuffer.add(packet); } } private Ack ack(final int id) { final Socket self = this; - final boolean[] sent = new boolean[] {false}; - return new Ack() { - @Override - public void call(final Object... args) { - EventThread.exec(new Runnable() { - @Override - public void run() { - if (sent[0]) return; - sent[0] = true; - if (logger.isLoggable(Level.FINE)) { - logger.fine(String.format("sending ack %s", args.length != 0 ? args : null)); - } - - JSONArray jsonArgs = new JSONArray(); - for (Object arg : args) { - jsonArgs.put(arg); - } + final boolean[] sent = new boolean[]{false}; + return args -> EventThread.exec(() -> { + if (sent[0]) return; + sent[0] = true; + if (logger.isLoggable(Level.FINE)) { + logger.fine(String.format("sending ack %s", args.length != 0 ? args : null)); + } - Packet packet = new Packet<>(Parser.ACK, jsonArgs); - packet.id = id; - self.packet(packet); - } - }); + JSONArray jsonArgs = new JSONArray(); + for (Object arg : args) { + jsonArgs.put(arg); } - }; + + Packet packet = new Packet<>(Parser.ACK, jsonArgs); + packet.id = id; + self.packet(packet); + }); } private void onack(Packet packet) { @@ -411,18 +410,27 @@ private void onack(Packet packet) { } } - private void onconnect(String id) { + private void onconnect(String sid) { + this.connected = true; + this.id = sid; + this.recovered = false; + this.emitBuffered(); + super.emit(EVENT_CONNECT); + } + + private void onconnect(String sid, String pid) { this.connected = true; - this.id = id; + this.id = sid; + this.recovered = pid != null && pid.equals(_pid); + this._pid = pid; this.emitBuffered(); super.emit(EVENT_CONNECT); } private void emitBuffered() { - List data; + Packet data; while ((data = this.receiveBuffer.poll()) != null) { - String event = (String)data.get(0); - super.emit(event, data.toArray()); + onevent(data); } this.receiveBuffer.clear(); @@ -437,6 +445,7 @@ private void ondisconnect() { if (logger.isLoggable(Level.FINE)) { logger.fine(String.format("server disconnect (%s)", this.nsp)); } + recovered = false; this.destroy(); this.onclose("io server disconnect"); } @@ -498,7 +507,7 @@ public boolean connected() { /** * A property on the socket instance that is equal to the underlying engine.io socket id. - * + *

* The value is present once the socket has connected, is removed when the socket disconnects and is updated if the socket reconnects. * * @return a socket id @@ -566,5 +575,14 @@ public Socket offAnyOutgoing(Listener fn) { } return this; } + + @Nullable + public String getLastOffset() { + return this._lastOffset; + } + + public boolean isRecovered() { + return recovered; + } } diff --git a/src/main/java/io/socket/client/SocketOptionBuilder.java b/src/main/java/io/socket/client/SocketOptionBuilder.java index ef24bf83..7a061241 100644 --- a/src/main/java/io/socket/client/SocketOptionBuilder.java +++ b/src/main/java/io/socket/client/SocketOptionBuilder.java @@ -1,5 +1,7 @@ package io.socket.client; +import org.json.JSONObject; + import java.util.List; import java.util.Map; @@ -174,7 +176,7 @@ public SocketOptionBuilder setPath(String path) { return this; } - public SocketOptionBuilder setAuth(Map auth) { + public SocketOptionBuilder setAuth(JSONObject auth) { this.options.auth = auth; return this; } diff --git a/src/test/java/io/socket/client/Connection.java b/src/test/java/io/socket/client/Connection.java index 9f3a533e..7d7fd4ca 100644 --- a/src/test/java/io/socket/client/Connection.java +++ b/src/test/java/io/socket/client/Connection.java @@ -16,7 +16,7 @@ public abstract class Connection { private static final Logger logger = Logger.getLogger(Connection.class.getName()); - final static int TIMEOUT = 7000; + final static int TIMEOUT = 7_000; final static int PORT = 3000; private Process serverProcess; diff --git a/src/test/java/io/socket/client/ConnectionTest.java b/src/test/java/io/socket/client/ConnectionTest.java index aad9f4c4..f99d3796 100644 --- a/src/test/java/io/socket/client/ConnectionTest.java +++ b/src/test/java/io/socket/client/ConnectionTest.java @@ -16,12 +16,13 @@ import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.*; @RunWith(JUnit4.class) public class ConnectionTest extends Connection { @@ -72,38 +73,28 @@ public void startTwoConnectionsWithSamePathAndDifferentQuerystrings() throws Int @Test(timeout = TIMEOUT) public void workWithAcks() throws InterruptedException { final BlockingQueue values = new LinkedBlockingQueue<>(); - socket = client(); - socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { - @Override - public void call(Object... objects) { - socket.emit("callAck"); - socket.on("ack", new Emitter.Listener() { - @Override - public void call(Object... args) { - Ack fn = (Ack) args[0]; - JSONObject data = new JSONObject(); - try { - data.put("test", true); - } catch (JSONException e) { - throw new AssertionError(e); - } - fn.call(5, data); - } - }); - socket.on("ackBack", new Emitter.Listener() { - @Override - public void call(Object... args) { - JSONObject data = (JSONObject)args[1]; - try { - if ((Integer)args[0] == 5 && data.getBoolean("test")) { - values.offer("done"); - } - } catch (JSONException e) { - throw new AssertionError(e); - } + socket = client();socket.on(Socket.EVENT_CONNECT, objects -> { + socket.on("ack", args -> { + Ack fn = (Ack) args[0]; + JSONObject data = new JSONObject(); + try { + data.put("test", true); + } catch (JSONException e) { + throw new AssertionError(e); + } + fn.call(5, data); + }); + socket.on("ackBack", args -> { + JSONObject data = (JSONObject)args[1]; + try { + if ((Integer)args[0] == 5 && data.getBoolean("test")) { + values.offer("done"); } - }); - } + } catch (JSONException e) { + throw new AssertionError(e); + } + }); + socket.emit("callAck"); }); socket.connect(); values.take(); @@ -302,12 +293,9 @@ public void call(Object... args) { public void reconnectByDefault() throws InterruptedException { final BlockingQueue values = new LinkedBlockingQueue<>(); socket = client(); - socket.io().on(Manager.EVENT_RECONNECT, new Emitter.Listener() { - @Override - public void call(Object... objects) { - socket.close(); - values.offer("done"); - } + socket.io().on(Manager.EVENT_RECONNECT, objects -> { + socket.close(); + values.offer("done"); }); socket.open(); new Timer().schedule(new TimerTask() { @@ -349,29 +337,20 @@ public void call(Object... args) { public void reconnectAutomaticallyAfterReconnectingManually() throws InterruptedException { final BlockingQueue values = new LinkedBlockingQueue<>(); socket = client(); - socket.once(Socket.EVENT_CONNECT, new Emitter.Listener() { - @Override - public void call(Object... args) { + socket.once(Socket.EVENT_CONNECT, args -> { + socket.disconnect(); + }).once(Socket.EVENT_DISCONNECT, args -> { + socket.io().on(Manager.EVENT_RECONNECT, args1 -> { socket.disconnect(); - } - }).once(Socket.EVENT_DISCONNECT, new Emitter.Listener() { - @Override - public void call(Object... args) { - socket.io().on(Manager.EVENT_RECONNECT, new Emitter.Listener() { - @Override - public void call(Object... args) { - socket.disconnect(); - values.offer("done"); - } - }); - socket.connect(); - new Timer().schedule(new TimerTask() { - @Override - public void run() { - socket.io().engine.close(); - } - }, 500); - } + values.offer("done"); + }); + socket.connect(); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + socket.io().engine.close(); + } + }, 500); }); socket.connect(); values.take(); @@ -921,4 +900,62 @@ public void call(Object... args) { assertThat((String)values.take(), is("please arrive second")); socket.close(); } + + @Test(timeout = TIMEOUT) + public void shouldReceiveBufferedEventAfterReconnect() throws InterruptedException { + final BlockingQueue events = new LinkedBlockingQueue<>(); + + socket = client(); + socket.on(Socket.EVENT_CONNECT, args -> { + if (!socket.isRecovered()) { + events.offer("first-connect"); + // Tell server to start buffering scenario + socket.emit("startBufferTest"); + + // Disconnect engine after 1 second + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + socket.io().engine.close(); + } + }, 1000); + } else { + // Reconnected with recovery + events.offer("reconnected"); + } + }); + + socket.on(Socket.EVENT_DISCONNECT, args -> { + events.offer("disconnected"); + // Reconnect after 2 seconds + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + socket.connect(); + } + }, 2000); + }); + + socket.on("message", args -> { + if (args[0] instanceof JSONObject) { + JSONObject data = (JSONObject) args[0]; + String text = data.optString("text"); + System.out.println("Received message: " + text); + events.offer(text); + } + }); + + socket.connect(); + + // Verify sequence + assertEquals("first-connect", events.poll(TIMEOUT, TimeUnit.MILLISECONDS)); + assertEquals("disconnected", events.poll(TIMEOUT, TimeUnit.MILLISECONDS)); + assertEquals("buffered-message", events.poll(TIMEOUT, TimeUnit.MILLISECONDS)); + assertEquals("reconnected", events.poll(TIMEOUT, TimeUnit.MILLISECONDS)); + + assertTrue("Should have recovered session", socket.isRecovered()); + assertNotNull("Should have tracked offset", socket.getLastOffset()); + } } diff --git a/src/test/java/io/socket/client/ServerConnectionTest.java b/src/test/java/io/socket/client/ServerConnectionTest.java index c2e9354e..ea22c322 100644 --- a/src/test/java/io/socket/client/ServerConnectionTest.java +++ b/src/test/java/io/socket/client/ServerConnectionTest.java @@ -43,8 +43,8 @@ public void call(Object... args) { }); socket.connect(); - assertThat(((Object[])values.take()).length, is(0)); - Object[] args = (Object[] )values.take(); + assertThat(((Object[]) values.take()).length, is(0)); + Object[] args = (Object[]) values.take(); assertThat(args.length, is(1)); assertThat(args[0], is(instanceOf(String.class))); } @@ -67,8 +67,8 @@ public void call(Object... args) { }); socket.connect(); - assertThat((Object[])values.take(), is(new Object[] {"hello client"})); - assertThat((Object[])values.take(), is(new Object[] {"foo", "bar"})); + assertThat(stripOffset((Object[]) values.take()), is(new Object[]{"hello client"})); + assertThat(stripOffset((Object[]) values.take()), is(new Object[]{"foo", "bar"})); socket.disconnect(); } @@ -93,11 +93,12 @@ public void call(Object... args) { }); socket.connect(); - Object[] args = (Object[])values.take(); + Object[] args = (Object[]) values.take(); + args = stripOffset(args); assertThat(args.length, is(3)); assertThat(args[0].toString(), is(obj.toString())); assertThat(args[1], is(nullValue())); - assertThat((String)args[2], is("bar")); + assertThat((String) args[2], is("bar")); socket.disconnect(); } @@ -112,7 +113,7 @@ public void ack() throws Exception { socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { @Override public void call(Object... objects) { - socket.emit("ack", new Object[] {obj, "bar"}, new Ack() { + socket.emit("ack", new Object[]{obj, "bar"}, new Ack() { @Override public void call(Object... args) { values.offer(args); @@ -122,10 +123,10 @@ public void call(Object... args) { }); socket.connect(); - Object[] args = (Object[])values.take(); + Object[] args = (Object[]) values.take(); assertThat(args.length, is(2)); assertThat(args[0].toString(), is(obj.toString())); - assertThat((String)args[1], is("bar")); + assertThat((String) args[1], is("bar")); socket.disconnect(); } @@ -147,7 +148,7 @@ public void call(Object... args) { }); socket.connect(); - assertThat((Integer)values.take(), is(0)); + assertThat((Integer) values.take(), is(0)); socket.disconnect(); } @@ -163,7 +164,7 @@ public void call(Object... objects) { @Override public void call(Object... args) { values.offer(args); - Ack ack = (Ack)args[0]; + Ack ack = (Ack) args[0]; ack.call(); } }).on("ackBack", new Emitter.Listener() { @@ -178,10 +179,12 @@ public void call(Object... args) { }); socket.connect(); - Object[] args = (Object[])values.take(); + Object[] args = (Object[]) values.take(); + args = stripOffset(args); assertThat(args.length, is(1)); assertThat(args[0], is(instanceOf(Ack.class))); - args = (Object[])values.take(); + args = (Object[]) values.take(); + args = stripOffset(args); assertThat(args.length, is(0)); socket.disconnect(); } @@ -212,30 +215,22 @@ public void broadcast() throws InterruptedException { final BlockingQueue values = new LinkedBlockingQueue<>(); socket = client(); - socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { - @Override - public void call(Object... objects) { - socket2 = client(); - - socket2.on(Socket.EVENT_CONNECT, new Emitter.Listener() { - @Override - public void call(Object... objects) { - socket2.emit("broadcast", "hi"); - } - }); - socket2.connect(); - } - }).on("broadcastBack", new Emitter.Listener() { - @Override - public void call(Object... args) { - values.offer(args); - } + socket.on(Socket.EVENT_CONNECT, objects -> { + socket2 = client(); + + socket2.on(Socket.EVENT_CONNECT, objects1 -> { + socket2.emit("broadcast", "hi"); + }); + socket2.connect(); + }).on("broadcastBack", args -> { + values.offer(args); }); socket.connect(); - Object[] args = (Object[])values.take(); + Object[] args = (Object[]) values.take(); + args = stripOffset(args); assertThat(args.length, is(1)); - assertThat((String)args[0], is("hi")); + assertThat((String) args[0], is("hi")); socket.disconnect(); socket2.disconnect(); } @@ -245,22 +240,13 @@ public void room() throws InterruptedException { final BlockingQueue values = new LinkedBlockingQueue<>(); socket = client(); - socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { - @Override - public void call(Object... objects) { - socket.emit("room", "hi"); - } - }).on("roomBack", new Emitter.Listener() { - @Override - public void call(Object... args) { - values.offer(args); - } - }); + socket.on(Socket.EVENT_CONNECT, objects -> socket.emit("room", "hi")).on("roomBack", args -> values.offer(args)); socket.connect(); - Object[] args = (Object[])values.take(); + Object[] args = (Object[]) values.take(); + args = stripOffset(args); assertThat(args.length, is(1)); - assertThat((String)args[0], is("hi")); + assertThat((String) args[0], is("hi")); socket.disconnect(); } @@ -269,24 +255,24 @@ public void pollingHeaders() throws InterruptedException { final BlockingQueue values = new LinkedBlockingQueue<>(); IO.Options opts = createOptions(); - opts.transports = new String[] {Polling.NAME}; + opts.transports = new String[]{Polling.NAME}; socket = client(opts); socket.io().on(Manager.EVENT_TRANSPORT, new Emitter.Listener() { @Override public void call(Object... args) { - Transport transport = (Transport)args[0]; + Transport transport = (Transport) args[0]; transport.on(Transport.EVENT_REQUEST_HEADERS, new Emitter.Listener() { @Override public void call(Object... args) { @SuppressWarnings("unchecked") - Map> headers = (Map>)args[0]; + Map> headers = (Map>) args[0]; headers.put("X-SocketIO", Arrays.asList("hi")); } }).on(Transport.EVENT_RESPONSE_HEADERS, new Emitter.Listener() { @Override public void call(Object... args) { @SuppressWarnings("unchecked") - Map> headers = (Map>)args[0]; + Map> headers = (Map>) args[0]; List value = headers.get("X-SocketIO"); values.offer(value != null ? value.get(0) : ""); } @@ -295,7 +281,7 @@ public void call(Object... args) { }); socket.open(); - assertThat((String)values.take(), is("hi")); + assertThat((String) values.take(), is("hi")); socket.close(); } @@ -304,24 +290,24 @@ public void websocketHandshakeHeaders() throws InterruptedException { final BlockingQueue values = new LinkedBlockingQueue<>(); IO.Options opts = createOptions(); - opts.transports = new String[] {WebSocket.NAME}; + opts.transports = new String[]{WebSocket.NAME}; socket = client(opts); socket.io().on(Manager.EVENT_TRANSPORT, new Emitter.Listener() { @Override public void call(Object... args) { - Transport transport = (Transport)args[0]; + Transport transport = (Transport) args[0]; transport.on(Transport.EVENT_REQUEST_HEADERS, new Emitter.Listener() { @Override public void call(Object... args) { @SuppressWarnings("unchecked") - Map> headers = (Map>)args[0]; + Map> headers = (Map>) args[0]; headers.put("X-SocketIO", Arrays.asList("hi")); } }).on(Transport.EVENT_RESPONSE_HEADERS, new Emitter.Listener() { @Override public void call(Object... args) { @SuppressWarnings("unchecked") - Map> headers = (Map>)args[0]; + Map> headers = (Map>) args[0]; List value = headers.get("X-SocketIO"); values.offer(value != null ? value.get(0) : ""); } @@ -330,7 +316,7 @@ public void call(Object... args) { }); socket.open(); - assertThat((String)values.take(), is("hi")); + assertThat((String) values.take(), is("hi")); socket.close(); } @@ -351,6 +337,10 @@ public void call(Object... args) { } }); socket.connect(); - assertThat((String)values.take(), is("disconnected")); + assertThat((String) values.take(), is("disconnected")); + } + + private Object[] stripOffset(Object[] args) { + return args.length > 0 && args[args.length - 1] instanceof String ? Arrays.copyOf(args, args.length - 1) : args; } } diff --git a/src/test/java/io/socket/client/SocketTest.java b/src/test/java/io/socket/client/SocketTest.java index 3db641db..5e3847f4 100644 --- a/src/test/java/io/socket/client/SocketTest.java +++ b/src/test/java/io/socket/client/SocketTest.java @@ -201,7 +201,9 @@ public void shouldAcceptAnAuthOption() throws InterruptedException, JSONExceptio final BlockingQueue values = new LinkedBlockingQueue<>(); IO.Options opts = new IO.Options(); - opts.auth = singletonMap("token", "abcd"); + JSONObject auth = new JSONObject(); + auth.put("token", "abcd"); + opts.auth = auth; socket = client("/abc", opts); socket.on("handshake", new Emitter.Listener() { @Override @@ -402,20 +404,14 @@ public void shouldCallCatchAllListenerForIncomingPackets() throws InterruptedExc socket = client(); - socket.on("message", new Emitter.Listener() { - @Override - public void call(Object... args) { - socket.emit("echo", 1, "2", new byte[] { 3 }); + socket.on("message", args -> { + socket.emit("echo", 1, "2", new byte[] { 3 }); - socket.onAnyIncoming(new Emitter.Listener() { - @Override - public void call(Object... args) { - for (Object arg : args) { - values.offer(arg); - } - } - }); - } + socket.onAnyIncoming(args1 -> { + for (Object arg : args1) { + values.offer(arg); + } + }); }); socket.connect(); @@ -434,12 +430,9 @@ public void shouldCallCatchAllListenerForOutgoingPackets() throws InterruptedExc socket.emit("echo", 1, "2", new byte[] { 3 }); - socket.onAnyOutgoing(new Emitter.Listener() { - @Override - public void call(Object... args) { - for (Object arg : args) { - values.offer(arg); - } + socket.onAnyOutgoing(args -> { + for (Object arg : args) { + values.offer(arg); } }); diff --git a/src/test/java/io/socket/parser/Helpers.java b/src/test/java/io/socket/parser/Helpers.java index ba90e807..596d98fe 100644 --- a/src/test/java/io/socket/parser/Helpers.java +++ b/src/test/java/io/socket/parser/Helpers.java @@ -11,7 +11,7 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -@RunWith(JUnit4.class) +//@RunWith(JUnit4.class) public class Helpers { private static Parser.Encoder encoder = new IOParser.Encoder(); diff --git a/src/test/resources/package-lock.json b/src/test/resources/package-lock.json index 929d9591..ee4f3eb9 100644 --- a/src/test/resources/package-lock.json +++ b/src/test/resources/package-lock.json @@ -1,163 +1,248 @@ { + "name": "resources", + "lockfileVersion": 3, "requires": true, - "lockfileVersion": 1, - "dependencies": { - "@types/component-emitter": { - "version": "1.2.10", - "resolved": "https://registry.npmjs.org/@types/component-emitter/-/component-emitter-1.2.10.tgz", - "integrity": "sha512-bsjleuRKWmGqajMerkzox19aGbscQX5rmmvvXl3wlIp5gMG1HgkiwPxsN5p070fBDKTNSPgojVbuY1+HWMbFhg==" - }, - "@types/cookie": { - "version": "0.4.0", - "resolved": "https://registry.npmjs.org/@types/cookie/-/cookie-0.4.0.tgz", - "integrity": "sha512-y7mImlc/rNkvCRmg8gC3/lj87S7pTUIJ6QGjwHR9WQJcFs+ZMTOaoPrkdFA/YdbuqVEmEbb5RdhVxMkAcgOnpg==" - }, - "@types/cors": { - "version": "2.8.9", - "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.9.tgz", - "integrity": "sha512-zurD1ibz21BRlAOIKP8yhrxlqKx6L9VCwkB5kMiP6nZAhoF5MvC7qS1qPA7nRcr1GJolfkQC7/EAL4hdYejLtg==" - }, - "@types/node": { - "version": "14.14.12", - "resolved": "https://registry.npmjs.org/@types/node/-/node-14.14.12.tgz", - "integrity": "sha512-ASH8OPHMNlkdjrEdmoILmzFfsJICvhBsFfAum4aKZ/9U4B6M6tTmTPh+f3ttWdD74CEGV5XvXWkbyfSdXaTd7g==" - }, - "accepts": { - "version": "1.3.7", - "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.7.tgz", - "integrity": "sha512-Il80Qs2WjYlJIBNzNkK6KYqlVMTbZLXgHx2oT0pU/fjRHyEp+PEfEPY0R3WCwAGVOtauxh1hOxNgIf5bv7dQpA==", - "requires": { - "mime-types": "~2.1.24", - "negotiator": "0.6.2" - } - }, - "base64-arraybuffer": { - "version": "0.1.4", - "resolved": "https://registry.npmjs.org/base64-arraybuffer/-/base64-arraybuffer-0.1.4.tgz", - "integrity": "sha1-mBjHngWbE1X5fgQooBfIOOkLqBI=" - }, - "base64id": { + "packages": { + "": { + "dependencies": { + "socket.io": "^4.8.1" + } + }, + "node_modules/@socket.io/component-emitter": { + "version": "3.1.2", + "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.2.tgz", + "integrity": "sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==", + "license": "MIT" + }, + "node_modules/@types/cors": { + "version": "2.8.19", + "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.19.tgz", + "integrity": "sha512-mFNylyeyqN93lfe/9CSxOGREz8cpzAhH+E93xJ4xWQf62V8sQ/24reV2nyzUWM6H6Xji+GGHpkbLe7pVoUEskg==", + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, + "node_modules/@types/node": { + "version": "24.10.1", + "resolved": "https://registry.npmjs.org/@types/node/-/node-24.10.1.tgz", + "integrity": "sha512-GNWcUTRBgIRJD5zj+Tq0fKOJ5XZajIiBroOF0yvj2bSU1WvNdYS/dn9UxwsujGW4JX06dnHyjV2y9rRaybH0iQ==", + "license": "MIT", + "dependencies": { + "undici-types": "~7.16.0" + } + }, + "node_modules/accepts": { + "version": "1.3.8", + "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.8.tgz", + "integrity": "sha512-PYAthTa2m2VKxuvSD3DPC/Gy+U+sOA1LAuT8mkmRuvw+NACSaeXEQ+NHcVF7rONl6qcaxV3Uuemwawk+7+SJLw==", + "license": "MIT", + "dependencies": { + "mime-types": "~2.1.34", + "negotiator": "0.6.3" + }, + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/base64id": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/base64id/-/base64id-2.0.0.tgz", - "integrity": "sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog==" - }, - "component-emitter": { - "version": "1.3.0", - "resolved": "https://registry.npmjs.org/component-emitter/-/component-emitter-1.3.0.tgz", - "integrity": "sha512-Rd3se6QB+sO1TwqZjscQrurpEPIfO0/yYnSin6Q/rD3mOutHvUrCAhJub3r90uNb+SESBuE0QYoB90YdfatsRg==" + "integrity": "sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog==", + "license": "MIT", + "engines": { + "node": "^4.5.0 || >= 5.9" + } }, - "cookie": { - "version": "0.4.1", - "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.4.1.tgz", - "integrity": "sha512-ZwrFkGJxUR3EIoXtO+yVE69Eb7KlixbaeAWfBQB9vVsNn/o+Yw69gBWSSDK825hQNdN+wF8zELf3dFNl/kxkUA==" + "node_modules/cookie": { + "version": "0.7.2", + "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.7.2.tgz", + "integrity": "sha512-yki5XnKuf750l50uGTllt6kKILY4nQ1eNIQatoXEByZ5dWgnKqbnqmTrBE5B4N7lrMJKQ2ytWMiTO2o0v6Ew/w==", + "license": "MIT", + "engines": { + "node": ">= 0.6" + } }, - "cors": { + "node_modules/cors": { "version": "2.8.5", "resolved": "https://registry.npmjs.org/cors/-/cors-2.8.5.tgz", "integrity": "sha512-KIHbLJqu73RGr/hnbrO9uBeixNGuvSQjul/jdFvS/KFSIH1hWVd1ng7zOHx+YrEfInLG7q4n6GHQ9cDtxv/P6g==", - "requires": { + "license": "MIT", + "dependencies": { "object-assign": "^4", "vary": "^1" + }, + "engines": { + "node": ">= 0.10" } }, - "debug": { - "version": "4.1.1", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.1.1.tgz", - "integrity": "sha512-pYAIzeRo8J6KPEaJ0VWOh5Pzkbw/RetuzehGM7QRRX5he4fPHx2rdKMB256ehJCkX+XRQm16eZLqLNS8RSZXZw==", - "requires": { - "ms": "^2.1.1" + "node_modules/debug": { + "version": "4.3.7", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz", + "integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==", + "license": "MIT", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } } }, - "engine.io": { - "version": "4.0.5", - "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-4.0.5.tgz", - "integrity": "sha512-Ri+whTNr2PKklxQkfbGjwEo+kCBUM4Qxk4wtLqLrhH+b1up2NFL9g9pjYWiCV/oazwB0rArnvF/ZmZN2ab5Hpg==", - "requires": { + "node_modules/engine.io": { + "version": "6.6.4", + "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.6.4.tgz", + "integrity": "sha512-ZCkIjSYNDyGn0R6ewHDtXgns/Zre/NT6Agvq1/WobF7JXgFff4SeDroKiCO3fNJreU9YG429Sc81o4w5ok/W5g==", + "license": "MIT", + "dependencies": { + "@types/cors": "^2.8.12", + "@types/node": ">=10.0.0", "accepts": "~1.3.4", "base64id": "2.0.0", - "cookie": "~0.4.1", + "cookie": "~0.7.2", "cors": "~2.8.5", - "debug": "~4.1.0", - "engine.io-parser": "~4.0.0", - "ws": "^7.1.2" + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.17.1" + }, + "engines": { + "node": ">=10.2.0" } }, - "engine.io-parser": { - "version": "4.0.2", - "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-4.0.2.tgz", - "integrity": "sha512-sHfEQv6nmtJrq6TKuIz5kyEKH/qSdK56H/A+7DnAuUPWosnIZAS2NHNcPLmyjtY3cGS/MqJdZbUjW97JU72iYg==", - "requires": { - "base64-arraybuffer": "0.1.4" + "node_modules/engine.io-parser": { + "version": "5.2.3", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.3.tgz", + "integrity": "sha512-HqD3yTBfnBxIrbnM1DoD6Pcq8NECnh8d4As1Qgh0z5Gg3jRRIqijury0CL3ghu/edArpUYiYqQiDUQBIs4np3Q==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" } }, - "mime-db": { - "version": "1.44.0", - "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.44.0.tgz", - "integrity": "sha512-/NOTfLrsPBVeH7YtFPgsVWveuL+4SjjYxaQ1xtM1KMFj7HdxlBlxeyNLzhyJVx7r4rZGJAZ/6lkKCitSc/Nmpg==" + "node_modules/mime-db": { + "version": "1.52.0", + "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz", + "integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==", + "license": "MIT", + "engines": { + "node": ">= 0.6" + } }, - "mime-types": { - "version": "2.1.27", - "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.27.tgz", - "integrity": "sha512-JIhqnCasI9yD+SsmkquHBxTSEuZdQX5BuQnS2Vc7puQQQ+8yiP5AY5uWhpdv4YL4VM5c6iliiYWPgJ/nJQLp7w==", - "requires": { - "mime-db": "1.44.0" + "node_modules/mime-types": { + "version": "2.1.35", + "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz", + "integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==", + "license": "MIT", + "dependencies": { + "mime-db": "1.52.0" + }, + "engines": { + "node": ">= 0.6" } }, - "ms": { + "node_modules/ms": { "version": "2.1.3", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", - "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" - }, - "negotiator": { - "version": "0.6.2", - "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.2.tgz", - "integrity": "sha512-hZXc7K2e+PgeI1eDBe/10Ard4ekbfrrqG8Ep+8Jmf4JID2bNg7NvCPOZN+kfF574pFQI7mum2AUqDidoKqcTOw==" + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", + "license": "MIT" + }, + "node_modules/negotiator": { + "version": "0.6.3", + "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.3.tgz", + "integrity": "sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==", + "license": "MIT", + "engines": { + "node": ">= 0.6" + } }, - "object-assign": { + "node_modules/object-assign": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/object-assign/-/object-assign-4.1.1.tgz", - "integrity": "sha1-IQmtx5ZYh8/AXLvUQsrIv7s2CGM=" - }, - "socket.io": { - "version": "3.0.4", - "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-3.0.4.tgz", - "integrity": "sha512-Vj1jUoO75WGc9txWd311ZJJqS9Dr8QtNJJ7gk2r7dcM/yGe9sit7qOijQl3GAwhpBOz/W8CwkD7R6yob07nLbA==", - "requires": { - "@types/cookie": "^0.4.0", - "@types/cors": "^2.8.8", - "@types/node": "^14.14.7", + "integrity": "sha512-rJgTQnkUnH1sFw8yT6VSU3zD3sWmu6sZhIseY8VX+GRu3P6F7Fu+JNDoXfklElbLJSnc3FUQHVe4cU5hj+BcUg==", + "license": "MIT", + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/socket.io": { + "version": "4.8.1", + "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.8.1.tgz", + "integrity": "sha512-oZ7iUCxph8WYRHHcjBEc9unw3adt5CmSNlppj/5Q4k2RIrhl8Z5yY2Xr4j9zj0+wzVZ0bxmYoGSzKJnRl6A4yg==", + "license": "MIT", + "dependencies": { "accepts": "~1.3.4", "base64id": "~2.0.0", - "debug": "~4.1.0", - "engine.io": "~4.0.0", - "socket.io-adapter": "~2.0.3", - "socket.io-parser": "~4.0.1" + "cors": "~2.8.5", + "debug": "~4.3.2", + "engine.io": "~6.6.0", + "socket.io-adapter": "~2.5.2", + "socket.io-parser": "~4.2.4" + }, + "engines": { + "node": ">=10.2.0" } }, - "socket.io-adapter": { - "version": "2.0.3", - "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.0.3.tgz", - "integrity": "sha512-2wo4EXgxOGSFueqvHAdnmi5JLZzWqMArjuP4nqC26AtLh5PoCPsaRbRdah2xhcwTAMooZfjYiNVNkkmmSMaxOQ==" + "node_modules/socket.io-adapter": { + "version": "2.5.5", + "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.5.tgz", + "integrity": "sha512-eLDQas5dzPgOWCk9GuuJC2lBqItuhKI4uxGgo9aIV7MYbk2h9Q6uULEh8WBzThoI7l+qU9Ast9fVUmkqPP9wYg==", + "license": "MIT", + "dependencies": { + "debug": "~4.3.4", + "ws": "~8.17.1" + } }, - "socket.io-parser": { - "version": "4.0.2", - "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.0.2.tgz", - "integrity": "sha512-Bs3IYHDivwf+bAAuW/8xwJgIiBNtlvnjYRc4PbXgniLmcP1BrakBoq/QhO24rgtgW7VZ7uAaswRGxutUnlAK7g==", - "requires": { - "@types/component-emitter": "^1.2.10", - "component-emitter": "~1.3.0", - "debug": "~4.1.0" + "node_modules/socket.io-parser": { + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", + "integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==", + "license": "MIT", + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1" + }, + "engines": { + "node": ">=10.0.0" } }, - "vary": { + "node_modules/undici-types": { + "version": "7.16.0", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.16.0.tgz", + "integrity": "sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw==", + "license": "MIT" + }, + "node_modules/vary": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", - "integrity": "sha1-IpnwLG3tMNSllhsLn3RSShj2NPw=" + "integrity": "sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==", + "license": "MIT", + "engines": { + "node": ">= 0.8" + } }, - "ws": { - "version": "7.4.1", - "resolved": "https://registry.npmjs.org/ws/-/ws-7.4.1.tgz", - "integrity": "sha512-pTsP8UAfhy3sk1lSk/O/s4tjD0CRwvMnzvwr4OKGX7ZvqZtUyx4KIJB5JWbkykPoc55tixMGgTNoh3k4FkNGFQ==" + "node_modules/ws": { + "version": "8.17.1", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz", + "integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } } } } diff --git a/src/test/resources/package.json b/src/test/resources/package.json index 7685423e..7aa8e921 100644 --- a/src/test/resources/package.json +++ b/src/test/resources/package.json @@ -1,6 +1,6 @@ { "private": true, "dependencies": { - "socket.io": "^3.0.4" + "socket.io": "^4.8.1" } } diff --git a/src/test/resources/server.js b/src/test/resources/server.js index 72b02504..cfad86fb 100644 --- a/src/test/resources/server.js +++ b/src/test/resources/server.js @@ -12,7 +12,11 @@ if (process.env.SSL) { var io = require('socket.io')(server, { pingInterval: 2000, - wsEngine: 'ws' + //wsEngine: 'ws', + connectionStateRecovery: { + maxDisconnectionDuration: 2 * 60 * 1000, + skipMiddlewares: true, + } }); var port = process.env.PORT || 3000; var nsp = process.argv[2] || '/'; @@ -21,8 +25,13 @@ var slice = Array.prototype.slice; const fooNsp = io.of('/foo'); fooNsp.on('connection', (socket) => { - socket.on('room', (...args) => { - fooNsp.to(socket.id).emit.apply(fooNsp, ['roomBack'].concat(args)); + socket.on('broadcast', function(data) { + var args = slice.call(arguments); + fooNsp.emit('broadcastBack', ...args); + }); + + socket.on('room', (arg) => { + fooNsp.to(socket.id).emit("roomBack", arg); }); }); @@ -49,6 +58,15 @@ io.of("/no").use((socket, next) => { }); io.of(nsp).on('connection', function(socket) { + //console.log('=== Handshake Debug ==='); + //console.log('Socket ID:', socket.id); + //console.log('Auth:', socket.handshake.auth); + //console.log('Query:', socket.handshake.query); + //console.log('Headers:', socket.handshake.headers); + //console.log('Full handshake:', JSON.stringify(socket.handshake, null, 2)); + //console.log('Recovered:', socket.recovered); + //console.log('======================='); + socket.send('hello client'); socket.on('message', function() { @@ -91,7 +109,7 @@ io.of(nsp).on('connection', function(socket) { socket.on('broadcast', function(data) { var args = slice.call(arguments); - socket.broadcast.emit.apply(socket, ['broadcastBack'].concat(args)); + io.emit('broadcastBack', ...args); }); socket.on('room', (arg) => { @@ -113,6 +131,16 @@ io.of(nsp).on('connection', function(socket) { socket.on('getHandshake', function(cb) { cb(socket.handshake); }); + + socket.on('startBufferTest', () => { + console.log('Starting buffer test scenario'); + + // Send message after 1.5 seconds (while client will be disconnected) + setTimeout(() => { + console.log('Sending buffered message with offset'); + socket.emit('message', { text: 'buffered-message' }); + }, 1500); + }); }); From 279f9dd549981e58ff6c22ee3a551af1beaab9e0 Mon Sep 17 00:00:00 2001 From: RobinPcrd Date: Wed, 3 Dec 2025 16:12:20 +0100 Subject: [PATCH 02/11] fix --- src/main/java/io/socket/client/Socket.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/io/socket/client/Socket.java b/src/main/java/io/socket/client/Socket.java index cf336315..b3ce7d50 100644 --- a/src/main/java/io/socket/client/Socket.java +++ b/src/main/java/io/socket/client/Socket.java @@ -4,7 +4,6 @@ import io.socket.parser.Packet; import io.socket.parser.Parser; import io.socket.thread.EventThread; -import org.jetbrains.annotations.Nullable; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -576,7 +575,6 @@ public Socket offAnyOutgoing(Listener fn) { return this; } - @Nullable public String getLastOffset() { return this._lastOffset; } From 102ab094bc015e3d1ef2b73135933f18725c3ec0 Mon Sep 17 00:00:00 2001 From: RobinPcrd Date: Wed, 3 Dec 2025 16:55:14 +0100 Subject: [PATCH 03/11] chore: bump version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 2e35df48..3d8fb065 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 io.socket socket.io-client - 2.1.52-SNAPSHOT + 2.2.0-SNAPSHOT jar socket.io-client Socket.IO Client Library for Java From bc1946dbe0fa6b36840bf0acbc6a7fbf44a80e95 Mon Sep 17 00:00:00 2001 From: RobinPcrd Date: Wed, 3 Dec 2025 16:58:27 +0100 Subject: [PATCH 04/11] fix(test): disable recovery on demand for a specific test --- .../java/io/socket/client/ConnectionTest.java | 1 - .../java/io/socket/client/SocketTest.java | 30 ++++++++++++------- src/test/resources/server.js | 8 +++++ 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/src/test/java/io/socket/client/ConnectionTest.java b/src/test/java/io/socket/client/ConnectionTest.java index f99d3796..2f3dc2bf 100644 --- a/src/test/java/io/socket/client/ConnectionTest.java +++ b/src/test/java/io/socket/client/ConnectionTest.java @@ -942,7 +942,6 @@ public void run() { if (args[0] instanceof JSONObject) { JSONObject data = (JSONObject) args[0]; String text = data.optString("text"); - System.out.println("Received message: " + text); events.offer(text); } }); diff --git a/src/test/java/io/socket/client/SocketTest.java b/src/test/java/io/socket/client/SocketTest.java index 5e3847f4..210e5f81 100644 --- a/src/test/java/io/socket/client/SocketTest.java +++ b/src/test/java/io/socket/client/SocketTest.java @@ -114,7 +114,17 @@ public void run() { @Test(timeout = TIMEOUT) public void shouldChangeSocketIdUponReconnection() throws InterruptedException { final BlockingQueue values = new LinkedBlockingQueue<>(); - socket = client(); + + IO.Options opts = createOptions(); + opts.forceNew = true; + try { + JSONObject auth = new JSONObject(); + auth.put("noRecovery", true); + opts.auth = auth; + } catch (JSONException ignored) { + } + + socket = client(opts); socket.once(Socket.EVENT_CONNECT, new Emitter.Listener() { @Override public void call(Object... objects) { @@ -160,7 +170,7 @@ public void shouldAcceptAQueryStringOnDefaultNamespace() throws InterruptedExcep socket.emit("getHandshake", new Ack() { @Override public void call(Object... args) { - JSONObject handshake = (JSONObject)args[0]; + JSONObject handshake = (JSONObject) args[0]; values.offer(Optional.ofNullable(handshake)); } }); @@ -181,7 +191,7 @@ public void shouldAcceptAQueryString() throws InterruptedException, JSONExceptio socket.on("handshake", new Emitter.Listener() { @Override public void call(Object... args) { - JSONObject handshake = (JSONObject)args[0]; + JSONObject handshake = (JSONObject) args[0]; values.offer(Optional.ofNullable(handshake)); } }); @@ -208,7 +218,7 @@ public void shouldAcceptAnAuthOption() throws InterruptedException, JSONExceptio socket.on("handshake", new Emitter.Listener() { @Override public void call(Object... args) { - JSONObject handshake = (JSONObject)args[0]; + JSONObject handshake = (JSONObject) args[0]; values.offer(Optional.ofNullable(handshake)); } }); @@ -375,7 +385,7 @@ public void shouldNotTimeoutWhenTheServerDoesAcknowledgeTheEvent() throws Interr socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { @Override public void call(Object... args) { - socket.emit("ack", 1, "2", new byte[] { 3 }, new AckWithTimeout(200) { + socket.emit("ack", 1, "2", new byte[]{3}, new AckWithTimeout(200) { @Override public void onTimeout() { fail(); @@ -395,7 +405,7 @@ public void onSuccess(Object... args) { assertThat((Integer) values.take(), is(1)); assertThat((String) values.take(), is("2")); - assertThat((byte[]) values.take(), is(new byte[] { 3 })); + assertThat((byte[]) values.take(), is(new byte[]{3})); } @Test(timeout = TIMEOUT) @@ -405,7 +415,7 @@ public void shouldCallCatchAllListenerForIncomingPackets() throws InterruptedExc socket = client(); socket.on("message", args -> { - socket.emit("echo", 1, "2", new byte[] { 3 }); + socket.emit("echo", 1, "2", new byte[]{3}); socket.onAnyIncoming(args1 -> { for (Object arg : args1) { @@ -419,7 +429,7 @@ public void shouldCallCatchAllListenerForIncomingPackets() throws InterruptedExc assertThat((String) values.take(), is("echoBack")); assertThat((Integer) values.take(), is(1)); assertThat((String) values.take(), is("2")); - assertThat((byte[]) values.take(), is(new byte[] { 3 })); + assertThat((byte[]) values.take(), is(new byte[]{3})); } @Test(timeout = TIMEOUT) @@ -428,7 +438,7 @@ public void shouldCallCatchAllListenerForOutgoingPackets() throws InterruptedExc socket = client(); - socket.emit("echo", 1, "2", new byte[] { 3 }); + socket.emit("echo", 1, "2", new byte[]{3}); socket.onAnyOutgoing(args -> { for (Object arg : args) { @@ -441,6 +451,6 @@ public void shouldCallCatchAllListenerForOutgoingPackets() throws InterruptedExc assertThat((String) values.take(), is("echo")); assertThat((Integer) values.take(), is(1)); assertThat((String) values.take(), is("2")); - assertThat((byte[]) values.take(), is(new byte[] { 3 })); + assertThat((byte[]) values.take(), is(new byte[]{3})); } } diff --git a/src/test/resources/server.js b/src/test/resources/server.js index cfad86fb..333c5dfd 100644 --- a/src/test/resources/server.js +++ b/src/test/resources/server.js @@ -22,6 +22,14 @@ var port = process.env.PORT || 3000; var nsp = process.argv[2] || '/'; var slice = Array.prototype.slice; +// Disable recovery on demand +io.use((socket, next) => { + if (socket.handshake.auth?.noRecovery === true) { + socket.handshake.auth._pid = 'invalid-' + Date.now(); + } + next(); +}); + const fooNsp = io.of('/foo'); fooNsp.on('connection', (socket) => { From 639abf36394a9379e23ab130fb2854725d6d59bb Mon Sep 17 00:00:00 2001 From: RobinPcrd Date: Thu, 4 Dec 2025 16:23:41 +0100 Subject: [PATCH 05/11] fix: tests --- .../java/io/socket/client/Connection.java | 111 ++++++++++++------ .../java/io/socket/client/SocketTest.java | 7 +- .../client/executions/ConnectionFailure.java | 3 +- src/test/resources/server.js | 8 -- src/test/resources/server_no_recovery.js | 31 +++++ 5 files changed, 106 insertions(+), 54 deletions(-) create mode 100644 src/test/resources/server_no_recovery.js diff --git a/src/test/java/io/socket/client/Connection.java b/src/test/java/io/socket/client/Connection.java index 7d7fd4ca..f270a31b 100644 --- a/src/test/java/io/socket/client/Connection.java +++ b/src/test/java/io/socket/client/Connection.java @@ -18,61 +18,88 @@ public abstract class Connection { final static int TIMEOUT = 7_000; final static int PORT = 3000; + final static int NO_RECOVERY_PORT = 3001; private Process serverProcess; + private Process noRecoveryServerProcess; private ExecutorService serverService; - private Future serverOutput; - private Future serverError; + private Future serverOutput; + private Future serverError; + private Future noRecoveryServerOutput; + private Future noRecoveryServerError; @Before public void startServer() throws IOException, InterruptedException { - logger.fine("Starting server ..."); + logger.fine("Starting servers..."); + // Start main server final CountDownLatch latch = new CountDownLatch(1); - serverProcess = Runtime.getRuntime().exec( - String.format("node src/test/resources/server.js %s", nsp()), createEnv()); + serverProcess = startServerProcess("node src/test/resources/server.js %s", PORT); serverService = Executors.newCachedThreadPool(); - serverOutput = serverService.submit(new Runnable() { - @Override - public void run() { - BufferedReader reader = new BufferedReader( - new InputStreamReader(serverProcess.getInputStream())); - String line; - try { - line = reader.readLine(); - latch.countDown(); - do { - logger.fine("SERVER OUT: " + line); - } while ((line = reader.readLine()) != null); - } catch (IOException e) { - logger.warning(e.getMessage()); - } + serverOutput = startServerOutput(serverProcess, "MAIN", latch); + serverError = startServerError(serverProcess, "MAIN"); + + // Start no-recovery server + final CountDownLatch noRecoveryLatch = new CountDownLatch(1); + noRecoveryServerProcess = startServerProcess("node src/test/resources/server_no_recovery.js %s", NO_RECOVERY_PORT); + noRecoveryServerOutput = startServerOutput(noRecoveryServerProcess, "NO_RECOVERY", noRecoveryLatch); + noRecoveryServerError = startServerError(noRecoveryServerProcess, "NO_RECOVERY"); + + // Wait for both servers to start + latch.await(3000, TimeUnit.MILLISECONDS); + noRecoveryLatch.await(3000, TimeUnit.MILLISECONDS); + } + + private Process startServerProcess(String script, int port) throws IOException { + return Runtime.getRuntime().exec(String.format(script, nsp()), createEnv(port)); + } + + private Future startServerOutput(Process process, String serverName, CountDownLatch latch) { + return serverService.submit(() -> { + BufferedReader reader = new BufferedReader( + new InputStreamReader(process.getInputStream())); + String line; + try { + line = reader.readLine(); + latch.countDown(); + do { + logger.fine(serverName + " SERVER OUT: " + line); + } while ((line = reader.readLine()) != null); + } catch (IOException e) { + logger.warning(e.getMessage()); } }); - serverError = serverService.submit(new Runnable() { - @Override - public void run() { - BufferedReader reader = new BufferedReader( - new InputStreamReader(serverProcess.getErrorStream())); - String line; - try { - while ((line = reader.readLine()) != null) { - logger.fine("SERVER ERR: " + line); - } - } catch (IOException e) { - logger.warning(e.getMessage()); + } + + private Future startServerError(Process process, String serverName) { + return serverService.submit(() -> { + BufferedReader reader = new BufferedReader( + new InputStreamReader(process.getErrorStream())); + String line; + try { + while ((line = reader.readLine()) != null) { + logger.fine(serverName + " SERVER ERR: " + line); } + } catch (IOException e) { + logger.warning(e.getMessage()); } }); - latch.await(3000, TimeUnit.MILLISECONDS); } @After public void stopServer() throws InterruptedException { - logger.fine("Stopping server ..."); + logger.fine("Stopping servers..."); + + // Stop main server serverProcess.destroy(); serverOutput.cancel(false); serverError.cancel(false); + + // Stop no-recovery server + noRecoveryServerProcess.destroy(); + noRecoveryServerOutput.cancel(false); + noRecoveryServerError.cancel(false); + serverService.shutdown(); serverService.awaitTermination(3000, TimeUnit.MILLISECONDS); } @@ -90,11 +117,16 @@ Socket client(IO.Options opts) { } Socket client(String path, IO.Options opts) { - return IO.socket(URI.create(uri() + path), opts); + int port = opts.port != -1 ? opts.port : PORT; + return IO.socket(URI.create(uri(port) + path), opts); } URI uri() { - return URI.create("http://localhost:" + PORT); + return uri(PORT); + } + + URI uri(int port) { + return URI.create("http://localhost:" + port); } String nsp() { @@ -108,9 +140,13 @@ IO.Options createOptions() { } String[] createEnv() { + return createEnv(PORT); + } + + String[] createEnv(int port) { Map env = new HashMap<>(System.getenv()); env.put("DEBUG", "socket.io:*"); - env.put("PORT", String.valueOf(PORT)); + env.put("PORT", String.valueOf(port)); String[] _env = new String[env.size()]; int i = 0; for (String key : env.keySet()) { @@ -118,6 +154,5 @@ String[] createEnv() { i++; } return _env; - } } diff --git a/src/test/java/io/socket/client/SocketTest.java b/src/test/java/io/socket/client/SocketTest.java index 210e5f81..88bd141f 100644 --- a/src/test/java/io/socket/client/SocketTest.java +++ b/src/test/java/io/socket/client/SocketTest.java @@ -117,12 +117,7 @@ public void shouldChangeSocketIdUponReconnection() throws InterruptedException { IO.Options opts = createOptions(); opts.forceNew = true; - try { - JSONObject auth = new JSONObject(); - auth.put("noRecovery", true); - opts.auth = auth; - } catch (JSONException ignored) { - } + opts.port = Connection.NO_RECOVERY_PORT; socket = client(opts); socket.once(Socket.EVENT_CONNECT, new Emitter.Listener() { diff --git a/src/test/java/io/socket/client/executions/ConnectionFailure.java b/src/test/java/io/socket/client/executions/ConnectionFailure.java index a4feb267..e818c3cf 100644 --- a/src/test/java/io/socket/client/executions/ConnectionFailure.java +++ b/src/test/java/io/socket/client/executions/ConnectionFailure.java @@ -10,8 +10,7 @@ public class ConnectionFailure { public static void main(String[] args) throws URISyntaxException { - int port = Integer.parseInt(System.getenv("PORT")); - port++; + int port = 60_000; IO.Options options = new IO.Options(); options.forceNew = true; options.reconnection = false; diff --git a/src/test/resources/server.js b/src/test/resources/server.js index 333c5dfd..cfad86fb 100644 --- a/src/test/resources/server.js +++ b/src/test/resources/server.js @@ -22,14 +22,6 @@ var port = process.env.PORT || 3000; var nsp = process.argv[2] || '/'; var slice = Array.prototype.slice; -// Disable recovery on demand -io.use((socket, next) => { - if (socket.handshake.auth?.noRecovery === true) { - socket.handshake.auth._pid = 'invalid-' + Date.now(); - } - next(); -}); - const fooNsp = io.of('/foo'); fooNsp.on('connection', (socket) => { diff --git a/src/test/resources/server_no_recovery.js b/src/test/resources/server_no_recovery.js new file mode 100644 index 00000000..4262eeb8 --- /dev/null +++ b/src/test/resources/server_no_recovery.js @@ -0,0 +1,31 @@ +var fs = require('fs'); + +var server; +if (process.env.SSL) { + server = require('https').createServer({ + key: fs.readFileSync(__dirname + '/key.pem'), + cert: fs.readFileSync(__dirname + '/cert.pem') + }); +} else { + server = require('http').createServer(); +} + +// Create server without connection state recovery +var io = require('socket.io')(server, { + pingInterval: 2000 +}); + +var port = process.env.PORT || 3001; // Different port to avoid conflicts +var nsp = process.argv[2] || '/'; + +server.listen(port, () => { + console.log(`Test server without recovery running on port ${port}`); +}); + +io.of(nsp).on('connection', (socket) => { + console.log(`New connection: ${socket.id}`); + + socket.on('disconnect', () => { + console.log(`Client disconnected: ${socket.id}`); + }); +}); \ No newline at end of file From f45445f99cac890298743c23480e7d95596bdce1 Mon Sep 17 00:00:00 2001 From: RobinPcrd Date: Thu, 4 Dec 2025 13:34:25 +0100 Subject: [PATCH 06/11] fix: replace lambda by anonymous classes --- pom.xml | 4 +- src/main/java/io/socket/client/Socket.java | 179 ++++++++------ .../java/io/socket/client/ConnectionTest.java | 223 ++++++++++-------- .../socket/client/ServerConnectionTest.java | 39 ++- .../java/io/socket/client/SocketTest.java | 29 ++- 5 files changed, 279 insertions(+), 195 deletions(-) diff --git a/pom.xml b/pom.xml index 3d8fb065..457cb917 100644 --- a/pom.xml +++ b/pom.xml @@ -238,8 +238,8 @@ - 1.8 - 1.8 + 1.7 + 1.7 -Xlint:unchecked diff --git a/src/main/java/io/socket/client/Socket.java b/src/main/java/io/socket/client/Socket.java index b3ce7d50..d398a367 100644 --- a/src/main/java/io/socket/client/Socket.java +++ b/src/main/java/io/socket/client/Socket.java @@ -84,19 +84,31 @@ private void subEvents() { final Manager io = Socket.this.io; Socket.this.subs = new LinkedList() {{ - add(On.on(io, Manager.EVENT_OPEN, args -> { - Socket.this.onopen(); + add(On.on(io, Manager.EVENT_OPEN, new Listener() { + @Override + public void call(Object... args) { + Socket.this.onopen(); + } })); - add(On.on(io, Manager.EVENT_PACKET, args -> { - Socket.this.onpacket((Packet) args[0]); + add(On.on(io, Manager.EVENT_PACKET, new Listener() { + @Override + public void call(Object... args) { + Socket.this.onpacket((Packet) args[0]); + } })); - add(On.on(io, Manager.EVENT_ERROR, args -> { - if (!Socket.this.connected) { - Socket.super.emit(EVENT_CONNECT_ERROR, args[0]); + add(On.on(io, Manager.EVENT_ERROR, new Listener() { + @Override + public void call(Object... args) { + if (!Socket.this.connected) { + Socket.super.emit(EVENT_CONNECT_ERROR, args[0]); + } } })); - add(On.on(io, Manager.EVENT_CLOSE, args -> { - Socket.this.onclose(args.length > 0 ? (String) args[0] : null); + add(On.on(io, Manager.EVENT_CLOSE, new Listener() { + @Override + public void call(Object... args) { + Socket.this.onclose(args.length > 0 ? (String) args[0] : null); + } })); }}; } @@ -133,7 +145,12 @@ public Socket connect() { * @return a reference to this object. */ public Socket send(final Object... args) { - EventThread.exec(() -> Socket.this.emit(EVENT_MESSAGE, args)); + EventThread.exec(new Runnable() { + @Override + public void run() { + Socket.this.emit(EVENT_MESSAGE, args); + } + }); return this; } @@ -150,23 +167,26 @@ public Emitter emit(final String event, final Object... args) { throw new RuntimeException("'" + event + "' is a reserved event name"); } - EventThread.exec(() -> { - Ack ack; - Object[] _args; - int lastIndex = args.length - 1; - - if (args.length > 0 && args[lastIndex] instanceof Ack) { - _args = new Object[lastIndex]; - for (int i = 0; i < lastIndex; i++) { - _args[i] = args[i]; + EventThread.exec(new Runnable() { + @Override + public void run() { + Ack ack; + Object[] _args; + int lastIndex = args.length - 1; + + if (args.length > 0 && args[lastIndex] instanceof Ack) { + _args = new Object[lastIndex]; + for (int i = 0; i < lastIndex; i++) { + _args[i] = args[i]; + } + ack = (Ack) args[lastIndex]; + } else { + _args = args; + ack = null; } - ack = (Ack) args[lastIndex]; - } else { - _args = args; - ack = null; - } - emit(event, _args, ack); + Socket.this.emit(event, _args, ack); + } }); return this; } @@ -180,52 +200,55 @@ public Emitter emit(final String event, final Object... args) { * @return a reference to this object. */ public Emitter emit(final String event, final Object[] args, final Ack ack) { - EventThread.exec(() -> { - JSONArray jsonArgs = new JSONArray(); - jsonArgs.put(event); + EventThread.exec(new Runnable() { + @Override + public void run() { + JSONArray jsonArgs = new JSONArray(); + jsonArgs.put(event); - if (args != null) { - for (Object arg : args) { - jsonArgs.put(arg); + if (args != null) { + for (Object arg : args) { + jsonArgs.put(arg); + } } - } - Packet packet = new Packet<>(Parser.EVENT, jsonArgs); + Packet packet = new Packet<>(Parser.EVENT, jsonArgs); - if (ack != null) { - final int ackId = Socket.this.ids; + if (ack != null) { + final int ackId = Socket.this.ids; - logger.fine(String.format("emitting packet with ack id %d", ackId)); + logger.fine(String.format("emitting packet with ack id %d", ackId)); - if (ack instanceof AckWithTimeout) { - final AckWithTimeout ackWithTimeout = (AckWithTimeout) ack; - ackWithTimeout.schedule(new TimerTask() { - @Override - public void run() { - // remove the ack from the map (to prevent an actual acknowledgement) - acks.remove(ackId); + if (ack instanceof AckWithTimeout) { + final AckWithTimeout ackWithTimeout = (AckWithTimeout) ack; + ackWithTimeout.schedule(new TimerTask() { + @Override + public void run() { + // remove the ack from the map (to prevent an actual acknowledgement) + acks.remove(ackId); - // remove the packet from the buffer (if applicable) - Iterator> iterator = sendBuffer.iterator(); - while (iterator.hasNext()) { - if (iterator.next().id == ackId) { - iterator.remove(); + // remove the packet from the buffer (if applicable) + Iterator> iterator = sendBuffer.iterator(); + while (iterator.hasNext()) { + if (iterator.next().id == ackId) { + iterator.remove(); + } } + + ackWithTimeout.onTimeout(); } + }); + } - ackWithTimeout.onTimeout(); - } - }); + Socket.this.acks.put(ackId, ack); + packet.id = ids++; } - Socket.this.acks.put(ackId, ack); - packet.id = ids++; - } - - if (Socket.this.connected) { - Socket.this.packet(packet); - } else { - Socket.this.sendBuffer.add(packet); + if (Socket.this.connected) { + Socket.this.packet(packet); + } else { + Socket.this.sendBuffer.add(packet); + } } }); return this; @@ -376,23 +399,31 @@ private void onevent(Packet packet) { private Ack ack(final int id) { final Socket self = this; - final boolean[] sent = new boolean[]{false}; - return args -> EventThread.exec(() -> { - if (sent[0]) return; - sent[0] = true; - if (logger.isLoggable(Level.FINE)) { - logger.fine(String.format("sending ack %s", args.length != 0 ? args : null)); - } + final boolean[] sent = new boolean[] {false}; + return new Ack() { + @Override + public void call(final Object... args) { + EventThread.exec(new Runnable() { + @Override + public void run() { + if (sent[0]) return; + sent[0] = true; + if (logger.isLoggable(Level.FINE)) { + logger.fine(String.format("sending ack %s", args.length != 0 ? args : null)); + } - JSONArray jsonArgs = new JSONArray(); - for (Object arg : args) { - jsonArgs.put(arg); - } + JSONArray jsonArgs = new JSONArray(); + for (Object arg : args) { + jsonArgs.put(arg); + } - Packet packet = new Packet<>(Parser.ACK, jsonArgs); - packet.id = id; - self.packet(packet); - }); + Packet packet = new Packet<>(Parser.ACK, jsonArgs); + packet.id = id; + self.packet(packet); + } + }); + } + }; } private void onack(Packet packet) { diff --git a/src/test/java/io/socket/client/ConnectionTest.java b/src/test/java/io/socket/client/ConnectionTest.java index 2f3dc2bf..ae0dcf26 100644 --- a/src/test/java/io/socket/client/ConnectionTest.java +++ b/src/test/java/io/socket/client/ConnectionTest.java @@ -73,28 +73,32 @@ public void startTwoConnectionsWithSamePathAndDifferentQuerystrings() throws Int @Test(timeout = TIMEOUT) public void workWithAcks() throws InterruptedException { final BlockingQueue values = new LinkedBlockingQueue<>(); - socket = client();socket.on(Socket.EVENT_CONNECT, objects -> { - socket.on("ack", args -> { - Ack fn = (Ack) args[0]; - JSONObject data = new JSONObject(); - try { - data.put("test", true); - } catch (JSONException e) { - throw new AssertionError(e); - } - fn.call(5, data); - }); - socket.on("ackBack", args -> { - JSONObject data = (JSONObject)args[1]; - try { - if ((Integer)args[0] == 5 && data.getBoolean("test")) { - values.offer("done"); + socket = client(); + socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... objects) { + socket.on("ack", args -> { + Ack fn = (Ack) args[0]; + JSONObject data = new JSONObject(); + try { + data.put("test", true); + } catch (JSONException e) { + throw new AssertionError(e); } - } catch (JSONException e) { - throw new AssertionError(e); - } - }); - socket.emit("callAck"); + fn.call(5, data); + }); + socket.on("ackBack", args -> { + JSONObject data = (JSONObject) args[1]; + try { + if ((Integer) args[0] == 5 && data.getBoolean("test")) { + values.offer("done"); + } + } catch (JSONException e) { + throw new AssertionError(e); + } + }); + socket.emit("callAck"); + } }); socket.connect(); values.take(); @@ -147,14 +151,14 @@ public void call(Object... args) { socket.on("ackBack", new Emitter.Listener() { @Override public void call(Object... args) { - byte[] data = (byte[])args[0]; + byte[] data = (byte[]) args[0]; values.offer(data); } }); } }); socket.connect(); - Assert.assertArrayEquals(buf, (byte[])values.take()); + Assert.assertArrayEquals(buf, (byte[]) values.take()); socket.close(); } @@ -171,13 +175,13 @@ public void call(Object... objects) { @Override public void call(Object... args) { - values.offer(args[0]); + values.offer(args[0]); } }); } }); socket.connect(); - Assert.assertArrayEquals(buf, (byte[])values.take()); + Assert.assertArrayEquals(buf, (byte[]) values.take()); socket.close(); } @@ -198,19 +202,19 @@ public void call(Object... args) { } }); socket.connect(); - assertThat((Boolean)values.take(), is(false)); + assertThat((Boolean) values.take(), is(false)); socket.close(); } @Test(timeout = TIMEOUT) public void receiveUTF8MultibyteCharacters() throws InterruptedException { final BlockingQueue values = new LinkedBlockingQueue<>(); - final String[] correct = new String[] { - "てすと", - "Я Б Г Д Ж Й", - "Ä ä Ü ü ß", - "utf8 — string", - "utf8 — string" + final String[] correct = new String[]{ + "てすと", + "Я Б Г Д Ж Й", + "Ä ä Ü ü ß", + "utf8 — string", + "utf8 — string" }; socket = client(); @@ -230,7 +234,7 @@ public void call(Object... args) { }); socket.connect(); for (String expected : correct) { - assertThat((String)values.take(), is(expected)); + assertThat((String) values.take(), is(expected)); } socket.close(); } @@ -293,9 +297,12 @@ public void call(Object... args) { public void reconnectByDefault() throws InterruptedException { final BlockingQueue values = new LinkedBlockingQueue<>(); socket = client(); - socket.io().on(Manager.EVENT_RECONNECT, objects -> { - socket.close(); - values.offer("done"); + socket.io().on(Manager.EVENT_RECONNECT, new Emitter.Listener() { + @Override + public void call(Object... objects) { + socket.close(); + values.offer("done"); + } }); socket.open(); new Timer().schedule(new TimerTask() { @@ -337,20 +344,29 @@ public void call(Object... args) { public void reconnectAutomaticallyAfterReconnectingManually() throws InterruptedException { final BlockingQueue values = new LinkedBlockingQueue<>(); socket = client(); - socket.once(Socket.EVENT_CONNECT, args -> { - socket.disconnect(); - }).once(Socket.EVENT_DISCONNECT, args -> { - socket.io().on(Manager.EVENT_RECONNECT, args1 -> { + socket.once(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { socket.disconnect(); - values.offer("done"); - }); - socket.connect(); - new Timer().schedule(new TimerTask() { - @Override - public void run() { - socket.io().engine.close(); - } - }, 500); + } + }).once(Socket.EVENT_DISCONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.io().on(Manager.EVENT_RECONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.disconnect(); + values.offer("done"); + } + }); + socket.connect(); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + socket.io().engine.close(); + } + }, 500); + } }); socket.connect(); values.take(); @@ -369,7 +385,7 @@ public void attemptReconnectsAfterAFailedReconnect() throws InterruptedException manager.once(Manager.EVENT_RECONNECT_FAILED, new Emitter.Listener() { @Override public void call(Object... args) { - final int[] reconnects = new int[] {0}; + final int[] reconnects = new int[]{0}; Emitter.Listener reconnectCb = new Emitter.Listener() { @Override public void call(Object... args) { @@ -388,7 +404,7 @@ public void call(Object... args) { } }); socket.connect(); - assertThat((Integer)values.take(), is(2)); + assertThat((Integer) values.take(), is(2)); socket.close(); manager.close(); } @@ -405,10 +421,10 @@ public void reconnectDelayShouldIncreaseEveryTime() throws InterruptedException final Manager manager = new Manager(uri(), opts); socket = manager.socket("/timeout"); - final int[] reconnects = new int[] {0}; - final boolean[] increasingDelay = new boolean[] {true}; - final long[] startTime = new long[] {0}; - final long[] prevDelay = new long[] {0}; + final int[] reconnects = new int[]{0}; + final boolean[] increasingDelay = new boolean[]{true}; + final long[] startTime = new long[]{0}; + final long[] prevDelay = new long[]{0}; manager.on(Manager.EVENT_ERROR, new Emitter.Listener() { @Override @@ -469,7 +485,7 @@ public void run() { } }); socket.connect(); - assertThat((Boolean)values.take(), is(true)); + assertThat((Boolean) values.take(), is(true)); } @Test(timeout = TIMEOUT) @@ -610,7 +626,7 @@ public void tryToReconnectTwiceAndFailWithIncorrectAddress() throws InterruptedE opts.reconnectionDelay = 10; final Manager manager = new Manager(URI.create("http://localhost:3940"), opts); socket = manager.socket("/asd"); - final int[] reconnects = new int[] {0}; + final int[] reconnects = new int[]{0}; Emitter.Listener cb = new Emitter.Listener() { @Override public void call(Object... objects) { @@ -628,7 +644,7 @@ public void call(Object... objects) { }); socket.open(); - assertThat((Integer)values.take(), is(2)); + assertThat((Integer) values.take(), is(2)); socket.close(); manager.close(); } @@ -643,7 +659,7 @@ public void tryToReconnectTwiceAndFailWithImmediateTimeout() throws InterruptedE opts.reconnectionDelay = 10; final Manager manager = new Manager(uri(), opts); - final int[] reconnects = new int[] {0}; + final int[] reconnects = new int[]{0}; Emitter.Listener reconnectCb = new Emitter.Listener() { @Override public void call(Object... objects) { @@ -663,7 +679,7 @@ public void call(Object... objects) { socket = manager.socket("/timeout"); socket.open(); - assertThat((Integer)values.take(), is(2)); + assertThat((Integer) values.take(), is(2)); } @Test(timeout = TIMEOUT) @@ -712,7 +728,7 @@ public void fireReconnectEventsOnSocket() throws InterruptedException { final Manager manager = new Manager(uri(), opts); socket = manager.socket("/timeout_socket"); - final int[] reconnects = new int[] {0}; + final int[] reconnects = new int[]{0}; Emitter.Listener reconnectCb = new Emitter.Listener() { @Override public void call(Object... args) { @@ -731,8 +747,8 @@ public void call(Object... objects) { } }); socket.open(); - assertThat((Integer)values.take(), is(reconnects[0])); - assertThat((Integer)values.take(), is(2)); + assertThat((Integer) values.take(), is(reconnects[0])); + assertThat((Integer) values.take(), is(2)); } @Test(timeout = TIMEOUT) @@ -747,7 +763,7 @@ public void fireReconnectingWithAttemptsNumberWhenReconnectingTwice() throws Int final Manager manager = new Manager(uri(), opts); socket = manager.socket("/timeout_socket"); - final int[] reconnects = new int[] {0}; + final int[] reconnects = new int[]{0}; Emitter.Listener reconnectCb = new Emitter.Listener() { @Override public void call(Object... args) { @@ -766,8 +782,8 @@ public void call(Object... objects) { } }); socket.open(); - assertThat((Integer)values.take(), is(reconnects[0])); - assertThat((Integer)values.take(), is(2)); + assertThat((Integer) values.take(), is(reconnects[0])); + assertThat((Integer) values.take(), is(2)); } @Test(timeout = TIMEOUT) @@ -816,7 +832,7 @@ public void call(Object... args) { socket.connect(); Object data = values.take(); assertThat(data, instanceOf(JSONObject.class)); - assertThat(((JSONObject)data).get("date"), instanceOf(String.class)); + assertThat(((JSONObject) data).get("date"), instanceOf(String.class)); socket.close(); } @@ -839,7 +855,7 @@ public void call(Object... args) { } }); socket.open(); - assertThat((byte[])values.take(), is(buf)); + assertThat((byte[]) values.take(), is(buf)); socket.close(); } @@ -869,9 +885,9 @@ public void call(Object... args) { } }); socket.open(); - JSONObject a = (JSONObject)values.take(); + JSONObject a = (JSONObject) values.take(); assertThat(a.getString("hello"), is("lol")); - assertThat((byte[])a.get("message"), is(buf)); + assertThat((byte[]) a.get("message"), is(buf)); assertThat(a.getString("goodbye"), is("gotcha")); socket.close(); } @@ -896,8 +912,8 @@ public void call(Object... args) { } }); socket.open(); - assertThat((byte[])values.take(), is(buf)); - assertThat((String)values.take(), is("please arrive second")); + assertThat((byte[]) values.take(), is(buf)); + assertThat((String) values.take(), is("please arrive second")); socket.close(); } @@ -906,43 +922,52 @@ public void shouldReceiveBufferedEventAfterReconnect() throws InterruptedExcepti final BlockingQueue events = new LinkedBlockingQueue<>(); socket = client(); - socket.on(Socket.EVENT_CONNECT, args -> { - if (!socket.isRecovered()) { - events.offer("first-connect"); - // Tell server to start buffering scenario - socket.emit("startBufferTest"); + socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { + if (!socket.isRecovered()) { + events.offer("first-connect"); + // Tell server to start buffering scenario + socket.emit("startBufferTest"); + + // Disconnect engine after 1 second + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + socket.io().engine.close(); + } + }, 1000); + } else { + // Reconnected with recovery + events.offer("reconnected"); + } + } + }); - // Disconnect engine after 1 second + socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { + events.offer("disconnected"); + // Reconnect after 2 seconds Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { - socket.io().engine.close(); + socket.connect(); } - }, 1000); - } else { - // Reconnected with recovery - events.offer("reconnected"); + }, 2000); } }); - socket.on(Socket.EVENT_DISCONNECT, args -> { - events.offer("disconnected"); - // Reconnect after 2 seconds - Timer timer = new Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - socket.connect(); + socket.on("message", new Emitter.Listener() { + @Override + public void call(Object... args) { + if (args[0] instanceof JSONObject) { + JSONObject data = (JSONObject) args[0]; + String text = data.optString("text"); + events.offer(text); } - }, 2000); - }); - - socket.on("message", args -> { - if (args[0] instanceof JSONObject) { - JSONObject data = (JSONObject) args[0]; - String text = data.optString("text"); - events.offer(text); } }); diff --git a/src/test/java/io/socket/client/ServerConnectionTest.java b/src/test/java/io/socket/client/ServerConnectionTest.java index ea22c322..59ea3fc5 100644 --- a/src/test/java/io/socket/client/ServerConnectionTest.java +++ b/src/test/java/io/socket/client/ServerConnectionTest.java @@ -215,15 +215,24 @@ public void broadcast() throws InterruptedException { final BlockingQueue values = new LinkedBlockingQueue<>(); socket = client(); - socket.on(Socket.EVENT_CONNECT, objects -> { - socket2 = client(); - - socket2.on(Socket.EVENT_CONNECT, objects1 -> { - socket2.emit("broadcast", "hi"); - }); - socket2.connect(); - }).on("broadcastBack", args -> { - values.offer(args); + socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... objects) { + socket2 = client(); + + socket2.on(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... objects) { + socket2.emit("broadcast", "hi"); + } + }); + socket2.connect(); + } + }).on("broadcastBack", new Emitter.Listener() { + @Override + public void call(Object... args) { + values.offer(args); + } }); socket.connect(); @@ -240,7 +249,17 @@ public void room() throws InterruptedException { final BlockingQueue values = new LinkedBlockingQueue<>(); socket = client(); - socket.on(Socket.EVENT_CONNECT, objects -> socket.emit("room", "hi")).on("roomBack", args -> values.offer(args)); + socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... objects) { + socket.emit("room", "hi"); + } + }).on("roomBack", new Emitter.Listener() { + @Override + public void call(Object... args) { + values.offer(args); + } + }); socket.connect(); Object[] args = (Object[]) values.take(); diff --git a/src/test/java/io/socket/client/SocketTest.java b/src/test/java/io/socket/client/SocketTest.java index 88bd141f..623373e7 100644 --- a/src/test/java/io/socket/client/SocketTest.java +++ b/src/test/java/io/socket/client/SocketTest.java @@ -409,14 +409,20 @@ public void shouldCallCatchAllListenerForIncomingPackets() throws InterruptedExc socket = client(); - socket.on("message", args -> { - socket.emit("echo", 1, "2", new byte[]{3}); + socket.on("message", new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.emit("echo", 1, "2", new byte[] { 3 }); - socket.onAnyIncoming(args1 -> { - for (Object arg : args1) { - values.offer(arg); - } - }); + socket.onAnyIncoming(new Emitter.Listener() { + @Override + public void call(Object... args) { + for (Object arg : args) { + values.offer(arg); + } + } + }); + } }); socket.connect(); @@ -435,9 +441,12 @@ public void shouldCallCatchAllListenerForOutgoingPackets() throws InterruptedExc socket.emit("echo", 1, "2", new byte[]{3}); - socket.onAnyOutgoing(args -> { - for (Object arg : args) { - values.offer(arg); + socket.onAnyOutgoing(new Emitter.Listener() { + @Override + public void call(Object... args) { + for (Object arg : args) { + values.offer(arg); + } } }); From 831267728602a9a2f9bae6881916aac78923dafb Mon Sep 17 00:00:00 2001 From: RobinPcrd Date: Thu, 4 Dec 2025 17:09:18 +0100 Subject: [PATCH 07/11] fix: replace lambda by anonymous classes --- src/main/java/io/socket/client/Socket.java | 13 +++-- .../java/io/socket/client/Connection.java | 48 +++++++++++-------- 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/src/main/java/io/socket/client/Socket.java b/src/main/java/io/socket/client/Socket.java index d398a367..d13f9477 100644 --- a/src/main/java/io/socket/client/Socket.java +++ b/src/main/java/io/socket/client/Socket.java @@ -121,12 +121,15 @@ public boolean isActive() { * Connects the socket. */ public Socket open() { - EventThread.exec(() -> { - if (Socket.this.connected || Socket.this.io.isReconnecting()) return; + EventThread.exec(new Runnable() { + @Override + public void run() { + if (Socket.this.connected || Socket.this.io.isReconnecting()) return; - Socket.this.subEvents(); - Socket.this.io.open(); // ensure open - if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen(); + Socket.this.subEvents(); + Socket.this.io.open(); // ensure open + if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen(); + } }); return this; } diff --git a/src/test/java/io/socket/client/Connection.java b/src/test/java/io/socket/client/Connection.java index f270a31b..6ec4f04b 100644 --- a/src/test/java/io/socket/client/Connection.java +++ b/src/test/java/io/socket/client/Connection.java @@ -55,33 +55,39 @@ private Process startServerProcess(String script, int port) throws IOException { } private Future startServerOutput(Process process, String serverName, CountDownLatch latch) { - return serverService.submit(() -> { - BufferedReader reader = new BufferedReader( - new InputStreamReader(process.getInputStream())); - String line; - try { - line = reader.readLine(); - latch.countDown(); - do { - logger.fine(serverName + " SERVER OUT: " + line); - } while ((line = reader.readLine()) != null); - } catch (IOException e) { - logger.warning(e.getMessage()); + return serverService.submit(new Runnable() { + @Override + public void run() { + BufferedReader reader = new BufferedReader( + new InputStreamReader(process.getInputStream())); + String line; + try { + line = reader.readLine(); + latch.countDown(); + do { + logger.fine(serverName + " SERVER OUT: " + line); + } while ((line = reader.readLine()) != null); + } catch (IOException e) { + logger.warning(e.getMessage()); + } } }); } private Future startServerError(Process process, String serverName) { - return serverService.submit(() -> { - BufferedReader reader = new BufferedReader( - new InputStreamReader(process.getErrorStream())); - String line; - try { - while ((line = reader.readLine()) != null) { - logger.fine(serverName + " SERVER ERR: " + line); + return serverService.submit(new Runnable() { + @Override + public void run() { + BufferedReader reader = new BufferedReader( + new InputStreamReader(process.getErrorStream())); + String line; + try { + while ((line = reader.readLine()) != null) { + logger.fine(serverName + " SERVER ERR: " + line); + } + } catch (IOException e) { + logger.warning(e.getMessage()); } - } catch (IOException e) { - logger.warning(e.getMessage()); } }); } From f4031f15f9051baeb66cc1f35287fbaa91dd3e90 Mon Sep 17 00:00:00 2001 From: RobinPcrd Date: Thu, 4 Dec 2025 17:11:06 +0100 Subject: [PATCH 08/11] fix: replace lambda by anonymous classes --- .../java/io/socket/client/ConnectionTest.java | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/src/test/java/io/socket/client/ConnectionTest.java b/src/test/java/io/socket/client/ConnectionTest.java index ae0dcf26..fd57b124 100644 --- a/src/test/java/io/socket/client/ConnectionTest.java +++ b/src/test/java/io/socket/client/ConnectionTest.java @@ -77,24 +77,30 @@ public void workWithAcks() throws InterruptedException { socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { @Override public void call(Object... objects) { - socket.on("ack", args -> { - Ack fn = (Ack) args[0]; - JSONObject data = new JSONObject(); - try { - data.put("test", true); - } catch (JSONException e) { - throw new AssertionError(e); + socket.on("ack", new Emitter.Listener() { + @Override + public void call(Object... args) { + Ack fn = (Ack) args[0]; + JSONObject data = new JSONObject(); + try { + data.put("test", true); + } catch (JSONException e) { + throw new AssertionError(e); + } + fn.call(5, data); } - fn.call(5, data); }); - socket.on("ackBack", args -> { - JSONObject data = (JSONObject) args[1]; - try { - if ((Integer) args[0] == 5 && data.getBoolean("test")) { - values.offer("done"); + socket.on("ackBack", new Emitter.Listener() { + @Override + public void call(Object... args) { + JSONObject data = (JSONObject)args[1]; + try { + if ((Integer)args[0] == 5 && data.getBoolean("test")) { + values.offer("done"); + } + } catch (JSONException e) { + throw new AssertionError(e); } - } catch (JSONException e) { - throw new AssertionError(e); } }); socket.emit("callAck"); From 227c88a5f16a9ff64e69d07005d2ea00338e36e1 Mon Sep 17 00:00:00 2001 From: RobinPcrd Date: Thu, 4 Dec 2025 17:13:05 +0100 Subject: [PATCH 09/11] fix --- src/test/java/io/socket/client/Connection.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/socket/client/Connection.java b/src/test/java/io/socket/client/Connection.java index 6ec4f04b..cc05a6e6 100644 --- a/src/test/java/io/socket/client/Connection.java +++ b/src/test/java/io/socket/client/Connection.java @@ -54,7 +54,7 @@ private Process startServerProcess(String script, int port) throws IOException { return Runtime.getRuntime().exec(String.format(script, nsp()), createEnv(port)); } - private Future startServerOutput(Process process, String serverName, CountDownLatch latch) { + private Future startServerOutput(final Process process, String serverName, CountDownLatch latch) { return serverService.submit(new Runnable() { @Override public void run() { @@ -74,7 +74,7 @@ public void run() { }); } - private Future startServerError(Process process, String serverName) { + private Future startServerError(final Process process, String serverName) { return serverService.submit(new Runnable() { @Override public void run() { From 750990ae1ae53f762ea113451dd13187f0927d3e Mon Sep 17 00:00:00 2001 From: RobinPcrd Date: Thu, 4 Dec 2025 17:14:08 +0100 Subject: [PATCH 10/11] fix --- src/test/java/io/socket/client/Connection.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/socket/client/Connection.java b/src/test/java/io/socket/client/Connection.java index cc05a6e6..5e185dce 100644 --- a/src/test/java/io/socket/client/Connection.java +++ b/src/test/java/io/socket/client/Connection.java @@ -54,7 +54,7 @@ private Process startServerProcess(String script, int port) throws IOException { return Runtime.getRuntime().exec(String.format(script, nsp()), createEnv(port)); } - private Future startServerOutput(final Process process, String serverName, CountDownLatch latch) { + private Future startServerOutput(final Process process, final String serverName, final CountDownLatch latch) { return serverService.submit(new Runnable() { @Override public void run() { @@ -74,7 +74,7 @@ public void run() { }); } - private Future startServerError(final Process process, String serverName) { + private Future startServerError(final Process process, final String serverName) { return serverService.submit(new Runnable() { @Override public void run() { From 64a4bb73221b3c8ab4d0ca9d1d9b6dcdaf4e258b Mon Sep 17 00:00:00 2001 From: RobinPcrd Date: Thu, 4 Dec 2025 19:33:52 +0100 Subject: [PATCH 11/11] feat: Socket.engine and Manager.close() public --- src/main/java/io/socket/client/Manager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/socket/client/Manager.java b/src/main/java/io/socket/client/Manager.java index a98882b7..34bdbf09 100644 --- a/src/main/java/io/socket/client/Manager.java +++ b/src/main/java/io/socket/client/Manager.java @@ -79,7 +79,7 @@ public class Manager extends Emitter { private final List packetBuffer = new ArrayList<>(); private final Queue subs = new LinkedList<>();; private final Options opts; - /*package*/ io.socket.engineio.client.Socket engine; + public io.socket.engineio.client.Socket engine; private final Parser.Encoder encoder; private final Parser.Decoder decoder; @@ -446,7 +446,7 @@ private void cleanup() { this.decoder.destroy(); } - /*package*/ void close() { + public void close() { logger.fine("disconnect"); this.skipReconnect = true; this.reconnecting = false;