From 17bf8aefbef45477fa3d7bbf3007f0ed3ad16b62 Mon Sep 17 00:00:00 2001 From: Santanu Sinha Date: Mon, 15 Sep 2025 11:29:55 +0530 Subject: [PATCH 1/2] Implemented HTTP based update from executor with fallback to ZK --- .../drove/executor/ExecutorCoreModule.java | 20 +++-- .../drove/executor/ExecutorOptions.java | 6 +- .../discovery/RemoteNodeDataStore.java | 89 +++++++++++++++++++ .../executor/discovery/RemoteUpdateMode.java | 9 ++ 4 files changed, 116 insertions(+), 8 deletions(-) create mode 100644 drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStore.java create mode 100644 drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteUpdateMode.java diff --git a/drove-executor/src/main/java/com/phonepe/drove/executor/ExecutorCoreModule.java b/drove-executor/src/main/java/com/phonepe/drove/executor/ExecutorCoreModule.java index c57ae47b..9200c363 100644 --- a/drove-executor/src/main/java/com/phonepe/drove/executor/ExecutorCoreModule.java +++ b/drove-executor/src/main/java/com/phonepe/drove/executor/ExecutorCoreModule.java @@ -23,7 +23,10 @@ import com.github.dockerjava.core.DockerClientImpl; import com.github.dockerjava.zerodep.ZerodepDockerHttpClient; import com.google.common.base.Strings; -import com.google.inject.*; +import com.google.inject.AbstractModule; +import com.google.inject.Injector; +import com.google.inject.Provides; +import com.google.inject.TypeLiteral; import com.google.inject.name.Names; import com.phonepe.drove.auth.config.ClusterAuthenticationConfig; import com.phonepe.drove.common.CommonUtils; @@ -31,9 +34,9 @@ import com.phonepe.drove.common.discovery.ZkNodeDataStore; import com.phonepe.drove.common.model.ControllerMessageType; import com.phonepe.drove.common.model.controller.ControllerMessage; -import com.phonepe.drove.common.model.utils.Pair; import com.phonepe.drove.common.net.MessageSender; import com.phonepe.drove.common.zookeeper.ZkConfig; +import com.phonepe.drove.executor.discovery.RemoteNodeDataStore; import com.phonepe.drove.executor.dockerauth.DockerAuthConfig; import com.phonepe.drove.executor.engine.ApplicationInstanceEngine; import com.phonepe.drove.executor.engine.LocalServiceInstanceEngine; @@ -60,7 +63,6 @@ import javax.inject.Named; import javax.inject.Singleton; import java.net.URI; -import java.util.Map; import java.util.Objects; import java.util.concurrent.SynchronousQueue; @@ -72,7 +74,10 @@ public class ExecutorCoreModule extends AbstractModule { @Override protected void configure() { - bind(NodeDataStore.class).to(ZkNodeDataStore.class); + bind(NodeDataStore.class).to(RemoteNodeDataStore.class); + bind(NodeDataStore.class) + .annotatedWith(Names.named("PersistentNodeDataStore")) + .to(ZkNodeDataStore.class); bind(new TypeLiteral>() { }) .to(RemoteControllerMessageSender.class); @@ -180,7 +185,8 @@ public ResourceConfig resourceConfig(final AppConfig appConfig) { } @Provides - @Singleton MetadataConfig metadataConfig(final ResourceConfig resourceConfig) { + @Singleton + MetadataConfig metadataConfig(final ResourceConfig resourceConfig) { return Objects.requireNonNullElse(resourceConfig.getMetadata(), MetadataConfig.DEFAULT); } @@ -198,8 +204,8 @@ public DockerClient dockerClient(final ExecutorOptions executorOptions) { ExecutorOptions.DEFAULT_CONTAINER_COMMAND_TIMEOUT) .toMilliseconds()); val dockerSocketPath = Strings.isNullOrEmpty(executorOptions.getDockerSocketPath()) - ? ExecutorOptions.DEFAULT_DOCKER_SOCKET_PATH - : executorOptions.getDockerSocketPath(); + ? ExecutorOptions.DEFAULT_DOCKER_SOCKET_PATH + : executorOptions.getDockerSocketPath(); log.info("Using docker path: {}", dockerSocketPath); return DockerClientImpl.getInstance( DefaultDockerClientConfig.createDefaultConfigBuilder() diff --git a/drove-executor/src/main/java/com/phonepe/drove/executor/ExecutorOptions.java b/drove-executor/src/main/java/com/phonepe/drove/executor/ExecutorOptions.java index c0ddb5d1..ab2f59a6 100644 --- a/drove-executor/src/main/java/com/phonepe/drove/executor/ExecutorOptions.java +++ b/drove-executor/src/main/java/com/phonepe/drove/executor/ExecutorOptions.java @@ -16,6 +16,7 @@ package com.phonepe.drove.executor; +import com.phonepe.drove.executor.discovery.RemoteUpdateMode; import com.phonepe.drove.models.application.nonroot.UserSpec; import io.dropwizard.util.DataSize; import io.dropwizard.util.DataSizeUnit; @@ -49,6 +50,7 @@ public class ExecutorOptions { public static final Duration DEFAULT_CONTAINER_COMMAND_TIMEOUT = Duration.seconds(30); @SuppressWarnings("java:S1075") public static final String DEFAULT_DOCKER_SOCKET_PATH = "/var/run/docker.sock"; + public static final RemoteUpdateMode DEFAULT_UPDATE_MODE = RemoteUpdateMode.RPC; public static final ExecutorOptions DEFAULT = new ExecutorOptions(null, true, @@ -58,7 +60,8 @@ public class ExecutorOptions { DEFAULT_LOG_CACHE_COUNT, DEFAULT_CONTAINER_COMMAND_TIMEOUT, DEFAULT_DOCKER_SOCKET_PATH, - null); + null, + DEFAULT_UPDATE_MODE); @Length(max = 255) String hostname; @@ -84,4 +87,5 @@ public class ExecutorOptions { UserSpec defaultUserSpec; + RemoteUpdateMode remoteUpdateMode; } diff --git a/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStore.java b/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStore.java new file mode 100644 index 00000000..154416ab --- /dev/null +++ b/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStore.java @@ -0,0 +1,89 @@ +package com.phonepe.drove.executor.discovery; + +import com.google.common.annotations.VisibleForTesting; +import com.phonepe.drove.common.discovery.NodeDataStore; +import com.phonepe.drove.common.model.MessageDeliveryStatus; +import com.phonepe.drove.common.model.MessageHeader; +import com.phonepe.drove.common.model.controller.ExecutorSnapshotMessage; +import com.phonepe.drove.executor.ExecutorOptions; +import com.phonepe.drove.executor.engine.ExecutorCommunicator; +import com.phonepe.drove.models.info.nodedata.*; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import lombok.val; + +import javax.inject.Inject; +import javax.inject.Named; +import javax.inject.Provider; +import java.util.List; +import java.util.Objects; + +/** + * This node data store is to be used by executors to send data to controller only. It is not supposed to + * store any data locally. Depending on config, it might route to either ZK or HTTP calls. + */ +@Slf4j +@AllArgsConstructor(access = AccessLevel.PROTECTED, onConstructor_ = {@VisibleForTesting}) +public class RemoteNodeDataStore implements NodeDataStore { + private final RemoteUpdateMode remoteUpdateMode; + private final Provider communicator; + private final NodeDataStore remoteStore; + + @Inject + public RemoteNodeDataStore( + ExecutorOptions executorOptions, + Provider communicator, + @Named("PersistentNodeDataStore") NodeDataStore remoteStore) { + this(Objects.requireNonNullElse(executorOptions.getRemoteUpdateMode(), RemoteUpdateMode.RPC), + communicator, + remoteStore); + } + + @Override + public void updateNodeData(NodeData nodeData) { + val status = nodeData.accept(new NodeDataVisitor() { + @Override + public Boolean visit(ControllerNodeData controllerData) { + return null; + } + + @Override + public Boolean visit(ExecutorNodeData executorData) { + try { + switch (remoteUpdateMode) { + case STORE -> { + remoteStore.updateNodeData(executorData); + } + case RPC -> { + val response = + communicator.get() + .send(new ExecutorSnapshotMessage(MessageHeader.controllerRequest(), + executorData)); + if (!response.getStatus().equals(MessageDeliveryStatus.ACCEPTED)) { + log.warn("RPC based update failed. Reverting to store based state update."); + remoteStore.updateNodeData(executorData); + } + } + } + return true; + } + catch (Throwable t) { + log.error("Could not update node state: {}", t.getMessage(), t); + } + return false; + } + }); + log.debug("Remote state update status: {}", status); + } + + @Override + public List nodes(NodeType nodeType) { + return remoteStore.nodes(nodeType); + } + + @Override + public void removeNodeData(NodeData nodeData) { + throw new UnsupportedOperationException("Not supported in remote mode"); + } +} diff --git a/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteUpdateMode.java b/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteUpdateMode.java new file mode 100644 index 00000000..7c75b00e --- /dev/null +++ b/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteUpdateMode.java @@ -0,0 +1,9 @@ +package com.phonepe.drove.executor.discovery; + +/** + * + */ +public enum RemoteUpdateMode { + STORE, + RPC +} From 8a5dc892b876d8d1e402bcd15a16f732c985353f Mon Sep 17 00:00:00 2001 From: Santanu Sinha Date: Mon, 15 Sep 2025 12:53:42 +0530 Subject: [PATCH 2/2] Remote updates from executor with HTTP --- .../discovery/RemoteNodeDataStore.java | 46 +++- .../executor/discovery/RemoteUpdateMode.java | 8 +- .../discovery/RemoteNodeDataStoreTest.java | 211 ++++++++++++++++++ 3 files changed, 253 insertions(+), 12 deletions(-) create mode 100644 drove-executor/src/test/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStoreTest.java diff --git a/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStore.java b/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStore.java index 154416ab..9cbfe786 100644 --- a/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStore.java +++ b/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStore.java @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2025 Original Author(s), PhonePe India Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.phonepe.drove.executor.discovery; import com.google.common.annotations.VisibleForTesting; @@ -21,7 +37,8 @@ /** * This node data store is to be used by executors to send data to controller only. It is not supposed to - * store any data locally. Depending on config, it might route to either ZK or HTTP calls. + * store any data locally. Depending on config, it might route to either ZK or HTTP calls. In case HTTP call + * fails, it will route the call to ZK. */ @Slf4j @AllArgsConstructor(access = AccessLevel.PROTECTED, onConstructor_ = {@VisibleForTesting}) @@ -35,7 +52,8 @@ public RemoteNodeDataStore( ExecutorOptions executorOptions, Provider communicator, @Named("PersistentNodeDataStore") NodeDataStore remoteStore) { - this(Objects.requireNonNullElse(executorOptions.getRemoteUpdateMode(), RemoteUpdateMode.RPC), + this(Objects.requireNonNullElse(executorOptions.getRemoteUpdateMode(), + ExecutorOptions.DEFAULT_UPDATE_MODE), communicator, remoteStore); } @@ -45,22 +63,28 @@ public void updateNodeData(NodeData nodeData) { val status = nodeData.accept(new NodeDataVisitor() { @Override public Boolean visit(ControllerNodeData controllerData) { - return null; + throw new IllegalArgumentException("Invalid data. Why is executor sending controller data here?"); } @Override public Boolean visit(ExecutorNodeData executorData) { try { switch (remoteUpdateMode) { - case STORE -> { - remoteStore.updateNodeData(executorData); - } + case STORE -> remoteStore.updateNodeData(executorData); case RPC -> { - val response = - communicator.get() - .send(new ExecutorSnapshotMessage(MessageHeader.controllerRequest(), - executorData)); - if (!response.getStatus().equals(MessageDeliveryStatus.ACCEPTED)) { + var storeUpdateNeeded = false; + try { + val response = + communicator.get() + .send(new ExecutorSnapshotMessage(MessageHeader.controllerRequest(), + executorData)); + storeUpdateNeeded = !response.getStatus().equals(MessageDeliveryStatus.ACCEPTED); + } + catch (Exception e) { + log.error("RPC based update failed due to error: ", e); + storeUpdateNeeded = true; + } + if (storeUpdateNeeded) { log.warn("RPC based update failed. Reverting to store based state update."); remoteStore.updateNodeData(executorData); } diff --git a/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteUpdateMode.java b/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteUpdateMode.java index 7c75b00e..3b486c33 100644 --- a/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteUpdateMode.java +++ b/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteUpdateMode.java @@ -1,9 +1,15 @@ package com.phonepe.drove.executor.discovery; /** - * + * Represents how executor will send updates to controller */ public enum RemoteUpdateMode { + /** + * Direct store update + */ STORE, + /** + * Send update over HTTP(S), in case of failure, send over store + */ RPC } diff --git a/drove-executor/src/test/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStoreTest.java b/drove-executor/src/test/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStoreTest.java new file mode 100644 index 00000000..f3ce4ba5 --- /dev/null +++ b/drove-executor/src/test/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStoreTest.java @@ -0,0 +1,211 @@ +/* + * Copyright (c) 2025 Original Author(s), PhonePe India Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.drove.executor.discovery; + +import com.phonepe.drove.common.discovery.NodeDataStore; +import com.phonepe.drove.common.model.Message; +import com.phonepe.drove.common.model.MessageDeliveryStatus; +import com.phonepe.drove.common.model.MessageResponse; +import com.phonepe.drove.executor.ExecutorOptions; +import com.phonepe.drove.executor.engine.ExecutorCommunicator; +import com.phonepe.drove.models.info.nodedata.*; +import lombok.val; +import org.junit.jupiter.api.Test; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +/** + * Tests {@link RemoteNodeDataStore} + */ +class RemoteNodeDataStoreTest { + + @Test + void testRpcSendSuccess() { + val communicatorCalled = new AtomicBoolean(false); + val upstreamCalled = new AtomicBoolean(false); + val communicator = mock(ExecutorCommunicator.class); + val upstream = mock(NodeDataStore.class); + val nds = new RemoteNodeDataStore(ExecutorOptions.DEFAULT, + () -> communicator, + upstream); + when(communicator.send(any())) + .thenAnswer(invocationMock -> { + val message = invocationMock.getArgument(0, Message.class); + communicatorCalled.set(true); + return new MessageResponse(message.getHeader(), MessageDeliveryStatus.ACCEPTED); + }); + doAnswer(invocationOnMock -> { + upstreamCalled.set(true); + return null; + }).when(upstream).updateNodeData(any(NodeData.class)); + val data = generateDummyData(); + nds.updateNodeData(data); + assertTrue(communicatorCalled.get()); + assertFalse(upstreamCalled.get()); + } + + @Test + void testUpstreamSendSuccess() { + val communicatorCalled = new AtomicBoolean(false); + val upstreamCalled = new AtomicBoolean(false); + val communicator = mock(ExecutorCommunicator.class); + val upstream = mock(NodeDataStore.class); + val nds = new RemoteNodeDataStore(ExecutorOptions.DEFAULT + .withRemoteUpdateMode(RemoteUpdateMode.STORE), + () -> communicator, + upstream); + when(communicator.send(any())) + .thenAnswer(invocationMock -> { + val message = invocationMock.getArgument(0, Message.class); + communicatorCalled.set(true); + return new MessageResponse(message.getHeader(), MessageDeliveryStatus.ACCEPTED); + }); + doAnswer(invocationOnMock -> { + upstreamCalled.set(true); + return null; + }).when(upstream).updateNodeData(any(NodeData.class)); + val data = generateDummyData(); + nds.updateNodeData(data); + assertFalse(communicatorCalled.get()); + assertTrue(upstreamCalled.get()); + } + + @Test + void testUpstreamFallbackOnFailure() { + val communicatorCalled = new AtomicBoolean(false); + val upstreamCalled = new AtomicBoolean(false); + val communicator = mock(ExecutorCommunicator.class); + val upstream = mock(NodeDataStore.class); + val nds = new RemoteNodeDataStore(ExecutorOptions.DEFAULT.withRemoteUpdateMode(null), //Default to RPC + () -> communicator, + upstream); + when(communicator.send(any())) + .thenAnswer(invocationMock -> { + val message = invocationMock.getArgument(0, Message.class); + communicatorCalled.set(true); + return new MessageResponse(message.getHeader(), MessageDeliveryStatus.FAILED); + }); + doAnswer(invocationOnMock -> { + upstreamCalled.set(true); + return null; + }).when(upstream).updateNodeData(any(NodeData.class)); + val data = generateDummyData(); + nds.updateNodeData(data); + assertTrue(communicatorCalled.get()); + assertTrue(upstreamCalled.get()); + } + + @Test + void testUpstreamFallbackOnError() { + val communicatorCalled = new AtomicBoolean(false); + val upstreamCalled = new AtomicBoolean(false); + val communicator = mock(ExecutorCommunicator.class); + val upstream = mock(NodeDataStore.class); + val nds = new RemoteNodeDataStore(RemoteUpdateMode.RPC, + () -> communicator, + upstream); + when(communicator.send(any())) + .thenAnswer(invocationMock -> { + communicatorCalled.set(true); + throw new IllegalStateException("Simulated comms error"); + }); + doAnswer(invocationOnMock -> { + upstreamCalled.set(true); + return null; + }).when(upstream).updateNodeData(any(NodeData.class)); + val data = generateDummyData(); + nds.updateNodeData(data); + assertTrue(communicatorCalled.get()); + assertTrue(upstreamCalled.get()); + } + + @Test + void testFailure() { + val communicatorCalled = new AtomicBoolean(false); + val upstreamCalled = new AtomicBoolean(false); + val communicator = mock(ExecutorCommunicator.class); + val upstream = mock(NodeDataStore.class); + val nds = new RemoteNodeDataStore(RemoteUpdateMode.RPC, + () -> communicator, + upstream); + when(communicator.send(any())) + .thenAnswer(invocationMock -> { + communicatorCalled.set(true); + throw new IllegalStateException("Simulated comms error"); + }); + doAnswer(invocationOnMock -> { + upstreamCalled.set(true); + throw new IllegalStateException("Update failure"); + }).when(upstream).updateNodeData(any(NodeData.class)); + val data = generateDummyData(); + nds.updateNodeData(data); + assertTrue(communicatorCalled.get()); + assertTrue(upstreamCalled.get()); + } + + @Test + void testIllegalOps() { + val communicator = mock(ExecutorCommunicator.class); + val upstream = mock(NodeDataStore.class); + val nds = new RemoteNodeDataStore(RemoteUpdateMode.RPC, + () -> communicator, + upstream); + assertThrows(IllegalArgumentException.class, + () -> nds.updateNodeData(new ControllerNodeData(null, 0, null, null, false))); + assertThrows(UnsupportedOperationException.class, + () -> nds.removeNodeData(null)); + } + + @Test + void testNodePassthrough() { + val nodesCalled = new AtomicBoolean(false); + val communicator = mock(ExecutorCommunicator.class); + val upstream = mock(NodeDataStore.class); + val nds = new RemoteNodeDataStore(RemoteUpdateMode.RPC, + () -> communicator, + upstream); + when(upstream.nodes(any())) + .thenAnswer(invocationOnMock -> { + nodesCalled.set(true); + return List.of(); + }); + assertTrue(nds.nodes(NodeType.CONTROLLER).isEmpty()); + assertTrue(nodesCalled.get()); + } + + private static ExecutorNodeData generateDummyData() { + return new ExecutorNodeData("localhost", + 8080, + NodeTransportType.HTTP, + new Date(), + null, + List.of(), + List.of(), + List.of(), + Set.of(), + Map.of(), + ExecutorState.ACTIVE); + } +} \ No newline at end of file