Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions splitio/client/factory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import pytest
"""A module for Split.io Factories."""
import logging
import threading
Expand Down Expand Up @@ -643,7 +642,8 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
)

telemetry_init_producer.record_config(cfg, extra_cfg, total_flag_sets, invalid_flag_sets)

internal_events_task.start()

if preforked_initialization:
synchronizer.sync_all(max_retry_attempts=_MAX_RETRY_SYNC_ALL)
synchronizer._split_synchronizers._segment_sync.shutdown()
Expand Down
32 changes: 24 additions & 8 deletions splitio/events/events_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import threading
import logging
from collections import namedtuple
import pytest

from splitio.events import EventsManagerInterface
from splitio.models.events import SdkEvent

_LOGGER = logging.getLogger(__name__)

Expand All @@ -25,10 +25,17 @@ def __init__(self, events_configurations, events_delivery):
self._lock = threading.RLock()

def register(self, sdk_event, event_handler):
if self._active_subscriptions.get(sdk_event) != None:
if self._active_subscriptions.get(sdk_event) != None and self._get_event_handler(sdk_event) != None:
return

with self._lock:
# SDK ready already fired
if sdk_event == SdkEvent.SDK_READY and self._event_already_triggered(sdk_event):
self._active_subscriptions[sdk_event] = ActiveSubscriptions(True, event_handler)
_LOGGER.debug("EventsManager: Firing SDK_READY event for new subscription")
self._fire_sdk_event(sdk_event, None)
return

self._active_subscriptions[sdk_event] = ActiveSubscriptions(False, event_handler)

def unregister(self, sdk_event):
Expand All @@ -42,18 +49,27 @@ def notify_internal_event(self, sdk_internal_event, event_metadata):
with self._lock:
for sorted_event in self._manager_config.evaluation_order:
if sorted_event in self._get_sdk_event_if_applicable(sdk_internal_event):
_LOGGER.debug("EventsManager: Firing Sdk event %s", sorted_event)
if self._get_event_handler(sorted_event) != None:
notify_event = threading.Thread(target=self._events_delivery.deliver, args=[sorted_event, event_metadata, self._get_event_handler(sorted_event)],
name='SplitSDKEventNotify', daemon=True)
notify_event.start()
self._set_sdk_event_triggered(sorted_event)
self._fire_sdk_event(sorted_event, event_metadata)

# if client is not subscribed to SDK_READY
if sorted_event == SdkEvent.SDK_READY and self._get_event_handler(sorted_event) == None:
_LOGGER.debug("EventsManager: Registering SDK_READY event as fired")
self._active_subscriptions[SdkEvent.SDK_READY] = ActiveSubscriptions(True, None)


def destroy(self):
with self._lock:
self._active_subscriptions = {}
self._internal_events_status = {}

def _fire_sdk_event(self, sdk_event, event_metadata):
_LOGGER.debug("EventsManager: Firing Sdk event %s", sdk_event)
notify_event = threading.Thread(target=self._events_delivery.deliver, args=[sdk_event, event_metadata, self._get_event_handler(sdk_event)],
name='SplitSDKEventNotify', daemon=True)
notify_event.start()
self._set_sdk_event_triggered(sdk_event)

def _event_already_triggered(self, sdk_event):
if self._active_subscriptions.get(sdk_event) != None:
return self._active_subscriptions.get(sdk_event).triggered
Expand Down
9 changes: 5 additions & 4 deletions splitio/storage/inmemmory.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,10 +547,11 @@ def update(self, to_add, to_delete, new_change_number):
to_notify = []
[to_notify.append(feature.name) for feature in to_add]
to_notify.extend(to_delete)
self._internal_event_queue.put(
SdkInternalEventNotification(
SdkInternalEvent.FLAGS_UPDATED,
EventsMetadata(SdkEventType.FLAG_UPDATE, set(to_notify))))
if len(to_notify) > 0:
self._internal_event_queue.put(
SdkInternalEventNotification(
SdkInternalEvent.FLAGS_UPDATED,
EventsMetadata(SdkEventType.FLAG_UPDATE, set(to_notify))))

def _put(self, feature_flag):
"""
Expand Down
5 changes: 0 additions & 5 deletions splitio/sync/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,6 @@ def start_periodic_data_recording(self):
for task in self._periodic_data_recording_tasks:
task.start()

if self._split_tasks.internal_events_task:
self._split_tasks.internal_events_task.start()

def stop_periodic_data_recording(self, blocking):
"""
Stop recorders.
Expand Down Expand Up @@ -883,8 +880,6 @@ def start_periodic_fetching(self):
self._split_tasks.split_task.start()
if self._split_tasks.segment_task is not None:
self._split_tasks.segment_task.start()
if self._split_tasks.internal_events_task:
self._split_tasks.internal_events_task.start()

def stop_periodic_fetching(self):
"""Stop fetchers for feature flags and segments."""
Expand Down
2 changes: 1 addition & 1 deletion splitio/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '10.5.1'
__version__ = '10.6.0'
178 changes: 177 additions & 1 deletion tests/integration/test_client_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from splitio.events.events_manager_config import EventsManagerConfig
from splitio.events.events_task import EventsTask
from splitio.models import splits, segments, rule_based_segments
from splitio.models.events import SdkEvent
from splitio.models.fallback_config import FallbackTreatmentsConfiguration, FallbackTreatmentCalculator
from splitio.models.fallback_treatment import FallbackTreatment
from splitio.recorder.recorder import StandardRecorder, PipelinedRecorder, StandardRecorderAsync, PipelinedRecorderAsync
Expand Down Expand Up @@ -2424,6 +2425,181 @@ def clear_cache(self):
for key in keys_to_delete:
redis_client.delete(key)

