-
Notifications
You must be signed in to change notification settings - Fork 0
AJDA-594 (input-mapping) job-based workspace loading for async input mapping #414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b8f7d05
987f8ab
23343bf
57385f8
8c0da98
388cb2d
ceec70a
565899a
e8643cb
67f2f1e
661813d
6a2312e
be770e8
f907bcd
871a097
4cf446e
17c0c32
cc3ebcd
3defadd
2ddad76
faa4d6c
b701821
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| <?php | ||
|
|
||
| declare(strict_types=1); | ||
|
|
||
| namespace Keboola\InputMapping\Helper; | ||
|
|
||
| use Keboola\InputMapping\Table\Options\InputTableOptions; | ||
| use Keboola\StagingProvider\Staging\File\FileStagingInterface; | ||
|
|
||
| class PathHelper | ||
| { | ||
| public static function ensurePathDelimiter(string $path): string | ||
| { | ||
| return self::ensureNoPathDelimiter($path) . '/'; | ||
| } | ||
|
|
||
| public static function ensureNoPathDelimiter(string $path): string | ||
| { | ||
| return rtrim($path, '\\/'); | ||
| } | ||
|
|
||
| public static function getManifestPath( | ||
| FileStagingInterface $metadataStorage, | ||
| string $destination, | ||
| InputTableOptions $table, | ||
| ): string { | ||
| return self::ensurePathDelimiter($metadataStorage->getPath()) . | ||
| self::getDestinationFilePath($destination, $table) . '.manifest'; | ||
| } | ||
|
|
||
| public static function getDataFilePath( | ||
| FileStagingInterface $dataStorage, | ||
| string $destination, | ||
| InputTableOptions $table, | ||
| ): string { | ||
| return self::ensurePathDelimiter($dataStorage->getPath()) . | ||
| self::getDestinationFilePath($destination, $table); | ||
| } | ||
|
|
||
| private static function getDestinationFilePath(string $destination, InputTableOptions $table): string | ||
| { | ||
| if (!$table->getDestination()) { | ||
| return $destination . '/' . $table->getSource(); | ||
| } else { | ||
| return $destination . '/' . $table->getDestination(); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -4,6 +4,8 @@ | |||||||||||
|
|
||||||||||||
| namespace Keboola\InputMapping; | ||||||||||||
|
|
||||||||||||
| use InvalidArgumentException; | ||||||||||||
| use Keboola\InputMapping\Exception\InputOperationException; | ||||||||||||
| use Keboola\InputMapping\File\Options\InputFileOptions; | ||||||||||||
| use Keboola\InputMapping\File\Options\RewrittenInputFileOptions; | ||||||||||||
| use Keboola\InputMapping\Helper\InputBucketValidator; | ||||||||||||
|
|
@@ -14,8 +16,11 @@ | |||||||||||
| use Keboola\InputMapping\State\InputTableStateList; | ||||||||||||
| use Keboola\InputMapping\Table\Options\InputTableOptionsList; | ||||||||||||
| use Keboola\InputMapping\Table\Options\ReaderOptions; | ||||||||||||
| use Keboola\InputMapping\Table\Options\RewrittenInputTableOptionsList; | ||||||||||||
| use Keboola\InputMapping\Table\Result; | ||||||||||||
| use Keboola\InputMapping\Table\Strategy\AbstractStrategy as TableAbstractStrategy; | ||||||||||||
| use Keboola\InputMapping\Table\Strategy\AbstractWorkspaceStrategy; | ||||||||||||
| use Keboola\InputMapping\Table\Strategy\WorkspaceLoadQueue; | ||||||||||||
| use Keboola\InputMapping\Table\TableDefinitionResolver; | ||||||||||||
| use Keboola\StorageApiBranch\ClientWrapper; | ||||||||||||
| use Psr\Log\LoggerInterface; | ||||||||||||
|
|
@@ -47,32 +52,29 @@ public function downloadFiles( | |||||||||||
| return $strategy->downloadFiles($configuration, $destination); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /** | ||||||||||||
| * @param InputTableOptionsList $tablesDefinition list of input mappings | ||||||||||||
| * @param InputTableStateList $tablesState list of input mapping table states | ||||||||||||
| * @param string $destination destination folder | ||||||||||||
| * @param ReaderOptions $readerOptions | ||||||||||||
| * @return Result | ||||||||||||
| */ | ||||||||||||
| public function downloadTables( | ||||||||||||
| InputTableOptionsList $tablesDefinition, | ||||||||||||
| InputTableStateList $tablesState, | ||||||||||||
| string $destination, | ||||||||||||
| ReaderOptions $readerOptions, | ||||||||||||
| ): Result { | ||||||||||||
| $tableResolver = new TableDefinitionResolver( | ||||||||||||
| private function createTableResolver(): TableDefinitionResolver | ||||||||||||
| { | ||||||||||||
| return new TableDefinitionResolver( | ||||||||||||
| $this->clientWrapper->getTableAndFileStorageClient(), | ||||||||||||
| $this->logger, | ||||||||||||
| ); | ||||||||||||
| $tablesState = TableRewriteHelperFactory::getTableRewriteHelper( | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private function rewriteTableStatesDestinations(InputTableStateList $tablesState): InputTableStateList | ||||||||||||
| { | ||||||||||||
| return TableRewriteHelperFactory::getTableRewriteHelper( | ||||||||||||
| $this->clientWrapper->getClientOptionsReadOnly(), | ||||||||||||
| )->rewriteTableStatesDestinations( | ||||||||||||
| $tablesState, | ||||||||||||
| $this->clientWrapper, | ||||||||||||
| $this->logger, | ||||||||||||
| ); | ||||||||||||
| $tablesDefinition = $tableResolver->resolve($tablesDefinition); | ||||||||||||
| $strategy = $this->strategyFactory->getTableInputStrategy($destination, $tablesState); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private function validateAndRewriteDevBuckets( | ||||||||||||
| InputTableOptionsList $tablesDefinition, | ||||||||||||
| ReaderOptions $readerOptions, | ||||||||||||
| ): RewrittenInputTableOptionsList { | ||||||||||||
| if ($readerOptions->devInputsDisabled() | ||||||||||||
| && !$this->clientWrapper->getClientOptionsReadOnly()->useBranchStorage() | ||||||||||||
| ) { | ||||||||||||
|
|
@@ -83,17 +85,79 @@ public function downloadTables( | |||||||||||
| $this->clientWrapper, | ||||||||||||
| ); | ||||||||||||
| } | ||||||||||||
| $tablesDefinition = TableRewriteHelperFactory::getTableRewriteHelper( | ||||||||||||
|
|
||||||||||||
| return TableRewriteHelperFactory::getTableRewriteHelper( | ||||||||||||
| $this->clientWrapper->getClientOptionsReadOnly(), | ||||||||||||
| )->rewriteTableOptionsSources( | ||||||||||||
| $tablesDefinition, | ||||||||||||
| $this->clientWrapper, | ||||||||||||
| $this->logger, | ||||||||||||
| ); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| /** | ||||||||||||
| * @param InputTableOptionsList $tablesDefinition list of input mappings | ||||||||||||
| * @param InputTableStateList $tablesState list of input mapping table states | ||||||||||||
| * @param string $destination destination folder | ||||||||||||
| * @param ReaderOptions $readerOptions | ||||||||||||
| * @return Result | ||||||||||||
| */ | ||||||||||||
| public function downloadTables( | ||||||||||||
| InputTableOptionsList $tablesDefinition, | ||||||||||||
| InputTableStateList $tablesState, | ||||||||||||
| string $destination, | ||||||||||||
| ReaderOptions $readerOptions, | ||||||||||||
| ): Result { | ||||||||||||
| $tablesState = $this->rewriteTableStatesDestinations($tablesState); | ||||||||||||
|
|
||||||||||||
| $tablesDefinition = $this->createTableResolver()->resolve($tablesDefinition); | ||||||||||||
| $strategy = $this->strategyFactory->getTableInputStrategy($destination, $tablesState); | ||||||||||||
| $tablesDefinition = $this->validateAndRewriteDevBuckets($tablesDefinition, $readerOptions); | ||||||||||||
|
|
||||||||||||
| /** @var TableAbstractStrategy $strategy */ | ||||||||||||
| return $strategy->downloadTables($tablesDefinition->getTables(), $readerOptions->preserveWorkspace()); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /** | ||||||||||||
| * Execute only prepare and execute phases for workspace table loading | ||||||||||||
| * Returns WorkspaceLoadQueue for later completion with waitForTableLoadCompletion() | ||||||||||||
| * | ||||||||||||
| * @param InputTableOptionsList $tablesDefinition list of input mappings | ||||||||||||
| * @param InputTableStateList $tablesState list of input mapping table states | ||||||||||||
| * @param string $destination destination folder | ||||||||||||
| * @param ReaderOptions $readerOptions | ||||||||||||
| * @return WorkspaceLoadQueue | ||||||||||||
| * @throws InvalidArgumentException if strategy is not workspace-based | ||||||||||||
| */ | ||||||||||||
| public function prepareAndExecuteTableLoads( | ||||||||||||
|
ErikZigo marked this conversation as resolved.
|
||||||||||||
| InputTableOptionsList $tablesDefinition, | ||||||||||||
| InputTableStateList $tablesState, | ||||||||||||
| string $destination, | ||||||||||||
| ReaderOptions $readerOptions, | ||||||||||||
| ): WorkspaceLoadQueue { | ||||||||||||
| $tablesState = $this->rewriteTableStatesDestinations($tablesState); | ||||||||||||
|
|
||||||||||||
| $tablesDefinition = $this->createTableResolver()->resolve($tablesDefinition); | ||||||||||||
| $strategy = $this->strategyFactory->getTableInputStrategy($destination, $tablesState); | ||||||||||||
|
|
||||||||||||
| // Ensure we have a workspace strategy. For file this method is not yet implemented. | ||||||||||||
| if (!$strategy instanceof AbstractWorkspaceStrategy) { | ||||||||||||
| throw new InputOperationException( | ||||||||||||
| 'prepareAndExecuteTableLoads() can only be used with workspace strategies', | ||||||||||||
|
||||||||||||
| 'prepareAndExecuteTableLoads() can only be used with workspace strategies', | |
| sprintf( | |
| 'prepareAndExecuteTableLoads() requires a workspace strategy but received %s. Available workspace strategies are: BigQueryWorkspaceStrategy, SnowflakeWorkspaceStrategy.', | |
| get_class($strategy) | |
| ), |
Uh oh!
There was an error while loading. Please reload this page.