diff --git a/libs/input-mapping/src/Table/Strategy/AbstractWorkspaceStrategy.php b/libs/input-mapping/src/Table/Strategy/AbstractWorkspaceStrategy.php index 89b1d9589..18c03aa8a 100644 --- a/libs/input-mapping/src/Table/Strategy/AbstractWorkspaceStrategy.php +++ b/libs/input-mapping/src/Table/Strategy/AbstractWorkspaceStrategy.php @@ -9,6 +9,8 @@ use Keboola\InputMapping\Helper\ManifestCreator; use Keboola\InputMapping\State\InputTableStateList; use Keboola\InputMapping\Table\Options\RewrittenInputTableOptions; +use Keboola\InputMapping\Table\Result; +use Keboola\InputMapping\Table\Result\TableInfo; use Keboola\StagingProvider\Staging\File\FileFormat; use Keboola\StagingProvider\Staging\File\FileStagingInterface; use Keboola\StagingProvider\Staging\StagingInterface; @@ -244,6 +246,87 @@ public function executeTableLoadsToWorkspace(WorkspaceLoadPlan $plan): Workspace return new WorkspaceLoadQueue($jobs); } + /** + * Phase 3: Wait - Wait for table loading jobs to complete and create manifests + * This phase handles job completion and prepares metadata + */ + public function waitForTableLoadCompletion(WorkspaceLoadQueue $loadQueue): array + { + // Wait for all table loading jobs to complete + $jobResults = $this->clientWrapper->getBranchClient()->handleAsyncTasks( + $loadQueue->getJobIds(), + ); + + $this->logger->info('Processed ' . count($jobResults) . ' workspace exports.'); + + // Create manifests for tables now available in Workspace for SQL analysis + foreach ($loadQueue->getAllTables() as $table) { + $manifestPath = $this->getManifestPath($table); + $this->manifestCreator->writeTableManifest( + $table->getTableInfo(), + $manifestPath, + $table->getColumnNamesFromTypes(), + $this->format, + ); + } + + return $jobResults; + } + + /** + * New three-phase public method that can be used instead of downloadTables workflow + * + * @param RewrittenInputTableOptions[] $tables + */ + public function downloadTablesThreePhase(array $tables, bool $preserve): Result + { + // Phase 1: Prepare + $instructions = $this->prepareTableLoadsToWorkspace($tables); + $plan = new WorkspaceLoadPlan( + $instructions, + $preserve, + ); + + // Phase 2: Execute + $loadQueue = $this->executeTableLoadsToWorkspace($plan); + + // Phase 3: Wait + $jobResults = $this->waitForTableLoadCompletion($loadQueue); + + // Build final result compatible with existing Result interface + $outputStateConfiguration = []; + $result = new Result(); + foreach ($tables as $table) { + $outputStateConfiguration[] = [ + 'source' => $table->getSource(), + 'lastImportDate' => $table->getTableInfo()['lastImportDate'], + ]; + $result->addTable(new TableInfo($table->getTableInfo())); + } + + $result->setMetrics($jobResults); + $result->setInputTableStateList(new InputTableStateList($outputStateConfiguration)); + $this->logger->info('All tables were fetched.'); + + return $result; + } + + /** + * Override parent's downloadTables to optionally use three-phase approach + * For now, keeps using the existing method for backward compatibility + * In the future, this can be switched to use downloadTablesThreePhase() + * + * @param RewrittenInputTableOptions[] $tables + */ + public function downloadTables(array $tables, bool $preserve): Result + { + // Option 1: Use new three-phase approach (uncomment to enable) + return $this->downloadTablesThreePhase($tables, $preserve); + + // Option 2: Use existing approach (current default for backward compatibility) + //return parent::downloadTables($tables, $preserve); + } + /** * Execute only Phase 1 & 2: Prepare and Execute workspace table loading * Returns WorkspaceLoadQueue for later completion with waitForTableLoadCompletion() diff --git a/libs/input-mapping/src/Table/Strategy/WorkspaceLoadQueue.php b/libs/input-mapping/src/Table/Strategy/WorkspaceLoadQueue.php index f1f2117b9..d9d640846 100644 --- a/libs/input-mapping/src/Table/Strategy/WorkspaceLoadQueue.php +++ b/libs/input-mapping/src/Table/Strategy/WorkspaceLoadQueue.php @@ -16,6 +16,18 @@ public function __construct( ) { } + /** + * @return string[] + */ + public function getJobIds(): array + { + $jobIds = []; + foreach ($this->jobs as $job) { + $jobIds[] = $job->jobId; + } + return $jobIds; + } + /** * @return RewrittenInputTableOptions[] */ diff --git a/libs/input-mapping/tests/Table/Strategy/WorkspaceLoadQueueTest.php b/libs/input-mapping/tests/Table/Strategy/WorkspaceLoadQueueTest.php new file mode 100644 index 000000000..ad61c9951 --- /dev/null +++ b/libs/input-mapping/tests/Table/Strategy/WorkspaceLoadQueueTest.php @@ -0,0 +1,72 @@ +getJobIds(); + + self::assertSame(['job-123', 'job-456', 'job-789'], $result); + self::assertCount(3, $result); + } + + public function testGetAllTablesReturnsTablesFromMultipleJobs(): void + { + $table1 = $this->createMockTableOptions('table1'); + $table2 = $this->createMockTableOptions('table2'); + $table3 = $this->createMockTableOptions('table3'); + $table4 = $this->createMockTableOptions('table4'); + + $job1 = new WorkspaceLoadJob('job-123', WorkspaceJobType::CLONE, [$table1, $table2]); + $job2 = new WorkspaceLoadJob('job-456', WorkspaceJobType::LOAD, [$table3]); + $job3 = new WorkspaceLoadJob('job-789', WorkspaceJobType::CLONE, [$table4]); + + $queue = new WorkspaceLoadQueue([$job1, $job2, $job3]); + + $result = $queue->getAllTables(); + + // Should merge tables from all jobs in order + self::assertSame([$table1, $table2, $table3, $table4], $result); + self::assertCount(4, $result); + } + + private function createMockTableOptions(string $tableName): RewrittenInputTableOptions + { + return new RewrittenInputTableOptions( + [ + 'source' => "in.c-test-bucket.{$tableName}", + 'destination' => $tableName, + ], + "in.c-test-bucket.{$tableName}", + 123, + [ + 'id' => "in.c-test-bucket.{$tableName}", + 'name' => $tableName, + 'displayName' => "Test {$tableName}", + 'columns' => ['col1', 'col2'], + 'columnMetadata' => [ + 'col1' => [['key' => 'KBC.datatype.type', 'value' => 'VARCHAR']], + 'col2' => [['key' => 'KBC.datatype.type', 'value' => 'INTEGER']], + ], + 'lastImportDate' => '2023-01-01 10:00:00', + ], + ); + } +}