diff --git a/drove-executor/src/main/java/com/phonepe/drove/executor/ExecutorCoreModule.java b/drove-executor/src/main/java/com/phonepe/drove/executor/ExecutorCoreModule.java index c57ae47b..9200c363 100644 --- a/drove-executor/src/main/java/com/phonepe/drove/executor/ExecutorCoreModule.java +++ b/drove-executor/src/main/java/com/phonepe/drove/executor/ExecutorCoreModule.java @@ -23,7 +23,10 @@ import com.github.dockerjava.core.DockerClientImpl; import com.github.dockerjava.zerodep.ZerodepDockerHttpClient; import com.google.common.base.Strings; -import com.google.inject.*; +import com.google.inject.AbstractModule; +import com.google.inject.Injector; +import com.google.inject.Provides; +import com.google.inject.TypeLiteral; import com.google.inject.name.Names; import com.phonepe.drove.auth.config.ClusterAuthenticationConfig; import com.phonepe.drove.common.CommonUtils; @@ -31,9 +34,9 @@ import com.phonepe.drove.common.discovery.ZkNodeDataStore; import com.phonepe.drove.common.model.ControllerMessageType; import com.phonepe.drove.common.model.controller.ControllerMessage; -import com.phonepe.drove.common.model.utils.Pair; import com.phonepe.drove.common.net.MessageSender; import com.phonepe.drove.common.zookeeper.ZkConfig; +import com.phonepe.drove.executor.discovery.RemoteNodeDataStore; import com.phonepe.drove.executor.dockerauth.DockerAuthConfig; import com.phonepe.drove.executor.engine.ApplicationInstanceEngine; import com.phonepe.drove.executor.engine.LocalServiceInstanceEngine; @@ -60,7 +63,6 @@ import javax.inject.Named; import javax.inject.Singleton; import java.net.URI; -import java.util.Map; import java.util.Objects; import java.util.concurrent.SynchronousQueue; @@ -72,7 +74,10 @@ public class ExecutorCoreModule extends AbstractModule { @Override protected void configure() { - bind(NodeDataStore.class).to(ZkNodeDataStore.class); + bind(NodeDataStore.class).to(RemoteNodeDataStore.class); + bind(NodeDataStore.class) + .annotatedWith(Names.named("PersistentNodeDataStore")) + .to(ZkNodeDataStore.class); bind(new TypeLiteral>() { }) .to(RemoteControllerMessageSender.class); @@ -180,7 +185,8 @@ public ResourceConfig resourceConfig(final AppConfig appConfig) { } @Provides - @Singleton MetadataConfig metadataConfig(final ResourceConfig resourceConfig) { + @Singleton + MetadataConfig metadataConfig(final ResourceConfig resourceConfig) { return Objects.requireNonNullElse(resourceConfig.getMetadata(), MetadataConfig.DEFAULT); } @@ -198,8 +204,8 @@ public DockerClient dockerClient(final ExecutorOptions executorOptions) { ExecutorOptions.DEFAULT_CONTAINER_COMMAND_TIMEOUT) .toMilliseconds()); val dockerSocketPath = Strings.isNullOrEmpty(executorOptions.getDockerSocketPath()) - ? ExecutorOptions.DEFAULT_DOCKER_SOCKET_PATH - : executorOptions.getDockerSocketPath(); + ? ExecutorOptions.DEFAULT_DOCKER_SOCKET_PATH + : executorOptions.getDockerSocketPath(); log.info("Using docker path: {}", dockerSocketPath); return DockerClientImpl.getInstance( DefaultDockerClientConfig.createDefaultConfigBuilder() diff --git a/drove-executor/src/main/java/com/phonepe/drove/executor/ExecutorOptions.java b/drove-executor/src/main/java/com/phonepe/drove/executor/ExecutorOptions.java index c0ddb5d1..ab2f59a6 100644 --- a/drove-executor/src/main/java/com/phonepe/drove/executor/ExecutorOptions.java +++ b/drove-executor/src/main/java/com/phonepe/drove/executor/ExecutorOptions.java @@ -16,6 +16,7 @@ package com.phonepe.drove.executor; +import com.phonepe.drove.executor.discovery.RemoteUpdateMode; import com.phonepe.drove.models.application.nonroot.UserSpec; import io.dropwizard.util.DataSize; import io.dropwizard.util.DataSizeUnit; @@ -49,6 +50,7 @@ public class ExecutorOptions { public static final Duration DEFAULT_CONTAINER_COMMAND_TIMEOUT = Duration.seconds(30); @SuppressWarnings("java:S1075") public static final String DEFAULT_DOCKER_SOCKET_PATH = "/var/run/docker.sock"; + public static final RemoteUpdateMode DEFAULT_UPDATE_MODE = RemoteUpdateMode.RPC; public static final ExecutorOptions DEFAULT = new ExecutorOptions(null, true, @@ -58,7 +60,8 @@ public class ExecutorOptions { DEFAULT_LOG_CACHE_COUNT, DEFAULT_CONTAINER_COMMAND_TIMEOUT, DEFAULT_DOCKER_SOCKET_PATH, - null); + null, + DEFAULT_UPDATE_MODE); @Length(max = 255) String hostname; @@ -84,4 +87,5 @@ public class ExecutorOptions { UserSpec defaultUserSpec; + RemoteUpdateMode remoteUpdateMode; } diff --git a/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStore.java b/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStore.java new file mode 100644 index 00000000..9cbfe786 --- /dev/null +++ b/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStore.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2025 Original Author(s), PhonePe India Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.drove.executor.discovery; + +import com.google.common.annotations.VisibleForTesting; +import com.phonepe.drove.common.discovery.NodeDataStore; +import com.phonepe.drove.common.model.MessageDeliveryStatus; +import com.phonepe.drove.common.model.MessageHeader; +import com.phonepe.drove.common.model.controller.ExecutorSnapshotMessage; +import com.phonepe.drove.executor.ExecutorOptions; +import com.phonepe.drove.executor.engine.ExecutorCommunicator; +import com.phonepe.drove.models.info.nodedata.*; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import lombok.val; + +import javax.inject.Inject; +import javax.inject.Named; +import javax.inject.Provider; +import java.util.List; +import java.util.Objects; + +/** + * This node data store is to be used by executors to send data to controller only. It is not supposed to + * store any data locally. Depending on config, it might route to either ZK or HTTP calls. In case HTTP call + * fails, it will route the call to ZK. + */ +@Slf4j +@AllArgsConstructor(access = AccessLevel.PROTECTED, onConstructor_ = {@VisibleForTesting}) +public class RemoteNodeDataStore implements NodeDataStore { + private final RemoteUpdateMode remoteUpdateMode; + private final Provider communicator; + private final NodeDataStore remoteStore; + + @Inject + public RemoteNodeDataStore( + ExecutorOptions executorOptions, + Provider communicator, + @Named("PersistentNodeDataStore") NodeDataStore remoteStore) { + this(Objects.requireNonNullElse(executorOptions.getRemoteUpdateMode(), + ExecutorOptions.DEFAULT_UPDATE_MODE), + communicator, + remoteStore); + } + + @Override + public void updateNodeData(NodeData nodeData) { + val status = nodeData.accept(new NodeDataVisitor() { + @Override + public Boolean visit(ControllerNodeData controllerData) { + throw new IllegalArgumentException("Invalid data. Why is executor sending controller data here?"); + } + + @Override + public Boolean visit(ExecutorNodeData executorData) { + try { + switch (remoteUpdateMode) { + case STORE -> remoteStore.updateNodeData(executorData); + case RPC -> { + var storeUpdateNeeded = false; + try { + val response = + communicator.get() + .send(new ExecutorSnapshotMessage(MessageHeader.controllerRequest(), + executorData)); + storeUpdateNeeded = !response.getStatus().equals(MessageDeliveryStatus.ACCEPTED); + } + catch (Exception e) { + log.error("RPC based update failed due to error: ", e); + storeUpdateNeeded = true; + } + if (storeUpdateNeeded) { + log.warn("RPC based update failed. Reverting to store based state update."); + remoteStore.updateNodeData(executorData); + } + } + } + return true; + } + catch (Throwable t) { + log.error("Could not update node state: {}", t.getMessage(), t); + } + return false; + } + }); + log.debug("Remote state update status: {}", status); + } + + @Override + public List nodes(NodeType nodeType) { + return remoteStore.nodes(nodeType); + } + + @Override + public void removeNodeData(NodeData nodeData) { + throw new UnsupportedOperationException("Not supported in remote mode"); + } +} diff --git a/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteUpdateMode.java b/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteUpdateMode.java new file mode 100644 index 00000000..3b486c33 --- /dev/null +++ b/drove-executor/src/main/java/com/phonepe/drove/executor/discovery/RemoteUpdateMode.java @@ -0,0 +1,15 @@ +package com.phonepe.drove.executor.discovery; + +/** + * Represents how executor will send updates to controller + */ +public enum RemoteUpdateMode { + /** + * Direct store update + */ + STORE, + /** + * Send update over HTTP(S), in case of failure, send over store + */ + RPC +} diff --git a/drove-executor/src/test/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStoreTest.java b/drove-executor/src/test/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStoreTest.java new file mode 100644 index 00000000..f3ce4ba5 --- /dev/null +++ b/drove-executor/src/test/java/com/phonepe/drove/executor/discovery/RemoteNodeDataStoreTest.java @@ -0,0 +1,211 @@ +/* + * Copyright (c) 2025 Original Author(s), PhonePe India Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.drove.executor.discovery; + +import com.phonepe.drove.common.discovery.NodeDataStore; +import com.phonepe.drove.common.model.Message; +import com.phonepe.drove.common.model.MessageDeliveryStatus; +import com.phonepe.drove.common.model.MessageResponse; +import com.phonepe.drove.executor.ExecutorOptions; +import com.phonepe.drove.executor.engine.ExecutorCommunicator; +import com.phonepe.drove.models.info.nodedata.*; +import lombok.val; +import org.junit.jupiter.api.Test; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +/** + * Tests {@link RemoteNodeDataStore} + */ +class RemoteNodeDataStoreTest { + + @Test + void testRpcSendSuccess() { + val communicatorCalled = new AtomicBoolean(false); + val upstreamCalled = new AtomicBoolean(false); + val communicator = mock(ExecutorCommunicator.class); + val upstream = mock(NodeDataStore.class); + val nds = new RemoteNodeDataStore(ExecutorOptions.DEFAULT, + () -> communicator, + upstream); + when(communicator.send(any())) + .thenAnswer(invocationMock -> { + val message = invocationMock.getArgument(0, Message.class); + communicatorCalled.set(true); + return new MessageResponse(message.getHeader(), MessageDeliveryStatus.ACCEPTED); + }); + doAnswer(invocationOnMock -> { + upstreamCalled.set(true); + return null; + }).when(upstream).updateNodeData(any(NodeData.class)); + val data = generateDummyData(); + nds.updateNodeData(data); + assertTrue(communicatorCalled.get()); + assertFalse(upstreamCalled.get()); + } + + @Test + void testUpstreamSendSuccess() { + val communicatorCalled = new AtomicBoolean(false); + val upstreamCalled = new AtomicBoolean(false); + val communicator = mock(ExecutorCommunicator.class); + val upstream = mock(NodeDataStore.class); + val nds = new RemoteNodeDataStore(ExecutorOptions.DEFAULT + .withRemoteUpdateMode(RemoteUpdateMode.STORE), + () -> communicator, + upstream); + when(communicator.send(any())) + .thenAnswer(invocationMock -> { + val message = invocationMock.getArgument(0, Message.class); + communicatorCalled.set(true); + return new MessageResponse(message.getHeader(), MessageDeliveryStatus.ACCEPTED); + }); + doAnswer(invocationOnMock -> { + upstreamCalled.set(true); + return null; + }).when(upstream).updateNodeData(any(NodeData.class)); + val data = generateDummyData(); + nds.updateNodeData(data); + assertFalse(communicatorCalled.get()); + assertTrue(upstreamCalled.get()); + } + + @Test + void testUpstreamFallbackOnFailure() { + val communicatorCalled = new AtomicBoolean(false); + val upstreamCalled = new AtomicBoolean(false); + val communicator = mock(ExecutorCommunicator.class); + val upstream = mock(NodeDataStore.class); + val nds = new RemoteNodeDataStore(ExecutorOptions.DEFAULT.withRemoteUpdateMode(null), //Default to RPC + () -> communicator, + upstream); + when(communicator.send(any())) + .thenAnswer(invocationMock -> { + val message = invocationMock.getArgument(0, Message.class); + communicatorCalled.set(true); + return new MessageResponse(message.getHeader(), MessageDeliveryStatus.FAILED); + }); + doAnswer(invocationOnMock -> { + upstreamCalled.set(true); + return null; + }).when(upstream).updateNodeData(any(NodeData.class)); + val data = generateDummyData(); + nds.updateNodeData(data); + assertTrue(communicatorCalled.get()); + assertTrue(upstreamCalled.get()); + } + + @Test + void testUpstreamFallbackOnError() { + val communicatorCalled = new AtomicBoolean(false); + val upstreamCalled = new AtomicBoolean(false); + val communicator = mock(ExecutorCommunicator.class); + val upstream = mock(NodeDataStore.class); + val nds = new RemoteNodeDataStore(RemoteUpdateMode.RPC, + () -> communicator, + upstream); + when(communicator.send(any())) + .thenAnswer(invocationMock -> { + communicatorCalled.set(true); + throw new IllegalStateException("Simulated comms error"); + }); + doAnswer(invocationOnMock -> { + upstreamCalled.set(true); + return null; + }).when(upstream).updateNodeData(any(NodeData.class)); + val data = generateDummyData(); + nds.updateNodeData(data); + assertTrue(communicatorCalled.get()); + assertTrue(upstreamCalled.get()); + } + + @Test + void testFailure() { + val communicatorCalled = new AtomicBoolean(false); + val upstreamCalled = new AtomicBoolean(false); + val communicator = mock(ExecutorCommunicator.class); + val upstream = mock(NodeDataStore.class); + val nds = new RemoteNodeDataStore(RemoteUpdateMode.RPC, + () -> communicator, + upstream); + when(communicator.send(any())) + .thenAnswer(invocationMock -> { + communicatorCalled.set(true); + throw new IllegalStateException("Simulated comms error"); + }); + doAnswer(invocationOnMock -> { + upstreamCalled.set(true); + throw new IllegalStateException("Update failure"); + }).when(upstream).updateNodeData(any(NodeData.class)); + val data = generateDummyData(); + nds.updateNodeData(data); + assertTrue(communicatorCalled.get()); + assertTrue(upstreamCalled.get()); + } + + @Test + void testIllegalOps() { + val communicator = mock(ExecutorCommunicator.class); + val upstream = mock(NodeDataStore.class); + val nds = new RemoteNodeDataStore(RemoteUpdateMode.RPC, + () -> communicator, + upstream); + assertThrows(IllegalArgumentException.class, + () -> nds.updateNodeData(new ControllerNodeData(null, 0, null, null, false))); + assertThrows(UnsupportedOperationException.class, + () -> nds.removeNodeData(null)); + } + + @Test + void testNodePassthrough() { + val nodesCalled = new AtomicBoolean(false); + val communicator = mock(ExecutorCommunicator.class); + val upstream = mock(NodeDataStore.class); + val nds = new RemoteNodeDataStore(RemoteUpdateMode.RPC, + () -> communicator, + upstream); + when(upstream.nodes(any())) + .thenAnswer(invocationOnMock -> { + nodesCalled.set(true); + return List.of(); + }); + assertTrue(nds.nodes(NodeType.CONTROLLER).isEmpty()); + assertTrue(nodesCalled.get()); + } + + private static ExecutorNodeData generateDummyData() { + return new ExecutorNodeData("localhost", + 8080, + NodeTransportType.HTTP, + new Date(), + null, + List.of(), + List.of(), + List.of(), + Set.of(), + Map.of(), + ExecutorState.ACTIVE); + } +} \ No newline at end of file