Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/storage/azure-storage-blob-nio/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 12.0.0-beta.37 (Unreleased)

### Features Added
- Extended Azure NIO filesystem URI support to allow a unique identifier: `azb://?endpoint=<account_endpoint>&uid=<unique_id>`

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ public final class AzureFileSystem extends FileSystem {
*/
public static final String AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK = "AzureStorageSkipInitialContainerCheck";

/**
* Expected type: String
*/
public static final String AZURE_STORAGE_FILESYSTEM_UID = "AzureStorageFileSystemUid";

static final String PATH_SEPARATOR = "/";

private static final Map<String, String> PROPERTIES = CoreUtils.getProperties("azure-storage-blob-nio.properties");
Expand All @@ -161,6 +166,7 @@ public final class AzureFileSystem extends FileSystem {
private final AzureSasCredential sasCredential;
private FileStore defaultFileStore;
private boolean closed;
private String uid;

AzureFileSystem(AzureFileSystemProvider parentFileSystemProvider, String endpoint, Map<String, ?> config)
throws IOException {
Expand All @@ -178,6 +184,9 @@ public final class AzureFileSystem extends FileSystem {
this.putBlobThreshold = (Long) config.get(AZURE_STORAGE_PUT_BLOB_THRESHOLD);
this.maxConcurrencyPerRequest = (Integer) config.get(AZURE_STORAGE_MAX_CONCURRENCY_PER_REQUEST);
this.sasCredential = (AzureSasCredential) config.get(AZURE_STORAGE_SAS_TOKEN_CREDENTIAL);
this.uid = config.containsKey(AZURE_STORAGE_FILESYSTEM_UID)
? (String) config.get(AZURE_STORAGE_FILESYSTEM_UID)
: endpoint;

// Initialize and ensure access to FileStores.
this.fileStores = this.initializeFileStores(config);
Expand Down Expand Up @@ -223,7 +232,7 @@ public FileSystemProvider provider() {
@Override
public void close() throws IOException {
this.closed = true;
this.parentFileSystemProvider.closeFileSystem(this.getFileSystemUrl());
this.parentFileSystemProvider.closeFileSystem(this.uid);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import com.azure.storage.blob.models.BlobCopyInfo;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.ParallelTransferOptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -171,6 +169,8 @@ private static final class ClientLoggerHolder {
public static final String CACHE_CONTROL = "Cache-Control";

private static final String ENDPOINT_QUERY_KEY = "endpoint";
private static final String UID_QUERY_KEY = "uid";

private static final int COPY_TIMEOUT_SECONDS = 30;
private static final Set<OpenOption> OUTPUT_STREAM_DEFAULT_OPTIONS = Collections.unmodifiableSet(new HashSet<>(
Arrays.asList(StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)));
Expand Down Expand Up @@ -217,15 +217,18 @@ public String getScheme() {
*/
@Override
public FileSystem newFileSystem(URI uri, Map<String, ?> config) throws IOException {
String endpoint = extractAccountEndpoint(uri);
FileSystemUriParameters parameters = parseFileSystemParameters(uri);

if (this.openFileSystems.containsKey(endpoint)) {
if (this.openFileSystems.containsKey(parameters.uid())) {
throw LoggingUtility.logError(ClientLoggerHolder.LOGGER,
new FileSystemAlreadyExistsException("Name: " + endpoint));
new FileSystemAlreadyExistsException("Name: " + parameters.endpoint() + " UID: " + parameters.uid()));
}

AzureFileSystem afs = new AzureFileSystem(this, endpoint, config);
this.openFileSystems.put(endpoint, afs);
Map<String, Object> configCopy = new HashMap<>(config);
configCopy.put(AzureFileSystem.AZURE_STORAGE_FILESYSTEM_UID, parameters.uid());

AzureFileSystem afs = new AzureFileSystem(this, parameters.endpoint(), configCopy);
this.openFileSystems.put(parameters.uid(), afs);

return afs;
}
Expand All @@ -246,12 +249,12 @@ public FileSystem newFileSystem(URI uri, Map<String, ?> config) throws IOExcepti
*/
@Override
public FileSystem getFileSystem(URI uri) {
String endpoint = extractAccountEndpoint(uri);
if (!this.openFileSystems.containsKey(endpoint)) {
FileSystemUriParameters parameters = parseFileSystemParameters(uri);
if (!this.openFileSystems.containsKey(parameters.uid())) {
throw LoggingUtility.logError(ClientLoggerHolder.LOGGER,
new FileSystemNotFoundException("Name: " + endpoint));
new FileSystemNotFoundException("Name: " + parameters.endpoint()));
}
return this.openFileSystems.get(endpoint);
return this.openFileSystems.get(parameters.uid());
}

/**
Expand Down Expand Up @@ -1144,7 +1147,12 @@ void closeFileSystem(String fileSystemName) {
this.openFileSystems.remove(fileSystemName);
}

private String extractAccountEndpoint(URI uri) {
/**
* Uses azure blob file system uri to extract the file system parameters.
* @param uri The URI to extract the file system details from. Format: {@code azb://?endpoint=<account_endpoint>&uid=<unique_id>}
* @return The file system details.
*/
private FileSystemUriParameters parseFileSystemParameters(URI uri) {
if (!uri.getScheme().equals(this.getScheme())) {
throw LoggingUtility.logError(ClientLoggerHolder.LOGGER,
new IllegalArgumentException("URI scheme does not match this provider"));
Expand All @@ -1155,19 +1163,50 @@ private String extractAccountEndpoint(URI uri) {
+ "the format \"azb://?endpoint=<account_endpoint>\"."));
}

String endpoint = Flux.fromArray(uri.getQuery().split("&"))
.filter(s -> s.startsWith(ENDPOINT_QUERY_KEY + "="))
.switchIfEmpty(Mono.defer(() -> Mono.error(LoggingUtility.logError(ClientLoggerHolder.LOGGER,
new IllegalArgumentException("URI does not contain an \"" + ENDPOINT_QUERY_KEY + "=\" parameter. "
+ "FileSystems require a URI of the format \"azb://?endpoint=<endpoint>\"")))))
.map(s -> s.substring(ENDPOINT_QUERY_KEY.length() + 1)) // Trim the query key and =
.blockLast();
String endpoint = "", uid = "";
for (String queryPart : uri.getQuery().split("&")) {
String[] parts = queryPart.split("=");
switch (parts[0]) {
case ENDPOINT_QUERY_KEY:
endpoint = parts.length > 1 ? parts[1] : "";
break;

case UID_QUERY_KEY:
uid = parts.length > 1 ? parts[1] : "";
break;
Comment on lines +1167 to +1176
}
}
uid = CoreUtils.isNullOrEmpty(uid) ? endpoint : uid;

String expectedFormat
= "FileSystems require a URI of the format \"azb://?endpoint=<endpoint>&uid=<unique_id>\"";
if (CoreUtils.isNullOrEmpty(uid)) {
throw LoggingUtility.logError(ClientLoggerHolder.LOGGER, new IllegalArgumentException(
"URI does not contain an \"" + UID_QUERY_KEY + "=\" parameter. " + expectedFormat));
}
if (CoreUtils.isNullOrEmpty(endpoint)) {
throw LoggingUtility.logError(ClientLoggerHolder.LOGGER,
new IllegalArgumentException("No account endpoint provided in URI query."));
new IllegalArgumentException("No account endpoint provided in URI query. " + expectedFormat));
Comment on lines +1166 to +1189
}

return endpoint;
return new FileSystemUriParameters(endpoint, uid);
}

static class FileSystemUriParameters {
private final String endpoint;
private final String uid;

public FileSystemUriParameters(String endpoint, String uid) {
this.endpoint = Objects.requireNonNull(endpoint);
this.uid = Objects.requireNonNull(uid);
}

public String endpoint() {
return endpoint;
}

public String uid() {
return uid;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -109,9 +110,43 @@ public void createFileSystem() throws IOException {
URI uri = getFileSystemUri();
provider.newFileSystem(uri, config);

assertTrue(provider.getFileSystem(uri).isOpen());
assertEquals(primaryBlobServiceClient.getAccountUrl(),
((AzureFileSystem) provider.getFileSystem(uri)).getFileSystemUrl());
verifyFileSystem((AzureFileSystem) provider.getFileSystem(uri));
}

@ParameterizedTest
@ValueSource(strings = { "", "test-uid" })
public void getFileSystemNotFoundAfterFileSystemIsClosed(String uid) throws IOException {
URI uri = getFileSystemUri(uid == "" ? null : uid);

config.put(AzureFileSystem.AZURE_STORAGE_SHARED_KEY_CREDENTIAL, ENV.getPrimaryAccount().getCredential());
config.put(AzureFileSystem.AZURE_STORAGE_FILE_STORES, generateContainerName());
provider.newFileSystem(uri, config);

provider.getFileSystem(uri).close();
assertThrows(FileSystemNotFoundException.class, () -> provider.getFileSystem(uri));
}

@Test
public void createFileSystemDifferentUid() throws IOException {
config.put(AzureFileSystem.AZURE_STORAGE_SHARED_KEY_CREDENTIAL, ENV.getPrimaryAccount().getCredential());
config.put(AzureFileSystem.AZURE_STORAGE_FILE_STORES, generateContainerName());
URI uri1 = getFileSystemUri("uid1");
URI uri2 = getFileSystemUri("uid2");

provider.newFileSystem(uri1, config);
provider.newFileSystem(uri2, config);

AzureFileSystem fs1 = (AzureFileSystem) provider.getFileSystem(uri1);
AzureFileSystem fs2 = (AzureFileSystem) provider.getFileSystem(uri2);

assertNotSame(fs1, fs2);
verifyFileSystem(fs1);
verifyFileSystem(fs2);
}

private void verifyFileSystem(AzureFileSystem fileSystem) {
assertTrue(fileSystem.isOpen());
assertEquals(primaryBlobServiceClient.getAccountUrl(), fileSystem.getFileSystemUrl());
}

@ParameterizedTest
Expand All @@ -120,13 +155,16 @@ public void createFileSystemInvalidUri(String uri) {
assertThrows(IllegalArgumentException.class, () -> provider.newFileSystem(new URI(uri), config));
}

@Test
public void createFileSystemDuplicate() throws IOException {
@ParameterizedTest
@ValueSource(strings = { "", "test-uid" })
public void createFileSystemDuplicate(String uid) throws IOException {
URI uri = getFileSystemUri(uid == "" ? null : uid);

config.put(AzureFileSystem.AZURE_STORAGE_FILE_STORES, generateContainerName());
config.put(AzureFileSystem.AZURE_STORAGE_SHARED_KEY_CREDENTIAL, ENV.getPrimaryAccount().getCredential());
provider.newFileSystem(getFileSystemUri(), config);
provider.newFileSystem(uri, config);

assertThrows(FileSystemAlreadyExistsException.class, () -> provider.newFileSystem(getFileSystemUri(), config));
assertThrows(FileSystemAlreadyExistsException.class, () -> provider.newFileSystem(uri, config));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,13 @@ protected Map<String, Object> initializeConfigMap(HttpPipelinePolicy... policies
}

protected URI getFileSystemUri() {
return getFileSystemUri(null);
}

protected URI getFileSystemUri(String uid) {
try {
return new URI("azb://?endpoint=" + ENV.getPrimaryAccount().getBlobEndpoint());
return new URI(
"azb://?endpoint=" + ENV.getPrimaryAccount().getBlobEndpoint() + (uid != null ? "&uid=" + uid : ""));
} catch (URISyntaxException ex) {
throw new RuntimeException(ex);
}
Expand Down
Loading