Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b8f7d05
tests (input-mapping) Table Strategy getWorkspaceType()
ErikZigo Sep 1, 2025
987f8ab
refactor (input-mapping) unify manifest and data path building for Ta…
ErikZigo Sep 2, 2025
23343bf
chore (input-mapping) Table Strategy WorkspaceJobType and WorkspaceLo…
ErikZigo Sep 2, 2025
57385f8
refactor (input-mapping) unify manifest and data path building for Ta…
ErikZigo Sep 3, 2025
8c0da98
feature (input-mapping) prepareTableLoadsToWorkspace method for Works…
ErikZigo Sep 3, 2025
388cb2d
tests (input-mapping) testPrepareTableLoadsToWorkspaceChecksViableLoa…
ErikZigo Sep 4, 2025
ceec70a
feature (input-mapping) Table Strategy WorkspaceLoadJob + WorkspaceLo…
ErikZigo Sep 5, 2025
565899a
feature (input-mapping) Table Strategy executeTableLoadsToWorkspace
ErikZigo Sep 5, 2025
e8643cb
feature (input-mapping) Table Strategy prepareAndExecuteTableLoads
ErikZigo Sep 5, 2025
67f2f1e
refactor (input-mapping) Reader::downloadTables() complex logic extra…
ErikZigo Sep 5, 2025
661813d
feature (input-mapping) Reader::prepareAndExecuteTableLoads()
ErikZigo Sep 5, 2025
6a2312e
refactor (input-mapping): extract decideTableLoadMethod() method in A…
ErikZigo Sep 6, 2025
be770e8
refactor (input-mapping): extract buildCloneInput() method in Abstrac…
ErikZigo Sep 6, 2025
f907bcd
refactor (input-mapping): extract buildCopyInput() method in Abstract…
ErikZigo Sep 6, 2025
871a097
chore (input-mapping): fix typos 'stragegy' to 'strategy'
ErikZigo Sep 9, 2025
4cf446e
fix (input-mapping): fix PHPUnit assert method calls to use static co…
ErikZigo Sep 9, 2025
17c0c32
refactor (input-mapping): remove redundant sourceBranchId check in bu…
ErikZigo Sep 9, 2025
cc3ebcd
fix (input-mapping): replace deprecated withConsecutive with willRetu…
ErikZigo Sep 9, 2025
3defadd
refactor (input-mapping): remove unused CLEAN case from WorkspaceJobT…
ErikZigo Sep 9, 2025
2ddad76
refactor (input-mapping): merge WorkspaceJobType into WorkspaceLoadType
ErikZigo Sep 9, 2025
faa4d6c
refactor (input-mapping): extract path utilities to PathHelper
ErikZigo Sep 9, 2025
b701821
refactor (input-mapping): remove getMetadataStorage() and getDestinat…
ErikZigo Sep 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions libs/input-mapping/src/Helper/PathHelper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php

declare(strict_types=1);

namespace Keboola\InputMapping\Helper;

use Keboola\InputMapping\Table\Options\InputTableOptions;
use Keboola\StagingProvider\Staging\File\FileStagingInterface;

class PathHelper
{
public static function ensurePathDelimiter(string $path): string
{
return self::ensureNoPathDelimiter($path) . '/';
}

public static function ensureNoPathDelimiter(string $path): string
{
return rtrim($path, '\\/');
}

public static function getManifestPath(
FileStagingInterface $metadataStorage,
string $destination,
InputTableOptions $table,
): string {
return self::ensurePathDelimiter($metadataStorage->getPath()) .
self::getDestinationFilePath($destination, $table) . '.manifest';
}

public static function getDataFilePath(
FileStagingInterface $dataStorage,
string $destination,
InputTableOptions $table,
): string {
return self::ensurePathDelimiter($dataStorage->getPath()) .
self::getDestinationFilePath($destination, $table);
}

private static function getDestinationFilePath(string $destination, InputTableOptions $table): string
{
if (!$table->getDestination()) {
return $destination . '/' . $table->getSource();
} else {
return $destination . '/' . $table->getDestination();
}
}
}
100 changes: 82 additions & 18 deletions libs/input-mapping/src/Reader.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Keboola\InputMapping;

