Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.ChannelPoolSettings;
import com.google.api.gax.grpc.GrpcCallSettings;
import com.google.api.gax.grpc.GrpcInterceptorProvider;
import com.google.api.gax.grpc.GrpcStubCallableFactory;
Expand Down Expand Up @@ -122,6 +123,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* @since 2.14.0
Expand All @@ -147,6 +149,7 @@ public final class GrpcStorageOptions extends StorageOptions
private final boolean grpcClientMetricsManuallyEnabled;
private final GrpcInterceptorProvider grpcInterceptorProvider;
private final BlobWriteSessionConfig blobWriteSessionConfig;
@Nullable private final ChannelPoolSettings channelPoolSettings;
private transient OpenTelemetry openTelemetry;

private GrpcStorageOptions(Builder builder, GrpcStorageDefaults serviceDefaults) {
Expand All @@ -164,6 +167,7 @@ private GrpcStorageOptions(Builder builder, GrpcStorageDefaults serviceDefaults)
this.grpcClientMetricsManuallyEnabled = builder.grpcMetricsManuallyEnabled;
this.grpcInterceptorProvider = builder.grpcInterceptorProvider;
this.blobWriteSessionConfig = builder.blobWriteSessionConfig;
this.channelPoolSettings = builder.channelPoolSettings;
this.openTelemetry = builder.openTelemetry;
}

Expand Down Expand Up @@ -192,6 +196,12 @@ GrpcInterceptorProvider getGrpcInterceptorProvider() {
return grpcInterceptorProvider;
}

@Nullable
@InternalApi
ChannelPoolSettings getChannelPoolSettings() {
return channelPoolSettings;
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
this.openTelemetry = HttpStorageOptions.getDefaultInstance().getOpenTelemetry();
Expand Down Expand Up @@ -324,6 +334,10 @@ private Tuple<StorageSettings, Opts<UserProject>> resolveSettingsAndOpts() throw
.setAllowNonDefaultServiceAccount(true)
.setAttemptDirectPath(attemptDirectPath);

if (channelPoolSettings != null) {
channelProviderBuilder.setChannelPoolSettings(channelPoolSettings);
}

if (!DIRECT_PATH_BOUND_TOKEN_DISABLED) {
channelProviderBuilder.setAllowHardBoundTokenTypes(
Collections.singletonList(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.ALTS));
Expand Down Expand Up @@ -431,6 +445,7 @@ public int hashCode() {
enableGrpcClientMetrics,
grpcInterceptorProvider,
blobWriteSessionConfig,
channelPoolSettings,
openTelemetry,
baseHashCode());
}
Expand All @@ -450,6 +465,7 @@ public boolean equals(Object o) {
&& Objects.equals(terminationAwaitDuration, that.terminationAwaitDuration)
&& Objects.equals(grpcInterceptorProvider, that.grpcInterceptorProvider)
&& Objects.equals(blobWriteSessionConfig, that.blobWriteSessionConfig)
&& Objects.equals(channelPoolSettings, that.channelPoolSettings)
&& Objects.equals(openTelemetry, that.openTelemetry)
&& this.baseEquals(that);
}
Expand Down Expand Up @@ -500,6 +516,7 @@ public static final class Builder extends StorageOptions.Builder {
GrpcStorageDefaults.INSTANCE.grpcInterceptorProvider();
private BlobWriteSessionConfig blobWriteSessionConfig =
GrpcStorageDefaults.INSTANCE.getDefaultStorageWriterConfig();
@Nullable private ChannelPoolSettings channelPoolSettings;
private OpenTelemetry openTelemetry = GrpcStorageDefaults.INSTANCE.getDefaultOpenTelemetry();

private boolean grpcMetricsManuallyEnabled = false;
Expand All @@ -508,14 +525,17 @@ public static final class Builder extends StorageOptions.Builder {

Builder(StorageOptions options) {
super(options);
GrpcStorageOptions gso = (GrpcStorageOptions) options;
this.storageRetryStrategy = gso.getRetryAlgorithmManager().retryStrategy;
this.terminationAwaitDuration = gso.getTerminationAwaitDuration();
this.attemptDirectPath = gso.attemptDirectPath;
this.enableGrpcClientMetrics = gso.enableGrpcClientMetrics;
this.grpcInterceptorProvider = gso.grpcInterceptorProvider;
this.blobWriteSessionConfig = gso.blobWriteSessionConfig;
this.openTelemetry = gso.openTelemetry;
if (options instanceof GrpcStorageOptions) {
GrpcStorageOptions gso = (GrpcStorageOptions) options;
this.storageRetryStrategy = gso.getRetryAlgorithmManager().retryStrategy;
this.terminationAwaitDuration = gso.getTerminationAwaitDuration();
this.attemptDirectPath = gso.attemptDirectPath;
this.enableGrpcClientMetrics = gso.enableGrpcClientMetrics;
this.grpcInterceptorProvider = gso.grpcInterceptorProvider;
this.blobWriteSessionConfig = gso.blobWriteSessionConfig;
this.channelPoolSettings = gso.channelPoolSettings;
this.openTelemetry = gso.openTelemetry;
}
}

/**
Expand Down Expand Up @@ -723,6 +743,17 @@ public GrpcStorageOptions.Builder setBlobWriteSessionConfig(
return this;
}

/**
* @see InstantiatingGrpcChannelProvider.Builder#setChannelPoolSettings(ChannelPoolSettings)
* @since 2.48.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public GrpcStorageOptions.Builder setChannelPoolSettings(
@NonNull ChannelPoolSettings channelPoolSettings) {
this.channelPoolSettings = channelPoolSettings;
return this;
}

@BetaApi
@Override
public GrpcStorageOptions.Builder setUniverseDomain(String universeDomain) {
Expand Down
Loading