From 9aedc2c8e148067c71235375d0b4454c21680ec0 Mon Sep 17 00:00:00 2001 From: shiyingchen Date: Tue, 28 Apr 2026 17:13:11 +0800 Subject: [PATCH 01/16] fix wps live tests Co-authored-by: Copilot --- .../azure/messaging/webpubsub/TestUtils.java | 4 ++-- sdk/webpubsub/test-resources.bicep | 24 +++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/sdk/webpubsub/azure-messaging-webpubsub/src/test/java/com/azure/messaging/webpubsub/TestUtils.java b/sdk/webpubsub/azure-messaging-webpubsub/src/test/java/com/azure/messaging/webpubsub/TestUtils.java index 07c9dc896c4d..debc6bd78ec2 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub/src/test/java/com/azure/messaging/webpubsub/TestUtils.java +++ b/sdk/webpubsub/azure-messaging-webpubsub/src/test/java/com/azure/messaging/webpubsub/TestUtils.java @@ -31,12 +31,12 @@ final class TestUtils { static String getSocketIOEndpoint() { return Configuration.getGlobalConfiguration() - .get("WEB_PUB_SUB_SOCKETIO_ENDPOINT", "http://testsocketioendpoint.webpubsubdev.azure.com"); + .get("WEB_PUB_SUB_SOCKETIO_ENDPOINT", "https://testsocketioendpoint.webpubsubdev.azure.com"); } static String getEndpoint() { return Configuration.getGlobalConfiguration() - .get("WEB_PUB_SUB_ENDPOINT", "http://testendpoint.webpubsubdev.azure.com"); + .get("WEB_PUB_SUB_ENDPOINT", "https://testendpoint.webpubsubdev.azure.com"); } static String getConnectionString() { diff --git a/sdk/webpubsub/test-resources.bicep b/sdk/webpubsub/test-resources.bicep index b511b6ca477d..8e82c58bedca 100644 --- a/sdk/webpubsub/test-resources.bicep +++ b/sdk/webpubsub/test-resources.bicep @@ -8,11 +8,34 @@ param testApplicationOid string param location string = resourceGroup().location var webPubSubName = '${baseName}-e2e' +var webPubSubSocketIOName = '${baseName}-socketio-e2e' // Find role id by heading to the Web Pub Sub resource, selecting Access Control (IAM), Roles, choose the Role, // then click on View under Details and check out the JSON. var webPubSubContributorRoleId = subscriptionResourceId('Microsoft.Authorization/roleDefinitions', '12cf5a90-567b-43ae-8102-96cf46c7d9b4') +resource webPubSubSocketIO 'Microsoft.SignalRService/webPubSub@2021-10-01' = { + name: webPubSubSocketIOName + location: location + kind: 'SocketIO' + sku: { + name: 'Standard_S1' + tier: 'Standard' + capacity: 1 + } + identity: { + type: 'None' + } + properties: { + tls: { + clientCertEnabled: false + } + publicNetworkAccess: 'Enabled' + disableLocalAuth: false + disableAadAuth: false + } +} + resource webPubSub 'Microsoft.SignalRService/webPubSub@2021-10-01' = { name: webPubSubName location: location @@ -54,3 +77,4 @@ output AZURE_SUBSCRIPTION_ID string = subscription().subscriptionId output AZURE_RESOURCE_GROUP_NAME string = resourceGroup().name output WEB_PUB_SUB_CONNECTION_STRING string = webPubSub.listKeys().primaryConnectionString output WEB_PUB_SUB_ENDPOINT string = 'https://${webPubSub.properties.hostName}' +output WEB_PUB_SUB_SOCKETIO_ENDPOINT string = 'https://${webPubSubSocketIO.properties.hostName}' From 02872d6ad1fec2b1ff07ca6128bba4de306c3d2e Mon Sep 17 00:00:00 2001 From: shiyingchen Date: Tue, 28 Apr 2026 17:50:13 +0800 Subject: [PATCH 02/16] update api version --- sdk/webpubsub/test-resources.bicep | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/webpubsub/test-resources.bicep b/sdk/webpubsub/test-resources.bicep index 8e82c58bedca..05123d70be6d 100644 --- a/sdk/webpubsub/test-resources.bicep +++ b/sdk/webpubsub/test-resources.bicep @@ -14,7 +14,7 @@ var webPubSubSocketIOName = '${baseName}-socketio-e2e' // then click on View under Details and check out the JSON. var webPubSubContributorRoleId = subscriptionResourceId('Microsoft.Authorization/roleDefinitions', '12cf5a90-567b-43ae-8102-96cf46c7d9b4') -resource webPubSubSocketIO 'Microsoft.SignalRService/webPubSub@2021-10-01' = { +resource webPubSubSocketIO 'Microsoft.SignalRService/webPubSub@2024-10-01-preview' = { name: webPubSubSocketIOName location: location kind: 'SocketIO' From 8d245cf115aa6c98f91fa9e47de013c611807235 Mon Sep 17 00:00:00 2001 From: shiyingchen Date: Tue, 28 Apr 2026 18:48:34 +0800 Subject: [PATCH 03/16] add owner permission for wps instance Co-authored-by: Copilot --- sdk/webpubsub/test-resources.bicep | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/sdk/webpubsub/test-resources.bicep b/sdk/webpubsub/test-resources.bicep index 05123d70be6d..4efb4d1c7431 100644 --- a/sdk/webpubsub/test-resources.bicep +++ b/sdk/webpubsub/test-resources.bicep @@ -12,7 +12,7 @@ var webPubSubSocketIOName = '${baseName}-socketio-e2e' // Find role id by heading to the Web Pub Sub resource, selecting Access Control (IAM), Roles, choose the Role, // then click on View under Details and check out the JSON. -var webPubSubContributorRoleId = subscriptionResourceId('Microsoft.Authorization/roleDefinitions', '12cf5a90-567b-43ae-8102-96cf46c7d9b4') +var webPubSubOwnerRoleId = subscriptionResourceId('Microsoft.Authorization/roleDefinitions', '12cf5a90-567b-43ae-8102-96cf46c7d9b4') resource webPubSubSocketIO 'Microsoft.SignalRService/webPubSub@2024-10-01-preview' = { name: webPubSubSocketIOName @@ -36,7 +36,7 @@ resource webPubSubSocketIO 'Microsoft.SignalRService/webPubSub@2024-10-01-previe } } -resource webPubSub 'Microsoft.SignalRService/webPubSub@2021-10-01' = { +resource webPubSub 'Microsoft.SignalRService/webPubSub@2024-10-01-preview' = { name: webPubSubName location: location sku: { @@ -64,12 +64,23 @@ resource webPubSub 'Microsoft.SignalRService/webPubSub@2021-10-01' = { } } -resource webPubSubContributor 'Microsoft.Authorization/roleAssignments@2020-04-01-preview' = { - name: guid('contributor', webPubSubName) +resource webPubSubOwnerRoleAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { + name: guid('owner', webPubSub.id, testApplicationOid) scope: webPubSub properties: { - roleDefinitionId: webPubSubContributorRoleId + roleDefinitionId: webPubSubOwnerRoleId principalId: testApplicationOid + principalType: 'ServicePrincipal' + } +} + +resource webPubSubSocketIOOwnerRoleAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { + name: guid('owner', webPubSubSocketIO.id, testApplicationOid) + scope: webPubSubSocketIO + properties: { + roleDefinitionId: webPubSubOwnerRoleId + principalId: testApplicationOid + principalType: 'ServicePrincipal' } } From de46b505ac385bde288f469818d723474b9ca05a Mon Sep 17 00:00:00 2001 From: shiyingchen Date: Wed, 29 Apr 2026 10:08:15 +0800 Subject: [PATCH 04/16] update bicep --- sdk/webpubsub/test-resources.bicep | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/webpubsub/test-resources.bicep b/sdk/webpubsub/test-resources.bicep index 4efb4d1c7431..567bd617ac04 100644 --- a/sdk/webpubsub/test-resources.bicep +++ b/sdk/webpubsub/test-resources.bicep @@ -39,6 +39,7 @@ resource webPubSubSocketIO 'Microsoft.SignalRService/webPubSub@2024-10-01-previe resource webPubSub 'Microsoft.SignalRService/webPubSub@2024-10-01-preview' = { name: webPubSubName location: location + kind: 'WebPubSub' sku: { name: 'Standard_S1' tier: 'Standard' From 27d3d12364a0eb50ec4d7d04350608395a21e8b8 Mon Sep 17 00:00:00 2001 From: shiyingchen Date: Wed, 29 Apr 2026 11:40:38 +0800 Subject: [PATCH 05/16] downgrade api version Co-authored-by: Copilot --- sdk/webpubsub/test-resources.bicep | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/webpubsub/test-resources.bicep b/sdk/webpubsub/test-resources.bicep index 567bd617ac04..257640c07ad3 100644 --- a/sdk/webpubsub/test-resources.bicep +++ b/sdk/webpubsub/test-resources.bicep @@ -36,10 +36,9 @@ resource webPubSubSocketIO 'Microsoft.SignalRService/webPubSub@2024-10-01-previe } } -resource webPubSub 'Microsoft.SignalRService/webPubSub@2024-10-01-preview' = { +resource webPubSub 'Microsoft.SignalRService/webPubSub@2021-10-01' = { name: webPubSubName location: location - kind: 'WebPubSub' sku: { name: 'Standard_S1' tier: 'Standard' From 0464d2d7e039790823a894b769dded9cde44a738 Mon Sep 17 00:00:00 2001 From: shiyingchen Date: Wed, 29 Apr 2026 13:20:50 +0800 Subject: [PATCH 06/16] add operator role Co-authored-by: Copilot --- sdk/webpubsub/test-resources.bicep | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/sdk/webpubsub/test-resources.bicep b/sdk/webpubsub/test-resources.bicep index 257640c07ad3..2521c05395e0 100644 --- a/sdk/webpubsub/test-resources.bicep +++ b/sdk/webpubsub/test-resources.bicep @@ -13,6 +13,7 @@ var webPubSubSocketIOName = '${baseName}-socketio-e2e' // Find role id by heading to the Web Pub Sub resource, selecting Access Control (IAM), Roles, choose the Role, // then click on View under Details and check out the JSON. var webPubSubOwnerRoleId = subscriptionResourceId('Microsoft.Authorization/roleDefinitions', '12cf5a90-567b-43ae-8102-96cf46c7d9b4') +var webPubSubOperatorRoleId = subscriptionResourceId('Microsoft.Authorization/roleDefinitions', 'c7393b34-138c-406f-901b-d8cf2b17e6ae') resource webPubSubSocketIO 'Microsoft.SignalRService/webPubSub@2024-10-01-preview' = { name: webPubSubSocketIOName @@ -74,6 +75,16 @@ resource webPubSubOwnerRoleAssignment 'Microsoft.Authorization/roleAssignments@2 } } +resource webPubSubOperatorRoleAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { + name: guid('operator', webPubSub.id, testApplicationOid) + scope: webPubSub + properties: { + roleDefinitionId: webPubSubOperatorRoleId + principalId: testApplicationOid + principalType: 'ServicePrincipal' + } +} + resource webPubSubSocketIOOwnerRoleAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { name: guid('owner', webPubSubSocketIO.id, testApplicationOid) scope: webPubSubSocketIO @@ -84,6 +95,16 @@ resource webPubSubSocketIOOwnerRoleAssignment 'Microsoft.Authorization/roleAssig } } +resource webPubSubSocketIOOperatorRoleAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { + name: guid('operator', webPubSubSocketIO.id, testApplicationOid) + scope: webPubSubSocketIO + properties: { + roleDefinitionId: webPubSubOperatorRoleId + principalId: testApplicationOid + principalType: 'ServicePrincipal' + } +} + output AZURE_SUBSCRIPTION_ID string = subscription().subscriptionId output AZURE_RESOURCE_GROUP_NAME string = resourceGroup().name output WEB_PUB_SUB_CONNECTION_STRING string = webPubSub.listKeys().primaryConnectionString From 971a4c7f6f2873a6483419339bb72dab6f140308 Mon Sep 17 00:00:00 2001 From: shiyingchen Date: Wed, 29 Apr 2026 14:30:57 +0800 Subject: [PATCH 07/16] reuse the client --- .../messaging/webpubsub/client/TestBase.java | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestBase.java b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestBase.java index ac86a5b9aa5c..9a70d4dc94ca 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestBase.java +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestBase.java @@ -21,16 +21,30 @@ */ public class TestBase extends TestProxyTestBase { + private static volatile WebPubSubServiceClient serviceClient; + + private static WebPubSubServiceClient getServiceClient() { + if (serviceClient == null) { + synchronized (TestBase.class) { + if (serviceClient == null) { + Configuration configuration = Configuration.getGlobalConfiguration(); + + serviceClient = new WebPubSubServiceClientBuilder().endpoint(configuration.get("WEB_PUB_SUB_ENDPOINT")) + .credential(TestUtils.getIdentityTestCredential(TestMode.LIVE)) + .hub("hub1") + .buildClient(); + } + } + } + return serviceClient; + } + protected static WebPubSubClientBuilder getClientBuilder() { return getClientBuilder("user1"); } protected static WebPubSubClientBuilder getClientBuilder(String userId) { - WebPubSubServiceClient client = new WebPubSubServiceClientBuilder() - .endpoint(Configuration.getGlobalConfiguration().get("WEB_PUB_SUB_ENDPOINT")) - .credential(TestUtils.getIdentityTestCredential(TestMode.LIVE)) - .hub("hub1") - .buildClient(); + WebPubSubServiceClient client = getServiceClient(); // client builder return new WebPubSubClientBuilder().credential(new WebPubSubClientCredential( From 199f2509891600fe4e94f40c36f8bffb9b6ecae8 Mon Sep 17 00:00:00 2001 From: shiyingchen Date: Wed, 29 Apr 2026 15:14:54 +0800 Subject: [PATCH 08/16] try chained credential Co-authored-by: Copilot --- .../com/azure/messaging/webpubsub/client/TestUtils.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestUtils.java b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestUtils.java index 2ab8e4c663f7..dd97cd992018 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestUtils.java +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestUtils.java @@ -35,9 +35,7 @@ private static TokenCredential getIdentityTestCredentialHelper() { Configuration config = Configuration.getGlobalConfiguration(); ChainedTokenCredentialBuilder builder - = new ChainedTokenCredentialBuilder().addLast(new EnvironmentCredentialBuilder().build()) - .addLast(new AzureCliCredentialBuilder().build()) - .addLast(new AzureDeveloperCliCredentialBuilder().build()); + = new ChainedTokenCredentialBuilder().addLast(new EnvironmentCredentialBuilder().build()); String serviceConnectionId = config.get("AZURESUBSCRIPTION_SERVICE_CONNECTION_ID"); String clientId = config.get("AZURESUBSCRIPTION_CLIENT_ID"); @@ -59,7 +57,9 @@ private static TokenCredential getIdentityTestCredentialHelper() { builder.addLast(trc -> azurePipelinesCredential.getToken(trc).subscribeOn(Schedulers.boundedElastic())); } - builder.addLast(new AzurePowerShellCredentialBuilder().build()); + builder.addLast(new AzurePowerShellCredentialBuilder().build()) + .addLast(new AzureCliCredentialBuilder().build()) + .addLast(new AzureDeveloperCliCredentialBuilder().build()); return builder.build(); } From 6812cbe4186b132de7d3fa646e0eb5b573bf7311 Mon Sep 17 00:00:00 2001 From: shiyingchen Date: Thu, 30 Apr 2026 10:54:47 +0800 Subject: [PATCH 09/16] the first ackid should start from 1 --- .../client/WebPubSubAsyncClient.java | 23 ++++---- .../websocket/WebSocketClientHandler.java | 8 ++- .../webpubsub/client/MockClientTests.java | 55 ++++++++++++++++++- 3 files changed, 69 insertions(+), 17 deletions(-) diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/WebPubSubAsyncClient.java b/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/WebPubSubAsyncClient.java index ec5cc290bf88..692b7187b095 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/WebPubSubAsyncClient.java +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/WebPubSubAsyncClient.java @@ -315,7 +315,7 @@ public Mono joinGroup(String group, Long ackId) { if (ackId == null) { ackId = nextAckId(); } - return sendMessage(new JoinGroupMessage().setGroup(group).setAckId(ackId)).then(waitForAckMessage(ackId)) + return sendMessageAndWaitForAck(new JoinGroupMessage().setGroup(group).setAckId(ackId), ackId) .retryWhen(sendMessageRetrySpec) .map(result -> { groups.compute(group, @@ -346,7 +346,7 @@ public Mono leaveGroup(String group, Long ackId) { if (ackId == null) { ackId = nextAckId(); } - return sendMessage(new LeaveGroupMessage().setGroup(group).setAckId(ackId)).then(waitForAckMessage(ackId)) + return sendMessageAndWaitForAck(new LeaveGroupMessage().setGroup(group).setAckId(ackId), ackId) .retryWhen(sendMessageRetrySpec) .map(result -> { groups.compute(group, @@ -414,8 +414,7 @@ public Mono sendToGroup(String group, BinaryData content, WebPu .setAckId(ackId) .setNoEcho(options.isEchoDisabled()); - Mono sendMessageMono = sendMessage(message); - Mono responseMono = sendMessageMono.then(waitForAckMessage(ackId)); + Mono responseMono = sendMessageAndWaitForAck(message, ackId); return responseMono.retryWhen(sendMessageRetrySpec); } @@ -454,8 +453,7 @@ public Mono sendEvent(String eventName, BinaryData content, Web .setDataType(dataFormat.toString()) .setAckId(ackId); - Mono sendMessageMono = sendMessage(message); - Mono responseMono = sendMessageMono.then(waitForAckMessage(ackId)); + Mono responseMono = sendMessageAndWaitForAck(message, ackId); return responseMono.retryWhen(sendMessageRetrySpec); } @@ -514,13 +512,7 @@ public Flux receiveRejoinGroupFailedEvents() { } private long nextAckId() { - return ackId.getAndUpdate(value -> { - // keep positive - if (++value < 0) { - value = 0; - } - return value; - }); + return ackId.updateAndGet(value -> value == Long.MAX_VALUE ? 1 : Math.max(0, value) + 1); } private Flux receiveAckMessages() { @@ -540,6 +532,11 @@ private Mono sendMessage(WebPubSubMessage message) { })); } + private Mono sendMessageAndWaitForAck(WebPubSubMessage message, Long ackId) { + return Mono.defer( + () -> Mono.zip(waitForAckMessage(ackId), sendMessage(message).thenReturn(true)).map(tuple -> tuple.getT1())); + } + private Mono checkStateBeforeSend() { return Mono.defer(() -> { WebPubSubClientState state = clientState.get(); diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java b/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java index eb167c731ec2..750a83512e30 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java @@ -163,6 +163,10 @@ private boolean processMessage(ChannelHandlerContext context, WebSocketFrame web } private void publishBuffer() { + if (compositeByteBuf == null || compositeByteBuf.refCnt() == 0 || compositeByteBuf.readableBytes() == 0) { + return; + } + final ByteBuffer[] nioBuffers = compositeByteBuf.nioBuffers(); if (nioBuffers.length == 0) { @@ -177,6 +181,7 @@ private void publishBuffer() { messageHandler.accept(deserialized); } finally { release(compositeByteBuf); + compositeByteBuf = null; } } @@ -198,9 +203,8 @@ CloseWebSocketFrame getServerCloseWebSocketFrame() { } private static void release(CompositeByteBuf buffer) { - if (buffer.refCnt() > 0) { + if (buffer != null && buffer.refCnt() > 0) { buffer.release(); - buffer.clear(); } } } diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/MockClientTests.java b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/MockClientTests.java index eac5d5c101ac..c3b3dfd75cc4 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/MockClientTests.java +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/MockClientTests.java @@ -4,13 +4,16 @@ package com.azure.messaging.webpubsub.client; import com.azure.messaging.webpubsub.client.implementation.WebPubSubClientState; +import com.azure.messaging.webpubsub.client.implementation.models.AckMessage; import com.azure.messaging.webpubsub.client.implementation.models.ConnectedMessage; import com.azure.messaging.webpubsub.client.implementation.models.WebPubSubMessage; +import com.azure.messaging.webpubsub.client.implementation.models.WebPubSubMessageAck; import com.azure.messaging.webpubsub.client.implementation.websocket.SendResult; import com.azure.messaging.webpubsub.client.implementation.websocket.WebSocketClient; import com.azure.messaging.webpubsub.client.implementation.websocket.WebSocketSession; import com.azure.messaging.webpubsub.client.models.ConnectFailedException; import com.azure.messaging.webpubsub.client.models.ConnectedEvent; +import com.azure.messaging.webpubsub.client.models.WebPubSubResult; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; @@ -18,9 +21,11 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; public class MockClientTests { @@ -71,6 +76,32 @@ public void testConnect() throws InterruptedException { Assertions.assertEquals(1, events.size()); } + @Test + public void testGeneratedAckIdStartsAtOne() { + List ackIds = new ArrayList<>(); + AtomicReference> messageHandlerReference = new AtomicReference<>(); + + WebSocketClient mockWsClient = (cec, path, loggerReference, messageHandler, openHandler, closeHandler) -> { + messageHandlerReference.set(messageHandler); + WebSocketSession mockWsSession = new MockWebSocketSession(true, messageHandlerReference, ackIds); + openHandler.accept(mockWsSession); + messageHandler.accept(new ConnectedMessage("mock_connection_id")); + return mockWsSession; + }; + + WebPubSubClientBuilder builder = new WebPubSubClientBuilder(); + builder.webSocketClient = mockWsClient; + WebPubSubClient client = builder.clientAccessUrl("mock").buildClient(); + + client.start(); + WebPubSubResult joinResult = client.joinGroup("group"); + WebPubSubResult sendResult = client.sendToGroup("group", "message"); + + Assertions.assertEquals(1L, joinResult.getAckId()); + Assertions.assertEquals(2L, sendResult.getAckId()); + Assertions.assertIterableEquals(Arrays.asList(1L, 2L), ackIds); + } + private static void sendConnectedEvent(Consumer messageHandler) { Mono.delay(SMALL_DELAY) .then(Mono.fromRunnable(() -> messageHandler.accept(new ConnectedMessage("mock_connection_id"))) @@ -79,14 +110,34 @@ private static void sendConnectedEvent(Consumer messageHandler } private static final class MockWebSocketSession implements WebSocketSession { + private final boolean open; + private final AtomicReference> messageHandlerReference; + private final List ackIds; + + private MockWebSocketSession() { + this(false, null, null); + } + + private MockWebSocketSession(boolean open, AtomicReference> messageHandlerReference, + List ackIds) { + this.open = open; + this.messageHandlerReference = messageHandlerReference; + this.ackIds = ackIds; + } + @Override public boolean isOpen() { - return false; + return open; } @Override public void sendObjectAsync(Object data, Consumer handler) { - + if (data instanceof WebPubSubMessageAck) { + long ackId = ((WebPubSubMessageAck) data).getAckId(); + ackIds.add(ackId); + messageHandlerReference.get().accept(new AckMessage().setAckId(ackId).setSuccess(true)); + } + handler.accept(new SendResult()); } @Override From 64eef0e9bd0830afa1a68254f35a5960c444ee00 Mon Sep 17 00:00:00 2001 From: shiyingchen Date: Thu, 30 Apr 2026 13:49:26 +0800 Subject: [PATCH 10/16] update bicep --- sdk/webpubsub/test-resources.bicep | 23 +---------------------- 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/sdk/webpubsub/test-resources.bicep b/sdk/webpubsub/test-resources.bicep index 2521c05395e0..4efb4d1c7431 100644 --- a/sdk/webpubsub/test-resources.bicep +++ b/sdk/webpubsub/test-resources.bicep @@ -13,7 +13,6 @@ var webPubSubSocketIOName = '${baseName}-socketio-e2e' // Find role id by heading to the Web Pub Sub resource, selecting Access Control (IAM), Roles, choose the Role, // then click on View under Details and check out the JSON. var webPubSubOwnerRoleId = subscriptionResourceId('Microsoft.Authorization/roleDefinitions', '12cf5a90-567b-43ae-8102-96cf46c7d9b4') -var webPubSubOperatorRoleId = subscriptionResourceId('Microsoft.Authorization/roleDefinitions', 'c7393b34-138c-406f-901b-d8cf2b17e6ae') resource webPubSubSocketIO 'Microsoft.SignalRService/webPubSub@2024-10-01-preview' = { name: webPubSubSocketIOName @@ -37,7 +36,7 @@ resource webPubSubSocketIO 'Microsoft.SignalRService/webPubSub@2024-10-01-previe } } -resource webPubSub 'Microsoft.SignalRService/webPubSub@2021-10-01' = { +resource webPubSub 'Microsoft.SignalRService/webPubSub@2024-10-01-preview' = { name: webPubSubName location: location sku: { @@ -75,16 +74,6 @@ resource webPubSubOwnerRoleAssignment 'Microsoft.Authorization/roleAssignments@2 } } -resource webPubSubOperatorRoleAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { - name: guid('operator', webPubSub.id, testApplicationOid) - scope: webPubSub - properties: { - roleDefinitionId: webPubSubOperatorRoleId - principalId: testApplicationOid - principalType: 'ServicePrincipal' - } -} - resource webPubSubSocketIOOwnerRoleAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { name: guid('owner', webPubSubSocketIO.id, testApplicationOid) scope: webPubSubSocketIO @@ -95,16 +84,6 @@ resource webPubSubSocketIOOwnerRoleAssignment 'Microsoft.Authorization/roleAssig } } -resource webPubSubSocketIOOperatorRoleAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { - name: guid('operator', webPubSubSocketIO.id, testApplicationOid) - scope: webPubSubSocketIO - properties: { - roleDefinitionId: webPubSubOperatorRoleId - principalId: testApplicationOid - principalType: 'ServicePrincipal' - } -} - output AZURE_SUBSCRIPTION_ID string = subscription().subscriptionId output AZURE_RESOURCE_GROUP_NAME string = resourceGroup().name output WEB_PUB_SUB_CONNECTION_STRING string = webPubSub.listKeys().primaryConnectionString From 93a888fb488f294c743e3221316a5975e2c16658 Mon Sep 17 00:00:00 2001 From: shiyingchen Date: Thu, 30 Apr 2026 14:17:22 +0800 Subject: [PATCH 11/16] revert the bicep --- sdk/webpubsub/test-resources.bicep | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/sdk/webpubsub/test-resources.bicep b/sdk/webpubsub/test-resources.bicep index 4efb4d1c7431..2521c05395e0 100644 --- a/sdk/webpubsub/test-resources.bicep +++ b/sdk/webpubsub/test-resources.bicep @@ -13,6 +13,7 @@ var webPubSubSocketIOName = '${baseName}-socketio-e2e' // Find role id by heading to the Web Pub Sub resource, selecting Access Control (IAM), Roles, choose the Role, // then click on View under Details and check out the JSON. var webPubSubOwnerRoleId = subscriptionResourceId('Microsoft.Authorization/roleDefinitions', '12cf5a90-567b-43ae-8102-96cf46c7d9b4') +var webPubSubOperatorRoleId = subscriptionResourceId('Microsoft.Authorization/roleDefinitions', 'c7393b34-138c-406f-901b-d8cf2b17e6ae') resource webPubSubSocketIO 'Microsoft.SignalRService/webPubSub@2024-10-01-preview' = { name: webPubSubSocketIOName @@ -36,7 +37,7 @@ resource webPubSubSocketIO 'Microsoft.SignalRService/webPubSub@2024-10-01-previe } } -resource webPubSub 'Microsoft.SignalRService/webPubSub@2024-10-01-preview' = { +resource webPubSub 'Microsoft.SignalRService/webPubSub@2021-10-01' = { name: webPubSubName location: location sku: { @@ -74,6 +75,16 @@ resource webPubSubOwnerRoleAssignment 'Microsoft.Authorization/roleAssignments@2 } } +resource webPubSubOperatorRoleAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { + name: guid('operator', webPubSub.id, testApplicationOid) + scope: webPubSub + properties: { + roleDefinitionId: webPubSubOperatorRoleId + principalId: testApplicationOid + principalType: 'ServicePrincipal' + } +} + resource webPubSubSocketIOOwnerRoleAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { name: guid('owner', webPubSubSocketIO.id, testApplicationOid) scope: webPubSubSocketIO @@ -84,6 +95,16 @@ resource webPubSubSocketIOOwnerRoleAssignment 'Microsoft.Authorization/roleAssig } } +resource webPubSubSocketIOOperatorRoleAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { + name: guid('operator', webPubSubSocketIO.id, testApplicationOid) + scope: webPubSubSocketIO + properties: { + roleDefinitionId: webPubSubOperatorRoleId + principalId: testApplicationOid + principalType: 'ServicePrincipal' + } +} + output AZURE_SUBSCRIPTION_ID string = subscription().subscriptionId output AZURE_RESOURCE_GROUP_NAME string = resourceGroup().name output WEB_PUB_SUB_CONNECTION_STRING string = webPubSub.listKeys().primaryConnectionString From 665ed49891b8569f771a5bab48e869f750cb8317 Mon Sep 17 00:00:00 2001 From: shiyingchen Date: Thu, 30 Apr 2026 15:24:41 +0800 Subject: [PATCH 12/16] fix buffer cleanup risk --- .../websocket/WebSocketClientHandler.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java b/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java index 750a83512e30..9dd2cbf7a0b2 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java @@ -163,17 +163,21 @@ private boolean processMessage(ChannelHandlerContext context, WebSocketFrame web } private void publishBuffer() { - if (compositeByteBuf == null || compositeByteBuf.refCnt() == 0 || compositeByteBuf.readableBytes() == 0) { + if (compositeByteBuf == null || compositeByteBuf.refCnt() == 0) { return; } - final ByteBuffer[] nioBuffers = compositeByteBuf.nioBuffers(); + try { + if (compositeByteBuf.readableBytes() == 0) { + return; + } - if (nioBuffers.length == 0) { - return; - } + final ByteBuffer[] nioBuffers = compositeByteBuf.nioBuffers(); + + if (nioBuffers.length == 0) { + return; + } - try { final BinaryData data = BinaryData.fromListByteBuffer(Arrays.asList(nioBuffers)); final String collected = data.toString(); final WebPubSubMessage deserialized = messageDecoder.decode(collected); From 3e77e18febffe0f13dd4335d112dc576df1e2840 Mon Sep 17 00:00:00 2001 From: shiyingchen Date: Thu, 30 Apr 2026 15:53:49 +0800 Subject: [PATCH 13/16] update changelog Co-authored-by: Copilot --- .../azure-messaging-webpubsub-client/CHANGELOG.md | 2 ++ .../messaging/webpubsub/client/ClientTests.java | 13 +++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/CHANGELOG.md b/sdk/webpubsub/azure-messaging-webpubsub-client/CHANGELOG.md index 0e9980263a6f..bb9062b640e6 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/CHANGELOG.md +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Fixed a race condition where Web PubSub client send operations could miss fast ack responses. + ### Other Changes ## 1.1.7 (2026-01-29) diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/ClientTests.java b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/ClientTests.java index d4b5e38fcc07..b78f4c09a911 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/ClientTests.java +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/ClientTests.java @@ -93,9 +93,10 @@ public void testTwoClients() throws InterruptedException { @Test @LiveOnly - public void testClientCloseable() { + public void testClientCloseable() throws InterruptedException { CountDownLatch connectedLatch = new CountDownLatch(1); CountDownLatch stoppedLatch = new CountDownLatch(1); + CountDownLatch disconnectedLatch = new CountDownLatch(1); AtomicBoolean stoppedEventReceived = new AtomicBoolean(false); AtomicBoolean disconnectedEventReceived = new AtomicBoolean(false); @@ -105,16 +106,20 @@ public void testClientCloseable() { stoppedLatch.countDown(); }); client.addOnConnectedEventHandler(connectedEvent -> connectedLatch.countDown()); - client.addOnDisconnectedEventHandler(disconnectedEvent -> disconnectedEventReceived.set(true)); + client.addOnDisconnectedEventHandler(disconnectedEvent -> { + disconnectedEventReceived.set(true); + disconnectedLatch.countDown(); + }); client.start(); - connectedLatch.countDown(); + Assertions.assertTrue(connectedLatch.await(10, TimeUnit.SECONDS)); // stop not called explicitly } - stoppedLatch.countDown(); + Assertions.assertTrue(stoppedLatch.await(10, TimeUnit.SECONDS)); + Assertions.assertTrue(disconnectedLatch.await(10, TimeUnit.SECONDS)); // verify client stopped via Closeable Assertions.assertTrue(stoppedEventReceived.get()); From 73da815d9e6ae611df129d28490877f9e7324652 Mon Sep 17 00:00:00 2001 From: Shiying Chen Date: Thu, 30 Apr 2026 16:12:59 +0800 Subject: [PATCH 14/16] Update sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../implementation/websocket/WebSocketClientHandler.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java b/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java index 9dd2cbf7a0b2..bdbc706d1b6d 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java @@ -163,10 +163,14 @@ private boolean processMessage(ChannelHandlerContext context, WebSocketFrame web } private void publishBuffer() { - if (compositeByteBuf == null || compositeByteBuf.refCnt() == 0) { + if (compositeByteBuf == null) { return; } + if (compositeByteBuf.refCnt() == 0) { + compositeByteBuf = null; + return; + } try { if (compositeByteBuf.readableBytes() == 0) { return; From a778c1e148c94cb77fad5282af2e2210146c54f4 Mon Sep 17 00:00:00 2001 From: shiyingchen Date: Thu, 30 Apr 2026 16:43:33 +0800 Subject: [PATCH 15/16] formating Co-authored-by: Copilot --- .../messaging/webpubsub/client/WebPubSubAsyncClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/WebPubSubAsyncClient.java b/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/WebPubSubAsyncClient.java index 692b7187b095..cef63321654a 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/WebPubSubAsyncClient.java +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/WebPubSubAsyncClient.java @@ -533,8 +533,8 @@ private Mono sendMessage(WebPubSubMessage message) { } private Mono sendMessageAndWaitForAck(WebPubSubMessage message, Long ackId) { - return Mono.defer( - () -> Mono.zip(waitForAckMessage(ackId), sendMessage(message).thenReturn(true)).map(tuple -> tuple.getT1())); + return Mono.defer(() -> Mono.zip(waitForAckMessage(ackId), sendMessage(message).thenReturn(true)) + .map(tuple -> tuple.getT1())); } private Mono checkStateBeforeSend() { From 451edfe0cfc7b79c112993c5e6a0b957c7a8c5c3 Mon Sep 17 00:00:00 2001 From: shiyingchen Date: Wed, 6 May 2026 13:23:13 +0800 Subject: [PATCH 16/16] formating --- .../com/azure/messaging/webpubsub/client/TestBase.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestBase.java b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestBase.java index 9a70d4dc94ca..79296d70d651 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestBase.java +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestBase.java @@ -29,10 +29,11 @@ private static WebPubSubServiceClient getServiceClient() { if (serviceClient == null) { Configuration configuration = Configuration.getGlobalConfiguration(); - serviceClient = new WebPubSubServiceClientBuilder().endpoint(configuration.get("WEB_PUB_SUB_ENDPOINT")) - .credential(TestUtils.getIdentityTestCredential(TestMode.LIVE)) - .hub("hub1") - .buildClient(); + serviceClient + = new WebPubSubServiceClientBuilder().endpoint(configuration.get("WEB_PUB_SUB_ENDPOINT")) + .credential(TestUtils.getIdentityTestCredential(TestMode.LIVE)) + .hub("hub1") + .buildClient(); } } }