class InMemoryEventsNotificationTests(object):
"""Inmemory storage-based events notification tests."""

ready_flag = False
timeout_flag = False

def test_sdk_timeout_fire(self):
"""Prepare storages with test data."""
factory2 = get_factory('some_api_key')
client = factory2.client()
client.on(SdkEvent.SDK_READY_TIMED_OUT, self._timeout_callback)
try:
factory2.block_until_ready(1)
except Exception as e:
print(e)
pass

time.sleep(1)
assert self.timeout_flag

"""Shut down the factory."""
event = threading.Event()
factory2.destroy(event)
event.wait()

def test_sdk_ready(self):
"""Prepare storages with test data."""
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)

split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json')
with open(split_fn, 'r') as flo:
data = json.loads(flo.read())
for split in data['ff']['d']:
split_storage.update([splits.from_raw(split)], [], 0)

for rbs in data['rbs']['d']:
rb_segment_storage.update([rule_based_segments.from_raw(rbs)], [], 0)

segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentEmployeesChanges.json')
with open(segment_fn, 'r') as flo:
data = json.loads(flo.read())
segment_storage.put(segments.from_raw(data))

segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentHumanBeignsChanges.json')
with open(segment_fn, 'r') as flo:
data = json.loads(flo.read())
segment_storage.put(segments.from_raw(data))

telemetry_storage = InMemoryTelemetryStorage()
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()

storages = {
'splits': split_storage,
'segments': segment_storage,
'rule_based_segments': rb_segment_storage,
'impressions': InMemoryImpressionStorage(5000, telemetry_runtime_producer),
'events': InMemoryEventStorage(5000, telemetry_runtime_producer),
}
impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener
recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter=ImpressionsCounter())
events_manager = EventsManager(EventsManagerConfig(), EventsDelivery())
internal_events_task = EventsTask(events_manager.notify_internal_event, events_queue)

# Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception.
try:
factory = SplitFactory('some_api_key',
storages,
True,
recorder,
events_queue,
events_manager,
None,
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(),
fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')}))
) # pylint:disable=attribute-defined-outside-init
internal_events_task.start()
except:
pass

client = factory.client()
client.on(SdkEvent.SDK_READY, self._ready_callback)
factory.block_until_ready(5)
assert self.ready_flag

"""Shut down the factory."""
event = threading.Event()
factory.destroy(event)
event.wait()

def test_sdk_ready_fire_later(self):
"""Prepare storages with test data."""
events_queue = queue.Queue()
split_storage = InMemorySplitStorage(events_queue)
segment_storage = InMemorySegmentStorage(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)

split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json')
with open(split_fn, 'r') as flo:
data = json.loads(flo.read())
for split in data['ff']['d']:
split_storage.update([splits.from_raw(split)], [], 0)

for rbs in data['rbs']['d']:
rb_segment_storage.update([rule_based_segments.from_raw(rbs)], [], 0)

segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentEmployeesChanges.json')
with open(segment_fn, 'r') as flo:
data = json.loads(flo.read())
segment_storage.put(segments.from_raw(data))

segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentHumanBeignsChanges.json')
with open(segment_fn, 'r') as flo:
data = json.loads(flo.read())
segment_storage.put(segments.from_raw(data))

telemetry_storage = InMemoryTelemetryStorage()
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()

storages = {
'splits': split_storage,
'segments': segment_storage,
'rule_based_segments': rb_segment_storage,
'impressions': InMemoryImpressionStorage(5000, telemetry_runtime_producer),
'events': InMemoryEventStorage(5000, telemetry_runtime_producer),
}
impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener
recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter=ImpressionsCounter())
events_manager = EventsManager(EventsManagerConfig(), EventsDelivery())
internal_events_task = EventsTask(events_manager.notify_internal_event, events_queue)

# Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception.
try:
factory = SplitFactory('some_api_key',
storages,
True,
recorder,
events_queue,
events_manager,
None,
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(),
fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')}))
) # pylint:disable=attribute-defined-outside-init
internal_events_task.start()
except:
pass

client = factory.client()
factory.block_until_ready(5)

assert client.get_treatment('user1', 'sample_feature', evaluation_options=EvaluationOptions({"prop": "value"})) == 'on'

self.ready_flag = False
client.on(SdkEvent.SDK_READY, self._ready_callback)
assert self.ready_flag

"""Shut down the factory."""
event = threading.Event()
factory.destroy(event)
event.wait()

def _ready_callback(self, metadata):
self.ready_flag = True

def _timeout_callback(self, metadata):
self.timeout_flag = True

class InMemoryIntegrationAsyncTests(object):
"""Inmemory storage-based integration tests."""

Expand Down Expand Up @@ -4984,4 +5160,4 @@ async def _manager_methods_async(factory, skip_rbs=False):
return

assert len(await manager.split_names()) == 9
assert len(await manager.splits()) == 9
assert len(await manager.splits()) == 9
Loading