diff --git a/libs/input-mapping/src/Helper/PathHelper.php b/libs/input-mapping/src/Helper/PathHelper.php new file mode 100644 index 000000000..91aadb0db --- /dev/null +++ b/libs/input-mapping/src/Helper/PathHelper.php @@ -0,0 +1,48 @@ +getPath()) . + self::getDestinationFilePath($destination, $table) . '.manifest'; + } + + public static function getDataFilePath( + FileStagingInterface $dataStorage, + string $destination, + InputTableOptions $table, + ): string { + return self::ensurePathDelimiter($dataStorage->getPath()) . + self::getDestinationFilePath($destination, $table); + } + + private static function getDestinationFilePath(string $destination, InputTableOptions $table): string + { + if (!$table->getDestination()) { + return $destination . '/' . $table->getSource(); + } else { + return $destination . '/' . $table->getDestination(); + } + } +} diff --git a/libs/input-mapping/src/Reader.php b/libs/input-mapping/src/Reader.php index 11ddeee9b..556a3be6c 100644 --- a/libs/input-mapping/src/Reader.php +++ b/libs/input-mapping/src/Reader.php @@ -4,6 +4,8 @@ namespace Keboola\InputMapping; +use InvalidArgumentException; +use Keboola\InputMapping\Exception\InputOperationException; use Keboola\InputMapping\File\Options\InputFileOptions; use Keboola\InputMapping\File\Options\RewrittenInputFileOptions; use Keboola\InputMapping\Helper\InputBucketValidator; @@ -14,8 +16,11 @@ use Keboola\InputMapping\State\InputTableStateList; use Keboola\InputMapping\Table\Options\InputTableOptionsList; use Keboola\InputMapping\Table\Options\ReaderOptions; +use Keboola\InputMapping\Table\Options\RewrittenInputTableOptionsList; use Keboola\InputMapping\Table\Result; use Keboola\InputMapping\Table\Strategy\AbstractStrategy as TableAbstractStrategy; +use Keboola\InputMapping\Table\Strategy\AbstractWorkspaceStrategy; +use Keboola\InputMapping\Table\Strategy\WorkspaceLoadQueue; use Keboola\InputMapping\Table\TableDefinitionResolver; use Keboola\StorageApiBranch\ClientWrapper; use Psr\Log\LoggerInterface; @@ -47,32 +52,29 @@ public function downloadFiles( return $strategy->downloadFiles($configuration, $destination); } - /** - * @param InputTableOptionsList $tablesDefinition list of input mappings - * @param InputTableStateList $tablesState list of input mapping table states - * @param string $destination destination folder - * @param ReaderOptions $readerOptions - * @return Result - */ - public function downloadTables( - InputTableOptionsList $tablesDefinition, - InputTableStateList $tablesState, - string $destination, - ReaderOptions $readerOptions, - ): Result { - $tableResolver = new TableDefinitionResolver( + private function createTableResolver(): TableDefinitionResolver + { + return new TableDefinitionResolver( $this->clientWrapper->getTableAndFileStorageClient(), $this->logger, ); - $tablesState = TableRewriteHelperFactory::getTableRewriteHelper( + } + + private function rewriteTableStatesDestinations(InputTableStateList $tablesState): InputTableStateList + { + return TableRewriteHelperFactory::getTableRewriteHelper( $this->clientWrapper->getClientOptionsReadOnly(), )->rewriteTableStatesDestinations( $tablesState, $this->clientWrapper, $this->logger, ); - $tablesDefinition = $tableResolver->resolve($tablesDefinition); - $strategy = $this->strategyFactory->getTableInputStrategy($destination, $tablesState); + } + + private function validateAndRewriteDevBuckets( + InputTableOptionsList $tablesDefinition, + ReaderOptions $readerOptions, + ): RewrittenInputTableOptionsList { if ($readerOptions->devInputsDisabled() && !$this->clientWrapper->getClientOptionsReadOnly()->useBranchStorage() ) { @@ -83,17 +85,79 @@ public function downloadTables( $this->clientWrapper, ); } - $tablesDefinition = TableRewriteHelperFactory::getTableRewriteHelper( + + return TableRewriteHelperFactory::getTableRewriteHelper( $this->clientWrapper->getClientOptionsReadOnly(), )->rewriteTableOptionsSources( $tablesDefinition, $this->clientWrapper, $this->logger, ); + } + + + /** + * @param InputTableOptionsList $tablesDefinition list of input mappings + * @param InputTableStateList $tablesState list of input mapping table states + * @param string $destination destination folder + * @param ReaderOptions $readerOptions + * @return Result + */ + public function downloadTables( + InputTableOptionsList $tablesDefinition, + InputTableStateList $tablesState, + string $destination, + ReaderOptions $readerOptions, + ): Result { + $tablesState = $this->rewriteTableStatesDestinations($tablesState); + + $tablesDefinition = $this->createTableResolver()->resolve($tablesDefinition); + $strategy = $this->strategyFactory->getTableInputStrategy($destination, $tablesState); + $tablesDefinition = $this->validateAndRewriteDevBuckets($tablesDefinition, $readerOptions); + /** @var TableAbstractStrategy $strategy */ return $strategy->downloadTables($tablesDefinition->getTables(), $readerOptions->preserveWorkspace()); } + /** + * Execute only prepare and execute phases for workspace table loading + * Returns WorkspaceLoadQueue for later completion with waitForTableLoadCompletion() + * + * @param InputTableOptionsList $tablesDefinition list of input mappings + * @param InputTableStateList $tablesState list of input mapping table states + * @param string $destination destination folder + * @param ReaderOptions $readerOptions + * @return WorkspaceLoadQueue + * @throws InvalidArgumentException if strategy is not workspace-based + */ + public function prepareAndExecuteTableLoads( + InputTableOptionsList $tablesDefinition, + InputTableStateList $tablesState, + string $destination, + ReaderOptions $readerOptions, + ): WorkspaceLoadQueue { + $tablesState = $this->rewriteTableStatesDestinations($tablesState); + + $tablesDefinition = $this->createTableResolver()->resolve($tablesDefinition); + $strategy = $this->strategyFactory->getTableInputStrategy($destination, $tablesState); + + // Ensure we have a workspace strategy. For file this method is not yet implemented. + if (!$strategy instanceof AbstractWorkspaceStrategy) { + throw new InputOperationException( + 'prepareAndExecuteTableLoads() can only be used with workspace strategies', + ); + } + + $tablesDefinition = $this->validateAndRewriteDevBuckets($tablesDefinition, $readerOptions); + + // Execute only Phase 1 & 2: Prepare and Execute + // Let the strategy handle the planning internally + return $strategy->prepareAndExecuteTableLoads( + $tablesDefinition->getTables(), + $readerOptions->preserveWorkspace(), + ); + } + /** * Get parent runId to the current runId (defined by SAPI client) * @param string $runId diff --git a/libs/input-mapping/src/Table/Strategy/ABS.php b/libs/input-mapping/src/Table/Strategy/ABS.php index 2ca5be178..aba0a5adf 100644 --- a/libs/input-mapping/src/Table/Strategy/ABS.php +++ b/libs/input-mapping/src/Table/Strategy/ABS.php @@ -5,6 +5,7 @@ namespace Keboola\InputMapping\Table\Strategy; use Keboola\InputMapping\Exception\InvalidInputException; +use Keboola\InputMapping\Helper\PathHelper; use Keboola\InputMapping\Table\Options\RewrittenInputTableOptions; use Keboola\StorageApi\Options\GetFileOptions; @@ -36,8 +37,11 @@ public function handleExports(array $exports, bool $preserve): array foreach ($exports as $export) { /** @var RewrittenInputTableOptions $table */ [$jobId, $table] = $export; - $manifestPath = $this->ensurePathDelimiter($this->metadataStorage->getPath()) . - $this->getDestinationFilePath($this->destination, $table) . '.manifest'; + $manifestPath = PathHelper::getManifestPath( + $this->metadataStorage, + $this->destination, + $table, + ); $tableInfo = $table->getTableInfo(); $fileInfo = $this->clientWrapper->getTableAndFileStorageClient()->getFile( $keyedResults[$jobId]['results']['file']['id'], diff --git a/libs/input-mapping/src/Table/Strategy/AbstractStrategy.php b/libs/input-mapping/src/Table/Strategy/AbstractStrategy.php index 19ae2bde8..412fb29ff 100644 --- a/libs/input-mapping/src/Table/Strategy/AbstractStrategy.php +++ b/libs/input-mapping/src/Table/Strategy/AbstractStrategy.php @@ -5,7 +5,6 @@ namespace Keboola\InputMapping\Table\Strategy; use Keboola\InputMapping\State\InputTableStateList; -use Keboola\InputMapping\Table\Options\InputTableOptions; use Keboola\InputMapping\Table\Options\RewrittenInputTableOptions; use Keboola\InputMapping\Table\Result; use Keboola\InputMapping\Table\Result\TableInfo; @@ -16,16 +15,6 @@ abstract class AbstractStrategy implements StrategyInterface { protected readonly LoggerInterface $logger; // @phpstan-ignore-line initialized in child classes - protected function ensurePathDelimiter(string $path): string - { - return $this->ensureNoPathDelimiter($path) . '/'; - } - - protected function ensureNoPathDelimiter(string $path): string - { - return rtrim($path, '\\/'); - } - /** * @param RewrittenInputTableOptions[] $tables * @param bool $preserve @@ -52,13 +41,4 @@ public function downloadTables(array $tables, bool $preserve): Result return $result; } - - protected function getDestinationFilePath(string $destination, InputTableOptions $table): string - { - if (!$table->getDestination()) { - return $destination . '/' . $table->getSource(); - } else { - return $destination . '/' . $table->getDestination(); - } - } } diff --git a/libs/input-mapping/src/Table/Strategy/AbstractWorkspaceStrategy.php b/libs/input-mapping/src/Table/Strategy/AbstractWorkspaceStrategy.php index 15f9c59cf..a17cc37b5 100644 --- a/libs/input-mapping/src/Table/Strategy/AbstractWorkspaceStrategy.php +++ b/libs/input-mapping/src/Table/Strategy/AbstractWorkspaceStrategy.php @@ -7,6 +7,7 @@ use InvalidArgumentException; use Keboola\InputMapping\Helper\LoadTypeDecider; use Keboola\InputMapping\Helper\ManifestCreator; +use Keboola\InputMapping\Helper\PathHelper; use Keboola\InputMapping\State\InputTableStateList; use Keboola\InputMapping\Table\Options\RewrittenInputTableOptions; use Keboola\StagingProvider\Staging\File\FileFormat; @@ -19,10 +20,6 @@ abstract class AbstractWorkspaceStrategy extends AbstractStrategy { - private const LOAD_TYPE_CLONE = 'clone'; - private const LOAD_TYPE_COPY = 'copy'; - private const LOAD_TYPE_VIEW = 'view'; - protected readonly WorkspaceStagingInterface $dataStorage; protected readonly ManifestCreator $manifestCreator; @@ -43,39 +40,23 @@ public function __construct( $this->manifestCreator = new ManifestCreator(); } - abstract protected function getWorkspaceType(): string; + abstract public function getWorkspaceType(): string; public function downloadTable(RewrittenInputTableOptions $table): array { $loadOptions = $table->getStorageApiLoadOptions($this->tablesState); - LoadTypeDecider::checkViableLoadMethod( - $table->getTableInfo(), - $this->getWorkspaceType(), - $loadOptions, - $this->clientWrapper->getToken()->getProjectId(), - ); + $loadType = $this->decideTableLoadMethod($table, $loadOptions); - if (LoadTypeDecider::canClone($table->getTableInfo(), $this->getWorkspaceType(), $loadOptions)) { - $this->logger->info(sprintf('Table "%s" will be cloned.', $table->getSource())); + if ($loadType === WorkspaceLoadType::CLONE) { return [ 'table' => $table, - 'type' => self::LOAD_TYPE_CLONE, + 'type' => $loadType->value, ]; } - if (LoadTypeDecider::canUseView( - $table->getTableInfo(), - $this->getWorkspaceType(), - )) { - $this->logger->info(sprintf('Table "%s" will be created as view.', $table->getSource())); - return [ - 'table' => [$table, $loadOptions], - 'type' => self::LOAD_TYPE_VIEW, - ]; - } - $this->logger->info(sprintf('Table "%s" will be copied.', $table->getSource())); + return [ 'table' => [$table, $loadOptions], - 'type' => self::LOAD_TYPE_COPY, + 'type' => $loadType->value, ]; } @@ -86,45 +67,17 @@ public function handleExports(array $exports, bool $preserve): array $workspaceTables = []; foreach ($exports as $export) { - if ($export['type'] === self::LOAD_TYPE_CLONE) { + if ($export['type'] === WorkspaceLoadType::CLONE->value) { /** @var RewrittenInputTableOptions $table */ $table = $export['table']; - $cloneInput = [ - 'source' => $table->getSource(), - 'destination' => $table->getDestination(), - 'sourceBranchId' => $table->getSourceBranchId(), - 'overwrite' => $table->getOverwrite(), - 'dropTimestampColumn' => !$table->keepInternalTimestampColumn(), - ]; - if ($table->getSourceBranchId() !== null) { - // practically, sourceBranchId should never be null, but i'm not able to make that statically safe - // and passing null causes application error in connection, so here is a useless condition. - $cloneInput['sourceBranchId'] = $table->getSourceBranchId(); - } - $cloneInputs[] = $cloneInput; + $cloneInputs[] = $this->buildCloneInput($table); $workspaceTables[] = $table; } - if (in_array($export['type'], [self::LOAD_TYPE_COPY, self::LOAD_TYPE_VIEW], true)) { + if (in_array($export['type'], [WorkspaceLoadType::COPY->value, WorkspaceLoadType::VIEW->value], true)) { [$table, $exportOptions] = $export['table']; - if ($table->getSourceBranchId() !== null) { - // practically, sourceBranchId should never be null, but i'm not able to make that statically safe - // and passing null causes application error in connection, so here is a useless condition. - $exportOptions['sourceBranchId'] = $table->getSourceBranchId(); - } - $copyInput = array_merge( - [ - 'source' => $table->getSource(), - 'destination' => $table->getDestination(), - ], - $exportOptions, - ); - - if ($table->isUseView() || $export['type'] === self::LOAD_TYPE_VIEW) { - $copyInput['useView'] = true; - } - + $loadType = WorkspaceLoadType::from($export['type']); + $copyInputs[] = $this->buildCopyInput($table, $exportOptions, $loadType); $workspaceTables[] = $table; - $copyInputs[] = $copyInput; } } @@ -132,7 +85,7 @@ public function handleExports(array $exports, bool $preserve): array $copyJobResult = []; $hasBeenCleaned = false; - $workspaces = new Workspaces($this->clientWrapper->getBranchClient()); + $workspaces = $this->createWorkspaces(); if ($cloneInputs) { $this->logger->info( @@ -172,8 +125,11 @@ public function handleExports(array $exports, bool $preserve): array $this->logger->info('Processed ' . count($jobResults) . ' workspace exports.'); foreach ($workspaceTables as $table) { - $manifestPath = $this->ensurePathDelimiter($this->metadataStorage->getPath()) . - $this->getDestinationFilePath($this->destination, $table) . '.manifest'; + $manifestPath = PathHelper::getManifestPath( + $this->metadataStorage, + $this->destination, + $table, + ); $this->manifestCreator->writeTableManifest( $table->getTableInfo(), $manifestPath, @@ -183,4 +139,187 @@ public function handleExports(array $exports, bool $preserve): array } return $jobResults; } + + /** + * Phase 1: Prepare - Analyze tables and create workspace load instructions + * Determines how each table from Table Storage should be loaded into Workspace + * + * @param RewrittenInputTableOptions[] $tables + * @return WorkspaceTableLoadInstruction[] + */ + public function prepareTableLoadsToWorkspace(array $tables): array + { + $instructions = []; + + foreach ($tables as $table) { + $loadOptions = $table->getStorageApiLoadOptions($this->tablesState); + $loadType = $this->decideTableLoadMethod($table, $loadOptions); + + $instructionLoadOptions = $loadType === WorkspaceLoadType::CLONE ? null : $loadOptions; + $instructions[] = new WorkspaceTableLoadInstruction( + $loadType, + $table, + $instructionLoadOptions, + ); + } + + return $instructions; + } + + /** + * Phase 2: Execute - Submit async jobs to load tables from Table Storage into Workspace + * CLEAN jobs must complete before other jobs are submitted + */ + public function executeTableLoadsToWorkspace(WorkspaceLoadPlan $plan): WorkspaceLoadQueue + { + $jobs = []; + $workspaces = $this->createWorkspaces(); + $workspaceId = (int) $this->dataStorage->getWorkspaceId(); + + // Step 1: Clean workspace if needed - MUST complete before other operations + if (!$plan->preserve) { + $this->logger->info('Cleaning workspace before loading tables.'); + $cleanJobId = $workspaces->queueWorkspaceCloneInto($workspaceId, [ + 'input' => [], // workspace will be only cleaned + 'preserve' => 0, + ]); + + // Wait for clean job to complete before proceeding + $this->clientWrapper->getBranchClient()->handleAsyncTasks([$cleanJobId]); + + // Don't add CLEAN job to queue since it's already completed + } + + // Step 2: Submit clone operations (after workspace is clean) + $cloneInstructions = $plan->getCloneInstructions(); + if ($plan->hasCloneInstructions()) { + $cloneInputs = []; + $cloneTables = []; + + foreach ($cloneInstructions as $instruction) { + $cloneInputs[] = $this->buildCloneInput($instruction->table); + $cloneTables[] = $instruction->table; + } + + $this->logger->info( + sprintf('Cloning %s tables to workspace.', count($cloneInputs)), + ); + $jobId = $workspaces->queueWorkspaceCloneInto($workspaceId, [ + 'input' => $cloneInputs, + 'preserve' => 1, + ]); + $jobs[] = new WorkspaceLoadJob((string) $jobId, WorkspaceLoadType::CLONE, $cloneTables); + } + + // Step 3: Submit copy/load operations (after workspace is clean) + $copyInstructions = $plan->getCopyInstructions(); + if ($plan->hasCopyInstructions()) { + $copyInputs = []; + $copyTables = []; + + foreach ($copyInstructions as $instruction) { + $copyInputs[] = $this->buildCopyInput( + $instruction->table, + $instruction->loadOptions ?? [], + $instruction->loadType, + ); + $copyTables[] = $instruction->table; + } + + $this->logger->info( + sprintf('Copying %s tables to workspace.', count($copyInputs)), + ); + $jobId = $workspaces->queueWorkspaceLoadData($workspaceId, [ + 'input' => $copyInputs, + 'preserve' => 1, + ]); + $jobs[] = new WorkspaceLoadJob((string) $jobId, WorkspaceLoadType::COPY, $copyTables); + } + + return new WorkspaceLoadQueue($jobs); + } + + /** + * Execute only Phase 1 & 2: Prepare and Execute workspace table loading + * Returns WorkspaceLoadQueue for later completion with waitForTableLoadCompletion() + * + * @param RewrittenInputTableOptions[] $tables + * @param bool $preserve + * @return WorkspaceLoadQueue + */ + public function prepareAndExecuteTableLoads(array $tables, bool $preserve): WorkspaceLoadQueue + { + // Phase 1: Prepare + $instructions = $this->prepareTableLoadsToWorkspace($tables); + $plan = new WorkspaceLoadPlan( + $instructions, + $preserve, + ); + + // Phase 2: Execute + return $this->executeTableLoadsToWorkspace($plan); + } + + private function buildCopyInput( + RewrittenInputTableOptions $table, + array $loadOptions, + WorkspaceLoadType $loadType, + ): array { + // Add sourceBranchId to loadOptions first (preserving handleExports behavior) + if ($table->getSourceBranchId() !== null) { + // practically, sourceBranchId should never be null, but i'm not able to make that statically safe + // and passing null causes application error in connection, so here is a useless condition. + $loadOptions['sourceBranchId'] = $table->getSourceBranchId(); + } + + $copyInput = array_merge([ + 'source' => $table->getSource(), + 'destination' => $table->getDestination(), + ], $loadOptions); + + // Views point to Table Storage, copies transfer data to Workspace + if ($loadType === WorkspaceLoadType::VIEW || $table->isUseView()) { + $copyInput['useView'] = true; + } + + return $copyInput; + } + + private function buildCloneInput(RewrittenInputTableOptions $table): array + { + return [ + 'source' => $table->getSource(), + 'destination' => $table->getDestination(), + 'sourceBranchId' => $table->getSourceBranchId(), + 'overwrite' => $table->getOverwrite(), + 'dropTimestampColumn' => !$table->keepInternalTimestampColumn(), + ]; + } + + private function decideTableLoadMethod(RewrittenInputTableOptions $table, array $loadOptions): WorkspaceLoadType + { + // Validate that table can be loaded to this workspace type + LoadTypeDecider::checkViableLoadMethod( + $table->getTableInfo(), + $this->getWorkspaceType(), + $loadOptions, + $this->clientWrapper->getToken()->getProjectId(), + ); + + if (LoadTypeDecider::canClone($table->getTableInfo(), $this->getWorkspaceType(), $loadOptions)) { + $this->logger->info(sprintf('Table "%s" will be cloned.', $table->getSource())); + return WorkspaceLoadType::CLONE; + } + if (LoadTypeDecider::canUseView($table->getTableInfo(), $this->getWorkspaceType())) { + $this->logger->info(sprintf('Table "%s" will be created as view.', $table->getSource())); + return WorkspaceLoadType::VIEW; + } + $this->logger->info(sprintf('Table "%s" will be copied.', $table->getSource())); + return WorkspaceLoadType::COPY; + } + + protected function createWorkspaces(): Workspaces + { + return new Workspaces($this->clientWrapper->getBranchClient()); + } } diff --git a/libs/input-mapping/src/Table/Strategy/BigQuery.php b/libs/input-mapping/src/Table/Strategy/BigQuery.php index 1b613b2ca..927d845d1 100644 --- a/libs/input-mapping/src/Table/Strategy/BigQuery.php +++ b/libs/input-mapping/src/Table/Strategy/BigQuery.php @@ -6,7 +6,7 @@ class BigQuery extends AbstractWorkspaceStrategy { - protected function getWorkspaceType(): string + public function getWorkspaceType(): string { return 'bigquery'; } diff --git a/libs/input-mapping/src/Table/Strategy/Local.php b/libs/input-mapping/src/Table/Strategy/Local.php index 50648e93d..cc0c1cf3b 100644 --- a/libs/input-mapping/src/Table/Strategy/Local.php +++ b/libs/input-mapping/src/Table/Strategy/Local.php @@ -5,6 +5,7 @@ namespace Keboola\InputMapping\Table\Strategy; use Keboola\InputMapping\Exception\InvalidInputException; +use Keboola\InputMapping\Helper\PathHelper; use Keboola\InputMapping\Table\Options\RewrittenInputTableOptions; use Keboola\StorageApi\TableExporter; @@ -21,8 +22,11 @@ public function downloadTable(RewrittenInputTableOptions $table): array $exportLimit = $tokenInfo['owner']['limits'][self::EXPORT_SIZE_LIMIT_NAME]['value']; } - $file = $this->ensurePathDelimiter($this->dataStorage->getPath()) . - $this->getDestinationFilePath($this->destination, $table); + $file = PathHelper::getDataFilePath( + $this->dataStorage, + $this->destination, + $table, + ); $tableInfo = $table->getTableInfo(); if ($tableInfo['dataSizeBytes'] > $exportLimit) { throw new InvalidInputException(sprintf( @@ -36,8 +40,11 @@ public function downloadTable(RewrittenInputTableOptions $table): array $this->manifestCreator->writeTableManifest( $tableInfo, - $this->ensurePathDelimiter($this->metadataStorage->getPath()) . - $this->getDestinationFilePath($this->destination, $table) . '.manifest', + PathHelper::getManifestPath( + $this->metadataStorage, + $this->destination, + $table, + ), $table->getColumnNamesFromTypes(), $this->format, ); diff --git a/libs/input-mapping/src/Table/Strategy/S3.php b/libs/input-mapping/src/Table/Strategy/S3.php index 63182b166..189c61cfc 100644 --- a/libs/input-mapping/src/Table/Strategy/S3.php +++ b/libs/input-mapping/src/Table/Strategy/S3.php @@ -5,6 +5,7 @@ namespace Keboola\InputMapping\Table\Strategy; use Keboola\InputMapping\Exception\InvalidInputException; +use Keboola\InputMapping\Helper\PathHelper; use Keboola\InputMapping\Table\Options\RewrittenInputTableOptions; use Keboola\StorageApi\Options\GetFileOptions; @@ -35,8 +36,11 @@ public function handleExports(array $exports, bool $preserve): array foreach ($exports as $export) { $table = $export['table']; - $manifestPath = $this->ensurePathDelimiter($this->metadataStorage->getPath()) . - $this->getDestinationFilePath($this->destination, $table) . '.manifest'; + $manifestPath = PathHelper::getManifestPath( + $this->metadataStorage, + $this->destination, + $table, + ); $tableInfo = $table->getTableInfo(); $fileInfo = $this->clientWrapper->getTableAndFileStorageClient()->getFile( $keyedResults[$export['jobId']]['results']['file']['id'], diff --git a/libs/input-mapping/src/Table/Strategy/Snowflake.php b/libs/input-mapping/src/Table/Strategy/Snowflake.php index 07d3118f7..3dc596d30 100644 --- a/libs/input-mapping/src/Table/Strategy/Snowflake.php +++ b/libs/input-mapping/src/Table/Strategy/Snowflake.php @@ -6,7 +6,7 @@ class Snowflake extends AbstractWorkspaceStrategy { - protected function getWorkspaceType(): string + public function getWorkspaceType(): string { return 'snowflake'; } diff --git a/libs/input-mapping/src/Table/Strategy/WorkspaceLoadJob.php b/libs/input-mapping/src/Table/Strategy/WorkspaceLoadJob.php new file mode 100644 index 000000000..b517a1db2 --- /dev/null +++ b/libs/input-mapping/src/Table/Strategy/WorkspaceLoadJob.php @@ -0,0 +1,27 @@ +value), + ); + } + } +} diff --git a/libs/input-mapping/src/Table/Strategy/WorkspaceLoadPlan.php b/libs/input-mapping/src/Table/Strategy/WorkspaceLoadPlan.php new file mode 100644 index 000000000..6cb766474 --- /dev/null +++ b/libs/input-mapping/src/Table/Strategy/WorkspaceLoadPlan.php @@ -0,0 +1,53 @@ +instructions, + fn(WorkspaceTableLoadInstruction $instruction) => $instruction->loadType === WorkspaceLoadType::CLONE, + ); + } + + /** + * @return WorkspaceTableLoadInstruction[] + */ + public function getCopyInstructions(): array + { + return array_filter( + $this->instructions, + fn(WorkspaceTableLoadInstruction $instruction) => in_array( + $instruction->loadType, + [WorkspaceLoadType::COPY, WorkspaceLoadType::VIEW], + true, + ), + ); + } + + public function hasCloneInstructions(): bool + { + return !empty($this->getCloneInstructions()); + } + + public function hasCopyInstructions(): bool + { + return !empty($this->getCopyInstructions()); + } +} diff --git a/libs/input-mapping/src/Table/Strategy/WorkspaceLoadQueue.php b/libs/input-mapping/src/Table/Strategy/WorkspaceLoadQueue.php new file mode 100644 index 000000000..f1f2117b9 --- /dev/null +++ b/libs/input-mapping/src/Table/Strategy/WorkspaceLoadQueue.php @@ -0,0 +1,30 @@ +jobs as $job) { + $tables = array_merge($tables, $job->tables); + } + return $tables; + } +} diff --git a/libs/input-mapping/src/Table/Strategy/WorkspaceLoadType.php b/libs/input-mapping/src/Table/Strategy/WorkspaceLoadType.php new file mode 100644 index 000000000..7abe8e0f6 --- /dev/null +++ b/libs/input-mapping/src/Table/Strategy/WorkspaceLoadType.php @@ -0,0 +1,12 @@ + [ + 'metadataPath' => '/tmp/metadata', + 'destination' => 'destination-folder', + 'tableSource' => 'test-table', + 'tableDestination' => 'my-table', + 'expected' => '/tmp/metadata/destination-folder/my-table.manifest', + ]; + + yield 'table without custom destination (uses source)' => [ + 'metadataPath' => '/tmp/metadata', + 'destination' => 'destination-folder', + 'tableSource' => 'test-table', + 'tableDestination' => null, + 'expected' => '/tmp/metadata/destination-folder/test-table.manifest', + ]; + + yield 'metadata path with trailing slash' => [ + 'metadataPath' => '/tmp/metadata/', + 'destination' => 'destination-folder', + 'tableSource' => 'test-table', + 'tableDestination' => 'my-table', + 'expected' => '/tmp/metadata/destination-folder/my-table.manifest', + ]; + + yield 'metadata path without leading slash' => [ + 'metadataPath' => 'tmp/metadata', + 'destination' => 'destination-folder', + 'tableSource' => 'test-table', + 'tableDestination' => 'my-table', + 'expected' => 'tmp/metadata/destination-folder/my-table.manifest', + ]; + + yield 'complex paths with multiple segments' => [ + 'metadataPath' => '/var/tmp/keboola/metadata', + 'destination' => 'input/tables', + 'tableSource' => 'bucket.table-name', + 'tableDestination' => 'transformed-table', + 'expected' => '/var/tmp/keboola/metadata/input/tables/transformed-table.manifest', + ]; + + yield 'metadata path with multiple trailing slashes' => [ + 'metadataPath' => '/tmp/metadata///', + 'destination' => 'destination-folder', + 'tableSource' => 'test-table', + 'tableDestination' => 'my-table', + 'expected' => '/tmp/metadata/destination-folder/my-table.manifest', + ]; + } + + /** + * @dataProvider getManifestPathDataProvider + */ + public function testGetManifestPath( + string $metadataPath, + string $destination, + string $tableSource, + ?string $tableDestination, + string $expected, + ): void { + $metadataStorageMock = $this->createMock(FileStagingInterface::class); + $metadataStorageMock->expects(self::once())->method('getPath')->willReturn($metadataPath); + + $tableOptionsConfig = ['source' => $tableSource]; + if ($tableDestination !== null) { + $tableOptionsConfig['destination'] = $tableDestination; + } + + $tableOptions = new RewrittenInputTableOptions( + $tableOptionsConfig, + $tableSource, + 123, + [], + ); + + $result = PathHelper::getManifestPath($metadataStorageMock, $destination, $tableOptions); + + self::assertSame($expected, $result); + } + + public static function getDataFilePathDataProvider(): Generator + { + yield 'basic case with custom destination' => [ + 'dataPath' => '/tmp/data', + 'destination' => 'destination-folder', + 'tableSource' => 'test-table', + 'tableDestination' => 'my-table', + 'expected' => '/tmp/data/destination-folder/my-table', + ]; + + yield 'table without custom destination (uses source)' => [ + 'dataPath' => '/tmp/data', + 'destination' => 'destination-folder', + 'tableSource' => 'test-table', + 'tableDestination' => null, + 'expected' => '/tmp/data/destination-folder/test-table', + ]; + + yield 'data path with trailing slash' => [ + 'dataPath' => '/tmp/data/', + 'destination' => 'destination-folder', + 'tableSource' => 'test-table', + 'tableDestination' => 'my-table', + 'expected' => '/tmp/data/destination-folder/my-table', + ]; + + yield 'data path without leading slash' => [ + 'dataPath' => 'tmp/data', + 'destination' => 'destination-folder', + 'tableSource' => 'test-table', + 'tableDestination' => 'my-table', + 'expected' => 'tmp/data/destination-folder/my-table', + ]; + + yield 'complex paths with multiple segments' => [ + 'dataPath' => '/var/tmp/keboola/data', + 'destination' => 'input/tables', + 'tableSource' => 'bucket.table-name', + 'tableDestination' => 'transformed-table', + 'expected' => '/var/tmp/keboola/data/input/tables/transformed-table', + ]; + + yield 'data path with multiple trailing slashes' => [ + 'dataPath' => '/tmp/data///', + 'destination' => 'destination-folder', + 'tableSource' => 'test-table', + 'tableDestination' => 'my-table', + 'expected' => '/tmp/data/destination-folder/my-table', + ]; + } + + /** + * @dataProvider getDataFilePathDataProvider + */ + public function testGetDataFilePath( + string $dataPath, + string $destination, + string $tableSource, + ?string $tableDestination, + string $expected, + ): void { + $dataStorageMock = $this->createMock(FileStagingInterface::class); + $dataStorageMock->expects(self::once())->method('getPath')->willReturn($dataPath); + + $tableOptionsConfig = ['source' => $tableSource]; + if ($tableDestination !== null) { + $tableOptionsConfig['destination'] = $tableDestination; + } + + $tableOptions = new RewrittenInputTableOptions( + $tableOptionsConfig, + $tableSource, + 123, + [], + ); + + $result = PathHelper::getDataFilePath($dataStorageMock, $destination, $tableOptions); + + self::assertSame($expected, $result); + } + + public static function ensurePathDelimiterDataProvider(): Generator + { + yield 'path without trailing slash' => [ + 'input' => '/tmp/data', + 'expected' => '/tmp/data/', + ]; + + yield 'path with trailing slash' => [ + 'input' => '/tmp/data/', + 'expected' => '/tmp/data/', + ]; + + yield 'path with multiple trailing slashes' => [ + 'input' => '/tmp/data///', + 'expected' => '/tmp/data/', + ]; + + yield 'path with backslashes' => [ + 'input' => '/tmp/data\\\\', + 'expected' => '/tmp/data/', + ]; + + yield 'empty path' => [ + 'input' => '', + 'expected' => '/', + ]; + + yield 'root path' => [ + 'input' => '/', + 'expected' => '/', + ]; + } + + /** + * @dataProvider ensurePathDelimiterDataProvider + */ + public function testEnsurePathDelimiter(string $input, string $expected): void + { + $result = PathHelper::ensurePathDelimiter($input); + self::assertSame($expected, $result); + } +} diff --git a/libs/input-mapping/tests/ReaderTest.php b/libs/input-mapping/tests/ReaderTest.php index 8fa00b28a..d8fe631d3 100644 --- a/libs/input-mapping/tests/ReaderTest.php +++ b/libs/input-mapping/tests/ReaderTest.php @@ -4,12 +4,16 @@ namespace Keboola\InputMapping\Tests; +use Generator; use Keboola\Csv\CsvFile; +use Keboola\InputMapping\Exception\InputOperationException; use Keboola\InputMapping\Reader; use Keboola\InputMapping\Staging\StrategyFactory; use Keboola\InputMapping\State\InputTableStateList; use Keboola\InputMapping\Table\Options\InputTableOptionsList; use Keboola\InputMapping\Table\Options\ReaderOptions; +use Keboola\InputMapping\Table\Strategy\AbstractWorkspaceStrategy; +use Keboola\InputMapping\Table\Strategy\WorkspaceLoadQueue; use Keboola\InputMapping\Tests\Needs\NeedsDevBranch; use Keboola\InputMapping\Tests\Needs\TestSatisfyer; use Keboola\StagingProvider\Staging\File\FileFormat; @@ -18,6 +22,7 @@ use Keboola\StagingProvider\Staging\StagingType; use Keboola\StorageApi\Client; use Keboola\StorageApiBranch\ClientWrapper; +use Keboola\StorageApiBranch\Factory\ClientOptions; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use Symfony\Component\Finder\Finder; @@ -183,4 +188,78 @@ public function testReadTablesDefaultBackendBranchRewrite(): void self::assertEquals(sprintf('%s.test', $branchBucketId), $data[0]['source']); self::assertArrayHasKey('lastImportDate', $data[0]); } + + public function testPrepareAndExecuteTableLoadsWithNonWorkspaceStrategy(): void + { + $clientWrapper = $this->initClient(); + $reader = new Reader( + $clientWrapper, + $this->testLogger, + $this->getStagingFactory($clientWrapper, StagingType::Local), + ); + + $configuration = new InputTableOptionsList([]); + $state = new InputTableStateList([]); + + $this->expectException(InputOperationException::class); + $this->expectExceptionMessage('prepareAndExecuteTableLoads() can only be used with workspace strategies'); + + $reader->prepareAndExecuteTableLoads( + $configuration, + $state, + 'destination', + new ReaderOptions(true), + ); + } + + /** + * @dataProvider preserveFlagProvider + */ + public function testPrepareAndExecuteTableLoadsDelegatesToWorkspaceStrategy(bool $preserveFlag): void + { + // Arrange + $clientWrapper = $this->createMock(ClientWrapper::class); + $clientWrapper->method('getTableAndFileStorageClient') + ->willReturn($this->createMock(Client::class)); + $clientWrapper->method('getClientOptionsReadOnly') + ->willReturn($this->createMock(ClientOptions::class)); + + $expectedQueue = new WorkspaceLoadQueue([]); + + $workspaceStrategy = $this->createMock(AbstractWorkspaceStrategy::class); + $workspaceStrategy + ->expects(self::once()) + ->method('prepareAndExecuteTableLoads') + ->with([], $preserveFlag) + ->willReturn($expectedQueue); + + $strategyFactory = $this->createMock(StrategyFactory::class); + $strategyFactory + ->expects(self::once()) + ->method('getTableInputStrategy') + ->willReturn($workspaceStrategy); + + // Act + $reader = new Reader( + $clientWrapper, + $this->testLogger, + $strategyFactory, + ); + + $result = $reader->prepareAndExecuteTableLoads( + new InputTableOptionsList([]), + new InputTableStateList([]), + 'destination', + new ReaderOptions(false, $preserveFlag), + ); + + // Assert + self::assertSame($expectedQueue, $result); + } + + public function preserveFlagProvider(): Generator + { + yield 'with preserve enabled' => [true]; + yield 'with preserve disabled' => [false]; + } } diff --git a/libs/input-mapping/tests/Table/Strategy/AbstractFileStrategyTest.php b/libs/input-mapping/tests/Table/Strategy/AbstractFileStrategyTest.php new file mode 100644 index 000000000..ae40c31dc --- /dev/null +++ b/libs/input-mapping/tests/Table/Strategy/AbstractFileStrategyTest.php @@ -0,0 +1,45 @@ +expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Data storage must be instance of FileStagingInterface'); + + new class( + $this->createMock(ClientWrapper::class), + $this->createMock(LoggerInterface::class), + $this->createMock(StagingInterface::class), // This should fail validation + $this->createMock(FileStagingInterface::class), + $this->createMock(InputTableStateList::class), + 'destination', + FileFormat::Json, + ) extends AbstractFileStrategy { + public function downloadTable(RewrittenInputTableOptions $table): array + { + return []; + } + + public function handleExports(array $exports, bool $preserve): array + { + return []; + } + }; + } +} diff --git a/libs/input-mapping/tests/Table/Strategy/AbstractWorkspaceStrategyTest.php b/libs/input-mapping/tests/Table/Strategy/AbstractWorkspaceStrategyTest.php new file mode 100644 index 000000000..61c01a609 --- /dev/null +++ b/libs/input-mapping/tests/Table/Strategy/AbstractWorkspaceStrategyTest.php @@ -0,0 +1,744 @@ +testHandler = new TestHandler(); + $this->testLogger = new Logger('testLogger', [$this->testHandler]); + } + + + public function testConstructorValidatesDataStorageIsWorkspaceStagingInterface(): void + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Data storage must be instance of WorkspaceStagingInterface'); + + new class( + $this->createMock(ClientWrapper::class), + $this->createMock(LoggerInterface::class), + $this->createMock(StagingInterface::class), // This should fail validation + $this->createMock(FileStagingInterface::class), + $this->createMock(InputTableStateList::class), + 'destination', + FileFormat::Json, + ) extends AbstractWorkspaceStrategy { + public function getWorkspaceType(): string + { + return 'test'; + } + + public function handleExports(array $exports, bool $preserve): array + { + return []; + } + }; + } + + public function testPrepareTableLoadsToWorkspaceClone(): void + { + $clientWrapper = $this->createMock(ClientWrapper::class); + $clientWrapper->expects($this->once()) + ->method('getToken') + ->willReturn(new StorageApiToken( + [ + 'owner' => ['id' => 12345], + ], + 'my-secret-token', + )) + ; + + $strategy = $this->createTestStrategy($clientWrapper, 'snowflake'); + + // Table that can be cloned (Snowflake backend, no filters) + $tableOptions = new RewrittenInputTableOptions( + ['source' => 'in.c-test-bucket.table1', 'destination' => 'table1'], + 'in.c-test-bucket.table1', + 123, + [ + 'id' => 'in.c-test-bucket.table1', + 'bucket' => ['backend' => 'snowflake'], + 'isAlias' => false, + ], + ); + + $instructions = $strategy->prepareTableLoadsToWorkspace([$tableOptions]); + + self::assertCount(1, $instructions); + self::assertInstanceOf(WorkspaceTableLoadInstruction::class, $instructions[0]); + self::assertSame(WorkspaceLoadType::CLONE, $instructions[0]->loadType); + self::assertSame($tableOptions, $instructions[0]->table); + self::assertNull($instructions[0]->loadOptions); + + self::assertTrue($this->testHandler->hasInfoThatContains('Table "in.c-test-bucket.table1" will be cloned.')); + } + + public function testPrepareTableLoadsToWorkspaceView(): void + { + $clientWrapper = $this->createMock(ClientWrapper::class); + $clientWrapper->expects($this->once()) + ->method('getToken') + ->willReturn(new StorageApiToken( + [ + 'owner' => ['id' => 12345], + ], + 'my-secret-token', + )) + ; + + $strategy = $this->createTestStrategy($clientWrapper, 'bigquery'); + + // Table that can use view (BigQuery backend) + $tableOptions = new RewrittenInputTableOptions( + ['source' => 'in.c-test-bucket.table1', 'destination' => 'table1'], + 'in.c-test-bucket.table1', + 123, + [ + 'id' => 'in.c-test-bucket.table1', + 'bucket' => ['backend' => 'bigquery'], + 'isAlias' => false, + ], + ); + + $instructions = $strategy->prepareTableLoadsToWorkspace([$tableOptions]); + + self::assertCount(1, $instructions); + self::assertEquals(WorkspaceLoadType::VIEW, $instructions[0]->loadType); + self::assertSame($tableOptions, $instructions[0]->table); + self::assertSame(['overwrite' => false], $instructions[0]->loadOptions); + + self::assertTrue( + $this->testHandler->hasInfoThatContains('Table "in.c-test-bucket.table1" will be created as view.'), + ); + } + + public function testPrepareTableLoadsToWorkspaceCopy(): void + { + $clientWrapper = $this->createMock(ClientWrapper::class); + $clientWrapper->expects($this->once()) + ->method('getToken') + ->willReturn(new StorageApiToken( + [ + 'owner' => ['id' => 12345], + ], + 'my-secret-token', + )) + ; + + $strategy = $this->createTestStrategy($clientWrapper, 'snowflake'); + + // Table that must be copied (Different backend than workspace) + $tableOptions = new RewrittenInputTableOptions( + ['source' => 'in.c-test-bucket.table1', 'destination' => 'table1'], + 'in.c-test-bucket.table1', + 123, + [ + 'id' => 'in.c-test-bucket.table1', + 'bucket' => ['backend' => 'bigquery'], + 'isAlias' => false, + ], + ); + + $instructions = $strategy->prepareTableLoadsToWorkspace([$tableOptions]); + + self::assertCount(1, $instructions); + self::assertEquals(WorkspaceLoadType::COPY, $instructions[0]->loadType); + self::assertSame($tableOptions, $instructions[0]->table); + self::assertSame(['overwrite' => false], $instructions[0]->loadOptions); + + self::assertTrue($this->testHandler->hasInfoThatContains('Table "in.c-test-bucket.table1" will be copied.')); + } + + public function testPrepareTableLoadsToWorkspaceChecksViableLoadMethod(): void + { + $clientWrapper = $this->createMock(ClientWrapper::class); + $clientWrapper->expects($this->once()) + ->method('getToken') + ->willReturn(new StorageApiToken( + [ + 'owner' => ['id' => 12345], + ], + 'my-secret-token', + )) + ; + + $strategy = $this->createTestStrategy($clientWrapper, 'bigquery'); + + // Create a table that will cause checkViableLoadMethod to throw an exception + // (BigQuery workspace with alias table from the same project) + $tableOptions = new RewrittenInputTableOptions( + ['source' => 'in.c-test-bucket.table1', 'destination' => 'table1'], + 'in.c-test-bucket.table1', + 123, + [ + 'id' => 'in.c-test-bucket.table1', + 'bucket' => ['backend' => 'bigquery'], + 'isAlias' => true, + 'sourceTable' => ['project' => ['id' => 12345]], // Same project ID + ], + ); + + // This should throw an InvalidInputException because checkViableLoadMethod is called + $this->expectException(InvalidInputException::class); + $this->expectExceptionMessage( + 'Table "in.c-test-bucket.table1" is an alias, which is not supported when loading Bigquery tables.', + ); + + $strategy->prepareTableLoadsToWorkspace([$tableOptions]); + } + + public function testExecuteTableLoadsToWorkspaceWithCleanAndPreserveFalse(): void + { + $branchClient = $this->createMock(BranchAwareClient::class); + $branchClient->expects($this->once()) + ->method('apiPostJson') + ->with('workspaces/456/load-clone', [ + 'input' => [], + 'preserve' => 0, + ], false) + ->willReturn(['id' => 123]); + + $branchClient->expects($this->once()) + ->method('handleAsyncTasks') + ->with([123]) + ->willReturn([]); + + $clientWrapper = $this->createMock(ClientWrapper::class); + $clientWrapper->expects($this->exactly(2)) + ->method('getBranchClient') + ->willReturn($branchClient); + + $dataStorage = $this->createMock(WorkspaceStagingInterface::class); + $dataStorage->expects($this->atLeastOnce()) + ->method('getWorkspaceId') + ->willReturn('456'); + + $strategy = $this->createTestStrategyWithDataStorage($clientWrapper, 'snowflake', $dataStorage); + + // Create plan with preserve=false (should trigger clean) + $plan = new WorkspaceLoadPlan( + [], + false, // preserve=false should trigger clean + ); + + $result = $strategy->executeTableLoadsToWorkspace($plan); + self::assertEmpty($result->jobs); + self::assertTrue($this->testHandler->hasInfoThatContains('Cleaning workspace before loading tables.')); + } + + public function testExecuteTableLoadsToWorkspaceWithMixedOperations(): void + { + $branchClient = $this->createMock(BranchAwareClient::class); + + // Set up expected API calls in execution order: + // 1. Cleanup: workspaces/{id}/load-clone with empty input (preserve=false trigger) + // 2. Clone: workspaces/{id}/load-clone with clone instructions + // 3. Load: workspaces/{id}/load with copy + view instructions batched together + $expectedApiCalls = [ + [ + // Step 1: Cleanup operation (executed synchronously, not returned in queue) + 'endpoint' => 'workspaces/456/load-clone', + 'data' => [ + 'input' => [], // workspace will be only cleaned + 'preserve' => 0, + ], + 'async' => false, + 'returnValue' => ['id' => 100], // cleanup job ID + ], + [ + // Step 2: Clone instruction group - both clone operations batched together + 'endpoint' => 'workspaces/456/load-clone', + 'data' => [ + 'input' => [ + [ + 'source' => 'in.c-test-bucket.table1', + 'destination' => 'table1', + 'overwrite' => true, + 'dropTimestampColumn' => true, + 'sourceBranchId' => 123, + ], + [ + 'source' => 'in.c-test-bucket.table2', + 'destination' => 'table2', + 'overwrite' => false, + 'dropTimestampColumn' => false, + 'sourceBranchId' => 124, + ], + ], + 'preserve' => 1, + ], + 'async' => false, + 'returnValue' => ['id' => 456], + ], + [ + // Step 3: Load instruction group - copy + view operations batched together + 'endpoint' => 'workspaces/456/load', + 'data' => [ + 'input' => [ + [ + // Copy instruction + 'source' => 'in.c-test-bucket.table3', + 'destination' => 'table3', + 'overwrite' => true, + 'sourceBranchId' => 125, + ], + [ + // View instruction (note: useView=true parameter) + 'source' => 'in.c-test-bucket.table4', + 'destination' => 'table4', + 'overwrite' => false, + 'sourceBranchId' => 126, + 'useView' => true, + ], + ], + 'preserve' => 1, + ], + 'async' => false, + 'returnValue' => ['id' => 789], + ], + ]; + + // Mock API calls with callback verification + $branchClient->expects($this->exactly(3)) + ->method('apiPostJson') + ->willReturnCallback(function (string $endpoint, array $data, bool $async) use (&$expectedApiCalls) { + $expectedCall = array_shift($expectedApiCalls); + self::assertNotNull($expectedCall); + + self::assertSame($expectedCall['endpoint'], $endpoint); + self::assertEquals($expectedCall['data'], $data); + self::assertSame($expectedCall['async'], $async); + + return $expectedCall['returnValue']; + }); + + // Mock handleAsyncTasks for cleanup job completion + $branchClient->expects($this->once()) + ->method('handleAsyncTasks') + ->with([100]) // cleanup job ID + ->willReturn([]); + + $clientWrapper = $this->createMock(ClientWrapper::class); + $clientWrapper->expects($this->exactly(2)) + ->method('getBranchClient') + ->willReturn($branchClient); + + $dataStorage = $this->createMock(WorkspaceStagingInterface::class); + $dataStorage->expects(self::once()) + ->method('getWorkspaceId') + ->willReturn('456'); + + $strategy = $this->createTestStrategyWithDataStorage($clientWrapper, 'snowflake', $dataStorage); + + // Create table options for clone, copy, and view operations + $cloneTableOptions1 = new RewrittenInputTableOptions( + [ + 'source' => 'in.c-test-bucket.table1', + 'destination' => 'table1', + 'overwrite' => true, + 'keep_internal_timestamp_column' => false, // dropTimestampColumn will be true + ], + 'in.c-test-bucket.table1', + 123, + [ + 'id' => 'in.c-test-bucket.table1', + 'bucket' => ['backend' => 'snowflake'], + 'isAlias' => false, + ], + ); + + $cloneTableOptions2 = new RewrittenInputTableOptions( + [ + 'source' => 'in.c-test-bucket.table2', + 'destination' => 'table2', + 'overwrite' => false, + 'keep_internal_timestamp_column' => true, // dropTimestampColumn will be false + ], + 'in.c-test-bucket.table2', + 124, + [ + 'id' => 'in.c-test-bucket.table2', + 'bucket' => ['backend' => 'snowflake'], + 'isAlias' => false, + ], + ); + + $copyTableOptions = new RewrittenInputTableOptions( + ['source' => 'in.c-test-bucket.table3', 'destination' => 'table3'], + 'in.c-test-bucket.table3', + 125, + [ + 'id' => 'in.c-test-bucket.table3', + 'bucket' => ['backend' => 'bigquery'], + 'isAlias' => false, + ], + ); + + $viewTableOptions = new RewrittenInputTableOptions( + ['source' => 'in.c-test-bucket.table4', 'destination' => 'table4'], + 'in.c-test-bucket.table4', + 126, + [ + 'id' => 'in.c-test-bucket.table4', + 'bucket' => ['backend' => 'bigquery'], + 'isAlias' => false, + ], + ); + + $plan = new WorkspaceLoadPlan( + [ + new WorkspaceTableLoadInstruction( + WorkspaceLoadType::CLONE, + $cloneTableOptions1, + null, + ), + new WorkspaceTableLoadInstruction( + WorkspaceLoadType::COPY, + $copyTableOptions, + ['overwrite' => true], + ), + new WorkspaceTableLoadInstruction( + WorkspaceLoadType::CLONE, + $cloneTableOptions2, + null, + ), + new WorkspaceTableLoadInstruction( + WorkspaceLoadType::VIEW, + $viewTableOptions, + ['overwrite' => false], + ), + ], + false, // preserve=false (trigger cleanup) + ); + + $result = $strategy->executeTableLoadsToWorkspace($plan); + + self::assertCount(2, $result->jobs); + + $cloneJob = $result->jobs[0]; + self::assertSame(WorkspaceLoadType::CLONE, $cloneJob->jobType); + self::assertSame([$cloneTableOptions1, $cloneTableOptions2], $cloneJob->tables); + + $loadJob = $result->jobs[1]; + self::assertSame(WorkspaceLoadType::COPY, $loadJob->jobType); + self::assertSame([$copyTableOptions, $viewTableOptions], $loadJob->tables); + + self::assertTrue($this->testHandler->hasInfoThatContains('Cleaning workspace before loading tables.')); + self::assertTrue($this->testHandler->hasInfoThatContains('Cloning 2 tables to workspace.')); + self::assertTrue($this->testHandler->hasInfoThatContains('Copying 2 tables to workspace.')); + } + + public function testExecuteTableLoadsToWorkspaceEmptyPlanWithPreserveTrue(): void + { + $branchClient = $this->createMock(BranchAwareClient::class); + $branchClient->expects(self::never())->method(self::anything()); + + $clientWrapper = $this->createMock(ClientWrapper::class); + $clientWrapper->expects($this->once()) + ->method('getBranchClient') + ->willReturn($branchClient) + ; + + $strategy = $this->createTestStrategy($clientWrapper, 'snowflake'); + + // Empty plan with preserve=true should not trigger any operations + $plan = new WorkspaceLoadPlan( + [], + true, // preserve=true (no clean) + ); + + $result = $strategy->executeTableLoadsToWorkspace($plan); + + self::assertInstanceOf(WorkspaceLoadQueue::class, $result); + self::assertEmpty($result->jobs); + } + + private function createTestStrategy( + ClientWrapper $clientWrapper, + string $workspaceType, + ): TestWorkspaceStrategy { + $strategy = new TestWorkspaceStrategy( + $clientWrapper, + $this->testLogger, + $this->createMock(WorkspaceStagingInterface::class), + $this->createMock(FileStagingInterface::class), + $this->createMock(InputTableStateList::class), + 'destination', + FileFormat::Json, + ); + $strategy->setWorkspaceType($workspaceType); + return $strategy; + } + + private function createTestStrategyWithDataStorage( + ClientWrapper $clientWrapper, + string $workspaceType, + WorkspaceStagingInterface $dataStorage, + ): TestWorkspaceStrategy { + $strategy = new TestWorkspaceStrategy( + $clientWrapper, + $this->testLogger, + $dataStorage, + $this->createMock(FileStagingInterface::class), + $this->createMock(InputTableStateList::class), + 'destination', + FileFormat::Json, + ); + $strategy->setWorkspaceType($workspaceType); + return $strategy; + } + + public function testPrepareAndExecuteTableLoadsEmpty(): void + { + $clientWrapper = $this->createMock(ClientWrapper::class); + $strategy = $this->createTestStrategy($clientWrapper, 'snowflake'); + + $result = $strategy->prepareAndExecuteTableLoads([], true); + + self::assertInstanceOf(WorkspaceLoadQueue::class, $result); + self::assertEmpty($result->jobs); + } + + public function testPrepareAndExecuteTableLoadsWithCloneAndCopy(): void + { + $branchClient = $this->createMock(BranchAwareClient::class); + + // Mock API calls in sequence + $expectedApiCalls = [ + [ + 'endpoint' => 'workspaces/456/load-clone', + 'data' => [ + 'input' => [ + [ + 'source' => 'in.c-test-bucket.table1', + 'destination' => 'table1', + 'overwrite' => false, + 'dropTimestampColumn' => false, + 'sourceBranchId' => 123, + ], + ], + 'preserve' => 1, + ], + 'async' => false, + 'returnValue' => ['id' => 789], + ], + [ + 'endpoint' => 'workspaces/456/load', + 'data' => [ + 'input' => [ + [ + 'source' => 'in.c-test-bucket.table2', + 'destination' => 'table2', + 'overwrite' => false, + 'sourceBranchId' => 124, + ], + ], + 'preserve' => 1, + ], + 'async' => false, + 'returnValue' => ['id' => 456], + ], + ]; + + $branchClient->expects($this->exactly(2)) + ->method('apiPostJson') + ->willReturnCallback(function (string $endpoint, array $data, bool $async) use (&$expectedApiCalls) { + $expectedCall = array_shift($expectedApiCalls); + self::assertNotNull($expectedCall); + + self::assertSame($expectedCall['endpoint'], $endpoint); + self::assertEquals($expectedCall['data'], $data); + self::assertSame($expectedCall['async'], $async); + + return $expectedCall['returnValue']; + }); + + $clientWrapper = $this->createMock(ClientWrapper::class); + $clientWrapper->expects($this->exactly(2)) + ->method('getToken') + ->willReturn(new StorageApiToken( + [ + 'owner' => ['id' => 12345], + ], + 'my-secret-token', + )); + $clientWrapper->expects($this->once()) + ->method('getBranchClient') + ->willReturn($branchClient); + + $dataStorage = $this->createMock(WorkspaceStagingInterface::class); + $dataStorage->expects(self::once()) + ->method('getWorkspaceId') + ->willReturn('456'); + + $strategy = $this->createTestStrategyWithDataStorage($clientWrapper, 'snowflake', $dataStorage); + + // Create tables: one clone, one copy + $cloneTable = new RewrittenInputTableOptions( + [ + 'source' => 'in.c-test-bucket.table1', + 'destination' => 'table1', + 'keep_internal_timestamp_column' => true, // dropTimestampColumn will be false + ], + 'in.c-test-bucket.table1', + 123, + [ + 'id' => 'in.c-test-bucket.table1', + 'bucket' => ['backend' => 'snowflake'], + 'isAlias' => false, + ], + ); + + $copyTable = new RewrittenInputTableOptions( + ['source' => 'in.c-test-bucket.table2', 'destination' => 'table2'], + 'in.c-test-bucket.table2', + 124, + [ + 'id' => 'in.c-test-bucket.table2', + 'bucket' => ['backend' => 'bigquery'], + 'isAlias' => false, + ], + ); + + $result = $strategy->prepareAndExecuteTableLoads([$cloneTable, $copyTable], true); + + self::assertInstanceOf(WorkspaceLoadQueue::class, $result); + self::assertCount(2, $result->jobs); + + // Verify clone job + $cloneJob = $result->jobs[0]; + self::assertSame(WorkspaceLoadType::CLONE, $cloneJob->jobType); + self::assertSame([$cloneTable], $cloneJob->tables); + + // Verify load job + $loadJob = $result->jobs[1]; + self::assertSame(WorkspaceLoadType::COPY, $loadJob->jobType); + self::assertSame([$copyTable], $loadJob->tables); + } + + public function testPrepareAndExecuteTableLoadsWithCleanWorkspace(): void + { + $branchClient = $this->createMock(BranchAwareClient::class); + + // Expect clean operation followed by clone operation + $expectedApiCalls = [ + [ + 'endpoint' => 'workspaces/456/load-clone', + 'data' => [ + 'input' => [], // clean operation + 'preserve' => 0, + ], + 'async' => false, + 'returnValue' => ['id' => 100], // clean job + ], + [ + 'endpoint' => 'workspaces/456/load-clone', + 'data' => [ + 'input' => [ + [ + 'source' => 'in.c-test-bucket.table1', + 'destination' => 'table1', + 'overwrite' => false, + 'dropTimestampColumn' => false, + 'sourceBranchId' => 123, + ], + ], + 'preserve' => 1, + ], + 'async' => false, + 'returnValue' => ['id' => 789], // clone job + ], + ]; + + $branchClient->expects($this->exactly(2)) + ->method('apiPostJson') + ->willReturnCallback(function (string $endpoint, array $data, bool $async) use (&$expectedApiCalls) { + $expectedCall = array_shift($expectedApiCalls); + self::assertNotNull($expectedCall); + + self::assertSame($expectedCall['endpoint'], $endpoint); + self::assertEquals($expectedCall['data'], $data); + self::assertSame($expectedCall['async'], $async); + + return $expectedCall['returnValue']; + }); + + // Clean job completion + $branchClient->expects($this->once()) + ->method('handleAsyncTasks') + ->with([100]) + ->willReturn([]); + + $clientWrapper = $this->createMock(ClientWrapper::class); + $clientWrapper->expects($this->once()) + ->method('getToken') + ->willReturn(new StorageApiToken( + [ + 'owner' => ['id' => 12345], + ], + 'my-secret-token', + )); + $clientWrapper->expects($this->exactly(2)) + ->method('getBranchClient') + ->willReturn($branchClient); + + $dataStorage = $this->createMock(WorkspaceStagingInterface::class); + $dataStorage->expects(self::atLeastOnce()) + ->method('getWorkspaceId') + ->willReturn('456'); + + $strategy = $this->createTestStrategyWithDataStorage($clientWrapper, 'snowflake', $dataStorage); + + $cloneTable = new RewrittenInputTableOptions( + [ + 'source' => 'in.c-test-bucket.table1', + 'destination' => 'table1', + 'keep_internal_timestamp_column' => true, // dropTimestampColumn will be false + ], + 'in.c-test-bucket.table1', + 123, + [ + 'id' => 'in.c-test-bucket.table1', + 'bucket' => ['backend' => 'snowflake'], + 'isAlias' => false, + ], + ); + + $result = $strategy->prepareAndExecuteTableLoads([$cloneTable], false); // preserve=false + + self::assertInstanceOf(WorkspaceLoadQueue::class, $result); + self::assertCount(1, $result->jobs); + self::assertTrue($this->testHandler->hasInfoThatContains('Cleaning workspace before loading tables.')); + self::assertTrue($this->testHandler->hasInfoThatContains('Cloning 1 tables to workspace.')); + } +} diff --git a/libs/input-mapping/tests/Table/Strategy/BigQueryTest.php b/libs/input-mapping/tests/Table/Strategy/BigQueryTest.php index f523fea25..dbbbe2d8c 100644 --- a/libs/input-mapping/tests/Table/Strategy/BigQueryTest.php +++ b/libs/input-mapping/tests/Table/Strategy/BigQueryTest.php @@ -98,4 +98,19 @@ public function testBigQueryDownloadTableAlias(): void $this->clientWrapper->getBasicClient()->getTable($aliasId), )); } + + public function testGetWorkspaceType(): void + { + $strategy = new BigQuery( + $this->initClient(), + new NullLogger(), + $this->createMock(WorkspaceStagingInterface::class), + $this->createMock(FileStagingInterface::class), + new InputTableStateList([]), + 'test', + FileFormat::Json, + ); + + self::assertEquals('bigquery', $strategy->getWorkspaceType()); + } } diff --git a/libs/input-mapping/tests/Table/Strategy/SnowflakeTest.php b/libs/input-mapping/tests/Table/Strategy/SnowflakeTest.php new file mode 100644 index 000000000..9ffb7a8dd --- /dev/null +++ b/libs/input-mapping/tests/Table/Strategy/SnowflakeTest.php @@ -0,0 +1,31 @@ +initClient(), + new NullLogger(), + $this->createMock(WorkspaceStagingInterface::class), + $this->createMock(FileStagingInterface::class), + new InputTableStateList([]), + 'test', + FileFormat::Json, + ); + + self::assertEquals('snowflake', $strategy->getWorkspaceType()); + } +} diff --git a/libs/input-mapping/tests/Table/Strategy/TestWorkspaceStrategy.php b/libs/input-mapping/tests/Table/Strategy/TestWorkspaceStrategy.php new file mode 100644 index 000000000..48cdef6b2 --- /dev/null +++ b/libs/input-mapping/tests/Table/Strategy/TestWorkspaceStrategy.php @@ -0,0 +1,27 @@ +workspaceType = $workspaceType; + } + + public function getWorkspaceType(): string + { + return $this->workspaceType; + } + + public function handleExports(array $exports, bool $preserve): array + { + return []; + } +} diff --git a/libs/input-mapping/tests/Table/Strategy/WorkspaceLoadJobTest.php b/libs/input-mapping/tests/Table/Strategy/WorkspaceLoadJobTest.php new file mode 100644 index 000000000..37de43b95 --- /dev/null +++ b/libs/input-mapping/tests/Table/Strategy/WorkspaceLoadJobTest.php @@ -0,0 +1,51 @@ +expectNotToPerformAssertions(); + + $tableOptions = $this->createMock(RewrittenInputTableOptions::class); + new WorkspaceLoadJob('123', $jobType, [$tableOptions]); + } + + /** + * @dataProvider invalidJobTypeProvider + */ + public function testConstructorThrowsExceptionForInvalidJobTypes(WorkspaceLoadType $jobType): void + { + $this->expectException(InputOperationException::class); + $this->expectExceptionMessage( + sprintf('Invalid job type "%s". Only CLONE and COPY are allowed for jobs.', $jobType->value), + ); + + $tableOptions = $this->createMock(RewrittenInputTableOptions::class); + new WorkspaceLoadJob('789', $jobType, [$tableOptions]); + } + + public static function validJobTypeProvider(): Generator + { + yield 'CLONE job type' => [WorkspaceLoadType::CLONE]; + yield 'COPY job type' => [WorkspaceLoadType::COPY]; + } + + public static function invalidJobTypeProvider(): Generator + { + yield 'VIEW job type' => [WorkspaceLoadType::VIEW]; + } +} diff --git a/libs/input-mapping/tests/Table/Strategy/WorkspaceLoadPlanTest.php b/libs/input-mapping/tests/Table/Strategy/WorkspaceLoadPlanTest.php new file mode 100644 index 000000000..668006b0f --- /dev/null +++ b/libs/input-mapping/tests/Table/Strategy/WorkspaceLoadPlanTest.php @@ -0,0 +1,205 @@ +createMockTableOptions('table1'), + null, + ); + $instructions = [$instruction]; + $preserve = true; + + $plan = new WorkspaceLoadPlan($instructions, $preserve); + + self::assertSame($preserve, $plan->preserve); + } + + public function testGetCloneInstructionsReturnsOnlyCloneType(): void + { + $cloneInstruction1 = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::CLONE, + $this->createMockTableOptions('clone1'), + null, + ); + $cloneInstruction2 = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::CLONE, + $this->createMockTableOptions('clone2'), + null, + ); + $copyInstruction = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::COPY, + $this->createMockTableOptions('copy1'), + ['overwrite' => false], + ); + $viewInstruction = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::VIEW, + $this->createMockTableOptions('view1'), + ['overwrite' => false], + ); + + $plan = new WorkspaceLoadPlan( + [$cloneInstruction1, $copyInstruction, $cloneInstruction2, $viewInstruction], + false, + ); + + $cloneInstructions = $plan->getCloneInstructions(); + + self::assertCount(2, $cloneInstructions); + self::assertSame($cloneInstruction1, $cloneInstructions[0]); + self::assertSame($cloneInstruction2, $cloneInstructions[2]); + } + + public function testGetCopyInstructionsReturnsCopyAndViewTypes(): void + { + $cloneInstruction = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::CLONE, + $this->createMockTableOptions('clone1'), + null, + ); + $copyInstruction1 = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::COPY, + $this->createMockTableOptions('copy1'), + ['overwrite' => false], + ); + $copyInstruction2 = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::COPY, + $this->createMockTableOptions('copy2'), + ['overwrite' => true], + ); + $viewInstruction = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::VIEW, + $this->createMockTableOptions('view1'), + ['overwrite' => false], + ); + + $plan = new WorkspaceLoadPlan( + [$cloneInstruction, $copyInstruction1, $viewInstruction, $copyInstruction2], + false, + ); + + $copyInstructions = $plan->getCopyInstructions(); + + self::assertCount(3, $copyInstructions); + self::assertSame($copyInstruction1, $copyInstructions[1]); + self::assertSame($viewInstruction, $copyInstructions[2]); + self::assertSame($copyInstruction2, $copyInstructions[3]); + } + + public function testHasCloneInstructionsReturnsTrueWhenCloneInstructionsExist(): void + { + $cloneInstruction = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::CLONE, + $this->createMockTableOptions('clone1'), + null, + ); + $copyInstruction = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::COPY, + $this->createMockTableOptions('copy1'), + ['overwrite' => false], + ); + + $plan = new WorkspaceLoadPlan([$cloneInstruction, $copyInstruction], false); + + self::assertTrue($plan->hasCloneInstructions()); + } + + public function testHasCloneInstructionsReturnsFalseWhenNoCloneInstructionsExist(): void + { + $copyInstruction = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::COPY, + $this->createMockTableOptions('copy1'), + ['overwrite' => false], + ); + $viewInstruction = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::VIEW, + $this->createMockTableOptions('view1'), + ['overwrite' => false], + ); + + $plan = new WorkspaceLoadPlan([$copyInstruction, $viewInstruction], false); + + self::assertFalse($plan->hasCloneInstructions()); + } + + public function testHasCopyInstructionsReturnsTrueWhenCopyOrViewInstructionsExist(): void + { + $cloneInstruction = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::CLONE, + $this->createMockTableOptions('clone1'), + null, + ); + $copyInstruction = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::COPY, + $this->createMockTableOptions('copy1'), + ['overwrite' => false], + ); + + $planWithCopy = new WorkspaceLoadPlan([$cloneInstruction, $copyInstruction], false); + self::assertTrue($planWithCopy->hasCopyInstructions()); + + $viewInstruction = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::VIEW, + $this->createMockTableOptions('view1'), + ['overwrite' => false], + ); + + $planWithView = new WorkspaceLoadPlan([$cloneInstruction, $viewInstruction], false); + self::assertTrue($planWithView->hasCopyInstructions()); + } + + public function testHasCopyInstructionsReturnsFalseWhenOnlyCloneInstructionsExist(): void + { + $cloneInstruction1 = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::CLONE, + $this->createMockTableOptions('clone1'), + null, + ); + $cloneInstruction2 = new WorkspaceTableLoadInstruction( + WorkspaceLoadType::CLONE, + $this->createMockTableOptions('clone2'), + null, + ); + + $plan = new WorkspaceLoadPlan([$cloneInstruction1, $cloneInstruction2], false); + + self::assertFalse($plan->hasCopyInstructions()); + } + + public function testEmptyPlan(): void + { + $plan = new WorkspaceLoadPlan([], true); + + self::assertEmpty($plan->getCloneInstructions()); + self::assertEmpty($plan->getCopyInstructions()); + self::assertFalse($plan->hasCloneInstructions()); + self::assertFalse($plan->hasCopyInstructions()); + self::assertTrue($plan->preserve); + } + + private function createMockTableOptions(string $source): RewrittenInputTableOptions + { + return new RewrittenInputTableOptions( + ['source' => $source, 'destination' => $source], + $source, + 123, + [ + 'id' => $source, + 'bucket' => ['backend' => 'snowflake'], + 'isAlias' => false, + ], + ); + } +}