Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.fs.s3;

import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.ConfigBuilder;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.fs.FileSystem;
Expand Down Expand Up @@ -44,6 +45,9 @@ public class S3FileSystemPlugin implements FileSystemPlugin {
private static final String HADOOP_CONFIG_PREFIX = "fs.s3a.";

private static final String ACCESS_KEY_ID = "fs.s3a.access.key";
private static final String ACCESS_KEY_SECRET = "fs.s3a.secret.key";

private static final String ROLE_ARN_KEY = "fs.s3a.assumed.role.arn";

private static final String[][] MIRRORED_CONFIG_KEYS = {
{"fs.s3a.access-key", "fs.s3a.access.key"},
Expand All @@ -58,18 +62,22 @@ public String getScheme() {

@Override
public FileSystem create(URI fsUri, Configuration flussConfig) throws IOException {
org.apache.hadoop.conf.Configuration hadoopConfig =
mirrorCertainHadoopConfig(getHadoopConfiguration(flussConfig));

// set credential provider
setCredentialProvider(hadoopConfig);
org.apache.hadoop.conf.Configuration hadoopConfig = buildHadoopConfiguration(flussConfig);

// create the Hadoop FileSystem
org.apache.hadoop.fs.FileSystem fs = new S3AFileSystem();
fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
return new S3FileSystem(getScheme(), fs, hadoopConfig);
}

@VisibleForTesting
org.apache.hadoop.conf.Configuration buildHadoopConfiguration(Configuration flussConfig) {
org.apache.hadoop.conf.Configuration hadoopConfig =
mirrorCertainHadoopConfig(getHadoopConfiguration(flussConfig));
setCredentialProvider(hadoopConfig);
return hadoopConfig;
}

org.apache.hadoop.conf.Configuration getHadoopConfiguration(Configuration flussConfig) {
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
if (flussConfig == null) {
Expand Down Expand Up @@ -122,7 +130,17 @@ private URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopCon
}

private void setCredentialProvider(org.apache.hadoop.conf.Configuration hadoopConfig) {
if (hadoopConfig.get(ACCESS_KEY_ID) == null) {
boolean hasStaticKeys =
hadoopConfig.get(ACCESS_KEY_ID) != null
&& hadoopConfig.get(ACCESS_KEY_SECRET) != null;
boolean hasRoleArn = hadoopConfig.get(ROLE_ARN_KEY) != null;

if (hasStaticKeys || hasRoleArn) {
LOG.info(
hasStaticKeys
? "Using provided static credentials."
: "Using default AWS credential chain with AssumeRole.");
} else {
if (Objects.equals(getScheme(), "s3")) {
S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfig);
} else if (Objects.equals(getScheme(), "s3a")) {
Expand All @@ -131,11 +149,8 @@ private void setCredentialProvider(org.apache.hadoop.conf.Configuration hadoopCo
throw new IllegalArgumentException("Unsupported scheme: " + getScheme());
}
LOG.info(
"{} is not set, using credential provider {}.",
ACCESS_KEY_ID,
"Using credential provider {} for delegated tokens.",
hadoopConfig.get(PROVIDER_CONFIG_NAME));
} else {
LOG.info("{} is set, using provided access key id and secret.", ACCESS_KEY_ID);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import java.util.Map;
import java.util.UUID;

import static org.apache.fluss.utils.Preconditions.checkNotNull;
import static org.apache.fluss.utils.Preconditions.checkArgument;

/** Delegation token provider for S3 Hadoop filesystems. */
public class S3DelegationTokenProvider {
Expand All @@ -58,20 +58,30 @@ public class S3DelegationTokenProvider {

private final String scheme;
private final String region;
private final String accessKey;
private final String secretKey;
@Nullable private final String accessKey;
@Nullable private final String secretKey;
@Nullable private final String roleArn;
@Nullable private final String stsEndpoint;
private final Map<String, String> additionInfos;

public S3DelegationTokenProvider(String scheme, Configuration conf) {
this.scheme = scheme;
this.region = conf.get(REGION_KEY);
checkNotNull(region, "Region is not set.");
checkArgument(region != null, "Region is not set.");
this.accessKey = conf.get(ACCESS_KEY_ID);
this.secretKey = conf.get(ACCESS_KEY_SECRET);
this.roleArn = conf.get(ROLE_ARN_KEY);
this.stsEndpoint = conf.get(STS_ENDPOINT_KEY);

checkArgument(
(accessKey == null) == (secretKey == null),
"S3 access key and secret key must both be set or both be unset.");
if (accessKey == null) {
checkArgument(
roleArn != null,
"Role ARN must be set when static credentials are not provided.");
}

this.additionInfos = new HashMap<>();
for (String key : Arrays.asList(REGION_KEY, ENDPOINT_KEY)) {
if (conf.get(key) != null) {
Expand All @@ -86,10 +96,7 @@ public ObtainedSecurityToken obtainSecurityToken() {
Credentials credentials;

if (roleArn != null) {
LOG.info(
"Obtaining session credentials via AssumeRole with access key: {}, role: {}",
accessKey,
roleArn);
LOG.info("Obtaining session credentials via AssumeRole, role: {}", roleArn);
AssumeRoleRequest request =
new AssumeRoleRequest()
.withRoleArn(roleArn)
Expand Down Expand Up @@ -121,10 +128,13 @@ public ObtainedSecurityToken obtainSecurityToken() {

private AWSSecurityTokenService buildStsClient() {
AWSSecurityTokenServiceClientBuilder builder =
AWSSecurityTokenServiceClientBuilder.standard()
.withCredentials(
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(accessKey, secretKey)));
AWSSecurityTokenServiceClientBuilder.standard();

if (accessKey != null && secretKey != null) {
builder.withCredentials(
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(accessKey, secretKey)));
}

if (stsEndpoint != null) {
builder.withEndpointConfiguration(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.fs.s3;

import org.apache.fluss.config.Configuration;
import org.apache.fluss.fs.s3.token.DynamicTemporaryAWSCredentialsProvider;
import org.apache.fluss.fs.s3.token.S3DelegationTokenReceiver;
import org.apache.fluss.fs.token.Credentials;
import org.apache.fluss.fs.token.CredentialsJsonSerde;
import org.apache.fluss.fs.token.ObtainedSecurityToken;

import org.junit.jupiter.api.Test;

import java.util.Collections;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for server/client detection in {@link S3FileSystemPlugin}. */
class S3FileSystemPluginTest {

private static final String PROVIDER_CONFIG = "fs.s3a.aws.credentials.provider";

@Test
void testServerModeWithStaticKeys() {
Configuration flussConfig = new Configuration();
flussConfig.setString("fs.s3a.access.key", "testAccessKey");
flussConfig.setString("fs.s3a.secret.key", "testSecretKey");

S3FileSystemPlugin plugin = new S3FileSystemPlugin();
org.apache.hadoop.conf.Configuration hadoopConfig =
plugin.buildHadoopConfiguration(flussConfig);

String providers = hadoopConfig.get(PROVIDER_CONFIG, "");
assertThat(providers).doesNotContain(DynamicTemporaryAWSCredentialsProvider.NAME);
}

@Test
void testServerModeWithRoleArnOnly() {
Configuration flussConfig = new Configuration();
flussConfig.setString(
"fs.s3a.assumed.role.arn", "arn:aws:iam::123456789012:role/test-role");

S3FileSystemPlugin plugin = new S3FileSystemPlugin();
org.apache.hadoop.conf.Configuration hadoopConfig =
plugin.buildHadoopConfiguration(flussConfig);

String providers = hadoopConfig.get(PROVIDER_CONFIG, "");
assertThat(providers).doesNotContain(DynamicTemporaryAWSCredentialsProvider.NAME);
}

@Test
void testClientModeWithDelegatedCredentials() {
// Pre-populate receiver so updateHadoopConfig does not throw.
Credentials creds = new Credentials("testKey", "testSecret", "testToken");
ObtainedSecurityToken token =
new ObtainedSecurityToken(
"s3",
CredentialsJsonSerde.toJson(creds),
System.currentTimeMillis() + 3600000,
Collections.singletonMap("fs.s3a.region", "us-east-1"));
S3DelegationTokenReceiver receiver = new S3DelegationTokenReceiver();
receiver.onNewTokensObtained(token);

Configuration flussConfig = new Configuration();

S3FileSystemPlugin plugin = new S3FileSystemPlugin();
org.apache.hadoop.conf.Configuration hadoopConfig =
plugin.buildHadoopConfiguration(flussConfig);

String providers = hadoopConfig.get(PROVIDER_CONFIG, "");
assertThat(providers).contains(DynamicTemporaryAWSCredentialsProvider.NAME);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.fs.s3.token;

import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link S3DelegationTokenProvider} constructor validation. */
class S3DelegationTokenProviderTest {
Comment thread
fresh-borzoni marked this conversation as resolved.

@Test
void testDefaultChainWithRoleArn() {
Configuration conf = new Configuration();
conf.set("fs.s3a.region", "us-east-1");
conf.set("fs.s3a.assumed.role.arn", "arn:aws:iam::123456789012:role/test-role");

assertThatCode(() -> new S3DelegationTokenProvider("s3", conf)).doesNotThrowAnyException();
}

@Test
void testDefaultChainWithoutRoleArnThrows() {
Configuration conf = new Configuration();
conf.set("fs.s3a.region", "us-east-1");

assertThatThrownBy(() -> new S3DelegationTokenProvider("s3", conf))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Role ARN must be set");
}

@Test
void testPartialStaticCredentialsThrows() {
Configuration conf = new Configuration();
conf.set("fs.s3a.region", "us-east-1");
conf.set("fs.s3a.access.key", "testAccessKey");

assertThatThrownBy(() -> new S3DelegationTokenProvider("s3", conf))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("must both be set or both be unset");
}
}
20 changes: 20 additions & 0 deletions website/docs/maintenance/filesystems/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,23 @@ s3.assumed.role.sts.endpoint: http://<your-s3-compatible-endpoint>:9000
:::note
Without `s3.assumed.role.arn`, Fluss falls back to `GetSessionToken` (the default AWS behavior). This is fully backward compatible — existing AWS users do not need to change their configuration.
:::

### Default AWS Credential Chain (IRSA, Instance Profiles)

When running Fluss on Kubernetes with [IAM Roles for Service Accounts (IRSA)](https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html) or on EC2 with instance profiles, you can omit `s3.access-key` and `s3.secret-key`. The server will authenticate using the default AWS credential chain.

In this mode, `s3.assumed.role.arn` is required — the server uses `AssumeRole` to generate temporary credentials for clients (Flink/Spark connectors) that read tiered data from S3.

```yaml
remote.data.dir: s3://<your-bucket>/path/to/remote/storage
s3.region: <your-s3-region>
s3.assumed.role.arn: <your-delegation-role-arn>
```

The server's IAM role (e.g., the IRSA service account role) must have:
- Read/write permissions on the S3 bucket (for the server's own data access)
- `sts:AssumeRole` permission on the role specified in `s3.assumed.role.arn`

:::note
The server authenticates using its own credentials (static keys, IRSA, instance profile, or environment variables) for S3 data access. For delegation, the server calls AssumeRole with the configured `s3.assumed.role.arn`, so clients receive credentials for that role — which can have different permissions (e.g., read-only). Note that the server uses the same identity for both its own S3 access and the STS AssumeRole call. Further decoupling is planned.
:::