Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@
import com.github.dockerjava.core.DockerClientImpl;
import com.github.dockerjava.zerodep.ZerodepDockerHttpClient;
import com.google.common.base.Strings;
import com.google.inject.*;
import com.google.inject.AbstractModule;
import com.google.inject.Injector;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Names;
import com.phonepe.drove.auth.config.ClusterAuthenticationConfig;
import com.phonepe.drove.common.CommonUtils;
import com.phonepe.drove.common.discovery.NodeDataStore;
import com.phonepe.drove.common.discovery.ZkNodeDataStore;
import com.phonepe.drove.common.model.ControllerMessageType;
import com.phonepe.drove.common.model.controller.ControllerMessage;
import com.phonepe.drove.common.model.utils.Pair;
import com.phonepe.drove.common.net.MessageSender;
import com.phonepe.drove.common.zookeeper.ZkConfig;
import com.phonepe.drove.executor.discovery.RemoteNodeDataStore;
import com.phonepe.drove.executor.dockerauth.DockerAuthConfig;
import com.phonepe.drove.executor.engine.ApplicationInstanceEngine;
import com.phonepe.drove.executor.engine.LocalServiceInstanceEngine;
Expand All @@ -60,7 +63,6 @@
import javax.inject.Named;
import javax.inject.Singleton;
import java.net.URI;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.SynchronousQueue;

