diff --git a/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/main/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClient.java b/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/main/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClient.java index 8a8ea4e8e430..f0b5eacbd555 100644 --- a/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/main/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClient.java +++ b/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/main/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClient.java @@ -50,6 +50,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; @@ -538,6 +539,85 @@ private String createContentDataCenter(final GitCreateContentRequest request, fi return getRequiredLatestCommit(branch, resolvedPath); } + @Override + public void createBranch(final String newBranchName, final String sourceBranch, final Optional sourceCommitSha) + throws IOException, FlowRegistryException { + if (newBranchName == null || newBranchName.isBlank()) { + throw new IllegalArgumentException("Branch name must be specified"); + } + if (sourceBranch == null || sourceBranch.isBlank()) { + throw new IllegalArgumentException("Source branch must be specified"); + } + + final String trimmedNewBranch = newBranchName.trim(); + final String trimmedSourceBranch = sourceBranch.trim(); + + if (getBranches().contains(trimmedNewBranch)) { + throw new FlowRegistryException("Branch [%s] already exists".formatted(trimmedNewBranch)); + } + + logger.info("Creating branch [{}] from [{}] in repository [{}]", trimmedNewBranch, trimmedSourceBranch, repoName); + + if (formFactor == BitbucketFormFactor.DATA_CENTER) { + createBranchDataCenter(trimmedNewBranch, trimmedSourceBranch, sourceCommitSha); + } else { + createBranchCloud(trimmedNewBranch, trimmedSourceBranch, sourceCommitSha); + } + } + + private void createBranchCloud(final String newBranchName, final String sourceBranch, final Optional sourceCommitSha) throws FlowRegistryException { + final String targetHash; + if (sourceCommitSha.isPresent() && !sourceCommitSha.get().isBlank()) { + targetHash = sourceCommitSha.get(); + } else { + targetHash = getBranchHeadCloud(sourceBranch); + } + + final URI uri = getRepositoryUriBuilder().addPathSegment("refs").addPathSegment("branches").build(); + final String json; + try { + json = objectMapper.writeValueAsString(Map.of(FIELD_NAME, newBranchName, FIELD_TARGET, Map.of(FIELD_HASH, targetHash))); + } catch (final Exception e) { + throw new FlowRegistryException("Failed to serialize branch creation request", e); + } + + try (final HttpResponseEntity response = this.webClient.getWebClientService() + .post() + .uri(uri) + .header(AUTHORIZATION_HEADER, authToken.getAuthzHeaderValue()) + .header(CONTENT_TYPE_HEADER, "application/json") + .body(json) + .retrieve()) { + verifyStatusCode(response, "Error creating branch [%s] in repository [%s]".formatted(newBranchName, repoName), HttpURLConnection.HTTP_CREATED); + } catch (final IOException e) { + throw new FlowRegistryException("Failed closing Bitbucket create branch response", e); + } + } + + private void createBranchDataCenter(final String newBranchName, final String sourceBranch, final Optional sourceCommitSha) throws FlowRegistryException { + final String startPoint = sourceCommitSha.filter(sha -> !sha.isBlank()).orElse("refs/heads/" + sourceBranch); + final URI uri = getRepositoryUriBuilder().addPathSegment("branches").build(); + + final String json; + try { + json = objectMapper.writeValueAsString(Map.of(FIELD_NAME, newBranchName, "startPoint", startPoint)); + } catch (final Exception e) { + throw new FlowRegistryException("Failed to serialize branch creation request", e); + } + + try (final HttpResponseEntity response = this.webClient.getWebClientService() + .post() + .uri(uri) + .header(AUTHORIZATION_HEADER, authToken.getAuthzHeaderValue()) + .header(CONTENT_TYPE_HEADER, "application/json") + .body(json) + .retrieve()) { + verifyStatusCode(response, "Error creating branch [%s] in repository [%s]".formatted(newBranchName, repoName), HttpURLConnection.HTTP_OK); + } catch (final IOException e) { + throw new FlowRegistryException("Failed closing Bitbucket create branch response", e); + } + } + @Override public InputStream deleteContent(final String filePath, final String commitMessage, final String branch) throws FlowRegistryException { final String resolvedPath = getResolvedPath(filePath); diff --git a/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/test/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClientTest.java b/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/test/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClientTest.java index 0730b4352f97..3b77d0eb6e40 100644 --- a/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/test/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClientTest.java +++ b/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/test/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClientTest.java @@ -35,10 +35,12 @@ import org.mockito.stubbing.OngoingStubbing; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.util.Optional; import java.util.OptionalLong; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -126,14 +128,21 @@ private void stubPostChain(final HttpResponseEntity... responses) { postAfterHeaders = mock(HttpRequestBodySpec.class); lenient().when(webClientService.post()).thenReturn(postSpec); lenient().when(postSpec.uri(any(URI.class))).thenReturn(postBodySpec); + lenient().when(postBodySpec.header(anyString(), anyString())).thenReturn(postBodySpec); lenient().when(postBodySpec.body(any(InputStream.class), any(OptionalLong.class))).thenReturn(afterBody); + lenient().when(postBodySpec.body(anyString())).thenReturn(afterBody); lenient().when(afterBody.header(anyString(), anyString())).thenReturn(postAfterHeaders); lenient().when(postAfterHeaders.header(anyString(), anyString())).thenReturn(postAfterHeaders); - OngoingStubbing stubbing = when(postAfterHeaders.retrieve()); + OngoingStubbing stubbing = lenient().when(postAfterHeaders.retrieve()); for (final HttpResponseEntity response : responses) { stubbing = stubbing.thenReturn(response); } + + OngoingStubbing directStubbing = lenient().when(afterBody.retrieve()); + for (final HttpResponseEntity response : responses) { + directStubbing = directStubbing.thenReturn(response); + } } private HttpResponseEntity mockResponse(final int statusCode, final String body) { @@ -275,6 +284,52 @@ void testCreateContentCloudNullExpectedCommitSha() throws FlowRegistryException assertEquals(RESULT_COMMIT_SHA, commitSha); } + @Test + void testCreateBranchCloudSuccess() throws FlowRegistryException, IOException { + stubGetChain( + branchListResponse(), + branchListResponse(), + branchHeadResponse(BRANCH_HEAD_SHA) + ); + stubPostChain(createdResponse()); + + final BitbucketRepositoryClient client = buildCloudClient(); + client.createBranch("feature", "main", Optional.empty()); + } + + @Test + void testCreateBranchCloudWithCommitSha() throws FlowRegistryException, IOException { + stubGetChain(branchListResponse(), branchListResponse()); + stubPostChain(createdResponse()); + + final BitbucketRepositoryClient client = buildCloudClient(); + client.createBranch("feature", "main", Optional.of("abc123")); + } + + @Test + void testCreateBranchCloudAlreadyExists() throws FlowRegistryException { + stubGetChain(branchListResponse(), branchListResponse()); + + final BitbucketRepositoryClient client = buildCloudClient(); + final FlowRegistryException exception = assertThrows(FlowRegistryException.class, + () -> client.createBranch("main", "main", Optional.empty())); + assertTrue(exception.getMessage().contains("already exists")); + } + + @Test + void testCreateBranchBlankNameRejected() throws FlowRegistryException { + stubGetChain(branchListResponse()); + final BitbucketRepositoryClient client = buildCloudClient(); + assertThrows(IllegalArgumentException.class, () -> client.createBranch(" ", "main", Optional.empty())); + } + + @Test + void testCreateBranchBlankSourceRejected() throws FlowRegistryException { + stubGetChain(branchListResponse()); + final BitbucketRepositoryClient client = buildCloudClient(); + assertThrows(IllegalArgumentException.class, () -> client.createBranch("feature", " ", Optional.empty())); + } + @Test void testCreateContentDataCenterUnchanged() throws FlowRegistryException { final HttpRequestUriSpec getSpec = mock(HttpRequestUriSpec.class); diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/main/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClient.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/main/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClient.java index dc6859317c0f..1a4738a300f1 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/main/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClient.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/main/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClient.java @@ -121,6 +121,7 @@ public class AzureDevOpsRepositoryClient implements GitRepositoryClient { private static final String CHANGE_TYPE_DELETE = "delete"; private static final String CONTENT_TYPE_BASE64 = "base64encoded"; private static final int MAX_PUSH_ATTEMPTS = 3; + private static final String ZERO_OBJECT_ID = "0000000000000000000000000000000000000000"; // Common query parameter names and values private static final String VERSION_DESCRIPTOR_VERSION = "versionDescriptor.version"; @@ -443,6 +444,75 @@ private HttpResponseEntity executePush(final URI pushUri, final String branch, f .retrieve(); } + @Override + public void createBranch(final String newBranchName, final String sourceBranch, final Optional sourceCommitSha) + throws IOException, FlowRegistryException { + if (newBranchName == null || newBranchName.isBlank()) { + throw new IllegalArgumentException("Branch name must be specified"); + } + if (sourceBranch == null || sourceBranch.isBlank()) { + throw new IllegalArgumentException("Source branch must be specified"); + } + + final String trimmedNewBranch = newBranchName.trim(); + final String trimmedSourceBranch = sourceBranch.trim(); + + if (branchExists(trimmedNewBranch)) { + throw new FlowRegistryException("Branch [%s] already exists".formatted(trimmedNewBranch)); + } + + final String baseCommitSha; + if (sourceCommitSha.isPresent() && !sourceCommitSha.get().isBlank()) { + baseCommitSha = sourceCommitSha.get(); + } else { + baseCommitSha = fetchBranchHead(trimmedSourceBranch); + } + + logger.info("Creating branch [{}] from [{}] at commit [{}] in repo [{}]", trimmedNewBranch, trimmedSourceBranch, baseCommitSha, repoName); + + final URI refsUri = getUriBuilder().addPathSegment(SEGMENT_REFS) + .addQueryParameter(API, API_VERSION) + .build(); + + final String json; + try { + json = MAPPER.writeValueAsString(List.of(new CreateRefRequest(REFS_HEADS_PREFIX + trimmedNewBranch, ZERO_OBJECT_ID, baseCommitSha))); + } catch (final Exception e) { + throw new FlowRegistryException("Failed to serialize branch creation request", e); + } + + final HttpResponseEntity response = this.webClient.getWebClientService() + .post() + .uri(refsUri) + .header(AUTHORIZATION_HEADER, bearerToken()) + .header(CONTENT_TYPE_HEADER, MediaType.APPLICATION_JSON.getMediaType()) + .body(json) + .retrieve(); + + if (response.statusCode() != HttpURLConnection.HTTP_OK) { + throw new FlowRegistryException("Failed to create branch [%s] in repo [%s] - %s".formatted(trimmedNewBranch, repoName, getErrorMessage(response))); + } + } + + private boolean branchExists(final String branchName) throws FlowRegistryException { + final URI refUri = getUriBuilder().addPathSegment(SEGMENT_REFS) + .addQueryParameter(PARAM_FILTER, FILTER_HEADS_PREFIX + branchName) + .addQueryParameter(API, API_VERSION) + .build(); + final JsonNode refResponse = executeGet(refUri); + final JsonNode values = refResponse.get(JSON_FIELD_VALUE); + if (values == null || !values.isArray()) { + return false; + } + for (final JsonNode ref : values) { + final String refName = ref.get(JSON_FIELD_NAME).asText(); + if (refName.equals(REFS_HEADS_PREFIX + branchName)) { + return true; + } + } + return false; + } + @Override public InputStream deleteContent(final String filePath, final String commitMessage, final String branch) throws FlowRegistryException, IOException { final String path = getResolvedPath(filePath); @@ -515,6 +585,8 @@ private record Change(String changeType, Item item, NewContent newContent) { } private record NewContent(String content, String contentType) { } + private record CreateRefRequest(String name, String oldObjectId, String newObjectId) { } + /** * Create URI builder for accessing the repository. * diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/test/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClientTest.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/test/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClientTest.java index ba6c1831de80..7168a879aa70 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/test/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClientTest.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/test/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClientTest.java @@ -42,6 +42,7 @@ import java.net.HttpURLConnection; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -303,4 +304,50 @@ void testDeleteContentUsesFetchBranchHead() throws FlowRegistryException, IOExce assertTrue(body.contains("delete-commit")); } } + + @Test + void testCreateBranchSuccess() throws FlowRegistryException, IOException { + final HttpResponseEntity emptyRefsResponse = mockResponse(HttpURLConnection.HTTP_OK, "{\"value\":[]}"); + stubGetChain(repoInfoResponse(), permissionsResponse(), emptyRefsResponse, branchHeadResponse(BRANCH_HEAD_SHA)); + stubPostChain(mockResponse(HttpURLConnection.HTTP_OK, "{\"value\":[{\"name\":\"refs/heads/feature\"}]}")); + + final AzureDevOpsRepositoryClient client = buildClient(); + client.createBranch("feature", "main", Optional.empty()); + } + + @Test + void testCreateBranchWithSourceCommitSha() throws FlowRegistryException, IOException { + final HttpResponseEntity emptyRefsResponse = mockResponse(HttpURLConnection.HTTP_OK, "{\"value\":[]}"); + stubGetChain(repoInfoResponse(), permissionsResponse(), emptyRefsResponse); + stubPostChain(mockResponse(HttpURLConnection.HTTP_OK, "{\"value\":[{\"name\":\"refs/heads/feature\"}]}")); + + final AzureDevOpsRepositoryClient client = buildClient(); + client.createBranch("feature", "main", Optional.of("abc123")); + } + + @Test + void testCreateBranchAlreadyExists() throws FlowRegistryException { + final HttpResponseEntity existingBranchResponse = mockResponse(HttpURLConnection.HTTP_OK, + "{\"value\":[{\"name\":\"refs/heads/feature\",\"objectId\":\"abc123\"}]}"); + stubGetChain(repoInfoResponse(), permissionsResponse(), existingBranchResponse); + + final AzureDevOpsRepositoryClient client = buildClient(); + final FlowRegistryException exception = assertThrows(FlowRegistryException.class, + () -> client.createBranch("feature", "main", Optional.empty())); + assertTrue(exception.getMessage().contains("already exists")); + } + + @Test + void testCreateBranchBlankNameRejected() throws FlowRegistryException { + stubGetChain(repoInfoResponse(), permissionsResponse()); + final AzureDevOpsRepositoryClient client = buildClient(); + assertThrows(IllegalArgumentException.class, () -> client.createBranch(" ", "main", Optional.empty())); + } + + @Test + void testCreateBranchBlankSourceRejected() throws FlowRegistryException { + stubGetChain(repoInfoResponse(), permissionsResponse()); + final AzureDevOpsRepositoryClient client = buildClient(); + assertThrows(IllegalArgumentException.class, () -> client.createBranch("feature", " ", Optional.empty())); + } } diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClient.java b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClient.java index b866f9d604cb..59aad4d4032e 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClient.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClient.java @@ -208,6 +208,42 @@ public FlowRegistryBranch getDefaultBranch(final FlowRegistryClientConfiguration return defaultBranch; } + @Override + public void createBranch(final FlowRegistryClientConfigurationContext context, final FlowVersionLocation sourceLocation, final String newBranchName) + throws FlowRegistryException, IOException { + if (StringUtils.isBlank(newBranchName)) { + throw new IllegalArgumentException("Branch name must be specified when creating a new branch"); + } + + final GitRepositoryClient repositoryClient = getRepositoryClient(context); + verifyWritePermissions(repositoryClient); + + final String sourceBranch = resolveSourceBranch(context, sourceLocation); + if (StringUtils.isBlank(sourceBranch)) { + throw new FlowRegistryException("Unable to determine source branch for new branch creation"); + } + + final Optional sourceCommitSha = sourceLocation == null ? Optional.empty() : Optional.ofNullable(sourceLocation.getVersion()); + final String trimmedBranchName = newBranchName.trim(); + final String trimmedSourceBranch = sourceBranch.trim(); + + getLogger().info("Creating branch [{}] from branch [{}]", trimmedBranchName, trimmedSourceBranch); + + try { + repositoryClient.createBranch(trimmedBranchName, trimmedSourceBranch, sourceCommitSha); + } catch (final UnsupportedOperationException e) { + throw new FlowRegistryException("Configured repository client does not support branch creation", e); + } + } + + private String resolveSourceBranch(final FlowRegistryClientConfigurationContext context, final FlowVersionLocation sourceLocation) { + if (sourceLocation != null && StringUtils.isNotBlank(sourceLocation.getBranch())) { + return sourceLocation.getBranch(); + } + final String defaultBranch = context.getProperty(REPOSITORY_BRANCH).getValue(); + return StringUtils.isBlank(defaultBranch) ? null : defaultBranch; + } + @Override public Set getBuckets(final FlowRegistryClientConfigurationContext context, final String branch) throws IOException, FlowRegistryException { final GitRepositoryClient repositoryClient = getRepositoryClient(context); diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/client/GitRepositoryClient.java b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/client/GitRepositoryClient.java index 3d02cdbeaba1..5440f6299443 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/client/GitRepositoryClient.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/client/GitRepositoryClient.java @@ -155,6 +155,21 @@ default Optional getContentShaAtCommit(String path, String commitSha) th */ InputStream deleteContent(String filePath, String commitMessage, String branch) throws FlowRegistryException, IOException; + /** + * Creates a new branch in the repository. + * + * @param newBranchName the name of the branch to create + * @param sourceBranch the name of the source branch + * @param sourceCommitSha optional commit SHA to use as the starting point for the new branch. If empty, the head commit of the source branch should be used. + * @throws IOException if an I/O error occurs + * @throws FlowRegistryException if a non-I/O error occurs + * @throws UnsupportedOperationException if the repository implementation does not support branch creation + */ + default void createBranch(final String newBranchName, final String sourceBranch, final Optional sourceCommitSha) + throws IOException, FlowRegistryException { + throw new UnsupportedOperationException("Branch creation is not supported"); + } + /** * Closes any resources held by the client. * diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/test/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClientTest.java b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/test/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClientTest.java index 21e85bc4ea23..ec98d490beeb 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/test/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClientTest.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/test/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClientTest.java @@ -22,7 +22,9 @@ import org.apache.nifi.components.PropertyValue; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.registry.flow.FlowRegistryClientConfigurationContext; +import org.apache.nifi.registry.flow.FlowRegistryClientInitializationContext; import org.apache.nifi.registry.flow.FlowRegistryException; +import org.apache.nifi.registry.flow.FlowVersionLocation; import org.apache.nifi.registry.flow.git.client.GitCommit; import org.apache.nifi.registry.flow.git.client.GitCreateContentRequest; import org.apache.nifi.registry.flow.git.client.GitRepositoryClient; @@ -38,8 +40,10 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import javax.net.ssl.SSLContext; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; class AbstractGitFlowRegistryClientTest { @@ -49,6 +53,7 @@ void verifySuccessful() throws Exception { final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(true, true, Set.of("bucket-a", ".git")); final AtomicReference suppliedClient = new AtomicReference<>(repositoryClient); final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> suppliedClient.getAndSet(null), "git@example.git"); + flowRegistryClient.initialize(createInitializationContext()); final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*"); final ComponentLog verificationLogger = new MockComponentLog("test-component", this); @@ -71,6 +76,7 @@ void verifyAuthenticationFailure() { final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> { throw new FlowRegistryException("Authentication failed"); }, "git@example.git"); + flowRegistryClient.initialize(createInitializationContext()); final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*"); final ComponentLog verificationLogger = new MockComponentLog("test-component", this); @@ -85,6 +91,7 @@ void verifyAuthenticationFailure() { void verifyReadFailureSkipsBucketListing() throws Exception { final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(false, false, Set.of()); final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> repositoryClient, "git@example.git"); + flowRegistryClient.initialize(createInitializationContext()); final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*"); final ComponentLog verificationLogger = new MockComponentLog("test-component", this); @@ -100,10 +107,11 @@ void verifyReadFailureSkipsBucketListing() throws Exception { @Test void verifyBucketListingFailureReported() throws Exception { - final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(true, true, Set.of()); + final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(true, true, Set.of("bucket-a")); repositoryClient.setGetTopLevelDirectoryNamesException(new FlowRegistryException("listing error")); final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> repositoryClient, "git@example.git"); + flowRegistryClient.initialize(createInitializationContext()); final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*"); final ComponentLog verificationLogger = new MockComponentLog("test-component", this); @@ -118,6 +126,41 @@ void verifyBucketListingFailureReported() throws Exception { assertTrue(repositoryClient.isClosed()); } + @Test + void createBranchDelegatesToRepositoryClient() throws Exception { + final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(true, true, Set.of("bucket-a")); + final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> repositoryClient, "git@example.git"); + flowRegistryClient.initialize(createInitializationContext()); + final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*"); + + final FlowVersionLocation sourceLocation = new FlowVersionLocation("source-branch", "bucket-a", "flow-x", "commit-1"); + + flowRegistryClient.createBranch(context, sourceLocation, " new-branch "); + + assertEquals("new-branch", repositoryClient.getCreatedBranch()); + assertEquals("source-branch", repositoryClient.getCreatedBranchSource()); + assertEquals(Optional.of("commit-1"), repositoryClient.getCreatedBranchCommit()); + } + + @Test + void createBranchUnsupportedThrowsFlowRegistryException() throws Exception { + final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(true, true, Set.of("bucket-a")); + repositoryClient.setBranchCreationUnsupported(true); + + final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> repositoryClient, "git@example.git"); + flowRegistryClient.initialize(createInitializationContext()); + final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*"); + + final FlowVersionLocation sourceLocation = new FlowVersionLocation(); + sourceLocation.setBranch("source"); + + final FlowRegistryException exception = assertThrows(FlowRegistryException.class, + () -> flowRegistryClient.createBranch(context, sourceLocation, "new-branch")); + + assertEquals("Configured repository client does not support branch creation", exception.getMessage()); + assertTrue(repositoryClient.getCreatedBranchCommit().isEmpty()); + } + private FlowRegistryClientConfigurationContext createContext(final String branch, final String exclusionPattern) { final Map properties = Map.of( AbstractGitFlowRegistryClient.REPOSITORY_BRANCH, new MockPropertyValue(branch), @@ -145,6 +188,25 @@ public Optional getNiFiUserIdentity() { }; } + private FlowRegistryClientInitializationContext createInitializationContext() { + return new FlowRegistryClientInitializationContext() { + @Override + public String getIdentifier() { + return "test-git-client"; + } + + @Override + public ComponentLog getLogger() { + return new MockComponentLog("test-git-client", AbstractGitFlowRegistryClientTest.this); + } + + @Override + public Optional getSystemSslContext() { + return Optional.empty(); + } + }; + } + private static class TestGitFlowRegistryClient extends AbstractGitFlowRegistryClient { private final RepositoryClientSupplier repositoryClientSupplier; private final String storageLocation; @@ -186,6 +248,11 @@ private static class TestGitRepositoryClient implements GitRepositoryClient { private FlowRegistryException topLevelDirectoryNamesException; private IOException topLevelDirectoryNamesIOException; private boolean closed; + private boolean branchCreationUnsupported; + private FlowRegistryException createBranchException; + private String createdBranch; + private String createdBranchSource; + private Optional createdBranchCommit = Optional.empty(); TestGitRepositoryClient(final boolean canRead, final boolean canWrite, final Set bucketNames) { this.canRead = canRead; @@ -198,11 +265,31 @@ void setGetTopLevelDirectoryNamesException(final FlowRegistryException exception this.topLevelDirectoryNamesIOException = null; } - void setGetTopLevelDirectoryNamesException(final IOException exception) { + void setGetTopLevelDirectoryNamesIOException(final IOException exception) { this.topLevelDirectoryNamesIOException = exception; this.topLevelDirectoryNamesException = null; } + void setBranchCreationUnsupported(final boolean unsupported) { + this.branchCreationUnsupported = unsupported; + } + + void setCreateBranchException(final FlowRegistryException exception) { + this.createBranchException = exception; + } + + String getCreatedBranch() { + return createdBranch; + } + + String getCreatedBranchSource() { + return createdBranchSource; + } + + Optional getCreatedBranchCommit() { + return createdBranchCommit; + } + boolean isClosed() { return closed; } @@ -228,6 +315,21 @@ public Set getTopLevelDirectoryNames(final String branch) throws IOExcep return bucketNames; } + @Override + public void createBranch(final String newBranchName, final String sourceBranch, final Optional sourceCommitSha) + throws IOException, FlowRegistryException { + if (branchCreationUnsupported) { + throw new UnsupportedOperationException("Branch creation not supported"); + } + if (createBranchException != null) { + throw createBranchException; + } + + createdBranch = newBranchName; + createdBranchSource = sourceBranch; + createdBranchCommit = sourceCommitSha; + } + @Override public void close() { closed = true; @@ -270,7 +372,7 @@ public Optional getContentShaAtCommit(final String path, final String co @Override public String createContent(final GitCreateContentRequest request) { - throw new UnsupportedOperationException("Not required for test"); + return "test-commit"; } @Override diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java index ca20991d8c1b..b50abaae6da0 100644 --- a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java +++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java @@ -21,6 +21,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.registry.flow.FlowRegistryException; import org.apache.nifi.registry.flow.git.client.GitCommit; @@ -464,6 +465,58 @@ public InputStream deleteContent(final String filePath, final String commitMessa }); } + @Override + public void createBranch(final String newBranchName, final String sourceBranch, final Optional sourceCommitSha) + throws IOException, FlowRegistryException { + if (StringUtils.isBlank(newBranchName)) { + throw new IllegalArgumentException("Branch name must be specified"); + } + if (StringUtils.isBlank(sourceBranch)) { + throw new IllegalArgumentException("Source branch must be specified"); + } + + final String trimmedNewBranch = newBranchName.trim(); + final String trimmedSourceBranch = sourceBranch.trim(); + final String newBranchRefPath = "heads/" + trimmedNewBranch; + final String sourceBranchRefPath = "heads/" + trimmedSourceBranch; + + // FileNotFoundException indicates the branch does not exist, which is the expected case. + // Other exceptions (FlowRegistryException, IOException) propagate as communication errors. + try { + execute(() -> repository.getRef(newBranchRefPath)); + throw new FlowRegistryException("Branch [" + trimmedNewBranch + "] already exists"); + } catch (final FileNotFoundException notFound) { + logger.debug("Branch [{}] does not exist and will be created", trimmedNewBranch, notFound); + } + + final GHRef sourceBranchRef; + try { + sourceBranchRef = execute(() -> repository.getRef(sourceBranchRefPath)); + } catch (final FileNotFoundException notFound) { + throw new FlowRegistryException("Source branch [" + trimmedSourceBranch + "] does not exist", notFound); + } + + final String baseCommitSha; + if (sourceCommitSha.isPresent()) { + final String requestedCommitSha = sourceCommitSha.get(); + try { + baseCommitSha = execute(() -> repository.getCommit(requestedCommitSha).getSHA1()); + } catch (final FileNotFoundException notFound) { + throw new FlowRegistryException("Commit [" + requestedCommitSha + "] was not found in the repository", notFound); + } + } else { + baseCommitSha = sourceBranchRef.getObject().getSha(); + } + + logger.info("Creating branch [{}] from [{}] at commit [{}] for repository [{}]", + trimmedNewBranch, trimmedSourceBranch, baseCommitSha, repository.getFullName()); + + execute(() -> { + repository.createRef(BRANCH_REF_PATTERN.formatted(trimmedNewBranch), baseCommitSha); + return null; + }); + } + private String getResolvedPath(final String path) { return repoPath == null ? path : repoPath + "/" + path; } diff --git a/nifi-extension-bundles/nifi-gitlab-bundle/nifi-gitlab-extensions/src/main/java/org/apache/nifi/gitlab/GitLabRepositoryClient.java b/nifi-extension-bundles/nifi-gitlab-bundle/nifi-gitlab-extensions/src/main/java/org/apache/nifi/gitlab/GitLabRepositoryClient.java index a6292e4ab283..9d5bc399a402 100644 --- a/nifi-extension-bundles/nifi-gitlab-bundle/nifi-gitlab-extensions/src/main/java/org/apache/nifi/gitlab/GitLabRepositoryClient.java +++ b/nifi-extension-bundles/nifi-gitlab-bundle/nifi-gitlab-extensions/src/main/java/org/apache/nifi/gitlab/GitLabRepositoryClient.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.registry.flow.FlowRegistryException; @@ -293,6 +294,40 @@ public InputStream deleteContent(final String filePath, final String commitMessa }); } + @Override + public void createBranch(final String newBranchName, final String sourceBranch, final Optional sourceCommitSha) + throws IOException, FlowRegistryException { + if (StringUtils.isBlank(newBranchName)) { + throw new IllegalArgumentException("Branch name must be specified"); + } + if (StringUtils.isBlank(sourceBranch)) { + throw new IllegalArgumentException("Source branch must be specified"); + } + + final String trimmedNewBranch = newBranchName.trim(); + final String trimmedSourceBranch = sourceBranch.trim(); + final RepositoryApi repositoryApi = gitLab.getRepositoryApi(); + + try { + repositoryApi.getBranch(projectPath, trimmedNewBranch); + throw new FlowRegistryException("Branch [%s] already exists".formatted(trimmedNewBranch)); + } catch (final GitLabApiException e) { + if (e.getHttpStatus() != HttpURLConnection.HTTP_NOT_FOUND) { + throw new FlowRegistryException("Failed to check existence of branch [%s]".formatted(trimmedNewBranch), e); + } + logger.debug("Branch [{}] does not exist and will be created", trimmedNewBranch); + } + + final String ref = sourceCommitSha.filter(sha -> !sha.isBlank()).orElse(trimmedSourceBranch); + + logger.info("Creating branch [{}] from ref [{}] in repository [{}]", trimmedNewBranch, ref, projectPath); + + execute(() -> { + repositoryApi.createBranch(projectPath, trimmedNewBranch, ref); + return null; + }); + } + @Override public void close() { gitLab.close(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/CreateFlowBranchRequestEntity.java b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/CreateFlowBranchRequestEntity.java new file mode 100644 index 000000000000..8e57e8387eea --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/CreateFlowBranchRequestEntity.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.xml.bind.annotation.XmlRootElement; +import org.apache.nifi.web.api.dto.RevisionDTO; + +@XmlRootElement(name = "createFlowBranchRequestEntity") +public class CreateFlowBranchRequestEntity extends Entity { + + private RevisionDTO processGroupRevision; + private String branch; + private String sourceBranch; + private String sourceVersion; + private Boolean disconnectedNodeAcknowledged; + + @Schema(description = "The Revision of the Process Group under Version Control") + public RevisionDTO getProcessGroupRevision() { + return processGroupRevision; + } + + public void setProcessGroupRevision(final RevisionDTO processGroupRevision) { + this.processGroupRevision = processGroupRevision; + } + + @Schema(description = "The name of the new branch to create") + public String getBranch() { + return branch; + } + + public void setBranch(final String branch) { + this.branch = branch; + } + + @Schema(description = "The name of the source branch to create the new branch from. Defaults to the branch currently tracking in NiFi.") + public String getSourceBranch() { + return sourceBranch; + } + + public void setSourceBranch(final String sourceBranch) { + this.sourceBranch = sourceBranch; + } + + @Schema(description = "The version on the source branch to use when creating the new branch. Defaults to the version currently tracked by NiFi.") + public String getSourceVersion() { + return sourceVersion; + } + + public void setSourceVersion(final String sourceVersion) { + this.sourceVersion = sourceVersion; + } + + @Schema(description = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.") + public Boolean isDisconnectedNodeAcknowledged() { + return disconnectedNodeAcknowledged; + } + + public void setDisconnectedNodeAcknowledged(final Boolean disconnectedNodeAcknowledged) { + this.disconnectedNodeAcknowledged = disconnectedNodeAcknowledged; + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java index 2d3bb71a3e49..3c76eb6601bb 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java @@ -457,6 +457,12 @@ public Set getFlows(final FlowRegistryClientUserContext context, return node.getFlows(context, bucketLocation); } + @Override + public void createBranch(final FlowRegistryClientUserContext context, final FlowVersionLocation sourceLocation, final String newBranchName) + throws FlowRegistryException, IOException { + node.createBranch(context, sourceLocation, newBranchName); + } + @Override public FlowSnapshotContainer getFlowContents(final FlowRegistryClientUserContext context, final FlowVersionLocation flowVersionLocation, final boolean fetchRemoteFlows) throws FlowRegistryException, IOException { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java index e8e936df2708..ea4352e507ae 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java @@ -247,6 +247,15 @@ public Set getFlows(final FlowRegistryClientUserContext context, return execute(() -> client.get().getComponent().getFlows(getConfigurationContext(context), bucketLocation)); } + @Override + public void createBranch(final FlowRegistryClientUserContext context, final FlowVersionLocation sourceLocation, final String newBranchName) + throws FlowRegistryException, IOException { + execute(() -> { + client.get().getComponent().createBranch(getConfigurationContext(context), sourceLocation, newBranchName); + return null; + }); + } + @Override public FlowSnapshotContainer getFlowContents(final FlowRegistryClientUserContext context, final FlowVersionLocation flowVersionLocation, final boolean fetchRemoteFlows) throws FlowRegistryException, IOException { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java index 968f31138530..e6af6beedab8 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java @@ -53,6 +53,7 @@ public interface FlowRegistryClientNode extends ComponentNode { RegisteredFlow getFlow(FlowRegistryClientUserContext context, FlowLocation flowLocation) throws FlowRegistryException, IOException; Set getFlows(FlowRegistryClientUserContext context, BucketLocation bucketLocation) throws FlowRegistryException, IOException; + void createBranch(FlowRegistryClientUserContext context, FlowVersionLocation sourceLocation, String newBranchName) throws FlowRegistryException, IOException; FlowSnapshotContainer getFlowContents(FlowRegistryClientUserContext context, FlowVersionLocation flowVersionLocation, boolean fetchRemoteFlows) throws FlowRegistryException, IOException; RegisteredFlowSnapshot registerFlowSnapshot( diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 10f5c9c585c0..b98b2e4c58ab 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -1787,6 +1787,18 @@ RegisteredFlowSnapshot registerVersionedFlowSnapshot(String registryId, Register VersionControlInformationEntity setVersionControlInformation(Revision processGroupRevision, String processGroupId, VersionControlInformationDTO versionControlInfo, Map versionedComponentMapping); + /** + * Creates a new branch in the associated Flow Registry for the specified Process Group and updates the local Version Control information to track the new branch. + * + * @param revision the revision for the Process Group + * @param processGroupId the Process Group identifier + * @param newBranchName the name of the new branch to create + * @param sourceBranch the branch to branch from + * @param sourceVersion the commit/version on the source branch to branch from + * @return the updated Version Control information + */ + VersionControlInformationEntity createFlowBranch(Revision revision, String processGroupId, String newBranchName, String sourceBranch, String sourceVersion); + /** * Disconnects the specified Process Group from version control. * diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index f9057f97249d..f5ad436978ed 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -198,9 +198,11 @@ import org.apache.nifi.registry.flow.RegisteredFlow; import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; +import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.VerifiableFlowRegistryClient; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedFlowState; +import org.apache.nifi.registry.flow.VersionedFlowStatus; import org.apache.nifi.registry.flow.diff.ComparableDataFlow; import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor; import org.apache.nifi.registry.flow.diff.DifferenceType; @@ -6264,6 +6266,138 @@ public VersionControlInformationEntity setVersionControlInformation(final Revisi return entityFactory.createVersionControlInformationEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification())); } + @Override + public VersionControlInformationEntity createFlowBranch(final Revision revision, final String processGroupId, final String newBranchName, + final String sourceBranch, final String sourceVersion) { + final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId); + final VersionControlInformation versionControlInformation = group.getVersionControlInformation(); + + if (versionControlInformation == null) { + throw new IllegalStateException("Process Group with ID " + processGroupId + " is not currently under Version Control"); + } + + if (StringUtils.isBlank(newBranchName)) { + throw new IllegalArgumentException("Branch name must be specified"); + } + final String trimmedBranchName = newBranchName.trim(); + if (trimmedBranchName.equals(versionControlInformation.getBranch())) { + throw new IllegalArgumentException("Process Group is already tracking branch " + trimmedBranchName); + } + + final String resolvedSourceBranch = StringUtils.isNotBlank(sourceBranch) ? sourceBranch : versionControlInformation.getBranch(); + if (StringUtils.isBlank(resolvedSourceBranch)) { + throw new IllegalArgumentException("Source branch must be specified"); + } + + final String resolvedSourceVersion = StringUtils.isNotBlank(sourceVersion) ? sourceVersion : versionControlInformation.getVersion(); + + final FlowVersionLocation sourceLocation = new FlowVersionLocation(resolvedSourceBranch, + versionControlInformation.getBucketIdentifier(), + versionControlInformation.getFlowIdentifier(), + resolvedSourceVersion); + + final FlowRegistryClientUserContext clientUserContext = FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()); + + try { + flowRegistryDAO.createBranchForUser(clientUserContext, versionControlInformation.getRegistryIdentifier(), sourceLocation, trimmedBranchName); + } catch (final UnsupportedOperationException e) { + throw new IllegalArgumentException("Configured Flow Registry does not support branch creation", e); + } + + final VersionControlInformationDTO updatedVersionControlInformation = new VersionControlInformationDTO(); + updatedVersionControlInformation.setGroupId(processGroupId); + updatedVersionControlInformation.setRegistryId(versionControlInformation.getRegistryIdentifier()); + updatedVersionControlInformation.setRegistryName(versionControlInformation.getRegistryName()); + updatedVersionControlInformation.setBucketId(versionControlInformation.getBucketIdentifier()); + updatedVersionControlInformation.setBucketName(versionControlInformation.getBucketName()); + updatedVersionControlInformation.setFlowId(versionControlInformation.getFlowIdentifier()); + updatedVersionControlInformation.setFlowName(versionControlInformation.getFlowName()); + updatedVersionControlInformation.setFlowDescription(versionControlInformation.getFlowDescription()); + updatedVersionControlInformation.setStorageLocation(versionControlInformation.getStorageLocation()); + updatedVersionControlInformation.setBranch(trimmedBranchName); + updatedVersionControlInformation.setVersion(resolvedSourceVersion); + + final VersionedFlowStatus status = versionControlInformation.getStatus(); + VersionedFlowState updatedState = null; + String stateExplanation = status == null ? null : status.getStateExplanation(); + if (status != null) { + final VersionedFlowState state = status.getState(); + if (state != null) { + updatedState = switch (state) { + case LOCALLY_MODIFIED_AND_STALE -> { + stateExplanation = "Process Group has local modifications"; + yield VersionedFlowState.LOCALLY_MODIFIED; + } + case STALE -> VersionedFlowState.UP_TO_DATE; + default -> state; + }; + } + } + + if (updatedState != null) { + updatedVersionControlInformation.setState(updatedState.name()); + updatedVersionControlInformation.setStateExplanation(stateExplanation); + } + + final FlowManager flowManager = controllerFacade.getFlowManager(); + + VersionedProcessGroup registrySnapshot = null; + if (flowManager == null) { + logger.warn("Failed to synchronize Process Group {} with Flow Registry after creating branch {} because Flow Manager is unavailable", group.getIdentifier(), trimmedBranchName); + } else { + try { + final FlowRegistryClientNode registryClient = flowManager.getFlowRegistryClient(versionControlInformation.getRegistryIdentifier()); + if (registryClient == null) { + logger.warn("Unable to retrieve Flow Registry client with identifier {} for Process Group {}", versionControlInformation.getRegistryIdentifier(), group.getIdentifier()); + } else { + final FlowVersionLocation branchLocation = new FlowVersionLocation(trimmedBranchName, + versionControlInformation.getBucketIdentifier(), + versionControlInformation.getFlowIdentifier(), + resolvedSourceVersion); + + final FlowSnapshotContainer snapshotContainer = registryClient.getFlowContents(clientUserContext, branchLocation, false); + final RegisteredFlowSnapshot flowSnapshot = snapshotContainer == null ? null : snapshotContainer.getFlowSnapshot(); + if (flowSnapshot != null) { + registrySnapshot = flowSnapshot.getFlowContents(); + } + } + } catch (final IOException | FlowRegistryException e) { + logger.warn("Failed to retrieve Flow Registry snapshot for Process Group {}", group.getIdentifier(), e); + } + } + + final RevisionUpdate snapshot = updateComponent(revision, + group, + () -> processGroupDAO.updateVersionControlInformation(updatedVersionControlInformation, Collections.emptyMap()), + processGroup -> dtoFactory.createVersionControlInformationDto(processGroup)); + + final VersionControlInformation updatedVci = group.getVersionControlInformation(); + if (registrySnapshot != null) { + final StandardVersionControlInformation restoredInfo = StandardVersionControlInformation.Builder.fromDto(updatedVersionControlInformation) + .flowSnapshot(registrySnapshot) + .build(); + restoredInfo.setBucketName(updatedVersionControlInformation.getBucketName()); + restoredInfo.setFlowName(updatedVersionControlInformation.getFlowName()); + restoredInfo.setFlowDescription(updatedVersionControlInformation.getFlowDescription()); + restoredInfo.setStorageLocation(updatedVersionControlInformation.getStorageLocation()); + + group.setVersionControlInformation(restoredInfo, Collections.emptyMap()); + } else if (updatedVci instanceof StandardVersionControlInformation standardVci) { + standardVci.setFlowSnapshot(null); + } + + if (flowManager != null) { + try { + group.synchronizeWithFlowRegistry(flowManager); + } catch (final Exception e) { + logger.warn("Failed to synchronize Process Group {} with Flow Registry after creating branch {}", group.getIdentifier(), trimmedBranchName, e); + } + } + + final VersionControlInformationDTO refreshedDto = dtoFactory.createVersionControlInformationDto(group); + return entityFactory.createVersionControlInformationEntity(refreshedDto, dtoFactory.createRevisionDTO(snapshot.getLastModification())); + } + @Override public VersionControlInformationEntity deleteVersionControl(final Revision revision, final String processGroupId) { final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index acf2f05102b8..3b9246bbd671 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -64,6 +64,7 @@ import org.apache.nifi.web.api.dto.VersionedFlowUpdateRequestDTO; import org.apache.nifi.web.api.entity.AffectedComponentEntity; import org.apache.nifi.web.api.entity.CreateActiveRequestEntity; +import org.apache.nifi.web.api.entity.CreateFlowBranchRequestEntity; import org.apache.nifi.web.api.entity.Entity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity; @@ -623,6 +624,83 @@ private void unlockVersionControl(final URI requestUri, final String groupId) { } } + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("process-groups/{id}/branches") + @Operation( + summary = "Creates a new branch for a version controlled Process Group", + description = NON_GUARANTEED_ENDPOINT, + responses = { + @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = VersionControlInformationEntity.class))), + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + }, + security = { + @SecurityRequirement(name = "Read - /process-groups/{uuid}"), + @SecurityRequirement(name = "Write - /process-groups/{uuid}") + } + ) + public Response createFlowBranch( + @Parameter(description = "The process group id.") @PathParam("id") final String groupId, + @Parameter(description = "The branch creation request.", required = true) final CreateFlowBranchRequestEntity requestEntity) { + + if (requestEntity == null) { + throw new IllegalArgumentException("Branch creation request must be specified."); + } + + final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision(); + if (revisionDto == null) { + throw new IllegalArgumentException("Process Group Revision must be specified"); + } + if (StringUtils.isBlank(requestEntity.getBranch())) { + throw new IllegalArgumentException("Branch name must be specified"); + } + + if (isReplicateRequest()) { + return replicate(HttpMethod.POST, requestEntity); + } else if (isDisconnectedFromCluster()) { + verifyDisconnectedNodeModification(requestEntity.isDisconnectedNodeAcknowledged()); + } + + final Revision requestRevision = getRevision(revisionDto, groupId); + + return withWriteLock( + serviceFacade, + requestEntity, + requestRevision, + lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + processGroup.authorize(authorizer, RequestAction.READ, user); + processGroup.authorize(authorizer, RequestAction.WRITE, user); + }, + () -> { + final VersionControlInformationEntity currentVersionControlInfo = serviceFacade.getVersionControlInformation(groupId); + if (currentVersionControlInfo == null || currentVersionControlInfo.getVersionControlInformation() == null) { + throw new IllegalStateException("Process Group with ID " + groupId + " is not currently under Version Control"); + } + + final VersionControlInformationDTO currentInfo = currentVersionControlInfo.getVersionControlInformation(); + if (VersionControlInformationDTO.SYNC_FAILURE.equals(currentInfo.getState())) { + throw new IllegalStateException("Process Group with ID " + groupId + " cannot create a new branch while reporting Sync Failure"); + } + }, + (revision, entity) -> { + final VersionControlInformationEntity responseEntity = serviceFacade.createFlowBranch( + revision, + groupId, + entity.getBranch(), + entity.getSourceBranch(), + entity.getSourceVersion()); + + return generateOkResponse(responseEntity).build(); + }); + } + private String lockVersionControl(final URI originalUri, final String groupId) throws URISyntaxException { final URI createRequestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), originalUri.getPort(), "/nifi-api/versions/active-requests", null, originalUri.getFragment()); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/FlowRegistryDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/FlowRegistryDAO.java index 3c572cf4bb69..a773a6838b20 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/FlowRegistryDAO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/FlowRegistryDAO.java @@ -21,6 +21,7 @@ import org.apache.nifi.registry.flow.FlowRegistryBucket; import org.apache.nifi.registry.flow.FlowRegistryClientNode; import org.apache.nifi.registry.flow.FlowRegistryClientUserContext; +import org.apache.nifi.registry.flow.FlowVersionLocation; import org.apache.nifi.registry.flow.RegisteredFlow; import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO; @@ -54,6 +55,8 @@ public interface FlowRegistryDAO { Set getFlowVersionsForUser(FlowRegistryClientUserContext context, String branch, String registryId, String bucketId, String flowId); + void createBranchForUser(FlowRegistryClientUserContext context, String registryId, FlowVersionLocation sourceLocation, String newBranchName); + FlowRegistryClientNode removeFlowRegistry(String registryId); void verifyConfigVerification(String registryId); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java index 973b2c6839c7..f405b9684a67 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java @@ -33,6 +33,7 @@ import org.apache.nifi.registry.flow.FlowRegistryClientNode; import org.apache.nifi.registry.flow.FlowRegistryClientUserContext; import org.apache.nifi.registry.flow.FlowRegistryException; +import org.apache.nifi.registry.flow.FlowVersionLocation; import org.apache.nifi.registry.flow.RegisteredFlow; import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; import org.apache.nifi.util.BundleUtils; @@ -208,6 +209,22 @@ public RegisteredFlow getFlowForUser(final FlowRegistryClientUserContext context } } + @Override + public void createBranchForUser(final FlowRegistryClientUserContext context, final String registryId, final FlowVersionLocation sourceLocation, final String newBranchName) { + final FlowRegistryClientNode flowRegistry = flowController.getFlowManager().getFlowRegistryClient(registryId); + if (flowRegistry == null) { + throw new IllegalArgumentException("Registry ID [%s] not found".formatted(registryId)); + } + + try { + flowRegistry.createBranch(context, sourceLocation, newBranchName); + } catch (final UnsupportedOperationException e) { + throw e; + } catch (final IOException | FlowRegistryException ioe) { + throw new NiFiCoreException("Unable to create branch [%s] in registry with ID %s".formatted(newBranchName, registryId), ioe); + } + } + @Override public Set getFlowVersionsForUser(final FlowRegistryClientUserContext context, final String registryId, final String branch, final String bucketId, final String flowId) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java index 3de939f586ea..6bff1c398298 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java @@ -34,6 +34,7 @@ import org.apache.nifi.authorization.resource.ResourceType; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserDetails; +import org.apache.nifi.authorization.user.StandardNiFiUser; import org.apache.nifi.authorization.user.StandardNiFiUser.Builder; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.connector.ConnectorNode; @@ -71,9 +72,17 @@ import org.apache.nifi.history.HistoryQuery; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.processor.Processor; +import org.apache.nifi.registry.flow.FlowRegistryClientNode; +import org.apache.nifi.registry.flow.FlowRegistryClientUserContext; +import org.apache.nifi.registry.flow.FlowRegistryException; import org.apache.nifi.registry.flow.FlowRegistryUtil; +import org.apache.nifi.registry.flow.FlowSnapshotContainer; +import org.apache.nifi.registry.flow.FlowVersionLocation; import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; +import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.VersionControlInformation; +import org.apache.nifi.registry.flow.VersionedFlowState; +import org.apache.nifi.registry.flow.VersionedFlowStatus; import org.apache.nifi.registry.flow.diff.ComparableDataFlow; import org.apache.nifi.registry.flow.diff.DifferenceType; import org.apache.nifi.registry.flow.diff.FlowComparator; @@ -103,6 +112,7 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.apache.nifi.web.api.dto.action.HistoryDTO; import org.apache.nifi.web.api.dto.action.HistoryQueryDTO; import org.apache.nifi.web.api.dto.search.SearchResultsDTO; @@ -119,26 +129,32 @@ import org.apache.nifi.web.api.entity.StatusHistoryEntity; import org.apache.nifi.web.api.entity.TenantEntity; import org.apache.nifi.web.api.entity.TenantsEntity; +import org.apache.nifi.web.api.entity.VersionControlInformationEntity; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.dao.ComponentStateDAO; import org.apache.nifi.web.dao.ConnectorDAO; +import org.apache.nifi.web.dao.FlowRegistryDAO; import org.apache.nifi.web.dao.ProcessGroupDAO; import org.apache.nifi.web.dao.RemoteProcessGroupDAO; import org.apache.nifi.web.dao.UserDAO; import org.apache.nifi.web.dao.UserGroupDAO; import org.apache.nifi.web.revision.NaiveRevisionManager; +import org.apache.nifi.web.revision.RevisionClaim; import org.apache.nifi.web.revision.RevisionManager; import org.apache.nifi.web.revision.RevisionUpdate; import org.apache.nifi.web.revision.StandardRevisionUpdate; +import org.apache.nifi.web.revision.UpdateRevisionTask; import org.apache.nifi.web.security.token.NiFiAuthenticationToken; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Answers; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import org.springframework.security.authentication.TestingAuthenticationToken; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.ArrayList; @@ -177,7 +193,10 @@ import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -2076,4 +2095,213 @@ public void testGetConnectorClusterNodeRequest() { assertNotNull(entity.getComponent(), "Component should be populated when clusterNodeRequest is true"); assertEquals("RUNNING", entity.getComponent().getState()); } + + private StandardNiFiServiceFacade createBranchTestFacade(final ProcessGroupDAO branchProcessGroupDAO, final FlowRegistryDAO flowRegistryDAO, + final DtoFactory branchDtoFactory, final EntityFactory branchEntityFactory, + final FlowManager branchFlowManager) { + final ControllerFacade branchControllerFacade = mock(ControllerFacade.class); + lenient().when(branchControllerFacade.getFlowManager()).thenReturn(branchFlowManager); + + final RevisionManager branchRevisionManager = mock(RevisionManager.class); + lenient().when(branchRevisionManager.updateRevision(any(RevisionClaim.class), any(NiFiUser.class), any(UpdateRevisionTask.class))) + .thenAnswer(invocation -> { + final UpdateRevisionTask task = invocation.getArgument(2); + return task.update(); + }); + lenient().when(branchRevisionManager.getRevision(anyString())).thenAnswer(invocation -> { + final String componentId = invocation.getArgument(0, String.class); + return new Revision(1L, "client-1", componentId); + }); + + final StandardNiFiServiceFacade facade = new StandardNiFiServiceFacade(); + facade.setProcessGroupDAO(branchProcessGroupDAO); + facade.setFlowRegistryDAO(flowRegistryDAO); + facade.setDtoFactory(branchDtoFactory); + facade.setEntityFactory(branchEntityFactory); + facade.setControllerFacade(branchControllerFacade); + facade.setRevisionManager(branchRevisionManager); + + final NiFiUser user = new StandardNiFiUser.Builder().identity("unit-test").build(); + final TestingAuthenticationToken authenticationToken = new TestingAuthenticationToken(new NiFiUserDetails(user), null); + SecurityContextHolder.getContext().setAuthentication(authenticationToken); + + return facade; + } + + @Test + public void testCreateFlowBranchSuccess() throws IOException, FlowRegistryException { + final ProcessGroupDAO branchProcessGroupDAO = mock(ProcessGroupDAO.class); + final FlowRegistryDAO flowRegistryDAO = mock(FlowRegistryDAO.class); + final DtoFactory branchDtoFactory = mock(DtoFactory.class); + final EntityFactory branchEntityFactory = mock(EntityFactory.class); + final FlowManager branchFlowManager = mock(FlowManager.class); + lenient().when(branchFlowManager.getFlowRegistryClient(anyString())).thenReturn(null); + + final StandardNiFiServiceFacade facade = createBranchTestFacade(branchProcessGroupDAO, flowRegistryDAO, branchDtoFactory, branchEntityFactory, branchFlowManager); + final Revision revision = new Revision(1L, "client-1", "pg-1"); + + final ProcessGroup processGroup = mock(ProcessGroup.class); + when(branchProcessGroupDAO.getProcessGroup("pg-1")).thenReturn(processGroup); + + final VersionControlInformation versionControlInformation = mock(VersionControlInformation.class); + when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation); + when(versionControlInformation.getRegistryIdentifier()).thenReturn("registry-1"); + when(versionControlInformation.getBranch()).thenReturn("main"); + when(versionControlInformation.getBucketIdentifier()).thenReturn("bucket-1"); + when(versionControlInformation.getFlowIdentifier()).thenReturn("flow-1"); + when(versionControlInformation.getFlowDescription()).thenReturn("desc"); + when(versionControlInformation.getFlowName()).thenReturn("name"); + when(versionControlInformation.getStorageLocation()).thenReturn("loc"); + when(versionControlInformation.getVersion()).thenReturn("1"); + + final VersionedFlowStatus flowStatus = mock(VersionedFlowStatus.class); + when(versionControlInformation.getStatus()).thenReturn(flowStatus); + when(flowStatus.getState()).thenReturn(VersionedFlowState.LOCALLY_MODIFIED_AND_STALE); + when(flowStatus.getStateExplanation()).thenReturn("Up to date"); + + when(branchProcessGroupDAO.updateVersionControlInformation(any(VersionControlInformationDTO.class), eq(Collections.emptyMap()))) + .thenReturn(processGroup); + + final VersionControlInformationDTO updatedDto = new VersionControlInformationDTO(); + updatedDto.setBranch("feature"); + updatedDto.setRegistryId("registry-1"); + + final VersionControlInformationDTO refreshedDto = new VersionControlInformationDTO(); + refreshedDto.setBranch("feature"); + refreshedDto.setRegistryId("registry-1"); + refreshedDto.setState(VersionControlInformationDTO.LOCALLY_MODIFIED); + refreshedDto.setStateExplanation("Process Group has local modifications"); + + when(branchDtoFactory.createVersionControlInformationDto(processGroup)).thenReturn(updatedDto, refreshedDto); + + final VersionControlInformationEntity resultEntity = new VersionControlInformationEntity(); + resultEntity.setVersionControlInformation(refreshedDto); + when(branchEntityFactory.createVersionControlInformationEntity(eq(refreshedDto), any(RevisionDTO.class))).thenReturn(resultEntity); + when(branchDtoFactory.createRevisionDTO(any(FlowModification.class))).thenReturn(new RevisionDTO()); + + final FlowRegistryClientNode registryClient = mock(FlowRegistryClientNode.class); + when(branchFlowManager.getFlowRegistryClient("registry-1")).thenReturn(registryClient); + final VersionedProcessGroup registrySnapshot = new VersionedProcessGroup(); + final RegisteredFlowSnapshot registeredFlowSnapshot = new RegisteredFlowSnapshot(); + registeredFlowSnapshot.setFlowContents(registrySnapshot); + final FlowSnapshotContainer snapshotContainer = new FlowSnapshotContainer(registeredFlowSnapshot); + when(registryClient.getFlowContents(any(), any(FlowVersionLocation.class), eq(false))).thenReturn(snapshotContainer); + + final VersionControlInformationEntity response = facade.createFlowBranch(revision, "pg-1", " feature ", null, null); + assertEquals(resultEntity, response); + + final ArgumentCaptor locationCaptor = ArgumentCaptor.forClass(FlowVersionLocation.class); + verify(flowRegistryDAO).createBranchForUser(any(FlowRegistryClientUserContext.class), eq("registry-1"), locationCaptor.capture(), eq("feature")); + + final FlowVersionLocation capturedLocation = locationCaptor.getValue(); + assertEquals("main", capturedLocation.getBranch()); + assertEquals("bucket-1", capturedLocation.getBucketId()); + assertEquals("flow-1", capturedLocation.getFlowId()); + assertEquals("1", capturedLocation.getVersion()); + + verify(registryClient).getFlowContents(any(), any(FlowVersionLocation.class), eq(false)); + verify(processGroup).setVersionControlInformation(argThat(vci -> vci instanceof StandardVersionControlInformation + && ((StandardVersionControlInformation) vci).getFlowSnapshot() == registrySnapshot), eq(Collections.emptyMap())); + verify(processGroup).synchronizeWithFlowRegistry(branchFlowManager); + } + + @Test + public void testCreateFlowBranchSameBranchRejected() { + final ProcessGroupDAO branchProcessGroupDAO = mock(ProcessGroupDAO.class); + final FlowRegistryDAO flowRegistryDAO = mock(FlowRegistryDAO.class); + final FlowManager branchFlowManager = mock(FlowManager.class); + final StandardNiFiServiceFacade facade = createBranchTestFacade(branchProcessGroupDAO, flowRegistryDAO, mock(DtoFactory.class), mock(EntityFactory.class), branchFlowManager); + final Revision revision = new Revision(1L, "client-1", "pg-1"); + + final ProcessGroup processGroup = mock(ProcessGroup.class); + when(branchProcessGroupDAO.getProcessGroup("pg-1")).thenReturn(processGroup); + + final VersionControlInformation versionControlInformation = mock(VersionControlInformation.class); + when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation); + when(versionControlInformation.getBranch()).thenReturn("main"); + + assertThrows(IllegalArgumentException.class, () -> facade.createFlowBranch(revision, "pg-1", "main", null, null)); + + verify(flowRegistryDAO, never()).createBranchForUser(any(), any(), any(), any()); + verify(processGroup, never()).synchronizeWithFlowRegistry(any(FlowManager.class)); + } + + @Test + public void testCreateFlowBranchUnsupportedRegistry() throws IOException, FlowRegistryException { + final ProcessGroupDAO branchProcessGroupDAO = mock(ProcessGroupDAO.class); + final FlowRegistryDAO flowRegistryDAO = mock(FlowRegistryDAO.class); + final FlowManager branchFlowManager = mock(FlowManager.class); + final StandardNiFiServiceFacade facade = createBranchTestFacade(branchProcessGroupDAO, flowRegistryDAO, mock(DtoFactory.class), mock(EntityFactory.class), branchFlowManager); + final Revision revision = new Revision(1L, "client-1", "pg-1"); + + final ProcessGroup processGroup = mock(ProcessGroup.class); + when(branchProcessGroupDAO.getProcessGroup("pg-1")).thenReturn(processGroup); + + final VersionControlInformation versionControlInformation = mock(VersionControlInformation.class); + when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation); + when(versionControlInformation.getRegistryIdentifier()).thenReturn("registry-1"); + when(versionControlInformation.getBranch()).thenReturn("main"); + when(versionControlInformation.getBucketIdentifier()).thenReturn("bucket-1"); + when(versionControlInformation.getFlowIdentifier()).thenReturn("flow-1"); + when(versionControlInformation.getVersion()).thenReturn("1"); + + doThrow(new UnsupportedOperationException("not supported")) + .when(flowRegistryDAO) + .createBranchForUser(any(FlowRegistryClientUserContext.class), eq("registry-1"), any(FlowVersionLocation.class), eq("feature")); + + assertThrows(IllegalArgumentException.class, () -> facade.createFlowBranch(revision, "pg-1", "feature", null, null)); + + verify(processGroup, never()).synchronizeWithFlowRegistry(any(FlowManager.class)); + } + + @Test + public void testCreateFlowBranchNotVersionControlled() { + final ProcessGroupDAO branchProcessGroupDAO = mock(ProcessGroupDAO.class); + final FlowRegistryDAO flowRegistryDAO = mock(FlowRegistryDAO.class); + final FlowManager branchFlowManager = mock(FlowManager.class); + final StandardNiFiServiceFacade facade = createBranchTestFacade(branchProcessGroupDAO, flowRegistryDAO, mock(DtoFactory.class), mock(EntityFactory.class), branchFlowManager); + final Revision revision = new Revision(1L, "client-1", "pg-1"); + + final ProcessGroup processGroup = mock(ProcessGroup.class); + when(branchProcessGroupDAO.getProcessGroup("pg-1")).thenReturn(processGroup); + when(processGroup.getVersionControlInformation()).thenReturn(null); + + assertThrows(IllegalStateException.class, () -> facade.createFlowBranch(revision, "pg-1", "feature", null, null)); + + verify(flowRegistryDAO, never()).createBranchForUser(any(), any(), any(), any()); + verify(processGroup, never()).synchronizeWithFlowRegistry(any(FlowManager.class)); + } + + @Test + public void testCreateFlowBranchPropagatesRegistryErrors() throws IOException, FlowRegistryException { + final ProcessGroupDAO branchProcessGroupDAO = mock(ProcessGroupDAO.class); + final FlowRegistryDAO flowRegistryDAO = mock(FlowRegistryDAO.class); + final FlowManager branchFlowManager = mock(FlowManager.class); + final StandardNiFiServiceFacade facade = createBranchTestFacade(branchProcessGroupDAO, flowRegistryDAO, mock(DtoFactory.class), mock(EntityFactory.class), branchFlowManager); + final Revision revision = new Revision(1L, "client-1", "pg-1"); + + final ProcessGroup processGroup = mock(ProcessGroup.class); + when(branchProcessGroupDAO.getProcessGroup("pg-1")).thenReturn(processGroup); + + final VersionControlInformation versionControlInformation = mock(VersionControlInformation.class); + when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation); + when(versionControlInformation.getRegistryIdentifier()).thenReturn("registry-1"); + when(versionControlInformation.getBranch()).thenReturn("main"); + when(versionControlInformation.getBucketIdentifier()).thenReturn("bucket-1"); + when(versionControlInformation.getFlowIdentifier()).thenReturn("flow-1"); + when(versionControlInformation.getVersion()).thenReturn("1"); + + final FlowRegistryException cause = new FlowRegistryException("Branch [feature] already exists"); + doThrow(new NiFiCoreException("Unable to create branch [feature] in registry with ID registry-1", cause)) + .when(flowRegistryDAO) + .createBranchForUser(any(FlowRegistryClientUserContext.class), eq("registry-1"), any(FlowVersionLocation.class), eq("feature")); + + final NiFiCoreException exception = assertThrows(NiFiCoreException.class, () -> facade.createFlowBranch(revision, "pg-1", "feature", null, null)); + + assertTrue(exception.getMessage().contains("registry-1")); + assertTrue(exception.getMessage().contains("[feature]")); + assertEquals(cause, exception.getCause()); + + verify(processGroup, never()).synchronizeWithFlowRegistry(any(FlowManager.class)); + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java index 2526d6407179..f5a1fd88f634 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java @@ -21,18 +21,36 @@ import org.apache.nifi.registry.flow.FlowSnapshotContainer; import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.NiFiCoreException; import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; +import org.apache.nifi.web.api.entity.CreateFlowBranchRequestEntity; +import org.apache.nifi.web.api.entity.VersionControlInformationEntity; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.mock.web.MockHttpServletRequest; +import java.net.HttpURLConnection; import java.util.Collections; import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -45,6 +63,17 @@ public class TestVersionsResource { @Mock private NiFiServiceFacade serviceFacade; + @Mock + private NiFiProperties properties; + + @BeforeEach + public void setUp() { + versionsResource.setProperties(properties); + lenient().when(properties.isNode()).thenReturn(false); + lenient().when(properties.isClustered()).thenReturn(false); + versionsResource.httpServletRequest = new MockHttpServletRequest(); + } + @Test public void testExportFlowVersion() { final String groupId = UUID.randomUUID().toString(); @@ -70,7 +99,7 @@ public void testExportFlowVersion() { final RegisteredFlowSnapshot resultEntity = (RegisteredFlowSnapshot) response.getEntity(); - assertEquals(200, response.getStatus()); + assertEquals(HttpURLConnection.HTTP_OK, response.getStatus()); assertEquals(versionedFlowSnapshot, resultEntity); verify(versionedFlowSnapshot).setFlow(null); @@ -81,4 +110,238 @@ public void testExportFlowVersion() { verify(innerInnerVersionedProcessGroup).setVersionedFlowCoordinates(null); } + @Test + public void testCreateFlowBranchRequiresBranchName() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(0L); + requestEntity.setProcessGroupRevision(revisionDTO); + + assertThrows(IllegalArgumentException.class, () -> versionsResource.createFlowBranch(groupId, requestEntity)); + } + + @Test + public void testCreateFlowBranchInvokesService() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(1L); + requestEntity.setProcessGroupRevision(revisionDTO); + requestEntity.setBranch("feature"); + + versionsResource.httpServletRequest = new MockHttpServletRequest(); + + final VersionControlInformationDTO currentDto = new VersionControlInformationDTO(); + currentDto.setGroupId(groupId); + currentDto.setBranch("main"); + currentDto.setVersion("3"); + currentDto.setState(VersionControlInformationDTO.UP_TO_DATE); + final VersionControlInformationEntity currentEntity = new VersionControlInformationEntity(); + currentEntity.setVersionControlInformation(currentDto); + when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity); + + final VersionControlInformationEntity expectedEntity = new VersionControlInformationEntity(); + when(serviceFacade.createFlowBranch(any(Revision.class), eq(groupId), eq("feature"), any(), any())) + .thenReturn(expectedEntity); + + final Response response = versionsResource.createFlowBranch(groupId, requestEntity); + assertEquals(HttpURLConnection.HTTP_OK, response.getStatus()); + assertEquals(expectedEntity, response.getEntity()); + + ArgumentCaptor revisionCaptor = ArgumentCaptor.forClass(Revision.class); + verify(serviceFacade).createFlowBranch(revisionCaptor.capture(), eq(groupId), eq("feature"), isNull(), isNull()); + + final Revision capturedRevision = revisionCaptor.getValue(); + assertEquals(1L, capturedRevision.getVersion()); + assertEquals("client-id", capturedRevision.getClientId()); + assertEquals(groupId, capturedRevision.getComponentId()); + } + + @Test + public void testCreateFlowBranchFailsWhenBranchExists() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(1L); + requestEntity.setProcessGroupRevision(revisionDTO); + requestEntity.setBranch("main"); + + versionsResource.httpServletRequest = new MockHttpServletRequest(); + + final VersionControlInformationDTO currentDto = new VersionControlInformationDTO(); + currentDto.setGroupId(groupId); + currentDto.setBranch("main"); + currentDto.setState(VersionControlInformationDTO.UP_TO_DATE); + final VersionControlInformationEntity currentEntity = new VersionControlInformationEntity(); + currentEntity.setVersionControlInformation(currentDto); + when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity); + + when(serviceFacade.createFlowBranch(any(Revision.class), eq(groupId), eq("main"), any(), any())) + .thenThrow(new IllegalArgumentException("Process Group is already tracking branch main")); + + assertThrows(IllegalArgumentException.class, () -> versionsResource.createFlowBranch(groupId, requestEntity)); + } + + @Test + public void testCreateFlowBranchUnsupported() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(1L); + requestEntity.setProcessGroupRevision(revisionDTO); + requestEntity.setBranch("feature"); + + versionsResource.httpServletRequest = new MockHttpServletRequest(); + + final VersionControlInformationDTO currentDto = new VersionControlInformationDTO(); + currentDto.setGroupId(groupId); + currentDto.setBranch("main"); + currentDto.setVersion("2"); + currentDto.setState(VersionControlInformationDTO.UP_TO_DATE); + final VersionControlInformationEntity currentEntity = new VersionControlInformationEntity(); + currentEntity.setVersionControlInformation(currentDto); + when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity); + + when(serviceFacade.createFlowBranch(any(Revision.class), eq(groupId), eq("feature"), any(), any())) + .thenThrow(new IllegalArgumentException("Registry does not support branching")); + + assertThrows(IllegalArgumentException.class, () -> versionsResource.createFlowBranch(groupId, requestEntity)); + } + + @Test + public void testCreateFlowBranchAllowedWhenLocallyModified() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(1L); + requestEntity.setProcessGroupRevision(revisionDTO); + requestEntity.setBranch("feature"); + + versionsResource.httpServletRequest = new MockHttpServletRequest(); + + final VersionControlInformationDTO currentDto = new VersionControlInformationDTO(); + currentDto.setGroupId(groupId); + currentDto.setBranch("main"); + currentDto.setVersion("1"); + currentDto.setState(VersionControlInformationDTO.LOCALLY_MODIFIED); + final VersionControlInformationEntity currentEntity = new VersionControlInformationEntity(); + currentEntity.setVersionControlInformation(currentDto); + when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity); + + final VersionControlInformationEntity expectedEntity = new VersionControlInformationEntity(); + when(serviceFacade.createFlowBranch(any(Revision.class), eq(groupId), eq("feature"), any(), any())) + .thenReturn(expectedEntity); + + final Response response = versionsResource.createFlowBranch(groupId, requestEntity); + assertEquals(HttpURLConnection.HTTP_OK, response.getStatus()); + assertEquals(expectedEntity, response.getEntity()); + } + + @Test + public void testCreateFlowBranchBlockedWhenSyncFailure() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(1L); + requestEntity.setProcessGroupRevision(revisionDTO); + requestEntity.setBranch("feature"); + + versionsResource.httpServletRequest = new MockHttpServletRequest(); + + final VersionControlInformationDTO currentDto = new VersionControlInformationDTO(); + currentDto.setGroupId(groupId); + currentDto.setBranch("main"); + currentDto.setVersion("1"); + currentDto.setState(VersionControlInformationDTO.SYNC_FAILURE); + final VersionControlInformationEntity currentEntity = new VersionControlInformationEntity(); + currentEntity.setVersionControlInformation(currentDto); + when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity); + + assertThrows(IllegalStateException.class, () -> versionsResource.createFlowBranch(groupId, requestEntity)); + + verify(serviceFacade, never()).createFlowBranch(any(), anyString(), anyString(), any(), any()); + } + + @Test + public void testCreateFlowBranchAllowedWhenLocallyModifiedAndStale() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(1L); + requestEntity.setProcessGroupRevision(revisionDTO); + requestEntity.setBranch("feature"); + + versionsResource.httpServletRequest = new MockHttpServletRequest(); + + final VersionControlInformationDTO currentDto = new VersionControlInformationDTO(); + currentDto.setGroupId(groupId); + currentDto.setBranch("main"); + currentDto.setVersion("1"); + currentDto.setState(VersionControlInformationDTO.LOCALLY_MODIFIED_AND_STALE); + final VersionControlInformationEntity currentEntity = new VersionControlInformationEntity(); + currentEntity.setVersionControlInformation(currentDto); + when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity); + + final VersionControlInformationEntity expectedEntity = new VersionControlInformationEntity(); + when(serviceFacade.createFlowBranch(any(Revision.class), eq(groupId), eq("feature"), any(), any())) + .thenReturn(expectedEntity); + + final Response response = versionsResource.createFlowBranch(groupId, requestEntity); + assertEquals(HttpURLConnection.HTTP_OK, response.getStatus()); + assertEquals(expectedEntity, response.getEntity()); + } + + @Test + public void testCreateFlowBranchBranchAlreadyExists() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(1L); + requestEntity.setProcessGroupRevision(revisionDTO); + requestEntity.setBranch("feature"); + + versionsResource.httpServletRequest = new MockHttpServletRequest(); + + final VersionControlInformationDTO currentDto = new VersionControlInformationDTO(); + currentDto.setGroupId(groupId); + currentDto.setBranch("main"); + currentDto.setVersion("3"); + currentDto.setState(VersionControlInformationDTO.UP_TO_DATE); + final VersionControlInformationEntity currentEntity = new VersionControlInformationEntity(); + currentEntity.setVersionControlInformation(currentDto); + when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity); + + when(serviceFacade.createFlowBranch(any(Revision.class), eq(groupId), eq("feature"), any(), any())) + .thenThrow(new NiFiCoreException("Unable to create branch [feature] in registry with ID registry-1: Branch [feature] already exists")); + + assertThrows(NiFiCoreException.class, () -> versionsResource.createFlowBranch(groupId, requestEntity)); + } + + @Test + public void testCreateFlowBranchNotVersionControlled() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(1L); + requestEntity.setProcessGroupRevision(revisionDTO); + requestEntity.setBranch("feature"); + + versionsResource.httpServletRequest = new MockHttpServletRequest(); + + final VersionControlInformationEntity currentEntity = new VersionControlInformationEntity(); + currentEntity.setVersionControlInformation(null); + when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity); + + assertThrows(IllegalStateException.class, () -> versionsResource.createFlowBranch(groupId, requestEntity)); + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAOTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAOTest.java new file mode 100644 index 000000000000..43f4d69377b0 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAOTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.dao.impl; + +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.registry.flow.FlowRegistryClientNode; +import org.apache.nifi.registry.flow.FlowRegistryClientUserContext; +import org.apache.nifi.registry.flow.FlowRegistryException; +import org.apache.nifi.registry.flow.FlowVersionLocation; +import org.apache.nifi.web.NiFiCoreException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class StandardFlowRegistryDAOTest { + + @Mock + private FlowController flowController; + + @Mock + private FlowManager flowManager; + + @Mock + private FlowRegistryClientUserContext userContext; + + private StandardFlowRegistryDAO dao; + + @BeforeEach + void setUp() { + dao = new StandardFlowRegistryDAO(); + dao.setFlowController(flowController); + + when(flowController.getFlowManager()).thenReturn(flowManager); + } + + @Test + void testCreateBranchDelegatesToRegistryClient() throws IOException, FlowRegistryException { + final FlowRegistryClientNode clientNode = mock(FlowRegistryClientNode.class); + when(flowManager.getFlowRegistryClient("registry-1")).thenReturn(clientNode); + + final FlowVersionLocation sourceLocation = new FlowVersionLocation("main", "bucket", "flow", "1"); + + dao.createBranchForUser(userContext, "registry-1", sourceLocation, "feature"); + + ArgumentCaptor locationCaptor = ArgumentCaptor.forClass(FlowVersionLocation.class); + verify(clientNode).createBranch(eq(userContext), locationCaptor.capture(), eq("feature")); + + final FlowVersionLocation capturedLocation = locationCaptor.getValue(); + assertEquals("main", capturedLocation.getBranch()); + assertEquals("bucket", capturedLocation.getBucketId()); + assertEquals("flow", capturedLocation.getFlowId()); + assertEquals("1", capturedLocation.getVersion()); + } + + @Test + void testCreateBranchUnsupported() throws IOException, FlowRegistryException { + final FlowRegistryClientNode clientNode = mock(FlowRegistryClientNode.class); + when(flowManager.getFlowRegistryClient("registry-1")).thenReturn(clientNode); + final FlowVersionLocation sourceLocation = new FlowVersionLocation("main", "bucket", "flow", "1"); + + doThrow(new UnsupportedOperationException("not supported")) + .when(clientNode) + .createBranch(userContext, sourceLocation, "feature"); + + assertThrows(UnsupportedOperationException.class, + () -> dao.createBranchForUser(userContext, "registry-1", sourceLocation, "feature")); + } + + @Test + void testCreateBranchUnknownRegistry() { + when(flowManager.getFlowRegistryClient("missing")).thenReturn(null); + + final FlowVersionLocation sourceLocation = new FlowVersionLocation("main", "bucket", "flow", "1"); + assertThrows(IllegalArgumentException.class, + () -> dao.createBranchForUser(userContext, "missing", sourceLocation, "feature")); + } + + @Test + void testCreateBranchFlowRegistryExceptionWrapped() throws IOException, FlowRegistryException { + final FlowRegistryClientNode clientNode = mock(FlowRegistryClientNode.class); + when(flowManager.getFlowRegistryClient("registry-1")).thenReturn(clientNode); + + final FlowVersionLocation sourceLocation = new FlowVersionLocation("main", "bucket", "flow", "1"); + final FlowRegistryException cause = new FlowRegistryException("Branch [feature] already exists"); + doThrow(cause) + .when(clientNode) + .createBranch(userContext, sourceLocation, "feature"); + + final NiFiCoreException exception = assertThrows(NiFiCoreException.class, + () -> dao.createBranchForUser(userContext, "registry-1", sourceLocation, "feature")); + + assertTrue(exception.getMessage().contains("registry-1")); + assertTrue(exception.getMessage().contains("[feature]")); + assertEquals(cause, exception.getCause()); + } +}