use InvalidArgumentException;
use Keboola\InputMapping\Exception\InputOperationException;
use Keboola\InputMapping\File\Options\InputFileOptions;
use Keboola\InputMapping\File\Options\RewrittenInputFileOptions;
use Keboola\InputMapping\Helper\InputBucketValidator;
Expand All @@ -14,8 +16,11 @@
use Keboola\InputMapping\State\InputTableStateList;
use Keboola\InputMapping\Table\Options\InputTableOptionsList;
use Keboola\InputMapping\Table\Options\ReaderOptions;
use Keboola\InputMapping\Table\Options\RewrittenInputTableOptionsList;
use Keboola\InputMapping\Table\Result;
use Keboola\InputMapping\Table\Strategy\AbstractStrategy as TableAbstractStrategy;
use Keboola\InputMapping\Table\Strategy\AbstractWorkspaceStrategy;
use Keboola\InputMapping\Table\Strategy\WorkspaceLoadQueue;
use Keboola\InputMapping\Table\TableDefinitionResolver;
use Keboola\StorageApiBranch\ClientWrapper;
use Psr\Log\LoggerInterface;
Expand Down Expand Up @@ -47,32 +52,29 @@ public function downloadFiles(
return $strategy->downloadFiles($configuration, $destination);
}

/**
* @param InputTableOptionsList $tablesDefinition list of input mappings
* @param InputTableStateList $tablesState list of input mapping table states
* @param string $destination destination folder
* @param ReaderOptions $readerOptions
* @return Result
*/
public function downloadTables(
InputTableOptionsList $tablesDefinition,
InputTableStateList $tablesState,
string $destination,
ReaderOptions $readerOptions,
): Result {
$tableResolver = new TableDefinitionResolver(
private function createTableResolver(): TableDefinitionResolver
{
return new TableDefinitionResolver(
$this->clientWrapper->getTableAndFileStorageClient(),
$this->logger,
);
$tablesState = TableRewriteHelperFactory::getTableRewriteHelper(
}

private function rewriteTableStatesDestinations(InputTableStateList $tablesState): InputTableStateList
{
return TableRewriteHelperFactory::getTableRewriteHelper(
$this->clientWrapper->getClientOptionsReadOnly(),
)->rewriteTableStatesDestinations(
$tablesState,
$this->clientWrapper,
$this->logger,
);
$tablesDefinition = $tableResolver->resolve($tablesDefinition);
$strategy = $this->strategyFactory->getTableInputStrategy($destination, $tablesState);
}

private function validateAndRewriteDevBuckets(
InputTableOptionsList $tablesDefinition,
ReaderOptions $readerOptions,
): RewrittenInputTableOptionsList {
if ($readerOptions->devInputsDisabled()
&& !$this->clientWrapper->getClientOptionsReadOnly()->useBranchStorage()
) {
Expand All @@ -83,17 +85,79 @@ public function downloadTables(
$this->clientWrapper,
);
}
$tablesDefinition = TableRewriteHelperFactory::getTableRewriteHelper(

return TableRewriteHelperFactory::getTableRewriteHelper(
$this->clientWrapper->getClientOptionsReadOnly(),
)->rewriteTableOptionsSources(
$tablesDefinition,
$this->clientWrapper,
$this->logger,
);
}


/**
* @param InputTableOptionsList $tablesDefinition list of input mappings
* @param InputTableStateList $tablesState list of input mapping table states
* @param string $destination destination folder
* @param ReaderOptions $readerOptions
* @return Result
*/
public function downloadTables(
InputTableOptionsList $tablesDefinition,
InputTableStateList $tablesState,
string $destination,
ReaderOptions $readerOptions,
): Result {
$tablesState = $this->rewriteTableStatesDestinations($tablesState);

$tablesDefinition = $this->createTableResolver()->resolve($tablesDefinition);
$strategy = $this->strategyFactory->getTableInputStrategy($destination, $tablesState);
$tablesDefinition = $this->validateAndRewriteDevBuckets($tablesDefinition, $readerOptions);
Comment thread
ErikZigo marked this conversation as resolved.

/** @var TableAbstractStrategy $strategy */
return $strategy->downloadTables($tablesDefinition->getTables(), $readerOptions->preserveWorkspace());
}

/**
* Execute only prepare and execute phases for workspace table loading
* Returns WorkspaceLoadQueue for later completion with waitForTableLoadCompletion()
*
* @param InputTableOptionsList $tablesDefinition list of input mappings
* @param InputTableStateList $tablesState list of input mapping table states
* @param string $destination destination folder
* @param ReaderOptions $readerOptions
* @return WorkspaceLoadQueue
* @throws InvalidArgumentException if strategy is not workspace-based
*/
public function prepareAndExecuteTableLoads(
Comment thread
ErikZigo marked this conversation as resolved.
InputTableOptionsList $tablesDefinition,
InputTableStateList $tablesState,
string $destination,
ReaderOptions $readerOptions,
): WorkspaceLoadQueue {
$tablesState = $this->rewriteTableStatesDestinations($tablesState);

$tablesDefinition = $this->createTableResolver()->resolve($tablesDefinition);
$strategy = $this->strategyFactory->getTableInputStrategy($destination, $tablesState);

// Ensure we have a workspace strategy. For file this method is not yet implemented.
if (!$strategy instanceof AbstractWorkspaceStrategy) {
throw new InputOperationException(
'prepareAndExecuteTableLoads() can only be used with workspace strategies',
Copy link

Copilot AI Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message could be more descriptive by indicating what type of strategy was provided and what workspace strategies are available. Consider: 'prepareAndExecuteTableLoads() requires a workspace strategy but received {strategyType}. Available workspace strategies are: BigQuery, Snowflake.'

Suggested change
'prepareAndExecuteTableLoads() can only be used with workspace strategies',
sprintf(
'prepareAndExecuteTableLoads() requires a workspace strategy but received %s. Available workspace strategies are: BigQueryWorkspaceStrategy, SnowflakeWorkspaceStrategy.',
get_class($strategy)
),

Copilot uses AI. Check for mistakes.
);
}
Comment thread
ErikZigo marked this conversation as resolved.

$tablesDefinition = $this->validateAndRewriteDevBuckets($tablesDefinition, $readerOptions);

// Execute only Phase 1 & 2: Prepare and Execute
// Let the strategy handle the planning internally
return $strategy->prepareAndExecuteTableLoads(
$tablesDefinition->getTables(),
$readerOptions->preserveWorkspace(),
);
}

/**
* Get parent runId to the current runId (defined by SAPI client)
* @param string $runId
Expand Down
8 changes: 6 additions & 2 deletions libs/input-mapping/src/Table/Strategy/ABS.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Keboola\InputMapping\Table\Strategy;

use Keboola\InputMapping\Exception\InvalidInputException;
use Keboola\InputMapping\Helper\PathHelper;
use Keboola\InputMapping\Table\Options\RewrittenInputTableOptions;
use Keboola\StorageApi\Options\GetFileOptions;

Expand Down Expand Up @@ -36,8 +37,11 @@ public function handleExports(array $exports, bool $preserve): array
foreach ($exports as $export) {
/** @var RewrittenInputTableOptions $table */
[$jobId, $table] = $export;
$manifestPath = $this->ensurePathDelimiter($this->metadataStorage->getPath()) .
$this->getDestinationFilePath($this->destination, $table) . '.manifest';
$manifestPath = PathHelper::getManifestPath(
$this->metadataStorage,
$this->destination,
$table,
);
$tableInfo = $table->getTableInfo();
$fileInfo = $this->clientWrapper->getTableAndFileStorageClient()->getFile(
$keyedResults[$jobId]['results']['file']['id'],
Expand Down
20 changes: 0 additions & 20 deletions libs/input-mapping/src/Table/Strategy/AbstractStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Keboola\InputMapping\Table\Strategy;

use Keboola\InputMapping\State\InputTableStateList;
use Keboola\InputMapping\Table\Options\InputTableOptions;
use Keboola\InputMapping\Table\Options\RewrittenInputTableOptions;
use Keboola\InputMapping\Table\Result;
use Keboola\InputMapping\Table\Result\TableInfo;
Expand All @@ -16,16 +15,6 @@ abstract class AbstractStrategy implements StrategyInterface
{
protected readonly LoggerInterface $logger; // @phpstan-ignore-line initialized in child classes

protected function ensurePathDelimiter(string $path): string
{
return $this->ensureNoPathDelimiter($path) . '/';
}

protected function ensureNoPathDelimiter(string $path): string
{
return rtrim($path, '\\/');
}

/**
* @param RewrittenInputTableOptions[] $tables
* @param bool $preserve
Expand All @@ -52,13 +41,4 @@ public function downloadTables(array $tables, bool $preserve): Result

return $result;
}

protected function getDestinationFilePath(string $destination, InputTableOptions $table): string
{
if (!$table->getDestination()) {
return $destination . '/' . $table->getSource();
} else {
return $destination . '/' . $table->getDestination();
}
}
}
Loading