Expand All @@ -72,7 +74,10 @@ public class ExecutorCoreModule extends AbstractModule {

@Override
protected void configure() {
bind(NodeDataStore.class).to(ZkNodeDataStore.class);
bind(NodeDataStore.class).to(RemoteNodeDataStore.class);
bind(NodeDataStore.class)
.annotatedWith(Names.named("PersistentNodeDataStore"))
.to(ZkNodeDataStore.class);
bind(new TypeLiteral<MessageSender<ControllerMessageType, ControllerMessage>>() {
})
.to(RemoteControllerMessageSender.class);
Expand Down Expand Up @@ -180,7 +185,8 @@ public ResourceConfig resourceConfig(final AppConfig appConfig) {
}

@Provides
@Singleton MetadataConfig metadataConfig(final ResourceConfig resourceConfig) {
@Singleton
MetadataConfig metadataConfig(final ResourceConfig resourceConfig) {
return Objects.requireNonNullElse(resourceConfig.getMetadata(), MetadataConfig.DEFAULT);
}

Expand All @@ -198,8 +204,8 @@ public DockerClient dockerClient(final ExecutorOptions executorOptions) {
ExecutorOptions.DEFAULT_CONTAINER_COMMAND_TIMEOUT)
.toMilliseconds());
val dockerSocketPath = Strings.isNullOrEmpty(executorOptions.getDockerSocketPath())
? ExecutorOptions.DEFAULT_DOCKER_SOCKET_PATH
: executorOptions.getDockerSocketPath();
? ExecutorOptions.DEFAULT_DOCKER_SOCKET_PATH
: executorOptions.getDockerSocketPath();
log.info("Using docker path: {}", dockerSocketPath);
return DockerClientImpl.getInstance(
DefaultDockerClientConfig.createDefaultConfigBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.phonepe.drove.executor;

import com.phonepe.drove.executor.discovery.RemoteUpdateMode;
import com.phonepe.drove.models.application.nonroot.UserSpec;
import io.dropwizard.util.DataSize;
import io.dropwizard.util.DataSizeUnit;
Expand Down Expand Up @@ -49,6 +50,7 @@ public class ExecutorOptions {
public static final Duration DEFAULT_CONTAINER_COMMAND_TIMEOUT = Duration.seconds(30);
@SuppressWarnings("java:S1075")
public static final String DEFAULT_DOCKER_SOCKET_PATH = "/var/run/docker.sock";
public static final RemoteUpdateMode DEFAULT_UPDATE_MODE = RemoteUpdateMode.RPC;

public static final ExecutorOptions DEFAULT = new ExecutorOptions(null,
true,
Expand All @@ -58,7 +60,8 @@ public class ExecutorOptions {
DEFAULT_LOG_CACHE_COUNT,
DEFAULT_CONTAINER_COMMAND_TIMEOUT,
DEFAULT_DOCKER_SOCKET_PATH,
null);
null,
DEFAULT_UPDATE_MODE);

@Length(max = 255)
String hostname;
Expand All @@ -84,4 +87,5 @@ public class ExecutorOptions {

UserSpec defaultUserSpec;

RemoteUpdateMode remoteUpdateMode;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (c) 2025 Original Author(s), PhonePe India Pvt. Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.phonepe.drove.executor.discovery;

import com.google.common.annotations.VisibleForTesting;
import com.phonepe.drove.common.discovery.NodeDataStore;
import com.phonepe.drove.common.model.MessageDeliveryStatus;
import com.phonepe.drove.common.model.MessageHeader;
import com.phonepe.drove.common.model.controller.ExecutorSnapshotMessage;
import com.phonepe.drove.executor.ExecutorOptions;
import com.phonepe.drove.executor.engine.ExecutorCommunicator;
import com.phonepe.drove.models.info.nodedata.*;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Provider;
import java.util.List;
import java.util.Objects;

/**
* This node data store is to be used by executors to send data to controller only. It is not supposed to
* store any data locally. Depending on config, it might route to either ZK or HTTP calls. In case HTTP call
* fails, it will route the call to ZK.
*/
@Slf4j
@AllArgsConstructor(access = AccessLevel.PROTECTED, onConstructor_ = {@VisibleForTesting})
public class RemoteNodeDataStore implements NodeDataStore {
private final RemoteUpdateMode remoteUpdateMode;
private final Provider<ExecutorCommunicator> communicator;
private final NodeDataStore remoteStore;

@Inject
public RemoteNodeDataStore(
ExecutorOptions executorOptions,
Provider<ExecutorCommunicator> communicator,
@Named("PersistentNodeDataStore") NodeDataStore remoteStore) {
this(Objects.requireNonNullElse(executorOptions.getRemoteUpdateMode(),
ExecutorOptions.DEFAULT_UPDATE_MODE),
communicator,
remoteStore);
}

@Override
public void updateNodeData(NodeData nodeData) {
val status = nodeData.accept(new NodeDataVisitor<Boolean>() {
@Override
public Boolean visit(ControllerNodeData controllerData) {
throw new IllegalArgumentException("Invalid data. Why is executor sending controller data here?");
}

@Override
public Boolean visit(ExecutorNodeData executorData) {
try {
switch (remoteUpdateMode) {
case STORE -> remoteStore.updateNodeData(executorData);
case RPC -> {
var storeUpdateNeeded = false;
try {
val response =
communicator.get()
.send(new ExecutorSnapshotMessage(MessageHeader.controllerRequest(),
executorData));
storeUpdateNeeded = !response.getStatus().equals(MessageDeliveryStatus.ACCEPTED);
}
catch (Exception e) {
log.error("RPC based update failed due to error: ", e);
storeUpdateNeeded = true;
}
if (storeUpdateNeeded) {
log.warn("RPC based update failed. Reverting to store based state update.");
remoteStore.updateNodeData(executorData);
}
}
}
return true;
}
catch (Throwable t) {
log.error("Could not update node state: {}", t.getMessage(), t);
}
return false;
}
});
log.debug("Remote state update status: {}", status);
}

@Override
public List<NodeData> nodes(NodeType nodeType) {
return remoteStore.nodes(nodeType);
}

@Override
public void removeNodeData(NodeData nodeData) {
throw new UnsupportedOperationException("Not supported in remote mode");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.phonepe.drove.executor.discovery;

/**
* Represents how executor will send updates to controller
*/
public enum RemoteUpdateMode {
/**
* Direct store update
*/
STORE,
/**
* Send update over HTTP(S), in case of failure, send over store
*/
RPC
}
Loading