Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
use Keboola\InputMapping\Helper\ManifestCreator;
use Keboola\InputMapping\State\InputTableStateList;
use Keboola\InputMapping\Table\Options\RewrittenInputTableOptions;
use Keboola\InputMapping\Table\Result;
use Keboola\InputMapping\Table\Result\TableInfo;
use Keboola\StagingProvider\Staging\File\FileFormat;
use Keboola\StagingProvider\Staging\File\FileStagingInterface;
use Keboola\StagingProvider\Staging\StagingInterface;
Expand Down Expand Up @@ -244,6 +246,87 @@ public function executeTableLoadsToWorkspace(WorkspaceLoadPlan $plan): Workspace
return new WorkspaceLoadQueue($jobs);
}

/**
* Phase 3: Wait - Wait for table loading jobs to complete and create manifests
* This phase handles job completion and prepares metadata
*/
public function waitForTableLoadCompletion(WorkspaceLoadQueue $loadQueue): array
{
// Wait for all table loading jobs to complete
$jobResults = $this->clientWrapper->getBranchClient()->handleAsyncTasks(
$loadQueue->getJobIds(),
);

$this->logger->info('Processed ' . count($jobResults) . ' workspace exports.');

// Create manifests for tables now available in Workspace for SQL analysis
foreach ($loadQueue->getAllTables() as $table) {
$manifestPath = $this->getManifestPath($table);
$this->manifestCreator->writeTableManifest(
$table->getTableInfo(),
$manifestPath,
$table->getColumnNamesFromTypes(),
$this->format,
);
}

return $jobResults;
}

/**
* New three-phase public method that can be used instead of downloadTables workflow
*
* @param RewrittenInputTableOptions[] $tables
*/
public function downloadTablesThreePhase(array $tables, bool $preserve): Result
{
// Phase 1: Prepare
$instructions = $this->prepareTableLoadsToWorkspace($tables);
$plan = new WorkspaceLoadPlan(
$instructions,
$preserve,
);

// Phase 2: Execute
$loadQueue = $this->executeTableLoadsToWorkspace($plan);

// Phase 3: Wait
$jobResults = $this->waitForTableLoadCompletion($loadQueue);

// Build final result compatible with existing Result interface
$outputStateConfiguration = [];
$result = new Result();
foreach ($tables as $table) {
$outputStateConfiguration[] = [
'source' => $table->getSource(),
'lastImportDate' => $table->getTableInfo()['lastImportDate'],
];
$result->addTable(new TableInfo($table->getTableInfo()));
}

$result->setMetrics($jobResults);
$result->setInputTableStateList(new InputTableStateList($outputStateConfiguration));
$this->logger->info('All tables were fetched.');

return $result;
}

/**
* Override parent's downloadTables to optionally use three-phase approach
* For now, keeps using the existing method for backward compatibility
* In the future, this can be switched to use downloadTablesThreePhase()
*
* @param RewrittenInputTableOptions[] $tables
*/
public function downloadTables(array $tables, bool $preserve): Result
{
// Option 1: Use new three-phase approach (uncomment to enable)
return $this->downloadTablesThreePhase($tables, $preserve);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zkusmo jsem to prepnul ze stareho zpusobu na novy

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Urcite ale nemam ambice to delat v produkci. Ale dalo by se to skryt pod feature flag.


// Option 2: Use existing approach (current default for backward compatibility)
//return parent::downloadTables($tables, $preserve);
}

/**
* Execute only Phase 1 & 2: Prepare and Execute workspace table loading
* Returns WorkspaceLoadQueue for later completion with waitForTableLoadCompletion()
Expand Down
12 changes: 12 additions & 0 deletions libs/input-mapping/src/Table/Strategy/WorkspaceLoadQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ public function __construct(
) {
}

/**
* @return string[]
*/
public function getJobIds(): array
{
$jobIds = [];
foreach ($this->jobs as $job) {
$jobIds[] = $job->jobId;
}
return $jobIds;
}

/**
* @return RewrittenInputTableOptions[]
*/
Expand Down
72 changes: 72 additions & 0 deletions libs/input-mapping/tests/Table/Strategy/WorkspaceLoadQueueTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?php

declare(strict_types=1);

namespace Keboola\InputMapping\Tests\Table\Strategy;

use Keboola\InputMapping\Table\Options\RewrittenInputTableOptions;
use Keboola\InputMapping\Table\Strategy\WorkspaceJobType;
use Keboola\InputMapping\Table\Strategy\WorkspaceLoadJob;
use Keboola\InputMapping\Table\Strategy\WorkspaceLoadQueue;
use PHPUnit\Framework\TestCase;

class WorkspaceLoadQueueTest extends TestCase
{
public function testGetJobIds(): void
{
$job1 = new WorkspaceLoadJob('job-123', WorkspaceJobType::CLONE, []);
$job2 = new WorkspaceLoadJob('job-456', WorkspaceJobType::LOAD, []);
$job3 = new WorkspaceLoadJob('job-789', WorkspaceJobType::CLONE, []);
$jobs = [$job1, $job2, $job3];

$queue = new WorkspaceLoadQueue($jobs);

$result = $queue->getJobIds();

self::assertSame(['job-123', 'job-456', 'job-789'], $result);
self::assertCount(3, $result);
}

public function testGetAllTablesReturnsTablesFromMultipleJobs(): void
{
$table1 = $this->createMockTableOptions('table1');
$table2 = $this->createMockTableOptions('table2');
$table3 = $this->createMockTableOptions('table3');
$table4 = $this->createMockTableOptions('table4');

$job1 = new WorkspaceLoadJob('job-123', WorkspaceJobType::CLONE, [$table1, $table2]);
$job2 = new WorkspaceLoadJob('job-456', WorkspaceJobType::LOAD, [$table3]);
$job3 = new WorkspaceLoadJob('job-789', WorkspaceJobType::CLONE, [$table4]);

$queue = new WorkspaceLoadQueue([$job1, $job2, $job3]);

$result = $queue->getAllTables();

// Should merge tables from all jobs in order
self::assertSame([$table1, $table2, $table3, $table4], $result);
self::assertCount(4, $result);
}

private function createMockTableOptions(string $tableName): RewrittenInputTableOptions
{
return new RewrittenInputTableOptions(
[
'source' => "in.c-test-bucket.{$tableName}",
'destination' => $tableName,
],
"in.c-test-bucket.{$tableName}",
123,
[
'id' => "in.c-test-bucket.{$tableName}",
'name' => $tableName,
'displayName' => "Test {$tableName}",
'columns' => ['col1', 'col2'],
'columnMetadata' => [
'col1' => [['key' => 'KBC.datatype.type', 'value' => 'VARCHAR']],
'col2' => [['key' => 'KBC.datatype.type', 'value' => 'INTEGER']],
],
'lastImportDate' => '2023-01-01 10:00:00',
],
);
}
}