From 1929ac4c6f8e3fa6100968b69369d8b0de4c523f Mon Sep 17 00:00:00 2001 From: Mangat Toor Date: Sun, 3 May 2026 12:52:34 -0700 Subject: [PATCH] Fix celery worker exhaustion and increase task time limit to 3 hours --- airbnb_project/listings/tasks.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/airbnb_project/listings/tasks.py b/airbnb_project/listings/tasks.py index 58a8ecf..c723bd4 100644 --- a/airbnb_project/listings/tasks.py +++ b/airbnb_project/listings/tasks.py @@ -22,27 +22,26 @@ }) -def run_spider(): +def run_spider(extra_settings=None): """ - Run the Scrapy spider for harvesting listings. + Run the Scrapy spider for harvesting listings in a separate process. - This function initializes a Scrapy CrawlerProcess with the required settings, - schedules the `ListingsSpider` to run, and starts the crawling process. - The function runs in non-blocking mode with `stop_after_crawl=False` to keep - the process active after the spider completes. + This function spawns a subprocess to run the Scrapy CrawlerProcess. + Isolation is required because Scrapy uses the Twisted reactor, which + can only be started once per process and performs blocking operations. + Running in a subprocess ensures the Celery worker remains responsive. - Returns: - None + Args: + extra_settings (dict, optional): Additional Scrapy settings to override defaults. """ - runner = CrawlerProcess(settings=get_harvester_settings()) - runner.crawl(ListingsSpider) - runner.start(stop_after_crawl=False) - def _run(): try: - runner = CrawlerProcess(settings=get_harvester_settings()) + settings = get_harvester_settings() + if extra_settings: + settings.update(extra_settings) + runner = CrawlerProcess(settings=settings) runner.crawl(ListingsSpider) - runner.start() # This blocks until finished + runner.start() except Exception as e: logger.error(f"Spider subprocess failed: {e}") @@ -50,7 +49,7 @@ def _run(): p.start() p.join() -@shared_task(bind=True, retry_kwargs={'max_retries': 1}, ignore_result=True, time_limit=3600, soft_time_limit=3400) +@shared_task(bind=True, retry_kwargs={'max_retries': 1}, ignore_result=True, time_limit=10800, soft_time_limit=10600) def run_harvest_task(self): """ Celery task to trigger the Scrapy spider for harvesting listings.