From 51349f8149996d8faf8e3c004472528da8be11be Mon Sep 17 00:00:00 2001 From: Kevin Diem Date: Thu, 15 Dec 2022 14:30:44 -0500 Subject: [PATCH 1/2] Revert extended retry; close request --- tap_mailchimp_export/streams.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/tap_mailchimp_export/streams.py b/tap_mailchimp_export/streams.py index 119bf61..46c5115 100644 --- a/tap_mailchimp_export/streams.py +++ b/tap_mailchimp_export/streams.py @@ -14,10 +14,6 @@ PAGE_SIZE = 500 -class RateLimitException(Exception): - pass - - class RemoteDisconnected(Exception): pass @@ -128,11 +124,11 @@ def transform_event(record, campaign, include_sends): return [] obj = json.loads(record) - if int(obj.get('code', 0)) == -50: - raise RateLimitException(record) if 'error' in obj.keys(): + record.close() raise Exception(record) elif int(obj.get('status', 0)) >= 400: + record.close() raise Exception(record) try: @@ -287,12 +283,6 @@ def run_export_request(ctx, entity, stream, last_updated, retries=0, param_id=No write_records_and_update_state( entity, stream, batched_records, last_updated) - except RateLimitException as e: - logger.info(e) - logger.info('Rate limit hit; waiting 90 seconds - then retrying') - time.sleep(90) - retries += 1 - run_export_request(ctx, entity, stream, last_updated, retries) except Exception as e: logger.info(e) logger.info('Waiting 30 seconds - then retrying') From 97d97159e22fee88043de46583b8b80156795117 Mon Sep 17 00:00:00 2001 From: Kevin Diem Date: Thu, 15 Dec 2022 15:40:26 -0500 Subject: [PATCH 2/2] Fix closing connection --- tap_mailchimp_export/streams.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/tap_mailchimp_export/streams.py b/tap_mailchimp_export/streams.py index 46c5115..493d536 100644 --- a/tap_mailchimp_export/streams.py +++ b/tap_mailchimp_export/streams.py @@ -125,10 +125,8 @@ def transform_event(record, campaign, include_sends): obj = json.loads(record) if 'error' in obj.keys(): - record.close() raise Exception(record) elif int(obj.get('status', 0)) >= 400: - record.close() raise Exception(record) try: @@ -267,21 +265,26 @@ def run_export_request(ctx, entity, stream, last_updated, retries=0, param_id=No with ctx.client.export_post( stream, entity, last_updated, params ) as res: - if stream in ( - IDS.CAMPAIGN_SUBSCRIBER_ACTIVITY, - IDS.AUTOMATION_WORKFLOW_SUBSCRIBER_ACTIVITY): - batched_records = \ - handle_subscriber_activity_response( - res, stream, entity, last_updated, include_sends + try: + if stream in ( + IDS.CAMPAIGN_SUBSCRIBER_ACTIVITY, + IDS.AUTOMATION_WORKFLOW_SUBSCRIBER_ACTIVITY): + batched_records = \ + handle_subscriber_activity_response( + res, stream, entity, last_updated, include_sends + ) + elif stream in (IDS.LIST_MEMBERS_BY_UPDATE, IDS.LIST_MEMBERS_BY_CREATE): + batched_records = handle_list_members_response( + res, stream, entity, last_updated ) - elif stream in (IDS.LIST_MEMBERS_BY_UPDATE, IDS.LIST_MEMBERS_BY_CREATE): - batched_records = handle_list_members_response( - res, stream, entity, last_updated - ) - if batched_records: - write_records_and_update_state( - entity, stream, batched_records, last_updated) + if batched_records: + write_records_and_update_state( + entity, stream, batched_records, last_updated) + except Exception as e: + res.close() + + raise e except Exception as e: logger.info(e)