66import logging
77
88from django .apps import apps
9+ from django .db .models import Q , OuterRef , Subquery
10+ from django .contrib .contenttypes .models import ContentType
911from celery .utils .time import get_exponential_backoff_interval
1012import requests
1113
14+
1215from framework .celery_tasks import app as celery_app
1316from framework .celery_tasks .handlers import enqueue_task
1417from framework .encryption import ensure_bytes
@@ -80,6 +83,7 @@ def task__update_share(self, guid: str, is_backfill=False, osfmap_partition_name
8083 raise ValueError (f'unknown osfguid "{ guid } "' )
8184 _resource = _osfid_instance .referent
8285 _is_deletion = _should_delete_indexcard (_resource )
86+ _resource .mark_indexing_failed ()
8387 try :
8488 _response = (
8589 pls_delete_trove_record (_resource , osfmap_partition = _osfmap_partition )
@@ -115,6 +119,7 @@ def task__update_share(self, guid: str, is_backfill=False, osfmap_partition_name
115119 if HTTPStatus (_response .status_code ).is_server_error :
116120 raise self .retry (exc = e )
117121 else : # success response
122+ _resource .mark_indexing_success ()
118123 if not _is_deletion :
119124 # enqueue followup task for supplementary metadata
120125 _next_partition = _next_osfmap_partition (_osfmap_partition )
@@ -126,6 +131,39 @@ def task__update_share(self, guid: str, is_backfill=False, osfmap_partition_name
126131 )
127132
128133
134+ @celery_app .task
135+ def task__reindex_failed_or_not_indexed_resource_into_share (resource_type : str , start_id : int = 0 , chunk_count : int = 200 , chunk_size : int = 500 ):
136+ from osf .management .commands .recatalog_metadata import recatalog
137+ queryset = get_not_indexed_guids_for_resource_with_no_indexed_guid (resource_type , only_oldest_guid = False )
138+ # chunk count and chunk size up to discussion what will be better with Cloud Team
139+ recatalog (queryset , start_id , chunk_count , chunk_size )
140+
141+
142+ def get_not_indexed_guids_for_resource_with_no_indexed_guid (resource_type : str , only_oldest_guid : bool = True ):
143+ from osf .models import Guid , Registration , Preprint , Node , OSFUser
144+ from addons .osfstorage .models import OsfStorageFile
145+ common_not_indexed_public_resource_extract_query = (
146+ Q (is_public = True ) & Q (deleted__isnull = True ) &
147+ (Q (has_been_indexed = False ) | Q (has_been_indexed__isnull = True ))
148+ )
149+ resource_mapper = {
150+ 'projects' : (Node , common_not_indexed_public_resource_extract_query , ('first_guid' , 'date_last_indexed' , 'title' )),
151+ 'preprints' : (Preprint , common_not_indexed_public_resource_extract_query & Q (is_published = True ), ('first_guid' , 'date_last_indexed' , 'title' )),
152+ 'registries' : (Registration , common_not_indexed_public_resource_extract_query , ('first_guid' , 'date_last_indexed' , 'title' )),
153+ 'users' : (OSFUser , Q (is_active = True ) & Q (deleted__isnull = True ) & (Q (has_been_indexed = False ) | Q (has_been_indexed__isnull = True )), ('first_guid' , 'fullname' , 'date_last_indexed' )),
154+ 'files' : (OsfStorageFile , Q (deleted__isnull = True ), ('first_guid' , 'name' , 'date_last_indexed' )),
155+ }
156+ resource_model , query , values_to_return = resource_mapper .get (resource_type , 'projects' )
157+ if only_oldest_guid :
158+ model_content_type = ContentType .objects .get_for_model (resource_model )
159+ first_guid_sq = Guid .objects .filter (
160+ content_type = model_content_type ,
161+ object_id = OuterRef ('pk' ),
162+ ).order_by ('created' ).values ('_id' )[:1 ]
163+ return resource_model .objects .filter (query ).annotate (first_guid = Subquery (first_guid_sq )).exclude (first_guid__isnull = True ).values (* values_to_return )
164+ return resource_model .objects .filter (query )
165+
166+
129167def pls_send_trove_record (osf_item , * , is_backfill : bool , osfmap_partition : OsfmapPartition ):
130168 try :
131169 _iri = osf_item .get_semantic_iri ()
0 commit comments