Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 18 additions & 25 deletions tap_mailchimp_export/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
PAGE_SIZE = 500


class RateLimitException(Exception):
pass


class RemoteDisconnected(Exception):
pass

Expand Down Expand Up @@ -128,8 +124,6 @@ def transform_event(record, campaign, include_sends):
return []
obj = json.loads(record)

if int(obj.get('code', 0)) == -50:
raise RateLimitException(record)
if 'error' in obj.keys():
raise Exception(record)
elif int(obj.get('status', 0)) >= 400:
Expand Down Expand Up @@ -271,28 +265,27 @@ def run_export_request(ctx, entity, stream, last_updated, retries=0, param_id=No
with ctx.client.export_post(
stream, entity, last_updated, params
) as res:
if stream in (
IDS.CAMPAIGN_SUBSCRIBER_ACTIVITY,
IDS.AUTOMATION_WORKFLOW_SUBSCRIBER_ACTIVITY):
batched_records = \
handle_subscriber_activity_response(
res, stream, entity, last_updated, include_sends
try:
if stream in (
IDS.CAMPAIGN_SUBSCRIBER_ACTIVITY,
IDS.AUTOMATION_WORKFLOW_SUBSCRIBER_ACTIVITY):
batched_records = \
handle_subscriber_activity_response(
res, stream, entity, last_updated, include_sends
)
elif stream in (IDS.LIST_MEMBERS_BY_UPDATE, IDS.LIST_MEMBERS_BY_CREATE):
batched_records = handle_list_members_response(
res, stream, entity, last_updated
)
elif stream in (IDS.LIST_MEMBERS_BY_UPDATE, IDS.LIST_MEMBERS_BY_CREATE):
batched_records = handle_list_members_response(
res, stream, entity, last_updated
)

if batched_records:
write_records_and_update_state(
entity, stream, batched_records, last_updated)
if batched_records:
write_records_and_update_state(
entity, stream, batched_records, last_updated)
except Exception as e:
res.close()

raise e

except RateLimitException as e:
logger.info(e)
logger.info('Rate limit hit; waiting 90 seconds - then retrying')
time.sleep(90)
retries += 1
run_export_request(ctx, entity, stream, last_updated, retries)
except Exception as e:
logger.info(e)
logger.info('Waiting 30 seconds - then retrying')
Expand Down