From 9e24992e605a17d1ab35dafa3d8d986b4df67e65 Mon Sep 17 00:00:00 2001 From: Maxence Lange Date: Mon, 27 Oct 2025 14:29:57 -0100 Subject: [PATCH] new sync service Signed-off-by: Maxence Lange --- appinfo/info.xml | 7 + lib/AppInfo/Application.php | 12 +- lib/Command/Stop.php | 2 +- lib/Command/Sync.php | 85 +++++++++ lib/ConfigLexicon.php | 9 +- lib/Db/SyncMapper.php | 64 +++++++ lib/Enum/SessionType.php | 17 ++ .../Version23001Date20220408140253.php | 4 +- .../Version33001Date202511271645.php | 55 ++++++ lib/Model/DocumentSync.php | 45 +++++ lib/Model/IndexQueryHelper.php | 26 +++ lib/RepairStep/AppEnabled.php | 31 ++++ lib/Service/FullTextSearchService.php | 47 +++++ lib/Service/LoggerService.php | 72 ++++++++ lib/Service/ProcessService.php | 67 +++++++ lib/Service/ProviderService.php | 1 - lib/Service/RunningService.php | 6 +- lib/Service/SyncService.php | 170 ++++++++++++++++++ 18 files changed, 706 insertions(+), 14 deletions(-) create mode 100644 lib/Command/Sync.php create mode 100644 lib/Db/SyncMapper.php create mode 100644 lib/Enum/SessionType.php create mode 100644 lib/Migration/Version33001Date202511271645.php create mode 100644 lib/Model/DocumentSync.php create mode 100644 lib/Model/IndexQueryHelper.php create mode 100644 lib/RepairStep/AppEnabled.php create mode 100644 lib/Service/FullTextSearchService.php create mode 100644 lib/Service/LoggerService.php create mode 100644 lib/Service/ProcessService.php create mode 100644 lib/Service/SyncService.php diff --git a/appinfo/info.xml b/appinfo/info.xml index a4c4bba7..f7a30a64 100644 --- a/appinfo/info.xml +++ b/appinfo/info.xml @@ -34,6 +34,12 @@ Core App of the full-text search framework for your Nextcloud. OCA\FullTextSearch\Cron\Index + + + OCA\FullTextSearch\RepairStep\AppEnabled + + + OCA\FullTextSearch\Command\Check OCA\FullTextSearch\Command\CollectionInit @@ -50,6 +56,7 @@ Core App of the full-text search framework for your Nextcloud. OCA\FullTextSearch\Command\Reset OCA\FullTextSearch\Command\Search OCA\FullTextSearch\Command\Stop + OCA\FullTextSearch\Command\Sync OCA\FullTextSearch\Command\Test diff --git a/lib/AppInfo/Application.php b/lib/AppInfo/Application.php index b5994b91..23792fd6 100644 --- a/lib/AppInfo/Application.php +++ b/lib/AppInfo/Application.php @@ -11,10 +11,14 @@ use Closure; +use NCU\FullTextSearch\IManager; use OC; +use OC\FullTextSearch\FullTextSearchManager; use OCA\FullTextSearch\Capabilities; use OCA\FullTextSearch\ConfigLexicon; +use OCA\FullTextSearch\Provider\FilesContentProvider; use OCA\FullTextSearch\Search\UnifiedSearchProvider; +use OCA\FullTextSearch\Service\FullTextSearchService; use OCA\FullTextSearch\Service\IndexService; use OCA\FullTextSearch\Service\ProviderService; use OCA\FullTextSearch\Service\SearchService; @@ -41,6 +45,9 @@ public function register(IRegistrationContext $context): void { $context->registerCapability(Capabilities::class); $context->registerSearchProvider(UnifiedSearchProvider::class); $context->registerConfigLexicon(ConfigLexicon::class); + + $context->registerFullTextSearchService(FullTextSearchService::class); + $this->registerServices($this->getContainer()); } @@ -53,11 +60,8 @@ public function boot(IBootContext $context): void { $context->injectFn(Closure::fromCallable([$this, 'registerNavigation'])); } - /** - * Register Navigation Tab - * - * @param ContainerInterface $container + * @deprecated */ protected function registerServices(ContainerInterface $container) { /** @var IFullTextSearchManager $fullTextSearchManager */ diff --git a/lib/Command/Stop.php b/lib/Command/Stop.php index 6d9e545b..d016219f 100644 --- a/lib/Command/Stop.php +++ b/lib/Command/Stop.php @@ -41,7 +41,7 @@ protected function configure() { protected function execute(InputInterface $input, OutputInterface $output) { $output->writeln('stopping all running indexes'); - $this->runningService->forceStop(); + $this->runningService->stop(); return 0; } diff --git a/lib/Command/Sync.php b/lib/Command/Sync.php new file mode 100644 index 00000000..d081c401 --- /dev/null +++ b/lib/Command/Sync.php @@ -0,0 +1,85 @@ +setName('fulltextsearch:sync') + ->setDescription('Index files') + ->addOption('info', '', InputOption::VALUE_NONE, 'display info entries') + ->addOption('no-output', '', InputOption::VALUE_NONE, 'no output, use nextcloud logs'); + } + + protected function execute(InputInterface $input, OutputInterface $output) { + if (!$input->getOption('no-output')) { + $this->loggerService->setOutputInterface($output, $input->getOption('info')); + } +// $this->fullTextSearchService->requestIndex('files', '123'); + $this->lockService->lock(); + + while (true) { + try { + $this->lockService->update(); + $this->syncProcess(); + } catch (Exception $e) { + $this->loggerService->error('Exception while running fulltextsearch:sync', ['exception' => $e]); + } + sleep(10); + } + + return 0; + } + + private function syncProcess() { + $this->loggerService->info('initiating a new sync session'); + $this->syncService->smartSync(); + $this->loggerService->info('sync session closed'); + } +} + diff --git a/lib/ConfigLexicon.php b/lib/ConfigLexicon.php index df348e06..ca119f7a 100644 --- a/lib/ConfigLexicon.php +++ b/lib/ConfigLexicon.php @@ -26,8 +26,10 @@ class ConfigLexicon implements ILexicon { public const COLLECTION_INDEXING_LIST = 'collection_indexing_list'; public const COLLECTION_INTERNAL = 'collection_internal'; public const COLLECTION_LINKS = 'collection_links'; + public const SYNC_REQUIREMENT_LEVEL = 'sync_requirement'; public const LOCK_ID = 'lock_id'; public const LOCK_PING = 'lock_ping'; + public const ENABLED_SINCE = 'enabled_since'; public function getStrictness(): Strictness { return Strictness::NOTICE; @@ -42,9 +44,10 @@ public function getAppConfigs(): array { new Entry(key: self::COLLECTION_INDEXING_LIST, type: ValueType::INT, defaultRaw: 50, definition: 'size of chunks of async documents on collection queue request'), new Entry(key: self::COLLECTION_INTERNAL, type: ValueType::STRING, defaultRaw: 'local', definition: 'name of the local collection'), new Entry(key: self::COLLECTION_LINKS, type: ValueType::ARRAY, defaultRaw: [], definition: '(internal) data relative to collections'), - // IAppConfig::FLAG_INTERNAL) - new Entry(key: self::LOCK_ID, type: ValueType::STRING, defaultRaw: '', definition: 'internal lock id', lazy: true), - new Entry(key: self::LOCK_PING, type: ValueType::INT, defaultRaw: 0, definition: 'internal lock time', lazy: true), + new Entry(key: self::SYNC_REQUIREMENT_LEVEL, type: ValueType::INT, defaultRaw: 0, definition: 'requirement level to confirm the need for sync'), + new Entry(key: self::LOCK_ID, type: ValueType::STRING, defaultRaw: '', definition: 'internal lock id', lazy: true, flags: IAppConfig::FLAG_INTERNAL), + new Entry(key: self::LOCK_PING, type: ValueType::INT, defaultRaw: 0, definition: 'internal lock time', lazy: true, flags: IAppConfig::FLAG_INTERNAL), + new Entry(key: self::ENABLED_SINCE, type: ValueType::INT, defaultRaw: 0, definition: 'the time since the fulltextsearch app is enabled', lazy: true, flags: IAppConfig::FLAG_INTERNAL), ]; } diff --git a/lib/Db/SyncMapper.php b/lib/Db/SyncMapper.php new file mode 100644 index 00000000..4ad25095 --- /dev/null +++ b/lib/Db/SyncMapper.php @@ -0,0 +1,64 @@ + + */ +class SyncMapper extends QBMapper { + public const TABLE = 'fulltextsearch_sync'; + + public function __construct( + IDBConnection $db, + ) { + parent::__construct($db, self::TABLE, DocumentSync::class); + } + + /** + * @return DocumentSync[] + */ + public function getForcedSyncs(int $limit = 100): array { + $qb = $this->db->getQueryBuilder(); + $qb->select('*') + ->from($this->getTableName()) + ->where($qb->expr()->eq('indexed', $qb->createNamedParameter(0, IQueryBuilder::PARAM_INT))) + ->setMaxResults($limit); + + return $this->findEntities($qb); + } + + public function update(Entity $entity): Entity { + $qb = $this->db->getQueryBuilder(); + $qb->update($this->getTableName()) + ->set('indexed', $qb->createNamedParameter($entity->getIndexed(), IQueryBuilder::PARAM_INT)) + ->where( + $qb->expr()->eq('provider_id', $qb->createNamedParameter($entity->getProviderId())), + $qb->expr()->eq('document_id', $qb->createNamedParameter($entity->getDocumentId())), + ); + $qb->executeStatement(); + return $entity; + } + +// public function reset(string $providerId, string $documentId): void { +// $qb = $this->db->getQueryBuilder(); +// $qb->update($this->getTableName()) +// ->set('indexed', $qb->createNamedParameter(0, IQueryBuilder::PARAM_INT)) +// ->where( +// $qb->expr()->eq('provider_id', $qb->createNamedParameter($providerId())), +// $qb->expr()->eq('document_id', $qb->createNamedParameter($documentId())), +// ); +// $qb->executeStatement(); +// } +} diff --git a/lib/Enum/SessionType.php b/lib/Enum/SessionType.php new file mode 100644 index 00000000..b43f7ec0 --- /dev/null +++ b/lib/Enum/SessionType.php @@ -0,0 +1,17 @@ +hasTable('fulltextsearch_indexes')) { + if (!$schema->hasTable('fulltextsearch_sync')) { return null; } - $table = $schema->getTable('fulltextsearch_indexes'); + $table = $schema->getTable('fulltextsearch_sync'); $column = $table->getColumn('message'); if ($column->getType()->getName() === Types::TEXT) { diff --git a/lib/Migration/Version33001Date202511271645.php b/lib/Migration/Version33001Date202511271645.php new file mode 100644 index 00000000..098eb7c5 --- /dev/null +++ b/lib/Migration/Version33001Date202511271645.php @@ -0,0 +1,55 @@ +hasTable('fulltextsearch_sync')) { + return null; + } + + $table = $schema->createTable('fulltextsearch_sync'); + $table->addColumn('provider_id', Types::STRING, [ + 'length' => 31, + 'notnull' => true, + ]); + $table->addColumn('document_id', Types::STRING, [ + 'length' => 31, + 'notnull' => true, + ]); + $table->addColumn('flags', Types::INTEGER, [ + 'length' => 7, + 'default' => 0, + ]); + $table->addColumn('indexed', Types::BIGINT, [ + 'length' => 11, + 'default' => 0, + ]); + $table->addColumn('checksum', Types::STRING, [ + 'length' => 16, + 'notnull' => true, + ]); + + $table->setPrimaryKey(['provider_id', 'document_id'], 'fts_i_pd'); + $table->addIndex(['indexed'], 'fts_i_i'); + + return $schema; + } +} diff --git a/lib/Model/DocumentSync.php b/lib/Model/DocumentSync.php new file mode 100644 index 00000000..67de7f5d --- /dev/null +++ b/lib/Model/DocumentSync.php @@ -0,0 +1,45 @@ +addType('providerId', 'string'); + $this->addType('documentId', 'string'); + $this->addType('flags', 'integer'); + $this->addType('indexed', 'integer'); + $this->addType('checksum', 'string'); + } + + public function definition(): string { + return $this->getProviderId() . '/' . $this->getDocumentId(); + } +} diff --git a/lib/Model/IndexQueryHelper.php b/lib/Model/IndexQueryHelper.php new file mode 100644 index 00000000..f9f9e9df --- /dev/null +++ b/lib/Model/IndexQueryHelper.php @@ -0,0 +1,26 @@ +needed = false; + } + + public function isNeeded(): bool { + return $this->needed; + } +} diff --git a/lib/RepairStep/AppEnabled.php b/lib/RepairStep/AppEnabled.php new file mode 100644 index 00000000..85f74c9e --- /dev/null +++ b/lib/RepairStep/AppEnabled.php @@ -0,0 +1,31 @@ +appConfig->setValueInt(Application::APP_ID, ConfigLexicon::ENABLED_SINCE, time()); + } +} diff --git a/lib/Service/FullTextSearchService.php b/lib/Service/FullTextSearchService.php new file mode 100644 index 00000000..eb49b169 --- /dev/null +++ b/lib/Service/FullTextSearchService.php @@ -0,0 +1,47 @@ +loggerService; + } + + public function requestIndex(string $providerId, string $documentId): void { + $index = new DocumentSync(); + $index->setProviderId($providerId); + $index->setDocumentId($documentId); + $index->setIndexed(0); + $index->setChecksum(''); + + try { + $this->mapper->insertOrUpdate($index); + } catch (Exception $e) { + $this->logger->warning('could not store index', ['exception' => $e]); + } + } + + public function deleteIndex(string $providerId, string $documentId): void { + } +} diff --git a/lib/Service/LoggerService.php b/lib/Service/LoggerService.php new file mode 100644 index 00000000..5a02d894 --- /dev/null +++ b/lib/Service/LoggerService.php @@ -0,0 +1,72 @@ +verbose = $verbose; + $this->output = $output; + $this->logger = null; + } + + public function info(string $entry): void { + if ($this->verbose) { + $this->output?->writeln($this->prepOutput() . ' ' . $this->prepEntry($entry, 'comment')); + } + $this->logger?->debug('[' . $this->currentSessionType->value . '] ' . 'info: ' . $entry); + } + + public function action(string $entry): void { + $this->output?->writeln($this->prepOutput() . ' ' . $this->prepEntry($entry)); + $this->logger?->debug('[' . $this->currentSessionType->value . '] ' . 'action: ' . $entry); + } + + public function warning(string $entry, array $data = []): void { + $this->output?->writeln($this->prepOutput() . ' ' . $this->prepEntry($entry, 'error')); + $this->logger?->warning('[' . $this->currentSessionType->value . '] ' . $entry, $data); + } + + public function error(string $entry, array $data = []): void { + $this->output?->writeln($this->prepOutput() . ' ' . $this->prepEntry($entry, 'error')); + $this->loggerError->warning('[' . $this->currentSessionType->value . '] ' . $entry, $data); + } + + public function session(SessionType $sessionType = SessionType::UNKNOWN): void { + $this->currentSessionType = $sessionType; + } + + private function prepOutput(): string { + return '' . date('[H:i:s]') . ' '; + } + + private function prepEntry(string $entry, string $tag = ''): string { + $entry = ($tag !== '') ? '<' . $tag . '>' . $entry . '' : $entry; + $prefix = '(' . str_pad($this->currentSessionType->value . ')', 14, ' ') . ''; + return $prefix . $entry; + } +} diff --git a/lib/Service/ProcessService.php b/lib/Service/ProcessService.php new file mode 100644 index 00000000..5d2cc5cc --- /dev/null +++ b/lib/Service/ProcessService.php @@ -0,0 +1,67 @@ +connection->close(); + + if ($pid === -1) { + // TODO: manage issue while forking + } else if ($pid === 0) { + // forked process + $indexDocument($sync); + exit(); + } else { + // main process, counting forks + $this->forkCount++; + while (true) { + if (pcntl_waitpid(0, $status, WNOHANG) !== 0) { + $this->forkCount--; + } + if ($this->forkCount < self::FORK_LIMIT) { + return; + } + usleep(self::FORK_SLEEP); + } + } + } + + public function waitForChild(): void { + if (!extension_loaded('posix')) { + return; + } + + while (pcntl_waitpid(0, $status) !== -1) { + } + } +} diff --git a/lib/Service/ProviderService.php b/lib/Service/ProviderService.php index 6e2251e2..6142736b 100644 --- a/lib/Service/ProviderService.php +++ b/lib/Service/ProviderService.php @@ -269,5 +269,4 @@ public function addJavascriptAPI() { Util::addScript(Application::APP_ID, 'fulltextsearch.v1'); } - } diff --git a/lib/Service/RunningService.php b/lib/Service/RunningService.php index 8337d657..bacb0baa 100644 --- a/lib/Service/RunningService.php +++ b/lib/Service/RunningService.php @@ -25,7 +25,7 @@ public function __construct( /** * @deprecated */ - public function start(string $source): int { + public function start(string $source = ''): int { try { $this->lockService->lock(); } catch (LockException $e) { @@ -37,7 +37,7 @@ public function start(string $source): int { /** * @deprecated */ - public function update(int $runId, string $action = '') { + public function update(int $runId = 0, string $action = '') { try { $this->lockService->update(); } catch (LockException $e) { @@ -48,7 +48,7 @@ public function update(int $runId, string $action = '') { /** * @deprecated */ - public function stop(int $runId, string $reason = '') { + public function stop(int $runId = 0, string $reason = '') { $this->lockService->unlock(); } } diff --git a/lib/Service/SyncService.php b/lib/Service/SyncService.php new file mode 100644 index 00000000..41dd5375 --- /dev/null +++ b/lib/Service/SyncService.php @@ -0,0 +1,170 @@ +syncForcedIndexes() + || $this->syncContentProviders() + || $this->resyncRecentDocuments() + || $this->syncOlderDocuments(6 * 30 * 24 * 3600) + || $this->syncOlderDocuments(3 * 30 * 24 * 3600) + || $this->syncOlderDocuments(4 * 7 * 24 * 3600) + || $this->syncOlderDocuments(2 * 7 * 24 * 3600) + || $this->syncOlderDocuments(7 * 24 * 3600) + || $this->syncOlderDocuments() + ); + } + + /** + * index documents that have been set as out-of-sync + * + * @return bool FALSE if no out-of-sync documents were found + */ + public function syncForcedIndexes(int $limit = 100): bool { + $this->logger->session(SessionType::FORCED); + $syncDocuments = $this->mapper->getForcedSyncs($limit); + if (empty($syncDocuments)) { + return false; + } + $this->syncDocuments($syncDocuments); + return true; + } + + /** + * request provider for possible unknown documents to be indexed + * + * @return bool FALSE if none were found + */ + public function syncContentProviders(): bool { + $this->logger->session(SessionType::SYNC); + foreach ($this->manager->getContentProviders() as $provider) { + $this->syncContentProvider($provider); + } + + return false; + } + + public function syncContentProvider(string $providerId): void { + //$this->logger->action('providerId: ' . $providerId); + } + + private function resyncRecentDocuments(): bool { + $this->logger->session(SessionType::FORCED); + return false; + } + + private function syncOlderDocuments($olderThan = 0): bool { + return false; + } + + public function syncDocuments(array $syncDocuments): void { + foreach ($syncDocuments as $sync) { + $this->processService->forkIndexDocument($sync, [$this, 'syncDocument']); + } + $this->processService->waitForChild(); + } + + public function syncDocument(DocumentSync $sync): void { + $time = time(); + $this->logger->action('indexing document ' . $sync->definition()); + $this->indexDocument($sync); + $sync->setIndexed($time); + +// $this->mapper->insertOrUpdate($sync); + } + + private function indexDocument(DocumentSync $sync): void { + $provider = $this->getContentProvider($sync->getProviderId()); + if ($provider === null) { + $this->logger->error('provider ' . $sync->getProviderId() . ' not found'); + return; + } + + $document = $provider->getDocument($sync->getDocumentId()); + if ($document === null) { + $this->logger->info('document ' . $sync->definition() . ' not found'); + return; + } + + $checksum = $document->getChecksum(); + if (($sync->getChecksum() === $checksum) && ($this->appConfig->getAppValueString(ConfigLexicon::SYNC_REQUIREMENT_LEVEL) >= self::SYNC_LEVEL_CHECKSUM)) { + $this->logger->logger('document ' . $sync->definition() . ' seems to be identical to the indexed version'); + return; + } + + $sync->setChecksum($checksum); + + echo json_encode($document->getFlags()) . ' ' . json_encode($document->getContent()) . "\n"; + // get document + // send document to ES + // confirm it is done + sleep(rand(1, 15)); + } + + private function getContentProvider(string $providerId): ?IContentProvider { + foreach ($this->manager->getContentProviders() as $providerClass) { + try { + $provider = Server::get($providerClass); + if ($provider->getId() === $providerId) { + return $provider; + } + } catch (NotFoundExceptionInterface|ContainerExceptionInterface $e) { + $this->coreLogger->warning('could not load ' . $providerClass, ['exception' => $e]); + } + } + + return null; + } + +}