Skip to content

Commit c552dc8

Browse files
committed
address copilot's comments
1 parent 7a65c38 commit c552dc8

4 files changed

Lines changed: 99 additions & 6 deletions

File tree

fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystemPlugin.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.fs.s3;
1919

20+
import org.apache.fluss.annotation.VisibleForTesting;
2021
import org.apache.fluss.config.ConfigBuilder;
2122
import org.apache.fluss.config.Configuration;
2223
import org.apache.fluss.fs.FileSystem;
@@ -44,6 +45,7 @@ public class S3FileSystemPlugin implements FileSystemPlugin {
4445
private static final String HADOOP_CONFIG_PREFIX = "fs.s3a.";
4546

4647
private static final String ACCESS_KEY_ID = "fs.s3a.access.key";
48+
private static final String ACCESS_KEY_SECRET = "fs.s3a.secret.key";
4749

4850
private static final String ROLE_ARN_KEY = "fs.s3a.assumed.role.arn";
4951

@@ -123,8 +125,11 @@ private URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopCon
123125
return fsUri;
124126
}
125127

126-
private void setCredentialProvider(org.apache.hadoop.conf.Configuration hadoopConfig) {
127-
boolean hasStaticKeys = hadoopConfig.get(ACCESS_KEY_ID) != null;
128+
@VisibleForTesting
129+
void setCredentialProvider(org.apache.hadoop.conf.Configuration hadoopConfig) {
130+
boolean hasStaticKeys =
131+
hadoopConfig.get(ACCESS_KEY_ID) != null
132+
&& hadoopConfig.get(ACCESS_KEY_SECRET) != null;
128133
boolean hasRoleArn = hadoopConfig.get(ROLE_ARN_KEY) != null;
129134

130135
if (hasStaticKeys || hasRoleArn) {

fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.Map;
4141
import java.util.UUID;
4242

43+
import static org.apache.fluss.utils.Preconditions.checkArgument;
4344
import static org.apache.fluss.utils.Preconditions.checkNotNull;
4445

4546
/** Delegation token provider for S3 Hadoop filesystems. */
@@ -73,9 +74,12 @@ public S3DelegationTokenProvider(String scheme, Configuration conf) {
7374
this.roleArn = conf.get(ROLE_ARN_KEY);
7475
this.stsEndpoint = conf.get(STS_ENDPOINT_KEY);
7576

76-
if (accessKey == null || secretKey == null) {
77-
checkNotNull(
78-
roleArn,
77+
checkArgument(
78+
(accessKey == null) == (secretKey == null),
79+
"fs.s3a.access.key and fs.s3a.secret.key must both be set or both be unset.");
80+
if (accessKey == null) {
81+
checkArgument(
82+
roleArn != null,
7983
ROLE_ARN_KEY + " must be set when static credentials are not provided.");
8084
}
8185

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.fs.s3;
19+
20+
import org.apache.fluss.fs.s3.token.DynamicTemporaryAWSCredentialsProvider;
21+
import org.apache.fluss.fs.s3.token.S3DelegationTokenReceiver;
22+
import org.apache.fluss.fs.token.Credentials;
23+
import org.apache.fluss.fs.token.CredentialsJsonSerde;
24+
import org.apache.fluss.fs.token.ObtainedSecurityToken;
25+
26+
import org.apache.hadoop.conf.Configuration;
27+
import org.junit.jupiter.api.Test;
28+
29+
import java.util.Collections;
30+
31+
import static org.assertj.core.api.Assertions.assertThat;
32+
33+
/** Tests for server/client detection in {@link S3FileSystemPlugin}. */
34+
class S3FileSystemPluginTest {
35+
36+
private static final String PROVIDER_CONFIG = "fs.s3a.aws.credentials.provider";
37+
38+
@Test
39+
void testServerModeWithStaticKeys() {
40+
Configuration hadoopConfig = new Configuration();
41+
hadoopConfig.set("fs.s3a.access.key", "testAccessKey");
42+
hadoopConfig.set("fs.s3a.secret.key", "testSecretKey");
43+
44+
S3FileSystemPlugin plugin = new S3FileSystemPlugin();
45+
plugin.setCredentialProvider(hadoopConfig);
46+
47+
String providers = hadoopConfig.get(PROVIDER_CONFIG, "");
48+
assertThat(providers).doesNotContain(DynamicTemporaryAWSCredentialsProvider.NAME);
49+
}
50+
51+
@Test
52+
void testServerModeWithRoleArnOnly() {
53+
Configuration hadoopConfig = new Configuration();
54+
hadoopConfig.set("fs.s3a.assumed.role.arn", "arn:aws:iam::123456789012:role/test-role");
55+
56+
S3FileSystemPlugin plugin = new S3FileSystemPlugin();
57+
plugin.setCredentialProvider(hadoopConfig);
58+
59+
String providers = hadoopConfig.get(PROVIDER_CONFIG, "");
60+
assertThat(providers).doesNotContain(DynamicTemporaryAWSCredentialsProvider.NAME);
61+
}
62+
63+
@Test
64+
void testClientModeWithDelegatedCredentials() {
65+
// Pre-populate receiver so updateHadoopConfig does not throw.
66+
Credentials creds = new Credentials("testKey", "testSecret", "testToken");
67+
ObtainedSecurityToken token =
68+
new ObtainedSecurityToken(
69+
"s3",
70+
CredentialsJsonSerde.toJson(creds),
71+
System.currentTimeMillis() + 3600000,
72+
Collections.singletonMap("fs.s3a.region", "us-east-1"));
73+
S3DelegationTokenReceiver receiver = new S3DelegationTokenReceiver();
74+
receiver.onNewTokensObtained(token);
75+
76+
Configuration hadoopConfig = new Configuration();
77+
78+
S3FileSystemPlugin plugin = new S3FileSystemPlugin();
79+
plugin.setCredentialProvider(hadoopConfig);
80+
81+
String providers = hadoopConfig.get(PROVIDER_CONFIG, "");
82+
assertThat(providers).contains(DynamicTemporaryAWSCredentialsProvider.NAME);
83+
}
84+
}

fluss-filesystems/fluss-fs-s3/src/test/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProviderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ void testDefaultChainWithoutRoleArnThrows() {
4141
conf.set("fs.s3a.region", "us-east-1");
4242

4343
assertThatThrownBy(() -> new S3DelegationTokenProvider("s3", conf))
44-
.isInstanceOf(NullPointerException.class)
44+
.isInstanceOf(IllegalArgumentException.class)
4545
.hasMessageContaining("fs.s3a.assumed.role.arn must be set");
4646
}
4747
}

0 commit comments

Comments
 (0)