diff --git a/splitio/client/client.py b/splitio/client/client.py index 02bfbbb8..d4c37fa4 100644 --- a/splitio/client/client.py +++ b/splitio/client/client.py @@ -3,7 +3,7 @@ from splitio.engine.evaluator import Evaluator, CONTROL, EvaluationDataFactory, AsyncEvaluationDataFactory from splitio.engine.splitters import Splitter -from splitio.models.impressions import Impression, Label +from splitio.models.impressions import Impression, Label, ImpressionDecorated from splitio.models.events import Event, EventWrapper from splitio.models.telemetry import get_latency_bucket_index, MethodExceptionsAndLatencies from splitio.client import input_validator @@ -22,7 +22,8 @@ class ClientBase(object): # pylint: disable=too-many-instance-attributes 'impression': { 'label': Label.EXCEPTION, 'change_number': None, - } + }, + 'impressions_disabled': False } _NON_READY_EVAL_RESULT = { @@ -31,7 +32,8 @@ class ClientBase(object): # pylint: disable=too-many-instance-attributes 'impression': { 'label': Label.NOT_READY, 'change_number': None - } + }, + 'impressions_disabled': False } def __init__(self, factory, recorder, labels_enabled=True): @@ -116,14 +118,15 @@ def _validate_treatments_input(key, features, attributes, method): def _build_impression(self, key, bucketing, feature, result): """Build an impression based on evaluation data & it's result.""" - return Impression( - matching_key=key, + return ImpressionDecorated( + Impression(matching_key=key, feature_name=feature, treatment=result['treatment'], label=result['impression']['label'] if self._labels_enabled else None, change_number=result['impression']['change_number'], bucketing_key=bucketing, - time=utctime_ms()) + time=utctime_ms()), + disabled=result['impressions_disabled']) def _build_impressions(self, key, bucketing, results): """Build an impression based on evaluation data & it's result.""" @@ -296,8 +299,8 @@ def _get_treatment(self, method, key, feature, attributes=None): result = self._FAILED_EVAL_RESULT if result['impression']['label'] != Label.SPLIT_NOT_FOUND: - impression = self._build_impression(key, bucketing, feature, result) - self._record_stats([(impression, attributes)], start, method) + impression_decorated = self._build_impression(key, bucketing, feature, result) + self._record_stats([(impression_decorated, attributes)], start, method) return result['treatment'], result['configurations'] @@ -571,23 +574,23 @@ def _get_treatments(self, key, features, method, attributes=None): self._telemetry_evaluation_producer.record_exception(method) results = {n: self._FAILED_EVAL_RESULT for n in features} - imp_attrs = [ + imp_decorated_attrs = [ (i, attributes) for i in self._build_impressions(key, bucketing, results) - if i.label != Label.SPLIT_NOT_FOUND + if i.Impression.label != Label.SPLIT_NOT_FOUND ] - self._record_stats(imp_attrs, start, method) + self._record_stats(imp_decorated_attrs, start, method) return { feature: (results[feature]['treatment'], results[feature]['configurations']) for feature in results } - def _record_stats(self, impressions, start, operation): + def _record_stats(self, impressions_decorated, start, operation): """ Record impressions. - :param impressions: Generated impressions - :type impressions: list[tuple[splitio.models.impression.Impression, dict]] + :param impressions_decorated: Generated impressions + :type impressions_decorated: list[tuple[splitio.models.impression.ImpressionDecorated, dict]] :param start: timestamp when get_treatment or get_treatments was called :type start: int @@ -596,7 +599,7 @@ def _record_stats(self, impressions, start, operation): :type operation: str """ end = get_current_epoch_time_ms() - self._recorder.record_treatment_stats(impressions, get_latency_bucket_index(end - start), + self._recorder.record_treatment_stats(impressions_decorated, get_latency_bucket_index(end - start), operation, 'get_' + operation.value) def track(self, key, traffic_type, event_type, value=None, properties=None): @@ -763,8 +766,8 @@ async def _get_treatment(self, method, key, feature, attributes=None): result = self._FAILED_EVAL_RESULT if result['impression']['label'] != Label.SPLIT_NOT_FOUND: - impression = self._build_impression(key, bucketing, feature, result) - await self._record_stats([(impression, attributes)], start, method) + impression_decorated = self._build_impression(key, bucketing, feature, result) + await self._record_stats([(impression_decorated, attributes)], start, method) return result['treatment'], result['configurations'] async def get_treatments(self, key, feature_flag_names, attributes=None): @@ -960,23 +963,23 @@ async def _get_treatments(self, key, features, method, attributes=None): await self._telemetry_evaluation_producer.record_exception(method) results = {n: self._FAILED_EVAL_RESULT for n in features} - imp_attrs = [ + imp_decorated_attrs = [ (i, attributes) for i in self._build_impressions(key, bucketing, results) - if i.label != Label.SPLIT_NOT_FOUND + if i.Impression.label != Label.SPLIT_NOT_FOUND ] - await self._record_stats(imp_attrs, start, method) + await self._record_stats(imp_decorated_attrs, start, method) return { feature: (res['treatment'], res['configurations']) for feature, res in results.items() } - async def _record_stats(self, impressions, start, operation): + async def _record_stats(self, impressions_decorated, start, operation): """ Record impressions for async calls - :param impressions: Generated impressions - :type impressions: list[tuple[splitio.models.impression.Impression, dict]] + :param impressions_decorated: Generated impressions decorated + :type impressions_decorated: list[tuple[splitio.models.impression.Impression, dict]] :param start: timestamp when get_treatment or get_treatments was called :type start: int @@ -985,7 +988,7 @@ async def _record_stats(self, impressions, start, operation): :type operation: str """ end = get_current_epoch_time_ms() - await self._recorder.record_treatment_stats(impressions, get_latency_bucket_index(end - start), + await self._recorder.record_treatment_stats(impressions_decorated, get_latency_bucket_index(end - start), operation, 'get_' + operation.value) async def track(self, key, traffic_type, event_type, value=None, properties=None): diff --git a/splitio/client/factory.py b/splitio/client/factory.py index 8c3b7572..bb402bb5 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -13,7 +13,7 @@ from splitio.client.listener import ImpressionListenerWrapper, ImpressionListenerWrapperAsync from splitio.engine.impressions.impressions import Manager as ImpressionsManager from splitio.engine.impressions import set_classes, set_classes_async -from splitio.engine.impressions.strategies import StrategyDebugMode +from splitio.engine.impressions.strategies import StrategyDebugMode, StrategyNoneMode from splitio.engine.telemetry import TelemetryStorageProducer, TelemetryStorageConsumer, \ TelemetryStorageProducerAsync, TelemetryStorageConsumerAsync from splitio.engine.impressions.manager import Counter as ImpressionsCounter @@ -553,10 +553,10 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl unique_keys_tracker = UniqueKeysTracker(_UNIQUE_KEYS_CACHE_SIZE) unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \ clear_filter_task, impressions_count_sync, impressions_count_task, \ - imp_strategy = set_classes('MEMORY', cfg['impressionsMode'], apis, imp_counter, unique_keys_tracker) + imp_strategy, none_strategy = set_classes('MEMORY', cfg['impressionsMode'], apis, imp_counter, unique_keys_tracker) imp_manager = ImpressionsManager( - imp_strategy, telemetry_runtime_producer) + imp_strategy, none_strategy, telemetry_runtime_producer) synchronizers = SplitSynchronizers( SplitSynchronizer(apis['splits'], storages['splits']), @@ -681,10 +681,10 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url= unique_keys_tracker = UniqueKeysTrackerAsync(_UNIQUE_KEYS_CACHE_SIZE) unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \ clear_filter_task, impressions_count_sync, impressions_count_task, \ - imp_strategy = set_classes_async('MEMORY', cfg['impressionsMode'], apis, imp_counter, unique_keys_tracker) + imp_strategy, none_strategy = set_classes_async('MEMORY', cfg['impressionsMode'], apis, imp_counter, unique_keys_tracker) imp_manager = ImpressionsManager( - imp_strategy, telemetry_runtime_producer) + imp_strategy, none_strategy, telemetry_runtime_producer) synchronizers = SplitSynchronizers( SplitSynchronizerAsync(apis['splits'], storages['splits']), @@ -775,10 +775,10 @@ def _build_redis_factory(api_key, cfg): unique_keys_tracker = UniqueKeysTracker(_UNIQUE_KEYS_CACHE_SIZE) unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \ clear_filter_task, impressions_count_sync, impressions_count_task, \ - imp_strategy = set_classes('REDIS', cfg['impressionsMode'], redis_adapter, imp_counter, unique_keys_tracker) + imp_strategy, none_strategy = set_classes('REDIS', cfg['impressionsMode'], redis_adapter, imp_counter, unique_keys_tracker) imp_manager = ImpressionsManager( - imp_strategy, + imp_strategy, none_strategy, telemetry_runtime_producer) synchronizers = SplitSynchronizers(None, None, None, None, @@ -858,10 +858,10 @@ async def _build_redis_factory_async(api_key, cfg): unique_keys_tracker = UniqueKeysTrackerAsync(_UNIQUE_KEYS_CACHE_SIZE) unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \ clear_filter_task, impressions_count_sync, impressions_count_task, \ - imp_strategy = set_classes_async('REDIS', cfg['impressionsMode'], redis_adapter, imp_counter, unique_keys_tracker) + imp_strategy, none_strategy = set_classes_async('REDIS', cfg['impressionsMode'], redis_adapter, imp_counter, unique_keys_tracker) imp_manager = ImpressionsManager( - imp_strategy, + imp_strategy, none_strategy, telemetry_runtime_producer) synchronizers = SplitSynchronizers(None, None, None, None, @@ -936,10 +936,10 @@ def _build_pluggable_factory(api_key, cfg): unique_keys_tracker = UniqueKeysTracker(_UNIQUE_KEYS_CACHE_SIZE) unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \ clear_filter_task, impressions_count_sync, impressions_count_task, \ - imp_strategy = set_classes('PLUGGABLE', cfg['impressionsMode'], pluggable_adapter, imp_counter, unique_keys_tracker, storage_prefix) + imp_strategy, none_strategy = set_classes('PLUGGABLE', cfg['impressionsMode'], pluggable_adapter, imp_counter, unique_keys_tracker, storage_prefix) imp_manager = ImpressionsManager( - imp_strategy, + imp_strategy, none_strategy, telemetry_runtime_producer) synchronizers = SplitSynchronizers(None, None, None, None, @@ -1017,10 +1017,10 @@ async def _build_pluggable_factory_async(api_key, cfg): unique_keys_tracker = UniqueKeysTrackerAsync(_UNIQUE_KEYS_CACHE_SIZE) unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \ clear_filter_task, impressions_count_sync, impressions_count_task, \ - imp_strategy = set_classes_async('PLUGGABLE', cfg['impressionsMode'], pluggable_adapter, imp_counter, unique_keys_tracker, storage_prefix) + imp_strategy, none_strategy = set_classes_async('PLUGGABLE', cfg['impressionsMode'], pluggable_adapter, imp_counter, unique_keys_tracker, storage_prefix) imp_manager = ImpressionsManager( - imp_strategy, + imp_strategy, none_strategy, telemetry_runtime_producer) synchronizers = SplitSynchronizers(None, None, None, None, @@ -1123,7 +1123,7 @@ def _build_localhost_factory(cfg): manager.start() recorder = StandardRecorder( - ImpressionsManager(StrategyDebugMode(), telemetry_runtime_producer), + ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer), storages['events'], storages['impressions'], telemetry_evaluation_producer, @@ -1192,7 +1192,7 @@ async def _build_localhost_factory_async(cfg): await manager.start() recorder = StandardRecorderAsync( - ImpressionsManager(StrategyDebugMode(), telemetry_runtime_producer), + ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer), storages['events'], storages['impressions'], telemetry_evaluation_producer, diff --git a/splitio/engine/evaluator.py b/splitio/engine/evaluator.py index c6588c6f..f913ebba 100644 --- a/splitio/engine/evaluator.py +++ b/splitio/engine/evaluator.py @@ -67,7 +67,8 @@ def eval_with_context(self, key, bucketing, feature_name, attrs, ctx): 'impression': { 'label': label, 'change_number': _change_number - } + }, + 'impressions_disabled': feature.impressions_disabled if feature else None } def _treatment_for_flag(self, flag, key, bucketing, attributes, ctx): diff --git a/splitio/engine/impressions/__init__.py b/splitio/engine/impressions/__init__.py index 3e5ae13e..fdd84211 100644 --- a/splitio/engine/impressions/__init__.py +++ b/splitio/engine/impressions/__init__.py @@ -53,24 +53,24 @@ def set_classes(storage_mode, impressions_mode, api_adapter, imp_counter, unique api_impressions_adapter = api_adapter['impressions'] sender_adapter = InMemorySenderAdapter(api_telemetry_adapter) + none_strategy = StrategyNoneMode() + unique_keys_synchronizer = UniqueKeysSynchronizer(sender_adapter, unique_keys_tracker) + unique_keys_task = UniqueKeysSyncTask(unique_keys_synchronizer.send_all) + clear_filter_sync = ClearFilterSynchronizer(unique_keys_tracker) + impressions_count_sync = ImpressionsCountSynchronizer(api_impressions_adapter, imp_counter) + impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters) + clear_filter_task = ClearFilterSyncTask(clear_filter_sync.clear_all) + unique_keys_tracker.set_queue_full_hook(unique_keys_task.flush) + if impressions_mode == ImpressionsMode.NONE: imp_strategy = StrategyNoneMode() - unique_keys_synchronizer = UniqueKeysSynchronizer(sender_adapter, unique_keys_tracker) - unique_keys_task = UniqueKeysSyncTask(unique_keys_synchronizer.send_all) - clear_filter_sync = ClearFilterSynchronizer(unique_keys_tracker) - impressions_count_sync = ImpressionsCountSynchronizer(api_impressions_adapter, imp_counter) - impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters) - clear_filter_task = ClearFilterSyncTask(clear_filter_sync.clear_all) - unique_keys_tracker.set_queue_full_hook(unique_keys_task.flush) elif impressions_mode == ImpressionsMode.DEBUG: imp_strategy = StrategyDebugMode() else: imp_strategy = StrategyOptimizedMode() - impressions_count_sync = ImpressionsCountSynchronizer(api_impressions_adapter, imp_counter) - impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters) return unique_keys_synchronizer, clear_filter_sync, unique_keys_task, clear_filter_task, \ - impressions_count_sync, impressions_count_task, imp_strategy + impressions_count_sync, impressions_count_task, imp_strategy, none_strategy def set_classes_async(storage_mode, impressions_mode, api_adapter, imp_counter, unique_keys_tracker, prefix=None): """ @@ -118,21 +118,21 @@ def set_classes_async(storage_mode, impressions_mode, api_adapter, imp_counter, api_impressions_adapter = api_adapter['impressions'] sender_adapter = InMemorySenderAdapterAsync(api_telemetry_adapter) + none_strategy = StrategyNoneMode() + unique_keys_synchronizer = UniqueKeysSynchronizerAsync(sender_adapter, unique_keys_tracker) + unique_keys_task = UniqueKeysSyncTaskAsync(unique_keys_synchronizer.send_all) + clear_filter_sync = ClearFilterSynchronizerAsync(unique_keys_tracker) + impressions_count_sync = ImpressionsCountSynchronizerAsync(api_impressions_adapter, imp_counter) + impressions_count_task = ImpressionsCountSyncTaskAsync(impressions_count_sync.synchronize_counters) + clear_filter_task = ClearFilterSyncTaskAsync(clear_filter_sync.clear_all) + unique_keys_tracker.set_queue_full_hook(unique_keys_task.flush) + if impressions_mode == ImpressionsMode.NONE: imp_strategy = StrategyNoneMode() - unique_keys_synchronizer = UniqueKeysSynchronizerAsync(sender_adapter, unique_keys_tracker) - unique_keys_task = UniqueKeysSyncTaskAsync(unique_keys_synchronizer.send_all) - clear_filter_sync = ClearFilterSynchronizerAsync(unique_keys_tracker) - impressions_count_sync = ImpressionsCountSynchronizerAsync(api_impressions_adapter, imp_counter) - impressions_count_task = ImpressionsCountSyncTaskAsync(impressions_count_sync.synchronize_counters) - clear_filter_task = ClearFilterSyncTaskAsync(clear_filter_sync.clear_all) - unique_keys_tracker.set_queue_full_hook(unique_keys_task.flush) elif impressions_mode == ImpressionsMode.DEBUG: imp_strategy = StrategyDebugMode() else: imp_strategy = StrategyOptimizedMode() - impressions_count_sync = ImpressionsCountSynchronizerAsync(api_impressions_adapter, imp_counter) - impressions_count_task = ImpressionsCountSyncTaskAsync(impressions_count_sync.synchronize_counters) return unique_keys_synchronizer, clear_filter_sync, unique_keys_task, clear_filter_task, \ - impressions_count_sync, impressions_count_task, imp_strategy + impressions_count_sync, impressions_count_task, imp_strategy, none_strategy diff --git a/splitio/engine/impressions/impressions.py b/splitio/engine/impressions/impressions.py index 541e2f36..428fdd13 100644 --- a/splitio/engine/impressions/impressions.py +++ b/splitio/engine/impressions/impressions.py @@ -11,7 +11,7 @@ class ImpressionsMode(Enum): class Manager(object): # pylint:disable=too-few-public-methods """Impression manager.""" - def __init__(self, strategy, telemetry_runtime_producer): + def __init__(self, strategy, none_strategy, telemetry_runtime_producer): """ Construct a manger to track and forward impressions to the queue. @@ -23,19 +23,33 @@ def __init__(self, strategy, telemetry_runtime_producer): """ self._strategy = strategy + self._none_strategy = none_strategy self._telemetry_runtime_producer = telemetry_runtime_producer - def process_impressions(self, impressions): + def process_impressions(self, impressions_decorated): """ Process impressions. Impressions are analyzed to see if they've been seen before and counted. - :param impressions: List of impression objects with attributes - :type impressions: list[tuple[splitio.models.impression.Impression, dict]] + :param impressions_decorated: List of impression objects with attributes + :type impressions_decorated: list[tuple[splitio.models.impression.ImpressionDecorated, dict]] :return: processed and deduped impressions. :rtype: tuple(list[tuple[splitio.models.impression.Impression, dict]], list(int)) """ - for_log, for_listener, for_counter, for_unique_keys_tracker = self._strategy.process_impressions(impressions) - return for_log, len(impressions) - len(for_log), for_listener, for_counter, for_unique_keys_tracker + for_listener_all = [] + for_log_all = [] + for_counter_all = [] + for_unique_keys_tracker_all = [] + for impression_decorated, att in impressions_decorated: + if impression_decorated.disabled: + for_log, for_listener, for_counter, for_unique_keys_tracker = self._none_strategy.process_impressions([(impression_decorated.Impression, att)]) + else: + for_log, for_listener, for_counter, for_unique_keys_tracker = self._strategy.process_impressions([(impression_decorated.Impression, att)]) + for_listener_all.extend(for_listener) + for_log_all.extend(for_log) + for_counter_all.extend(for_counter) + for_unique_keys_tracker_all.extend(for_unique_keys_tracker) + + return for_log_all, len(impressions_decorated) - len(for_log_all), for_listener_all, for_counter_all, for_unique_keys_tracker_all diff --git a/splitio/models/impressions.py b/splitio/models/impressions.py index b08d31fb..9bdfb3a9 100644 --- a/splitio/models/impressions.py +++ b/splitio/models/impressions.py @@ -16,6 +16,14 @@ ] ) +ImpressionDecorated = namedtuple( + 'ImpressionDecorated', + [ + 'Impression', + 'disabled' + ] +) + # pre-python3.7 hack to make previous_time optional Impression.__new__.__defaults__ = (None,) diff --git a/splitio/models/splits.py b/splitio/models/splits.py index b5158ac5..92a277c4 100644 --- a/splitio/models/splits.py +++ b/splitio/models/splits.py @@ -10,7 +10,7 @@ SplitView = namedtuple( 'SplitView', - ['name', 'traffic_type', 'killed', 'treatments', 'change_number', 'configs', 'default_treatment', 'sets'] + ['name', 'traffic_type', 'killed', 'treatments', 'change_number', 'configs', 'default_treatment', 'sets', 'impressions_disabled'] ) _DEFAULT_CONDITIONS_TEMPLATE = { @@ -73,7 +73,8 @@ def __init__( # pylint: disable=too-many-arguments traffic_allocation=None, traffic_allocation_seed=None, configurations=None, - sets=None + sets=None, + impressions_disabled=None ): """ Class constructor. @@ -96,6 +97,8 @@ def __init__( # pylint: disable=too-many-arguments :type traffic_allocation_seed: int :pram sets: list of flag sets :type sets: list + :pram impressions_disabled: track impressions flag + :type impressions_disabled: boolean """ self._name = name self._seed = seed @@ -125,6 +128,7 @@ def __init__( # pylint: disable=too-many-arguments self._configurations = configurations self._sets = set(sets) if sets is not None else set() + self._impressions_disabled = impressions_disabled if impressions_disabled is not None else False @property def name(self): @@ -186,6 +190,11 @@ def sets(self): """Return the flag sets of the split.""" return self._sets + @property + def impressions_disabled(self): + """Return impressions_disabled of the split.""" + return self._impressions_disabled + def get_configurations_for(self, treatment): """Return the mapping of treatments to configurations.""" return self._configurations.get(treatment) if self._configurations else None @@ -214,7 +223,8 @@ def to_json(self): 'algo': self.algo.value, 'conditions': [c.to_json() for c in self.conditions], 'configurations': self._configurations, - 'sets': list(self._sets) + 'sets': list(self._sets), + 'impressionsDisabled': self._impressions_disabled } def to_split_view(self): @@ -232,7 +242,8 @@ def to_split_view(self): self.change_number, self._configurations if self._configurations is not None else {}, self._default_treatment, - list(self._sets) if self._sets is not None else [] + list(self._sets) if self._sets is not None else [], + self._impressions_disabled ) def local_kill(self, default_treatment, change_number): @@ -288,5 +299,6 @@ def from_raw(raw_split): traffic_allocation=raw_split.get('trafficAllocation'), traffic_allocation_seed=raw_split.get('trafficAllocationSeed'), configurations=raw_split.get('configurations'), - sets=set(raw_split.get('sets')) if raw_split.get('sets') is not None else [] + sets=set(raw_split.get('sets')) if raw_split.get('sets') is not None else [], + impressions_disabled=raw_split.get('impressionsDisabled') if raw_split.get('impressionsDisabled') is not None else False ) diff --git a/splitio/recorder/recorder.py b/splitio/recorder/recorder.py index 31a5a7db..4c0ec155 100644 --- a/splitio/recorder/recorder.py +++ b/splitio/recorder/recorder.py @@ -151,7 +151,7 @@ def __init__(self, impressions_manager, event_storage, impression_storage, telem self._telemetry_evaluation_producer = telemetry_evaluation_producer self._telemetry_runtime_producer = telemetry_runtime_producer - def record_treatment_stats(self, impressions, latency, operation, method_name): + def record_treatment_stats(self, impressions_decorated, latency, operation, method_name): """ Record stats for treatment evaluation. @@ -165,7 +165,7 @@ def record_treatment_stats(self, impressions, latency, operation, method_name): try: if method_name is not None: self._telemetry_evaluation_producer.record_latency(operation, latency) - impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions) + impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions_decorated) if deduped > 0: self._telemetry_runtime_producer.record_impression_stats(telemetry.CounterConstants.IMPRESSIONS_DEDUPED, deduped) self._impression_storage.put(impressions) @@ -210,7 +210,7 @@ def __init__(self, impressions_manager, event_storage, impression_storage, telem self._telemetry_evaluation_producer = telemetry_evaluation_producer self._telemetry_runtime_producer = telemetry_runtime_producer - async def record_treatment_stats(self, impressions, latency, operation, method_name): + async def record_treatment_stats(self, impressions_decorated, latency, operation, method_name): """ Record stats for treatment evaluation. @@ -224,7 +224,7 @@ async def record_treatment_stats(self, impressions, latency, operation, method_n try: if method_name is not None: await self._telemetry_evaluation_producer.record_latency(operation, latency) - impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions) + impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions_decorated) if deduped > 0: await self._telemetry_runtime_producer.record_impression_stats(telemetry.CounterConstants.IMPRESSIONS_DEDUPED, deduped) @@ -277,7 +277,7 @@ def __init__(self, pipe, impressions_manager, event_storage, self._data_sampling = data_sampling self._telemetry_redis_storage = telemetry_redis_storage - def record_treatment_stats(self, impressions, latency, operation, method_name): + def record_treatment_stats(self, impressions_decorated, latency, operation, method_name): """ Record stats for treatment evaluation. @@ -294,7 +294,7 @@ def record_treatment_stats(self, impressions, latency, operation, method_name): if self._data_sampling < rnumber: return - impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions) + impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions_decorated) if impressions: pipe = self._make_pipe() self._impression_storage.add_impressions_to_pipe(impressions, pipe) @@ -367,7 +367,7 @@ def __init__(self, pipe, impressions_manager, event_storage, self._data_sampling = data_sampling self._telemetry_redis_storage = telemetry_redis_storage - async def record_treatment_stats(self, impressions, latency, operation, method_name): + async def record_treatment_stats(self, impressions_decorated, latency, operation, method_name): """ Record stats for treatment evaluation. @@ -384,7 +384,7 @@ async def record_treatment_stats(self, impressions, latency, operation, method_n if self._data_sampling < rnumber: return - impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions) + impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions_decorated) if impressions: pipe = self._make_pipe() self._impression_storage.add_impressions_to_pipe(impressions, pipe) diff --git a/tests/client/test_client.py b/tests/client/test_client.py index 096df432..48a0fba2 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -17,10 +17,12 @@ InMemoryImpressionStorageAsync, InMemorySegmentStorageAsync, InMemoryTelemetryStorageAsync, InMemoryEventStorageAsync from splitio.models.splits import Split, Status, from_raw from splitio.engine.impressions.impressions import Manager as ImpressionManager +from splitio.engine.impressions.manager import Counter as ImpressionsCounter +from splitio.engine.impressions.unique_keys_tracker import UniqueKeysTracker, UniqueKeysTrackerAsync from splitio.engine.telemetry import TelemetryStorageConsumer, TelemetryStorageProducer, TelemetryStorageProducerAsync from splitio.engine.evaluator import Evaluator from splitio.recorder.recorder import StandardRecorder, StandardRecorderAsync -from splitio.engine.impressions.strategies import StrategyDebugMode +from splitio.engine.impressions.strategies import StrategyDebugMode, StrategyNoneMode, StrategyOptimizedMode from tests.integration import splits_json @@ -43,8 +45,10 @@ def test_get_treatment(self, mocker): mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) - recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) + recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer(), + unique_keys_tracker=UniqueKeysTracker(), + imp_counter=ImpressionsCounter()) class TelemetrySubmitterMock(): def synchronize_config(*_): pass @@ -61,7 +65,9 @@ def synchronize_config(*_): telemetry_producer.get_telemetry_init_producer(), TelemetrySubmitterMock(), ) - + ready_property = mocker.PropertyMock() + ready_property.return_value = True + type(factory).ready = ready_property factory.block_until_ready(5) split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0])], [], -1) @@ -74,6 +80,7 @@ def synchronize_config(*_): 'label': 'some_label', 'change_number': 123 }, + 'impressions_disabled': False } _logger = mocker.Mock() assert client.get_treatment('some_key', 'SPLIT_2') == 'on' @@ -84,6 +91,7 @@ def synchronize_config(*_): ready_property = mocker.PropertyMock() ready_property.return_value = False type(factory).ready = ready_property + # pytest.set_trace() assert client.get_treatment('some_key', 'SPLIT_2', {'some_attribute': 1}) == 'control' assert impression_storage.pop_many(100) == [Impression('some_key', 'SPLIT_2', 'control', Label.NOT_READY, None, None, 1000)] @@ -104,7 +112,7 @@ def test_get_treatment_with_config(self, mocker): segment_storage = InMemorySegmentStorage() telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) destroyed_property = mocker.PropertyMock() @@ -141,7 +149,8 @@ def synchronize_config(*_): 'impression': { 'label': 'some_label', 'change_number': 123 - } + }, + 'impressions_disabled': False } _logger = mocker.Mock() client._send_impression_to_listener = mocker.Mock() @@ -178,7 +187,7 @@ def test_get_treatments(self, mocker): segment_storage = InMemorySegmentStorage() telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0]), from_raw(splits_json['splitChange1_1']['splits'][1])], [], -1) @@ -215,7 +224,8 @@ def synchronize_config(*_): 'impression': { 'label': 'some_label', 'change_number': 123 - } + }, + 'impressions_disabled': False } client._evaluator.eval_many_with_context.return_value = { 'SPLIT_2': evaluation, @@ -223,7 +233,8 @@ def synchronize_config(*_): } _logger = mocker.Mock() client._send_impression_to_listener = mocker.Mock() - assert client.get_treatments('key', ['SPLIT_2', 'SPLIT_1']) == {'SPLIT_2': 'on', 'SPLIT_1': 'on'} + treatments = client.get_treatments('key', ['SPLIT_2', 'SPLIT_1']) + assert treatments == {'SPLIT_2': 'on', 'SPLIT_1': 'on'} impressions_called = impression_storage.pop_many(100) assert Impression('key', 'SPLIT_2', 'on', 'some_label', 123, None, 1000) in impressions_called @@ -254,7 +265,7 @@ def test_get_treatments_by_flag_set(self, mocker): segment_storage = InMemorySegmentStorage() telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0]), from_raw(splits_json['splitChange1_1']['splits'][1])], [], -1) @@ -291,7 +302,8 @@ def synchronize_config(*_): 'impression': { 'label': 'some_label', 'change_number': 123 - } + }, + 'impressions_disabled': False } client._evaluator.eval_many_with_context.return_value = { 'SPLIT_2': evaluation, @@ -330,7 +342,7 @@ def test_get_treatments_by_flag_sets(self, mocker): segment_storage = InMemorySegmentStorage() telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0]), from_raw(splits_json['splitChange1_1']['splits'][1])], [], -1) @@ -367,7 +379,8 @@ def synchronize_config(*_): 'impression': { 'label': 'some_label', 'change_number': 123 - } + }, + 'impressions_disabled': False } client._evaluator.eval_many_with_context.return_value = { 'SPLIT_2': evaluation, @@ -406,7 +419,7 @@ def test_get_treatments_with_config(self, mocker): segment_storage = InMemorySegmentStorage() telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0]), from_raw(splits_json['splitChange1_1']['splits'][1])], [], -1) @@ -442,7 +455,8 @@ def synchronize_config(*_): 'impression': { 'label': 'some_label', 'change_number': 123 - } + }, + 'impressions_disabled': False } client._evaluator.eval_many_with_context.return_value = { 'SPLIT_1': evaluation, @@ -486,7 +500,7 @@ def test_get_treatments_with_config_by_flag_set(self, mocker): segment_storage = InMemorySegmentStorage() telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0]), from_raw(splits_json['splitChange1_1']['splits'][1])], [], -1) @@ -522,7 +536,8 @@ def synchronize_config(*_): 'impression': { 'label': 'some_label', 'change_number': 123 - } + }, + 'impressions_disabled': False } client._evaluator.eval_many_with_context.return_value = { 'SPLIT_1': evaluation, @@ -563,7 +578,7 @@ def test_get_treatments_with_config_by_flag_sets(self, mocker): segment_storage = InMemorySegmentStorage() telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0]), from_raw(splits_json['splitChange1_1']['splits'][1])], [], -1) @@ -599,7 +614,8 @@ def synchronize_config(*_): 'impression': { 'label': 'some_label', 'change_number': 123 - } + }, + 'impressions_disabled': False } client._evaluator.eval_many_with_context.return_value = { 'SPLIT_1': evaluation, @@ -632,6 +648,182 @@ def _raise(*_): assert client.get_treatments_with_config_by_flag_sets('key', ['set_1']) == {'SPLIT_1': ('control', None), 'SPLIT_2': ('control', None)} factory.destroy() + def test_impression_toggle_optimized(self, mocker): + """Test get_treatment execution paths.""" + telemetry_storage = InMemoryTelemetryStorage() + telemetry_producer = TelemetryStorageProducer(telemetry_storage) + split_storage = InMemorySplitStorage() + segment_storage = InMemorySegmentStorage() + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) + event_storage = mocker.Mock(spec=EventStorage) + + destroyed_property = mocker.PropertyMock() + destroyed_property.return_value = False + + mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) + mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) + + impmanager = ImpressionManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_runtime_producer) + recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) + class TelemetrySubmitterMock(): + def synchronize_config(*_): + pass + + factory = SplitFactory(mocker.Mock(), + {'splits': split_storage, + 'segments': segment_storage, + 'impressions': impression_storage, + 'events': event_storage}, + mocker.Mock(), + recorder, + mocker.Mock(), + mocker.Mock(), + telemetry_producer, + telemetry_producer.get_telemetry_init_producer(), + TelemetrySubmitterMock(), + ) + + factory.block_until_ready(5) + + split_storage.update([ + from_raw(splits_json['splitChange1_1']['splits'][0]), + from_raw(splits_json['splitChange1_1']['splits'][1]), + from_raw(splits_json['splitChange1_1']['splits'][2]) + ], [], -1) + client = Client(factory, recorder, True) + assert client.get_treatment('some_key', 'SPLIT_1') == 'off' + assert client.get_treatment('some_key', 'SPLIT_2') == 'on' + assert client.get_treatment('some_key', 'SPLIT_3') == 'on' + + impressions = impression_storage.pop_many(100) + assert len(impressions) == 2 + + found1 = False + found2 = False + for impression in impressions: + if impression[1] == 'SPLIT_1': + found1 = True + if impression[1] == 'SPLIT_2': + found2 = True + assert found1 + assert found2 + factory.destroy() + + def test_impression_toggle_debug(self, mocker): + """Test get_treatment execution paths.""" + telemetry_storage = InMemoryTelemetryStorage() + telemetry_producer = TelemetryStorageProducer(telemetry_storage) + split_storage = InMemorySplitStorage() + segment_storage = InMemorySegmentStorage() + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) + event_storage = mocker.Mock(spec=EventStorage) + + destroyed_property = mocker.PropertyMock() + destroyed_property.return_value = False + + mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) + mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) + + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) + recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) + class TelemetrySubmitterMock(): + def synchronize_config(*_): + pass + + factory = SplitFactory(mocker.Mock(), + {'splits': split_storage, + 'segments': segment_storage, + 'impressions': impression_storage, + 'events': event_storage}, + mocker.Mock(), + recorder, + mocker.Mock(), + mocker.Mock(), + telemetry_producer, + telemetry_producer.get_telemetry_init_producer(), + TelemetrySubmitterMock(), + ) + + factory.block_until_ready(5) + + split_storage.update([ + from_raw(splits_json['splitChange1_1']['splits'][0]), + from_raw(splits_json['splitChange1_1']['splits'][1]), + from_raw(splits_json['splitChange1_1']['splits'][2]) + ], [], -1) + client = Client(factory, recorder, True) + assert client.get_treatment('some_key', 'SPLIT_1') == 'off' + assert client.get_treatment('some_key', 'SPLIT_2') == 'on' + assert client.get_treatment('some_key', 'SPLIT_3') == 'on' + + impressions = impression_storage.pop_many(100) + assert len(impressions) == 2 + + found1 = False + found2 = False + for impression in impressions: + if impression[1] == 'SPLIT_1': + found1 = True + if impression[1] == 'SPLIT_2': + found2 = True + assert found1 + assert found2 + factory.destroy() + + def test_impression_toggle_none(self, mocker): + """Test get_treatment execution paths.""" + telemetry_storage = InMemoryTelemetryStorage() + telemetry_producer = TelemetryStorageProducer(telemetry_storage) + split_storage = InMemorySplitStorage() + segment_storage = InMemorySegmentStorage() + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) + event_storage = mocker.Mock(spec=EventStorage) + + destroyed_property = mocker.PropertyMock() + destroyed_property.return_value = False + + mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) + mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) + non_strategy = StrategyNoneMode() + impmanager = ImpressionManager(non_strategy, non_strategy, telemetry_runtime_producer) + recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) + class TelemetrySubmitterMock(): + def synchronize_config(*_): + pass + + factory = SplitFactory(mocker.Mock(), + {'splits': split_storage, + 'segments': segment_storage, + 'impressions': impression_storage, + 'events': event_storage}, + mocker.Mock(), + recorder, + mocker.Mock(), + mocker.Mock(), + telemetry_producer, + telemetry_producer.get_telemetry_init_producer(), + TelemetrySubmitterMock(), + ) + + factory.block_until_ready(5) + + split_storage.update([ + from_raw(splits_json['splitChange1_1']['splits'][0]), + from_raw(splits_json['splitChange1_1']['splits'][1]), + from_raw(splits_json['splitChange1_1']['splits'][2]) + ], [], -1) + client = Client(factory, recorder, True) + assert client.get_treatment('some_key', 'SPLIT_1') == 'off' + assert client.get_treatment('some_key', 'SPLIT_2') == 'on' + assert client.get_treatment('some_key', 'SPLIT_3') == 'on' + + impressions = impression_storage.pop_many(100) + assert len(impressions) == 0 + factory.destroy() + @mock.patch('splitio.client.factory.SplitFactory.destroy') def test_destroy(self, mocker): """Test that destroy/destroyed calls are forwarded to the factory.""" @@ -717,7 +909,7 @@ def test_evaluations_before_running_post_fork(self, mocker): telemetry_producer = TelemetryStorageProducer(telemetry_storage) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) split_storage = InMemorySplitStorage() segment_storage = InMemorySegmentStorage() split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0])], [], -1) @@ -796,7 +988,7 @@ def test_telemetry_not_ready(self, mocker): telemetry_producer = TelemetryStorageProducer(telemetry_storage) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) split_storage = InMemorySplitStorage() segment_storage = InMemorySegmentStorage() split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0])], [], -1) @@ -930,7 +1122,7 @@ def test_telemetry_method_latency(self, mocker): telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) split_storage = InMemorySplitStorage() segment_storage = InMemorySegmentStorage() split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0])], [], -1) @@ -1049,7 +1241,7 @@ async def test_get_treatment_async(self, mocker): telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) recorder = StandardRecorderAsync(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) await split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0])], [], -1) @@ -1085,6 +1277,7 @@ async def synchronize_config(*_): 'label': 'some_label', 'change_number': 123 }, + 'impressions_disabled': False } _logger = mocker.Mock() assert await client.get_treatment('some_key', 'SPLIT_2') == 'on' @@ -1117,7 +1310,7 @@ async def test_get_treatment_with_config_async(self, mocker): telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) recorder = StandardRecorderAsync(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) await split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0])], [], -1) @@ -1153,7 +1346,8 @@ async def synchronize_config(*_): 'impression': { 'label': 'some_label', 'change_number': 123 - } + }, + 'impressions_disabled': False } _logger = mocker.Mock() client._send_impression_to_listener = mocker.Mock() @@ -1191,7 +1385,7 @@ async def test_get_treatments_async(self, mocker): telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) recorder = StandardRecorderAsync(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) await split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0]), from_raw(splits_json['splitChange1_1']['splits'][1])], [], -1) @@ -1227,7 +1421,8 @@ async def synchronize_config(*_): 'impression': { 'label': 'some_label', 'change_number': 123 - } + }, + 'impressions_disabled': False } client._evaluator.eval_many_with_context.return_value = { 'SPLIT_2': evaluation, @@ -1268,7 +1463,7 @@ async def test_get_treatments_by_flag_set_async(self, mocker): telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) recorder = StandardRecorderAsync(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) await split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0]), from_raw(splits_json['splitChange1_1']['splits'][1])], [], -1) @@ -1304,7 +1499,8 @@ async def synchronize_config(*_): 'impression': { 'label': 'some_label', 'change_number': 123 - } + }, + 'impressions_disabled': False } client._evaluator.eval_many_with_context.return_value = { 'SPLIT_2': evaluation, @@ -1345,7 +1541,7 @@ async def test_get_treatments_by_flag_sets_async(self, mocker): telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) recorder = StandardRecorderAsync(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) await split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0]), from_raw(splits_json['splitChange1_1']['splits'][1])], [], -1) @@ -1381,7 +1577,8 @@ async def synchronize_config(*_): 'impression': { 'label': 'some_label', 'change_number': 123 - } + }, + 'impressions_disabled': False } client._evaluator.eval_many_with_context.return_value = { 'SPLIT_2': evaluation, @@ -1422,7 +1619,7 @@ async def test_get_treatments_with_config(self, mocker): telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) recorder = StandardRecorderAsync(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) await split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0]), from_raw(splits_json['splitChange1_1']['splits'][1])], [], -1) @@ -1457,7 +1654,8 @@ async def synchronize_config(*_): 'impression': { 'label': 'some_label', 'change_number': 123 - } + }, + 'impressions_disabled': False } client._evaluator.eval_many_with_context.return_value = { 'SPLIT_1': evaluation, @@ -1503,7 +1701,7 @@ async def test_get_treatments_with_config_by_flag_set(self, mocker): telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) recorder = StandardRecorderAsync(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) await split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0]), from_raw(splits_json['splitChange1_1']['splits'][1])], [], -1) @@ -1538,7 +1736,8 @@ async def synchronize_config(*_): 'impression': { 'label': 'some_label', 'change_number': 123 - } + }, + 'impressions_disabled': False } client._evaluator.eval_many_with_context.return_value = { 'SPLIT_1': evaluation, @@ -1584,7 +1783,7 @@ async def test_get_treatments_with_config_by_flag_sets(self, mocker): telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = mocker.Mock(spec=EventStorage) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) recorder = StandardRecorderAsync(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) await split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0]), from_raw(splits_json['splitChange1_1']['splits'][1])], [], -1) @@ -1619,7 +1818,8 @@ async def synchronize_config(*_): 'impression': { 'label': 'some_label', 'change_number': 123 - } + }, + 'impressions_disabled': False } client._evaluator.eval_many_with_context.return_value = { 'SPLIT_1': evaluation, @@ -1655,6 +1855,173 @@ def _raise(*_): } await factory.destroy() + @pytest.mark.asyncio + async def test_impression_toggle_optimized(self, mocker): + """Test get_treatment execution paths.""" + telemetry_storage = await InMemoryTelemetryStorageAsync.create() + telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) + split_storage = InMemorySplitStorageAsync() + segment_storage = InMemorySegmentStorageAsync() + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) + event_storage = mocker.Mock(spec=EventStorage) + + destroyed_property = mocker.PropertyMock() + destroyed_property.return_value = False + + mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) + mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) + + impmanager = ImpressionManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_runtime_producer) + recorder = StandardRecorderAsync(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) + factory = SplitFactoryAsync(mocker.Mock(), + {'splits': split_storage, + 'segments': segment_storage, + 'impressions': impression_storage, + 'events': event_storage}, + mocker.Mock(), + recorder, + mocker.Mock(), + telemetry_producer, + telemetry_producer.get_telemetry_init_producer(), + mocker.Mock() + ) + + await factory.block_until_ready(5) + + await split_storage.update([ + from_raw(splits_json['splitChange1_1']['splits'][0]), + from_raw(splits_json['splitChange1_1']['splits'][1]), + from_raw(splits_json['splitChange1_1']['splits'][2]) + ], [], -1) + client = ClientAsync(factory, recorder, True) + treatment = await client.get_treatment('some_key', 'SPLIT_1') + assert treatment == 'off' + treatment = await client.get_treatment('some_key', 'SPLIT_2') + assert treatment == 'on' + treatment = await client.get_treatment('some_key', 'SPLIT_3') + assert treatment == 'on' + + impressions = await impression_storage.pop_many(100) + assert len(impressions) == 2 + + found1 = False + found2 = False + for impression in impressions: + if impression[1] == 'SPLIT_1': + found1 = True + if impression[1] == 'SPLIT_2': + found2 = True + assert found1 + assert found2 + await factory.destroy() + + @pytest.mark.asyncio + async def test_impression_toggle_debug(self, mocker): + """Test get_treatment execution paths.""" + telemetry_storage = await InMemoryTelemetryStorageAsync.create() + telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) + split_storage = InMemorySplitStorageAsync() + segment_storage = InMemorySegmentStorageAsync() + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) + event_storage = mocker.Mock(spec=EventStorage) + + destroyed_property = mocker.PropertyMock() + destroyed_property.return_value = False + + mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) + mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) + + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) + recorder = StandardRecorderAsync(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) + factory = SplitFactoryAsync(mocker.Mock(), + {'splits': split_storage, + 'segments': segment_storage, + 'impressions': impression_storage, + 'events': event_storage}, + mocker.Mock(), + recorder, + mocker.Mock(), + telemetry_producer, + telemetry_producer.get_telemetry_init_producer(), + mocker.Mock() + ) + + await factory.block_until_ready(5) + + await split_storage.update([ + from_raw(splits_json['splitChange1_1']['splits'][0]), + from_raw(splits_json['splitChange1_1']['splits'][1]), + from_raw(splits_json['splitChange1_1']['splits'][2]) + ], [], -1) + client = ClientAsync(factory, recorder, True) + assert await client.get_treatment('some_key', 'SPLIT_1') == 'off' + assert await client.get_treatment('some_key', 'SPLIT_2') == 'on' + assert await client.get_treatment('some_key', 'SPLIT_3') == 'on' + + impressions = await impression_storage.pop_many(100) + assert len(impressions) == 2 + + found1 = False + found2 = False + for impression in impressions: + if impression[1] == 'SPLIT_1': + found1 = True + if impression[1] == 'SPLIT_2': + found2 = True + assert found1 + assert found2 + await factory.destroy() + + @pytest.mark.asyncio + async def test_impression_toggle_none(self, mocker): + """Test get_treatment execution paths.""" + telemetry_storage = await InMemoryTelemetryStorageAsync.create() + telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) + split_storage = InMemorySplitStorageAsync() + segment_storage = InMemorySegmentStorageAsync() + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) + event_storage = mocker.Mock(spec=EventStorage) + + destroyed_property = mocker.PropertyMock() + destroyed_property.return_value = False + + mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) + mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) + non_strategy = StrategyNoneMode() + impmanager = ImpressionManager(non_strategy, non_strategy, telemetry_runtime_producer) + recorder = StandardRecorderAsync(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) + factory = SplitFactoryAsync(mocker.Mock(), + {'splits': split_storage, + 'segments': segment_storage, + 'impressions': impression_storage, + 'events': event_storage}, + mocker.Mock(), + recorder, + mocker.Mock(), + telemetry_producer, + telemetry_producer.get_telemetry_init_producer(), + mocker.Mock() + ) + + await factory.block_until_ready(5) + + await split_storage.update([ + from_raw(splits_json['splitChange1_1']['splits'][0]), + from_raw(splits_json['splitChange1_1']['splits'][1]), + from_raw(splits_json['splitChange1_1']['splits'][2]) + ], [], -1) + client = ClientAsync(factory, recorder, True) + assert await client.get_treatment('some_key', 'SPLIT_1') == 'off' + assert await client.get_treatment('some_key', 'SPLIT_2') == 'on' + assert await client.get_treatment('some_key', 'SPLIT_3') == 'on' + + impressions = await impression_storage.pop_many(100) + assert len(impressions) == 0 + await factory.destroy() + @pytest.mark.asyncio async def test_track_async(self, mocker): """Test that destroy/destroyed calls are forwarded to the factory.""" @@ -1712,7 +2079,7 @@ async def test_telemetry_not_ready_async(self, mocker): telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = InMemoryEventStorageAsync(10, telemetry_runtime_producer) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) recorder = StandardRecorderAsync(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) await split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0])], [], -1) factory = SplitFactoryAsync('localhost', @@ -1753,7 +2120,7 @@ async def test_telemetry_record_treatment_exception_async(self, mocker): telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = InMemoryEventStorageAsync(10, telemetry_runtime_producer) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) recorder = StandardRecorderAsync(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) await split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0])], [], -1) destroyed_property = mocker.PropertyMock() @@ -1825,7 +2192,7 @@ async def test_telemetry_method_latency_async(self, mocker): telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() impression_storage = InMemoryImpressionStorageAsync(10, telemetry_runtime_producer) event_storage = InMemoryEventStorageAsync(10, telemetry_runtime_producer) - impmanager = ImpressionManager(StrategyDebugMode(), telemetry_runtime_producer) + impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) recorder = StandardRecorderAsync(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer()) await split_storage.update([from_raw(splits_json['splitChange1_1']['splits'][0])], [], -1) destroyed_property = mocker.PropertyMock() diff --git a/tests/client/test_factory.py b/tests/client/test_factory.py index e3bcd092..fbe499d6 100644 --- a/tests/client/test_factory.py +++ b/tests/client/test_factory.py @@ -941,6 +941,6 @@ async def _make_factory_with_apikey(apikey, *_, **__): factory = await get_factory_async("none", config=config) await factory.destroy() - await asyncio.sleep(0.1) + await asyncio.sleep(0.5) assert factory.destroyed assert len(build_redis.mock_calls) == 2 diff --git a/tests/engine/test_evaluator.py b/tests/engine/test_evaluator.py index b56b7040..67c7387d 100644 --- a/tests/engine/test_evaluator.py +++ b/tests/engine/test_evaluator.py @@ -52,6 +52,7 @@ def test_evaluate_treatment_ok(self, mocker): assert result['impression']['change_number'] == 123 assert result['impression']['label'] == 'some_label' assert mocked_split.get_configurations_for.mock_calls == [mocker.call('on')] + assert result['impressions_disabled'] == mocked_split.impressions_disabled def test_evaluate_treatment_ok_no_config(self, mocker): diff --git a/tests/engine/test_impressions.py b/tests/engine/test_impressions.py index d736829b..b9f6a607 100644 --- a/tests/engine/test_impressions.py +++ b/tests/engine/test_impressions.py @@ -5,7 +5,7 @@ from splitio.engine.impressions.impressions import Manager, ImpressionsMode from splitio.engine.impressions.manager import Hasher, Observer, Counter, truncate_time from splitio.engine.impressions.strategies import StrategyDebugMode, StrategyOptimizedMode, StrategyNoneMode -from splitio.models.impressions import Impression +from splitio.models.impressions import Impression, ImpressionDecorated from splitio.client.listener import ImpressionListenerWrapper import splitio.models.telemetry as ModelTelemetry from splitio.engine.telemetry import TelemetryStorageProducer @@ -105,14 +105,15 @@ def test_standalone_optimized(self, mocker): telemetry_producer = TelemetryStorageProducer(telemetry_storage) telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() - manager = Manager(StrategyOptimizedMode(), telemetry_runtime_producer) # no listener + manager = Manager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener assert manager._strategy._observer is not None assert isinstance(manager._strategy, StrategyOptimizedMode) + assert isinstance(manager._none_strategy, StrategyNoneMode) # An impression that hasn't happened in the last hour (pt = None) should be tracked imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), - (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), False), None), + (ImpressionDecorated(Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), False), None) ]) assert for_unique_keys_tracker == [] @@ -122,7 +123,7 @@ def test_standalone_optimized(self, mocker): # Tracking the same impression a ms later should be empty imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), False), None) ]) assert imps == [] assert deduped == 1 @@ -130,7 +131,7 @@ def test_standalone_optimized(self, mocker): # Tracking an impression with a different key makes it to the queue imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None) + (ImpressionDecorated(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), False), None) ]) assert imps == [Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)] assert deduped == 0 @@ -143,8 +144,8 @@ def test_standalone_optimized(self, mocker): # Track the same impressions but "one hour later" imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), - (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), False), None), + (ImpressionDecorated(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), False), None) ]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1, old_utc-3), Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, old_utc-1)] @@ -157,14 +158,14 @@ def test_standalone_optimized(self, mocker): # Test counting only from the second impression imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k3', 'f3', 'on', 'l1', 123, None, utc_now-1), None) + (ImpressionDecorated(Impression('k3', 'f3', 'on', 'l1', 123, None, utc_now-1), False), None) ]) assert for_counter == [] assert deduped == 0 assert for_unique_keys_tracker == [] imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k3', 'f3', 'on', 'l1', 123, None, utc_now-1), None) + (ImpressionDecorated(Impression('k3', 'f3', 'on', 'l1', 123, None, utc_now-1), False), None) ]) assert for_counter == [Impression('k3', 'f3', 'on', 'l1', 123, None, utc_now-1, utc_now-1)] assert deduped == 1 @@ -179,14 +180,14 @@ def test_standalone_debug(self, mocker): utc_time_mock.return_value = utc_now mocker.patch('splitio.engine.impressions.strategies.utctime_ms', return_value=utc_time_mock()) - manager = Manager(StrategyDebugMode(), mocker.Mock()) # no listener + manager = Manager(StrategyDebugMode(), StrategyNoneMode(), mocker.Mock()) # no listener assert manager._strategy._observer is not None assert isinstance(manager._strategy, StrategyDebugMode) # An impression that hasn't happened in the last hour (pt = None) should be tracked imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), - (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), False), None), + (ImpressionDecorated(Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), False), None) ]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] @@ -195,7 +196,7 @@ def test_standalone_debug(self, mocker): # Tracking the same impression a ms later should return the impression imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), False), None) ]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2, utc_now-3)] assert for_counter == [] @@ -203,7 +204,7 @@ def test_standalone_debug(self, mocker): # Tracking a in impression with a different key makes it to the queue imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None) + (ImpressionDecorated(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), False), None) ]) assert imps == [Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)] assert for_counter == [] @@ -217,8 +218,8 @@ def test_standalone_debug(self, mocker): # Track the same impressions but "one hour later" imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), - (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), False), None), + (ImpressionDecorated(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), False), None) ]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1, old_utc-3), Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, old_utc-1)] @@ -236,13 +237,13 @@ def test_standalone_none(self, mocker): utc_time_mock.return_value = utc_now mocker.patch('splitio.engine.impressions.strategies.utctime_ms', return_value=utc_time_mock()) - manager = Manager(StrategyNoneMode(), mocker.Mock()) # no listener + manager = Manager(StrategyNoneMode(), StrategyNoneMode(), mocker.Mock()) # no listener assert isinstance(manager._strategy, StrategyNoneMode) # no impressions are tracked, only counter and mtk imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), - (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), False), None), + (ImpressionDecorated(Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), False), None) ]) assert imps == [] assert for_counter == [ @@ -253,13 +254,13 @@ def test_standalone_none(self, mocker): # Tracking the same impression a ms later should not return the impression and no change on mtk cache imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), False), None) ]) assert imps == [] # Tracking an impression with a different key, will only increase mtk imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k3', 'f1', 'on', 'l1', 123, None, utc_now-1), None) + (ImpressionDecorated(Impression('k3', 'f1', 'on', 'l1', 123, None, utc_now-1), False), None) ]) assert imps == [] assert for_unique_keys_tracker == [('k3', 'f1')] @@ -275,8 +276,8 @@ def test_standalone_none(self, mocker): # Track the same impressions but "one hour later", no changes on mtk imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), - (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), False), None), + (ImpressionDecorated(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), False), None) ]) assert imps == [] assert for_counter == [ @@ -294,14 +295,14 @@ def test_standalone_optimized_listener(self, mocker): # mocker.patch('splitio.util.time.utctime_ms', return_value=utc_time_mock) mocker.patch('splitio.engine.impressions.strategies.utctime_ms', return_value=utc_time_mock()) - manager = Manager(StrategyOptimizedMode(), mocker.Mock()) + manager = Manager(StrategyOptimizedMode(), StrategyNoneMode(), mocker.Mock()) assert manager._strategy._observer is not None assert isinstance(manager._strategy, StrategyOptimizedMode) # An impression that hasn't happened in the last hour (pt = None) should be tracked imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), - (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), False), None), + (ImpressionDecorated(Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), False), None) ]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] @@ -312,7 +313,7 @@ def test_standalone_optimized_listener(self, mocker): # Tracking the same impression a ms later should return empty imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), False), None) ]) assert imps == [] assert deduped == 1 @@ -321,7 +322,7 @@ def test_standalone_optimized_listener(self, mocker): # Tracking a in impression with a different key makes it to the queue imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None) + (ImpressionDecorated(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), False), None) ]) assert imps == [Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)] assert deduped == 0 @@ -336,8 +337,8 @@ def test_standalone_optimized_listener(self, mocker): # Track the same impressions but "one hour later" imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), - (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), False), None), + (ImpressionDecorated(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), False), None) ]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1, old_utc-3), Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, old_utc-1)] @@ -355,14 +356,14 @@ def test_standalone_optimized_listener(self, mocker): # Test counting only from the second impression imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k3', 'f3', 'on', 'l1', 123, None, utc_now-1), None) + (ImpressionDecorated(Impression('k3', 'f3', 'on', 'l1', 123, None, utc_now-1), False), None) ]) assert for_counter == [] assert deduped == 0 assert for_unique_keys_tracker == [] imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k3', 'f3', 'on', 'l1', 123, None, utc_now-1), None) + (ImpressionDecorated(Impression('k3', 'f3', 'on', 'l1', 123, None, utc_now-1), False), None) ]) assert for_counter == [ Impression('k3', 'f3', 'on', 'l1', 123, None, utc_now-1, utc_now-1) @@ -381,13 +382,13 @@ def test_standalone_debug_listener(self, mocker): imps = [] listener = mocker.Mock(spec=ImpressionListenerWrapper) - manager = Manager(StrategyDebugMode(), mocker.Mock()) + manager = Manager(StrategyDebugMode(), StrategyNoneMode(), mocker.Mock()) assert isinstance(manager._strategy, StrategyDebugMode) # An impression that hasn't happened in the last hour (pt = None) should be tracked imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), - (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), False), None), + (ImpressionDecorated(Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), False), None) ]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] @@ -397,7 +398,7 @@ def test_standalone_debug_listener(self, mocker): # Tracking the same impression a ms later should return the imp imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), False), None) ]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2, utc_now-3)] assert listen == [(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2, utc_now-3), None)] @@ -406,7 +407,7 @@ def test_standalone_debug_listener(self, mocker): # Tracking a in impression with a different key makes it to the queue imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None) + (ImpressionDecorated(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), False), None) ]) assert imps == [Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1)] assert listen == [(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)] @@ -421,8 +422,8 @@ def test_standalone_debug_listener(self, mocker): # Track the same impressions but "one hour later" imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), - (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), False), None), + (ImpressionDecorated(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), False), None) ]) assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1, old_utc-3), Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, old_utc-1)] @@ -443,13 +444,13 @@ def test_standalone_none_listener(self, mocker): utc_time_mock.return_value = utc_now mocker.patch('splitio.engine.impressions.strategies.utctime_ms', return_value=utc_time_mock()) - manager = Manager(StrategyNoneMode(), mocker.Mock()) + manager = Manager(StrategyNoneMode(), StrategyNoneMode(), mocker.Mock()) assert isinstance(manager._strategy, StrategyNoneMode) # An impression that hasn't happened in the last hour (pt = None) should not be tracked imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), - (Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), False), None), + (ImpressionDecorated(Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), False), None) ]) assert imps == [] assert listen == [(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None), @@ -461,7 +462,7 @@ def test_standalone_none_listener(self, mocker): # Tracking the same impression a ms later should return empty, no updates on mtk imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), False), None) ]) assert imps == [] assert listen == [(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2, None), None)] @@ -470,7 +471,7 @@ def test_standalone_none_listener(self, mocker): # Tracking a in impression with a different key update mtk imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None) + (ImpressionDecorated(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), False), None) ]) assert imps == [] assert listen == [(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-1), None)] @@ -485,8 +486,8 @@ def test_standalone_none_listener(self, mocker): # Track the same impressions but "one hour later" imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ - (Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), None), - (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), None) + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), False), None), + (ImpressionDecorated(Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2), False), None) ]) assert imps == [] assert for_counter == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1), @@ -496,3 +497,80 @@ def test_standalone_none_listener(self, mocker): (Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, None), None) ] assert for_unique_keys_tracker == [('k1', 'f1'), ('k2', 'f1')] + + def test_impression_toggle_optimized(self, mocker): + """Test impressions manager in optimized mode with sdk in standalone mode.""" + + # Mock utc_time function to be able to play with the clock + utc_now = truncate_time(utctime_ms_reimplement()) + 1800 * 1000 + utc_time_mock = mocker.Mock() + utc_time_mock.return_value = utc_now + mocker.patch('splitio.engine.impressions.strategies.utctime_ms', return_value=utc_time_mock()) + telemetry_storage = InMemoryTelemetryStorage() + telemetry_producer = TelemetryStorageProducer(telemetry_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + + manager = Manager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + assert manager._strategy._observer is not None + assert isinstance(manager._strategy, StrategyOptimizedMode) + assert isinstance(manager._none_strategy, StrategyNoneMode) + + # An impression that hasn't happened in the last hour (pt = None) should be tracked + imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), True), None), + (ImpressionDecorated(Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), False), None) + ]) + + assert for_unique_keys_tracker == [('k1', 'f1')] + assert imps == [Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] + assert deduped == 1 + + def test_impression_toggle_debug(self, mocker): + """Test impressions manager in optimized mode with sdk in standalone mode.""" + + # Mock utc_time function to be able to play with the clock + utc_now = truncate_time(utctime_ms_reimplement()) + 1800 * 1000 + utc_time_mock = mocker.Mock() + utc_time_mock.return_value = utc_now + mocker.patch('splitio.engine.impressions.strategies.utctime_ms', return_value=utc_time_mock()) + telemetry_storage = InMemoryTelemetryStorage() + telemetry_producer = TelemetryStorageProducer(telemetry_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + + manager = Manager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + assert manager._strategy._observer is not None + + # An impression that hasn't happened in the last hour (pt = None) should be tracked + imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), True), None), + (ImpressionDecorated(Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), False), None) + ]) + + assert for_unique_keys_tracker == [('k1', 'f1')] + assert imps == [Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)] + assert deduped == 1 + + def test_impression_toggle_none(self, mocker): + """Test impressions manager in optimized mode with sdk in standalone mode.""" + + # Mock utc_time function to be able to play with the clock + utc_now = truncate_time(utctime_ms_reimplement()) + 1800 * 1000 + utc_time_mock = mocker.Mock() + utc_time_mock.return_value = utc_now + mocker.patch('splitio.engine.impressions.strategies.utctime_ms', return_value=utc_time_mock()) + telemetry_storage = InMemoryTelemetryStorage() + telemetry_producer = TelemetryStorageProducer(telemetry_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + + strategy = StrategyNoneMode() + manager = Manager(strategy, strategy, telemetry_runtime_producer) # no listener + + # An impression that hasn't happened in the last hour (pt = None) should be tracked + imps, deduped, listen, for_counter, for_unique_keys_tracker = manager.process_impressions([ + (ImpressionDecorated(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), True), None), + (ImpressionDecorated(Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), False), None) + ]) + + assert for_unique_keys_tracker == [('k1', 'f1'), ('k1', 'f2')] + assert imps == [] + assert deduped == 2 diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index b3ecce57..ee2475df 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -1,6 +1,13 @@ -split11 = {"splits": [{"trafficTypeName": "user", "name": "SPLIT_2","trafficAllocation": 100,"trafficAllocationSeed": 1057590779, "seed": -113875324, "status": "ACTIVE","killed": False, "defaultTreatment": "off", "changeNumber": 1675443569027,"algo": 2, "configurations": {},"conditions": [{"conditionType": "ROLLOUT","matcherGroup": {"combiner": "AND","matchers": [{"keySelector": { "trafficType": "user", "attribute": None },"matcherType": "ALL_KEYS","negate": False,"userDefinedSegmentMatcherData": None,"whitelistMatcherData": None,"unaryNumericMatcherData": None,"betweenMatcherData": None,"booleanMatcherData": None,"dependencyMatcherData": None,"stringMatcherData": None}]},"partitions": [{ "treatment": "on", "size": 100 },{ "treatment": "off", "size": 0 }],"label": "default rule"}], "sets": ["set_1"]},{"trafficTypeName": "user", "name": "SPLIT_1", "trafficAllocation": 100, "trafficAllocationSeed": -1780071202,"seed": -1442762199, "status": "ACTIVE","killed": False, "defaultTreatment": "off", "changeNumber": 1675443537882,"algo": 2, "configurations": {},"conditions": [{"conditionType": "ROLLOUT", "matcherGroup": {"combiner": "AND","matchers": [{"keySelector": { "trafficType": "user", "attribute": None },"matcherType": "ALL_KEYS","negate": False,"userDefinedSegmentMatcherData": None,"whitelistMatcherData": None,"unaryNumericMatcherData": None,"betweenMatcherData": None,"booleanMatcherData": None,"dependencyMatcherData": None,"stringMatcherData": None}]},"partitions": [{ "treatment": "on", "size": 0 },{ "treatment": "off", "size": 100 }],"label": "default rule"}], "sets": ["set_1", "set_2"]}],"since": -1,"till": 1675443569027} +split11 = {"splits": [ + {"trafficTypeName": "user", "name": "SPLIT_2","trafficAllocation": 100,"trafficAllocationSeed": 1057590779, "seed": -113875324, "status": "ACTIVE","killed": False, "defaultTreatment": "off", "changeNumber": 1675443569027,"algo": 2, "configurations": {},"conditions": [{"conditionType": "ROLLOUT","matcherGroup": {"combiner": "AND","matchers": [{"keySelector": { "trafficType": "user", "attribute": None },"matcherType": "ALL_KEYS","negate": False,"userDefinedSegmentMatcherData": None,"whitelistMatcherData": None,"unaryNumericMatcherData": None,"betweenMatcherData": None,"booleanMatcherData": None,"dependencyMatcherData": None,"stringMatcherData": None}]},"partitions": [{ "treatment": "on", "size": 100 },{ "treatment": "off", "size": 0 }],"label": "default rule"}], "sets": ["set_1"], "impressionsDisabled": False}, + {"trafficTypeName": "user", "name": "SPLIT_1", "trafficAllocation": 100, "trafficAllocationSeed": -1780071202,"seed": -1442762199, "status": "ACTIVE","killed": False, "defaultTreatment": "off", "changeNumber": 1675443537882,"algo": 2, "configurations": {},"conditions": [{"conditionType": "ROLLOUT", "matcherGroup": {"combiner": "AND","matchers": [{"keySelector": { "trafficType": "user", "attribute": None },"matcherType": "ALL_KEYS","negate": False,"userDefinedSegmentMatcherData": None,"whitelistMatcherData": None,"unaryNumericMatcherData": None,"betweenMatcherData": None,"booleanMatcherData": None,"dependencyMatcherData": None,"stringMatcherData": None}]},"partitions": [{ "treatment": "on", "size": 0 },{ "treatment": "off", "size": 100 }],"label": "default rule"}], "sets": ["set_1", "set_2"]}, + {"trafficTypeName": "user", "name": "SPLIT_3","trafficAllocation": 100,"trafficAllocationSeed": 1057590779, "seed": -113875324, "status": "ACTIVE","killed": False, "defaultTreatment": "off", "changeNumber": 1675443569027,"algo": 2, "configurations": {},"conditions": [{"conditionType": "ROLLOUT","matcherGroup": {"combiner": "AND","matchers": [{"keySelector": { "trafficType": "user", "attribute": None },"matcherType": "ALL_KEYS","negate": False,"userDefinedSegmentMatcherData": None,"whitelistMatcherData": None,"unaryNumericMatcherData": None,"betweenMatcherData": None,"booleanMatcherData": None,"dependencyMatcherData": None,"stringMatcherData": None}]},"partitions": [{ "treatment": "on", "size": 100 },{ "treatment": "off", "size": 0 }],"label": "default rule"}], "sets": ["set_1"], "impressionsDisabled": True} + ],"since": -1,"till": 1675443569027} split12 = {"splits": [{"trafficTypeName": "user","name": "SPLIT_2","trafficAllocation": 100,"trafficAllocationSeed": 1057590779,"seed": -113875324,"status": "ACTIVE","killed": True,"defaultTreatment": "off","changeNumber": 1675443767288,"algo": 2,"configurations": {},"conditions": [{"conditionType": "ROLLOUT","matcherGroup": {"combiner": "AND","matchers": [{"keySelector": { "trafficType": "user", "attribute": None },"matcherType": "ALL_KEYS","negate": False,"userDefinedSegmentMatcherData": None,"whitelistMatcherData": None,"unaryNumericMatcherData": None,"betweenMatcherData": None,"booleanMatcherData": None,"dependencyMatcherData": None,"stringMatcherData": None}]},"partitions": [{ "treatment": "on", "size": 100 },{ "treatment": "off", "size": 0 }],"label": "default rule"}]}],"since": 1675443569027,"till": 167544376728} -split13 = {"splits": [{"trafficTypeName": "user","name": "SPLIT_1","trafficAllocation": 100,"trafficAllocationSeed": -1780071202,"seed": -1442762199,"status": "ARCHIVED","killed": False,"defaultTreatment": "off","changeNumber": 1675443984594,"algo": 2,"configurations": {},"conditions": [{"conditionType": "ROLLOUT","matcherGroup": {"combiner": "AND","matchers": [{"keySelector": { "trafficType": "user", "attribute": None },"matcherType": "ALL_KEYS","negate": False,"userDefinedSegmentMatcherData": None,"whitelistMatcherData": None,"unaryNumericMatcherData": None,"betweenMatcherData": None,"booleanMatcherData": None,"dependencyMatcherData": None,"stringMatcherData": None}]},"partitions": [{ "treatment": "on", "size": 0 },{ "treatment": "off", "size": 100 }],"label": "default rule"}]},{"trafficTypeName": "user","name": "SPLIT_2","trafficAllocation": 100,"trafficAllocationSeed": 1057590779,"seed": -113875324,"status": "ACTIVE","killed": False,"defaultTreatment": "off","changeNumber": 1675443954220,"algo": 2,"configurations": {},"conditions": [{"conditionType": "ROLLOUT","matcherGroup": {"combiner": "AND","matchers": [{"keySelector": { "trafficType": "user", "attribute": None },"matcherType": "ALL_KEYS","negate": False,"userDefinedSegmentMatcherData": None,"whitelistMatcherData": None,"unaryNumericMatcherData": None,"betweenMatcherData": None,"booleanMatcherData": None,"dependencyMatcherData": None,"stringMatcherData": None}]},"partitions": [{ "treatment": "on", "size": 100 },{ "treatment": "off", "size": 0 }],"label": "default rule"}]}],"since": 1675443767288,"till": 1675443984594} +split13 = {"splits": [ + {"trafficTypeName": "user","name": "SPLIT_1","trafficAllocation": 100,"trafficAllocationSeed": -1780071202,"seed": -1442762199,"status": "ARCHIVED","killed": False,"defaultTreatment": "off","changeNumber": 1675443984594,"algo": 2,"configurations": {},"conditions": [{"conditionType": "ROLLOUT","matcherGroup": {"combiner": "AND","matchers": [{"keySelector": { "trafficType": "user", "attribute": None },"matcherType": "ALL_KEYS","negate": False,"userDefinedSegmentMatcherData": None,"whitelistMatcherData": None,"unaryNumericMatcherData": None,"betweenMatcherData": None,"booleanMatcherData": None,"dependencyMatcherData": None,"stringMatcherData": None}]},"partitions": [{ "treatment": "on", "size": 0 },{ "treatment": "off", "size": 100 }],"label": "default rule"}]}, + {"trafficTypeName": "user","name": "SPLIT_2","trafficAllocation": 100,"trafficAllocationSeed": 1057590779,"seed": -113875324,"status": "ACTIVE","killed": False,"defaultTreatment": "off","changeNumber": 1675443954220,"algo": 2,"configurations": {},"conditions": [{"conditionType": "ROLLOUT","matcherGroup": {"combiner": "AND","matchers": [{"keySelector": { "trafficType": "user", "attribute": None },"matcherType": "ALL_KEYS","negate": False,"userDefinedSegmentMatcherData": None,"whitelistMatcherData": None,"unaryNumericMatcherData": None,"betweenMatcherData": None,"booleanMatcherData": None,"dependencyMatcherData": None,"stringMatcherData": None}]},"partitions": [{ "treatment": "on", "size": 100 },{ "treatment": "off", "size": 0 }],"label": "default rule"}]} + ],"since": 1675443767288,"till": 1675443984594} split41 = split11 split42 = split12 diff --git a/tests/integration/test_client_e2e.py b/tests/integration/test_client_e2e.py index f20e4f66..94a11624 100644 --- a/tests/integration/test_client_e2e.py +++ b/tests/integration/test_client_e2e.py @@ -441,7 +441,7 @@ def _manager_methods(factory): assert len(manager.split_names()) == 7 assert len(manager.splits()) == 7 -class InMemoryIntegrationTests(object): +class InMemoryDebugIntegrationTests(object): """Inmemory storage-based integration tests.""" def setup_method(self): @@ -476,7 +476,7 @@ def setup_method(self): 'impressions': InMemoryImpressionStorage(5000, telemetry_runtime_producer), 'events': InMemoryEventStorage(5000, telemetry_runtime_producer), } - impmanager = ImpressionsManager(StrategyDebugMode(), telemetry_runtime_producer) # no listener + impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer) # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. try: @@ -632,7 +632,7 @@ def setup_method(self): 'impressions': InMemoryImpressionStorage(5000, telemetry_runtime_producer), 'events': InMemoryEventStorage(5000, telemetry_runtime_producer), } - impmanager = ImpressionsManager(StrategyOptimizedMode(), telemetry_runtime_producer) # no listener + impmanager = ImpressionsManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer) self.factory = SplitFactory('some_api_key', storages, @@ -766,7 +766,7 @@ def setup_method(self): 'impressions': RedisImpressionsStorage(redis_client, metadata), 'events': RedisEventsStorage(redis_client, metadata), } - impmanager = ImpressionsManager(StrategyDebugMode(), telemetry_runtime_producer) # no listener + impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = PipelinedRecorder(redis_client.pipeline, impmanager, storages['events'], storages['impressions'], telemetry_redis_storage) self.factory = SplitFactory('some_api_key', @@ -946,7 +946,7 @@ def setup_method(self): 'impressions': RedisImpressionsStorage(redis_client, metadata), 'events': RedisEventsStorage(redis_client, metadata), } - impmanager = ImpressionsManager(StrategyDebugMode(), telemetry_runtime_producer) # no listener + impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = PipelinedRecorder(redis_client.pipeline, impmanager, storages['events'], storages['impressions'], telemetry_redis_storage) self.factory = SplitFactory('some_api_key', @@ -974,103 +974,98 @@ def test_localhost_json_e2e(self): # Tests 1 self.factory._storages['splits'].update([], ['SPLIT_1'], -1) -# self.factory._sync_manager._synchronizer._split_synchronizers._feature_flag_sync._feature_flag_storage.set_change_number(-1) self._update_temp_file(splits_json['splitChange1_1']) self._synchronize_now() - assert sorted(self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2"] + assert sorted(self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2", "SPLIT_3"] assert client.get_treatment("key", "SPLIT_1", None) == 'off' assert client.get_treatment("key", "SPLIT_2", None) == 'on' self._update_temp_file(splits_json['splitChange1_2']) self._synchronize_now() - assert sorted(self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2"] + assert sorted(self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2", "SPLIT_3"] assert client.get_treatment("key", "SPLIT_1", None) == 'off' assert client.get_treatment("key", "SPLIT_2", None) == 'off' self._update_temp_file(splits_json['splitChange1_3']) self._synchronize_now() - assert self.factory.manager().split_names() == ["SPLIT_2"] + assert self.factory.manager().split_names() == ["SPLIT_2", "SPLIT_3"] assert client.get_treatment("key", "SPLIT_1", None) == 'control' assert client.get_treatment("key", "SPLIT_2", None) == 'on' # Tests 3 self.factory._storages['splits'].update([], ['SPLIT_1'], -1) -# self.factory._sync_manager._synchronizer._split_synchronizers._feature_flag_sync._feature_flag_storage.set_change_number(-1) self._update_temp_file(splits_json['splitChange3_1']) self._synchronize_now() - assert self.factory.manager().split_names() == ["SPLIT_2"] + assert self.factory.manager().split_names() == ["SPLIT_2", "SPLIT_3"] assert client.get_treatment("key", "SPLIT_2", None) == 'on' self._update_temp_file(splits_json['splitChange3_2']) self._synchronize_now() - assert self.factory.manager().split_names() == ["SPLIT_2"] + assert self.factory.manager().split_names() == ["SPLIT_2", "SPLIT_3"] assert client.get_treatment("key", "SPLIT_2", None) == 'off' # Tests 4 self.factory._storages['splits'].update([], ['SPLIT_2'], -1) -# self.factory._sync_manager._synchronizer._split_synchronizers._feature_flag_sync._feature_flag_storage.set_change_number(-1) self._update_temp_file(splits_json['splitChange4_1']) self._synchronize_now() - assert sorted(self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2"] + assert sorted(self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2", "SPLIT_3"] assert client.get_treatment("key", "SPLIT_1", None) == 'off' assert client.get_treatment("key", "SPLIT_2", None) == 'on' self._update_temp_file(splits_json['splitChange4_2']) self._synchronize_now() - assert sorted(self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2"] + assert sorted(self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2", "SPLIT_3"] assert client.get_treatment("key", "SPLIT_1", None) == 'off' assert client.get_treatment("key", "SPLIT_2", None) == 'off' self._update_temp_file(splits_json['splitChange4_3']) self._synchronize_now() - assert self.factory.manager().split_names() == ["SPLIT_2"] + assert sorted(self.factory.manager().split_names()) == ["SPLIT_2", "SPLIT_3"] assert client.get_treatment("key", "SPLIT_1", None) == 'control' assert client.get_treatment("key", "SPLIT_2", None) == 'on' # Tests 5 self.factory._storages['splits'].update([], ['SPLIT_1', 'SPLIT_2'], -1) -# self.factory._sync_manager._synchronizer._split_synchronizers._feature_flag_sync._feature_flag_storage.set_change_number(-1) self._update_temp_file(splits_json['splitChange5_1']) self._synchronize_now() - assert self.factory.manager().split_names() == ["SPLIT_2"] + assert sorted(self.factory.manager().split_names()) == ["SPLIT_2", "SPLIT_3"] assert client.get_treatment("key", "SPLIT_2", None) == 'on' self._update_temp_file(splits_json['splitChange5_2']) self._synchronize_now() - assert self.factory.manager().split_names() == ["SPLIT_2"] + assert sorted(self.factory.manager().split_names()) == ["SPLIT_2", "SPLIT_3"] assert client.get_treatment("key", "SPLIT_2", None) == 'on' # Tests 6 self.factory._storages['splits'].update([], ['SPLIT_2'], -1) -# self.factory._sync_manager._synchronizer._split_synchronizers._feature_flag_sync._feature_flag_storage.set_change_number(-1) self._update_temp_file(splits_json['splitChange6_1']) self._synchronize_now() - assert sorted(self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2"] + assert sorted(self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2", "SPLIT_3"] assert client.get_treatment("key", "SPLIT_1", None) == 'off' assert client.get_treatment("key", "SPLIT_2", None) == 'on' self._update_temp_file(splits_json['splitChange6_2']) self._synchronize_now() - assert sorted(self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2"] + assert sorted(self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2", "SPLIT_3"] assert client.get_treatment("key", "SPLIT_1", None) == 'off' assert client.get_treatment("key", "SPLIT_2", None) == 'off' self._update_temp_file(splits_json['splitChange6_3']) self._synchronize_now() - assert self.factory.manager().split_names() == ["SPLIT_2"] + assert sorted(self.factory.manager().split_names()) == ["SPLIT_2", "SPLIT_3"] assert client.get_treatment("key", "SPLIT_1", None) == 'control' assert client.get_treatment("key", "SPLIT_2", None) == 'on' @@ -1165,7 +1160,7 @@ def setup_method(self): 'telemetry': telemetry_pluggable_storage } - impmanager = ImpressionsManager(StrategyDebugMode(), telemetry_runtime_producer) # no listener + impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer) @@ -1352,7 +1347,7 @@ def setup_method(self): 'telemetry': telemetry_pluggable_storage } - impmanager = ImpressionsManager(StrategyOptimizedMode(), telemetry_runtime_producer) # no listener + impmanager = ImpressionsManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer) @@ -1519,8 +1514,8 @@ def setup_method(self): unique_keys_tracker = UniqueKeysTracker() unique_keys_synchronizer, clear_filter_sync, self.unique_keys_task, \ clear_filter_task, impressions_count_sync, impressions_count_task, \ - imp_strategy = set_classes('PLUGGABLE', ImpressionsMode.NONE, self.pluggable_storage_adapter, imp_counter, unique_keys_tracker) - impmanager = ImpressionsManager(imp_strategy, telemetry_runtime_producer) # no listener + imp_strategy, none_strategy = set_classes('PLUGGABLE', ImpressionsMode.NONE, self.pluggable_storage_adapter, imp_counter, unique_keys_tracker) + impmanager = ImpressionsManager(imp_strategy, none_strategy, telemetry_runtime_producer) # no listener recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, unique_keys_tracker=unique_keys_tracker, imp_counter=imp_counter) @@ -1666,6 +1661,381 @@ def test_mtk(self): self.factory.destroy(event) event.wait() +class InMemoryImpressionsToggleIntegrationTests(object): + """InMemory storage-based impressions toggle integration tests.""" + + def test_optimized(self): + split_storage = InMemorySplitStorage() + segment_storage = InMemorySegmentStorage() + + split_storage.update([splits.from_raw(splits_json['splitChange1_1']['splits'][0]), + splits.from_raw(splits_json['splitChange1_1']['splits'][1]), + splits.from_raw(splits_json['splitChange1_1']['splits'][2]) + ], [], -1) + + telemetry_storage = InMemoryTelemetryStorage() + telemetry_producer = TelemetryStorageProducer(telemetry_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer() + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'impressions': InMemoryImpressionStorage(5000, telemetry_runtime_producer), + 'events': InMemoryEventStorage(5000, telemetry_runtime_producer), + } + impmanager = ImpressionsManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, None, UniqueKeysTracker(), ImpressionsCounter()) + # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. + try: + factory = SplitFactory('some_api_key', + storages, + True, + recorder, + None, + telemetry_producer=telemetry_producer, + telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), + ) # pylint:disable=attribute-defined-outside-init + except: + pass + + try: + client = factory.client() + except: + pass + + assert client.get_treatment('user1', 'SPLIT_1') == 'off' + assert client.get_treatment('user1', 'SPLIT_2') == 'on' + assert client.get_treatment('user1', 'SPLIT_3') == 'on' + imp_storage = client._factory._get_storage('impressions') + impressions = imp_storage.pop_many(10) + assert len(impressions) == 2 + assert impressions[0].feature_name == 'SPLIT_1' + assert impressions[1].feature_name == 'SPLIT_2' + assert client._recorder._unique_keys_tracker._cache == {'SPLIT_3': {'user1'}} + imps_count = client._recorder._imp_counter.pop_all() + assert len(imps_count) == 1 + assert imps_count[0].feature == 'SPLIT_3' + assert imps_count[0].count == 1 + + def test_debug(self): + split_storage = InMemorySplitStorage() + segment_storage = InMemorySegmentStorage() + + split_storage.update([splits.from_raw(splits_json['splitChange1_1']['splits'][0]), + splits.from_raw(splits_json['splitChange1_1']['splits'][1]), + splits.from_raw(splits_json['splitChange1_1']['splits'][2]) + ], [], -1) + + telemetry_storage = InMemoryTelemetryStorage() + telemetry_producer = TelemetryStorageProducer(telemetry_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer() + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'impressions': InMemoryImpressionStorage(5000, telemetry_runtime_producer), + 'events': InMemoryEventStorage(5000, telemetry_runtime_producer), + } + impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, None, UniqueKeysTracker(), ImpressionsCounter()) + # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. + try: + factory = SplitFactory('some_api_key', + storages, + True, + recorder, + None, + telemetry_producer=telemetry_producer, + telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), + ) # pylint:disable=attribute-defined-outside-init + except: + pass + + try: + client = factory.client() + except: + pass + + assert client.get_treatment('user1', 'SPLIT_1') == 'off' + assert client.get_treatment('user1', 'SPLIT_2') == 'on' + assert client.get_treatment('user1', 'SPLIT_3') == 'on' + imp_storage = client._factory._get_storage('impressions') + impressions = imp_storage.pop_many(10) + assert len(impressions) == 2 + assert impressions[0].feature_name == 'SPLIT_1' + assert impressions[1].feature_name == 'SPLIT_2' + assert client._recorder._unique_keys_tracker._cache == {'SPLIT_3': {'user1'}} + imps_count = client._recorder._imp_counter.pop_all() + assert len(imps_count) == 1 + assert imps_count[0].feature == 'SPLIT_3' + assert imps_count[0].count == 1 + + def test_none(self): + split_storage = InMemorySplitStorage() + segment_storage = InMemorySegmentStorage() + + split_storage.update([splits.from_raw(splits_json['splitChange1_1']['splits'][0]), + splits.from_raw(splits_json['splitChange1_1']['splits'][1]), + splits.from_raw(splits_json['splitChange1_1']['splits'][2]) + ], [], -1) + + telemetry_storage = InMemoryTelemetryStorage() + telemetry_producer = TelemetryStorageProducer(telemetry_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer() + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'impressions': InMemoryImpressionStorage(5000, telemetry_runtime_producer), + 'events': InMemoryEventStorage(5000, telemetry_runtime_producer), + } + impmanager = ImpressionsManager(StrategyNoneMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, None, UniqueKeysTracker(), ImpressionsCounter()) + # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. + try: + factory = SplitFactory('some_api_key', + storages, + True, + recorder, + None, + telemetry_producer=telemetry_producer, + telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), + ) # pylint:disable=attribute-defined-outside-init + except: + pass + + try: + client = factory.client() + except: + pass + + assert client.get_treatment('user1', 'SPLIT_1') == 'off' + assert client.get_treatment('user1', 'SPLIT_2') == 'on' + assert client.get_treatment('user1', 'SPLIT_3') == 'on' + imp_storage = client._factory._get_storage('impressions') + impressions = imp_storage.pop_many(10) + assert len(impressions) == 0 + assert client._recorder._unique_keys_tracker._cache == {'SPLIT_1': {'user1'}, 'SPLIT_2': {'user1'}, 'SPLIT_3': {'user1'}} + imps_count = client._recorder._imp_counter.pop_all() + assert len(imps_count) == 3 + assert imps_count[0].feature == 'SPLIT_1' + assert imps_count[0].count == 1 + assert imps_count[1].feature == 'SPLIT_2' + assert imps_count[1].count == 1 + assert imps_count[2].feature == 'SPLIT_3' + assert imps_count[2].count == 1 + +class RedisImpressionsToggleIntegrationTests(object): + """Run impression toggle tests for Redis.""" + + def test_optimized(self): + """Prepare storages with test data.""" + metadata = SdkMetadata('python-1.2.3', 'some_ip', 'some_name') + redis_client = build(DEFAULT_CONFIG.copy()) + split_storage = RedisSplitStorage(redis_client, True) + segment_storage = RedisSegmentStorage(redis_client) + + redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][0]['name']), json.dumps(splits_json['splitChange1_1']['splits'][0])) + redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][1]['name']), json.dumps(splits_json['splitChange1_1']['splits'][1])) + redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][2]['name']), json.dumps(splits_json['splitChange1_1']['splits'][2])) + redis_client.set(split_storage._FEATURE_FLAG_TILL_KEY, -1) + + telemetry_redis_storage = RedisTelemetryStorage(redis_client, metadata) + telemetry_producer = TelemetryStorageProducer(telemetry_redis_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'impressions': RedisImpressionsStorage(redis_client, metadata), + 'events': RedisEventsStorage(redis_client, metadata), + } + impmanager = ImpressionsManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + recorder = PipelinedRecorder(redis_client.pipeline, impmanager, + storages['events'], storages['impressions'], telemetry_redis_storage, unique_keys_tracker=UniqueKeysTracker(), imp_counter=ImpressionsCounter()) + factory = SplitFactory('some_api_key', + storages, + True, + recorder, + telemetry_producer=telemetry_producer, + telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), + ) # pylint:disable=attribute-defined-outside-init + + try: + client = factory.client() + except: + pass + + assert client.get_treatment('user1', 'SPLIT_1') == 'off' + assert client.get_treatment('user2', 'SPLIT_2') == 'on' + assert client.get_treatment('user3', 'SPLIT_3') == 'on' + time.sleep(0.2) + + imp_storage = factory._storages['impressions'] + impressions = [] + while True: + impression = redis_client.lpop(imp_storage.IMPRESSIONS_QUEUE_KEY) + if impression is None: + break + impressions.append(json.loads(impression)) + + assert len(impressions) == 2 + assert impressions[0]['i']['f'] == 'SPLIT_1' + assert impressions[1]['i']['f'] == 'SPLIT_2' + assert client._recorder._unique_keys_tracker._cache == {'SPLIT_3': {'user3'}} + imps_count = client._recorder._imp_counter.pop_all() + assert len(imps_count) == 1 + assert imps_count[0].feature == 'SPLIT_3' + assert imps_count[0].count == 1 + self.clear_cache() + client.destroy() + + def test_debug(self): + """Prepare storages with test data.""" + metadata = SdkMetadata('python-1.2.3', 'some_ip', 'some_name') + redis_client = build(DEFAULT_CONFIG.copy()) + split_storage = RedisSplitStorage(redis_client, True) + segment_storage = RedisSegmentStorage(redis_client) + + redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][0]['name']), json.dumps(splits_json['splitChange1_1']['splits'][0])) + redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][1]['name']), json.dumps(splits_json['splitChange1_1']['splits'][1])) + redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][2]['name']), json.dumps(splits_json['splitChange1_1']['splits'][2])) + redis_client.set(split_storage._FEATURE_FLAG_TILL_KEY, -1) + + telemetry_redis_storage = RedisTelemetryStorage(redis_client, metadata) + telemetry_producer = TelemetryStorageProducer(telemetry_redis_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'impressions': RedisImpressionsStorage(redis_client, metadata), + 'events': RedisEventsStorage(redis_client, metadata), + } + impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + recorder = PipelinedRecorder(redis_client.pipeline, impmanager, + storages['events'], storages['impressions'], telemetry_redis_storage, unique_keys_tracker=UniqueKeysTracker(), imp_counter=ImpressionsCounter()) + factory = SplitFactory('some_api_key', + storages, + True, + recorder, + telemetry_producer=telemetry_producer, + telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), + ) # pylint:disable=attribute-defined-outside-init + + try: + client = factory.client() + except: + pass + + assert client.get_treatment('user1', 'SPLIT_1') == 'off' + assert client.get_treatment('user2', 'SPLIT_2') == 'on' + assert client.get_treatment('user3', 'SPLIT_3') == 'on' + time.sleep(0.2) + + imp_storage = factory._storages['impressions'] + impressions = [] + while True: + impression = redis_client.lpop(imp_storage.IMPRESSIONS_QUEUE_KEY) + if impression is None: + break + impressions.append(json.loads(impression)) + + assert len(impressions) == 2 + assert impressions[0]['i']['f'] == 'SPLIT_1' + assert impressions[1]['i']['f'] == 'SPLIT_2' + assert client._recorder._unique_keys_tracker._cache == {'SPLIT_3': {'user3'}} + imps_count = client._recorder._imp_counter.pop_all() + assert len(imps_count) == 1 + assert imps_count[0].feature == 'SPLIT_3' + assert imps_count[0].count == 1 + self.clear_cache() + client.destroy() + + def test_none(self): + """Prepare storages with test data.""" + metadata = SdkMetadata('python-1.2.3', 'some_ip', 'some_name') + redis_client = build(DEFAULT_CONFIG.copy()) + split_storage = RedisSplitStorage(redis_client, True) + segment_storage = RedisSegmentStorage(redis_client) + + redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][0]['name']), json.dumps(splits_json['splitChange1_1']['splits'][0])) + redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][1]['name']), json.dumps(splits_json['splitChange1_1']['splits'][1])) + redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][2]['name']), json.dumps(splits_json['splitChange1_1']['splits'][2])) + redis_client.set(split_storage._FEATURE_FLAG_TILL_KEY, -1) + + telemetry_redis_storage = RedisTelemetryStorage(redis_client, metadata) + telemetry_producer = TelemetryStorageProducer(telemetry_redis_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'impressions': RedisImpressionsStorage(redis_client, metadata), + 'events': RedisEventsStorage(redis_client, metadata), + } + impmanager = ImpressionsManager(StrategyNoneMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + recorder = PipelinedRecorder(redis_client.pipeline, impmanager, + storages['events'], storages['impressions'], telemetry_redis_storage, unique_keys_tracker=UniqueKeysTracker(), imp_counter=ImpressionsCounter()) + factory = SplitFactory('some_api_key', + storages, + True, + recorder, + telemetry_producer=telemetry_producer, + telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), + ) # pylint:disable=attribute-defined-outside-init + + try: + client = factory.client() + except: + pass + + assert client.get_treatment('user1', 'SPLIT_1') == 'off' + assert client.get_treatment('user2', 'SPLIT_2') == 'on' + assert client.get_treatment('user3', 'SPLIT_3') == 'on' + time.sleep(0.2) + + imp_storage = factory._storages['impressions'] + impressions = [] + while True: + impression = redis_client.lpop(imp_storage.IMPRESSIONS_QUEUE_KEY) + if impression is None: + break + impressions.append(json.loads(impression)) + + assert len(impressions) == 0 + assert client._recorder._unique_keys_tracker._cache == {'SPLIT_1': {'user1'}, 'SPLIT_2': {'user2'}, 'SPLIT_3': {'user3'}} + imps_count = client._recorder._imp_counter.pop_all() + assert len(imps_count) == 3 + assert imps_count[0].feature == 'SPLIT_1' + assert imps_count[0].count == 1 + assert imps_count[1].feature == 'SPLIT_2' + assert imps_count[1].count == 1 + assert imps_count[2].feature == 'SPLIT_3' + assert imps_count[2].count == 1 + self.clear_cache() + client.destroy() + + def clear_cache(self): + """Clear redis cache.""" + keys_to_delete = [ + "SPLITIO.split.SPLIT_3", + "SPLITIO.splits.till", + "SPLITIO.split.SPLIT_2", + "SPLITIO.split.SPLIT_1", + "SPLITIO.telemetry.latencies" + ] + + redis_client = RedisAdapter(StrictRedis()) + for key in keys_to_delete: + redis_client.delete(key) + class InMemoryIntegrationAsyncTests(object): """Inmemory storage-based integration tests.""" @@ -1704,7 +2074,7 @@ async def _setup_method(self): 'impressions': InMemoryImpressionStorageAsync(5000, telemetry_runtime_producer), 'events': InMemoryEventStorageAsync(5000, telemetry_runtime_producer), } - impmanager = ImpressionsManager(StrategyDebugMode(), telemetry_runtime_producer) # no listener + impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = StandardRecorderAsync(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer) # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. try: @@ -1870,7 +2240,7 @@ async def _setup_method(self): 'impressions': InMemoryImpressionStorageAsync(5000, telemetry_runtime_producer), 'events': InMemoryEventStorageAsync(5000, telemetry_runtime_producer), } - impmanager = ImpressionsManager(StrategyOptimizedMode(), telemetry_runtime_producer) # no listener + impmanager = ImpressionsManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = StandardRecorderAsync(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter = ImpressionsCounter()) # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. @@ -2029,7 +2399,7 @@ async def _setup_method(self): 'impressions': RedisImpressionsStorageAsync(redis_client, metadata), 'events': RedisEventsStorageAsync(redis_client, metadata), } - impmanager = ImpressionsManager(StrategyDebugMode(), telemetry_runtime_producer) # no listener + impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = PipelinedRecorderAsync(redis_client.pipeline, impmanager, storages['events'], storages['impressions'], telemetry_redis_storage) self.factory = SplitFactoryAsync('some_api_key', @@ -2243,7 +2613,7 @@ async def _setup_method(self): 'impressions': RedisImpressionsStorageAsync(redis_client, metadata), 'events': RedisEventsStorageAsync(redis_client, metadata), } - impmanager = ImpressionsManager(StrategyDebugMode(), telemetry_runtime_producer) # no listener + impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = PipelinedRecorderAsync(redis_client.pipeline, impmanager, storages['events'], storages['impressions'], telemetry_redis_storage) self.factory = SplitFactoryAsync('some_api_key', @@ -2280,21 +2650,21 @@ async def test_localhost_json_e2e(self): self._update_temp_file(splits_json['splitChange1_1']) await self._synchronize_now() - assert sorted(await self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2"] + assert sorted(await self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2", "SPLIT_3"] assert await client.get_treatment("key", "SPLIT_1", None) == 'off' assert await client.get_treatment("key", "SPLIT_2", None) == 'on' self._update_temp_file(splits_json['splitChange1_2']) await self._synchronize_now() - assert sorted(await self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2"] + assert sorted(await self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2", "SPLIT_3"] assert await client.get_treatment("key", "SPLIT_1", None) == 'off' assert await client.get_treatment("key", "SPLIT_2", None) == 'off' self._update_temp_file(splits_json['splitChange1_3']) await self._synchronize_now() - assert await self.factory.manager().split_names() == ["SPLIT_2"] + assert sorted(await self.factory.manager().split_names()) == ["SPLIT_2", "SPLIT_3"] assert await client.get_treatment("key", "SPLIT_1", None) == 'control' assert await client.get_treatment("key", "SPLIT_2", None) == 'on' @@ -2303,13 +2673,13 @@ async def test_localhost_json_e2e(self): self._update_temp_file(splits_json['splitChange3_1']) await self._synchronize_now() - assert await self.factory.manager().split_names() == ["SPLIT_2"] + assert sorted(await self.factory.manager().split_names()) == ["SPLIT_2", "SPLIT_3"] assert await client.get_treatment("key", "SPLIT_2", None) == 'on' self._update_temp_file(splits_json['splitChange3_2']) await self._synchronize_now() - assert await self.factory.manager().split_names() == ["SPLIT_2"] + assert sorted(await self.factory.manager().split_names()) == ["SPLIT_2", "SPLIT_3"] assert await client.get_treatment("key", "SPLIT_2", None) == 'off' # Tests 4 @@ -2317,21 +2687,21 @@ async def test_localhost_json_e2e(self): self._update_temp_file(splits_json['splitChange4_1']) await self._synchronize_now() - assert sorted(await self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2"] + assert sorted(await self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2", "SPLIT_3"] assert await client.get_treatment("key", "SPLIT_1", None) == 'off' assert await client.get_treatment("key", "SPLIT_2", None) == 'on' self._update_temp_file(splits_json['splitChange4_2']) await self._synchronize_now() - assert sorted(await self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2"] + assert sorted(await self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2", "SPLIT_3"] assert await client.get_treatment("key", "SPLIT_1", None) == 'off' assert await client.get_treatment("key", "SPLIT_2", None) == 'off' self._update_temp_file(splits_json['splitChange4_3']) await self._synchronize_now() - assert await self.factory.manager().split_names() == ["SPLIT_2"] + assert sorted(await self.factory.manager().split_names()) == ["SPLIT_2", "SPLIT_3"] assert await client.get_treatment("key", "SPLIT_1", None) == 'control' assert await client.get_treatment("key", "SPLIT_2", None) == 'on' @@ -2340,13 +2710,13 @@ async def test_localhost_json_e2e(self): self._update_temp_file(splits_json['splitChange5_1']) await self._synchronize_now() - assert await self.factory.manager().split_names() == ["SPLIT_2"] + assert sorted(await self.factory.manager().split_names()) == ["SPLIT_2", "SPLIT_3"] assert await client.get_treatment("key", "SPLIT_2", None) == 'on' self._update_temp_file(splits_json['splitChange5_2']) await self._synchronize_now() - assert await self.factory.manager().split_names() == ["SPLIT_2"] + assert sorted(await self.factory.manager().split_names()) == ["SPLIT_2", "SPLIT_3"] assert await client.get_treatment("key", "SPLIT_2", None) == 'on' # Tests 6 @@ -2354,21 +2724,21 @@ async def test_localhost_json_e2e(self): self._update_temp_file(splits_json['splitChange6_1']) await self._synchronize_now() - assert sorted(await self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2"] + assert sorted(await self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2", "SPLIT_3"] assert await client.get_treatment("key", "SPLIT_1", None) == 'off' assert await client.get_treatment("key", "SPLIT_2", None) == 'on' self._update_temp_file(splits_json['splitChange6_2']) await self._synchronize_now() - assert sorted(await self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2"] + assert sorted(await self.factory.manager().split_names()) == ["SPLIT_1", "SPLIT_2", "SPLIT_3"] assert await client.get_treatment("key", "SPLIT_1", None) == 'off' assert await client.get_treatment("key", "SPLIT_2", None) == 'off' self._update_temp_file(splits_json['splitChange6_3']) await self._synchronize_now() - assert await self.factory.manager().split_names() == ["SPLIT_2"] + assert sorted(await self.factory.manager().split_names()) == ["SPLIT_2", "SPLIT_3"] assert await client.get_treatment("key", "SPLIT_1", None) == 'control' assert await client.get_treatment("key", "SPLIT_2", None) == 'on' @@ -2465,7 +2835,7 @@ async def _setup_method(self): 'telemetry': telemetry_pluggable_storage } - impmanager = ImpressionsManager(StrategyDebugMode(), telemetry_runtime_producer) # no listener + impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = StandardRecorderAsync(impmanager, storages['events'], storages['impressions'], telemetry_producer.get_telemetry_evaluation_producer(), @@ -2511,7 +2881,6 @@ async def _setup_method(self): async def test_get_treatment(self): """Test client.get_treatment().""" await self.setup_task -# pytest.set_trace() await _get_treatment_async(self.factory) await self.factory.destroy() @@ -2686,7 +3055,7 @@ async def _setup_method(self): 'telemetry': telemetry_pluggable_storage } - impmanager = ImpressionsManager(StrategyOptimizedMode(), telemetry_runtime_producer) # no listener + impmanager = ImpressionsManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = StandardRecorderAsync(impmanager, storages['events'], storages['impressions'], telemetry_producer.get_telemetry_evaluation_producer(), @@ -2896,8 +3265,8 @@ async def _setup_method(self): unique_keys_tracker = UniqueKeysTrackerAsync() unique_keys_synchronizer, clear_filter_sync, self.unique_keys_task, \ clear_filter_task, impressions_count_sync, impressions_count_task, \ - imp_strategy = set_classes_async('PLUGGABLE', ImpressionsMode.NONE, self.pluggable_storage_adapter, imp_counter, unique_keys_tracker) - impmanager = ImpressionsManager(imp_strategy, telemetry_runtime_producer) # no listener + imp_strategy, none_strategy = set_classes_async('PLUGGABLE', ImpressionsMode.NONE, self.pluggable_storage_adapter, imp_counter, unique_keys_tracker) + impmanager = ImpressionsManager(imp_strategy, none_strategy, telemetry_runtime_producer) # no listener recorder = StandardRecorderAsync(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, unique_keys_tracker=unique_keys_tracker, imp_counter=imp_counter) @@ -3098,6 +3467,408 @@ async def _teardown_method(self): for key in keys_to_delete: await self.pluggable_storage_adapter.delete(key) +class InMemoryImpressionsToggleIntegrationAsyncTests(object): + """InMemory storage-based impressions toggle integration tests.""" + + @pytest.mark.asyncio + async def test_optimized(self): + split_storage = InMemorySplitStorageAsync() + segment_storage = InMemorySegmentStorageAsync() + + await split_storage.update([splits.from_raw(splits_json['splitChange1_1']['splits'][0]), + splits.from_raw(splits_json['splitChange1_1']['splits'][1]), + splits.from_raw(splits_json['splitChange1_1']['splits'][2]) + ], [], -1) + + telemetry_storage = await InMemoryTelemetryStorageAsync.create() + telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer() + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'impressions': InMemoryImpressionStorageAsync(5000, telemetry_runtime_producer), + 'events': InMemoryEventStorageAsync(5000, telemetry_runtime_producer), + } + impmanager = ImpressionsManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + recorder = StandardRecorderAsync(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, None, UniqueKeysTrackerAsync(), ImpressionsCounter()) + # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. + try: + factory = SplitFactoryAsync('some_api_key', + storages, + True, + recorder, + None, + telemetry_producer=telemetry_producer, + telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), + ) # pylint:disable=attribute-defined-outside-init + except: + pass + ready_property = mocker.PropertyMock() + ready_property.return_value = True + type(factory).ready = ready_property + + try: + client = factory.client() + except: + pass + + assert await client.get_treatment('user1', 'SPLIT_1') == 'off' + assert await client.get_treatment('user1', 'SPLIT_2') == 'on' + assert await client.get_treatment('user1', 'SPLIT_3') == 'on' + imp_storage = client._factory._get_storage('impressions') + impressions = await imp_storage.pop_many(10) + assert len(impressions) == 2 + assert impressions[0].feature_name == 'SPLIT_1' + assert impressions[1].feature_name == 'SPLIT_2' + assert client._recorder._unique_keys_tracker._cache == {'SPLIT_3': {'user1'}} + imps_count = client._recorder._imp_counter.pop_all() + assert len(imps_count) == 1 + assert imps_count[0].feature == 'SPLIT_3' + assert imps_count[0].count == 1 + await factory.destroy() + + @pytest.mark.asyncio + async def test_debug(self): + split_storage = InMemorySplitStorageAsync() + segment_storage = InMemorySegmentStorageAsync() + + await split_storage.update([splits.from_raw(splits_json['splitChange1_1']['splits'][0]), + splits.from_raw(splits_json['splitChange1_1']['splits'][1]), + splits.from_raw(splits_json['splitChange1_1']['splits'][2]) + ], [], -1) + + telemetry_storage = await InMemoryTelemetryStorageAsync.create() + telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer() + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'impressions': InMemoryImpressionStorageAsync(5000, telemetry_runtime_producer), + 'events': InMemoryEventStorageAsync(5000, telemetry_runtime_producer), + } + impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + recorder = StandardRecorderAsync(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, None, UniqueKeysTrackerAsync(), ImpressionsCounter()) + # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. + try: + factory = SplitFactoryAsync('some_api_key', + storages, + True, + recorder, + None, + telemetry_producer=telemetry_producer, + telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), + ) # pylint:disable=attribute-defined-outside-init + except: + pass + ready_property = mocker.PropertyMock() + ready_property.return_value = True + type(factory).ready = ready_property + + try: + client = factory.client() + except: + pass + + assert await client.get_treatment('user1', 'SPLIT_1') == 'off' + assert await client.get_treatment('user1', 'SPLIT_2') == 'on' + assert await client.get_treatment('user1', 'SPLIT_3') == 'on' + imp_storage = client._factory._get_storage('impressions') + impressions = await imp_storage.pop_many(10) + assert len(impressions) == 2 + assert impressions[0].feature_name == 'SPLIT_1' + assert impressions[1].feature_name == 'SPLIT_2' + assert client._recorder._unique_keys_tracker._cache == {'SPLIT_3': {'user1'}} + imps_count = client._recorder._imp_counter.pop_all() + assert len(imps_count) == 1 + assert imps_count[0].feature == 'SPLIT_3' + assert imps_count[0].count == 1 + await factory.destroy() + + @pytest.mark.asyncio + async def test_none(self): + split_storage = InMemorySplitStorageAsync() + segment_storage = InMemorySegmentStorageAsync() + + await split_storage.update([splits.from_raw(splits_json['splitChange1_1']['splits'][0]), + splits.from_raw(splits_json['splitChange1_1']['splits'][1]), + splits.from_raw(splits_json['splitChange1_1']['splits'][2]) + ], [], -1) + + telemetry_storage = await InMemoryTelemetryStorageAsync.create() + telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer() + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'impressions': InMemoryImpressionStorageAsync(5000, telemetry_runtime_producer), + 'events': InMemoryEventStorageAsync(5000, telemetry_runtime_producer), + } + impmanager = ImpressionsManager(StrategyNoneMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + recorder = StandardRecorderAsync(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, None, UniqueKeysTrackerAsync(), ImpressionsCounter()) + # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. + try: + factory = SplitFactoryAsync('some_api_key', + storages, + True, + recorder, + None, + telemetry_producer=telemetry_producer, + telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), + ) # pylint:disable=attribute-defined-outside-init + except: + pass + ready_property = mocker.PropertyMock() + ready_property.return_value = True + type(factory).ready = ready_property + + try: + client = factory.client() + except: + pass + + assert await client.get_treatment('user1', 'SPLIT_1') == 'off' + assert await client.get_treatment('user1', 'SPLIT_2') == 'on' + assert await client.get_treatment('user1', 'SPLIT_3') == 'on' + imp_storage = client._factory._get_storage('impressions') + impressions = await imp_storage.pop_many(10) + assert len(impressions) == 0 + assert client._recorder._unique_keys_tracker._cache == {'SPLIT_1': {'user1'}, 'SPLIT_2': {'user1'}, 'SPLIT_3': {'user1'}} + imps_count = client._recorder._imp_counter.pop_all() + assert len(imps_count) == 3 + assert imps_count[0].feature == 'SPLIT_1' + assert imps_count[0].count == 1 + assert imps_count[1].feature == 'SPLIT_2' + assert imps_count[1].count == 1 + assert imps_count[2].feature == 'SPLIT_3' + assert imps_count[2].count == 1 + await factory.destroy() + +class RedisImpressionsToggleIntegrationAsyncTests(object): + """Run impression toggle tests for Redis.""" + + @pytest.mark.asyncio + async def test_optimized(self): + """Prepare storages with test data.""" + metadata = SdkMetadata('python-1.2.3', 'some_ip', 'some_name') + redis_client = await build_async(DEFAULT_CONFIG.copy()) + split_storage = RedisSplitStorageAsync(redis_client, True) + segment_storage = RedisSegmentStorageAsync(redis_client) + + await redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][0]['name']), json.dumps(splits_json['splitChange1_1']['splits'][0])) + await redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][1]['name']), json.dumps(splits_json['splitChange1_1']['splits'][1])) + await redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][2]['name']), json.dumps(splits_json['splitChange1_1']['splits'][2])) + await redis_client.set(split_storage._FEATURE_FLAG_TILL_KEY, -1) + + telemetry_redis_storage = await RedisTelemetryStorageAsync.create(redis_client, metadata) + telemetry_producer = TelemetryStorageProducerAsync(telemetry_redis_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'impressions': RedisImpressionsStorageAsync(redis_client, metadata), + 'events': RedisEventsStorageAsync(redis_client, metadata), + } + impmanager = ImpressionsManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + recorder = PipelinedRecorderAsync(redis_client.pipeline, impmanager, + storages['events'], storages['impressions'], telemetry_redis_storage, unique_keys_tracker=UniqueKeysTracker(), imp_counter=ImpressionsCounter()) + factory = SplitFactoryAsync('some_api_key', + storages, + True, + recorder, + telemetry_producer=telemetry_producer, + telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), + ) # pylint:disable=attribute-defined-outside-init + ready_property = mocker.PropertyMock() + ready_property.return_value = True + type(factory).ready = ready_property + + try: + client = factory.client() + except: + pass + + assert await client.get_treatment('user1', 'SPLIT_1') == 'off' + assert await client.get_treatment('user2', 'SPLIT_2') == 'on' + assert await client.get_treatment('user3', 'SPLIT_3') == 'on' + await asyncio.sleep(0.2) + + imp_storage = factory._storages['impressions'] + impressions = [] + while True: + impression = await redis_client.lpop(imp_storage.IMPRESSIONS_QUEUE_KEY) + if impression is None: + break + impressions.append(json.loads(impression)) + + assert len(impressions) == 2 + assert impressions[0]['i']['f'] == 'SPLIT_1' + assert impressions[1]['i']['f'] == 'SPLIT_2' + assert client._recorder._unique_keys_tracker._cache == {'SPLIT_3': {'user3'}} + imps_count = client._recorder._imp_counter.pop_all() + assert len(imps_count) == 1 + assert imps_count[0].feature == 'SPLIT_3' + assert imps_count[0].count == 1 + await self.clear_cache() + await factory.destroy() + + @pytest.mark.asyncio + async def test_debug(self): + """Prepare storages with test data.""" + metadata = SdkMetadata('python-1.2.3', 'some_ip', 'some_name') + redis_client = await build_async(DEFAULT_CONFIG.copy()) + split_storage = RedisSplitStorageAsync(redis_client, True) + segment_storage = RedisSegmentStorageAsync(redis_client) + + await redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][0]['name']), json.dumps(splits_json['splitChange1_1']['splits'][0])) + await redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][1]['name']), json.dumps(splits_json['splitChange1_1']['splits'][1])) + await redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][2]['name']), json.dumps(splits_json['splitChange1_1']['splits'][2])) + await redis_client.set(split_storage._FEATURE_FLAG_TILL_KEY, -1) + + telemetry_redis_storage = await RedisTelemetryStorageAsync.create(redis_client, metadata) + telemetry_producer = TelemetryStorageProducerAsync(telemetry_redis_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'impressions': RedisImpressionsStorageAsync(redis_client, metadata), + 'events': RedisEventsStorageAsync(redis_client, metadata), + } + impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + recorder = PipelinedRecorderAsync(redis_client.pipeline, impmanager, + storages['events'], storages['impressions'], telemetry_redis_storage, unique_keys_tracker=UniqueKeysTracker(), imp_counter=ImpressionsCounter()) + factory = SplitFactoryAsync('some_api_key', + storages, + True, + recorder, + telemetry_producer=telemetry_producer, + telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), + ) # pylint:disable=attribute-defined-outside-init + ready_property = mocker.PropertyMock() + ready_property.return_value = True + type(factory).ready = ready_property + + try: + client = factory.client() + except: + pass + + assert await client.get_treatment('user1', 'SPLIT_1') == 'off' + assert await client.get_treatment('user2', 'SPLIT_2') == 'on' + assert await client.get_treatment('user3', 'SPLIT_3') == 'on' + await asyncio.sleep(0.2) + + imp_storage = factory._storages['impressions'] + impressions = [] + while True: + impression = await redis_client.lpop(imp_storage.IMPRESSIONS_QUEUE_KEY) + if impression is None: + break + impressions.append(json.loads(impression)) + + assert len(impressions) == 2 + assert impressions[0]['i']['f'] == 'SPLIT_1' + assert impressions[1]['i']['f'] == 'SPLIT_2' + assert client._recorder._unique_keys_tracker._cache == {'SPLIT_3': {'user3'}} + imps_count = client._recorder._imp_counter.pop_all() + assert len(imps_count) == 1 + assert imps_count[0].feature == 'SPLIT_3' + assert imps_count[0].count == 1 + await self.clear_cache() + await factory.destroy() + + @pytest.mark.asyncio + async def test_none(self): + """Prepare storages with test data.""" + metadata = SdkMetadata('python-1.2.3', 'some_ip', 'some_name') + redis_client = await build_async(DEFAULT_CONFIG.copy()) + split_storage = RedisSplitStorageAsync(redis_client, True) + segment_storage = RedisSegmentStorageAsync(redis_client) + + await redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][0]['name']), json.dumps(splits_json['splitChange1_1']['splits'][0])) + await redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][1]['name']), json.dumps(splits_json['splitChange1_1']['splits'][1])) + await redis_client.set(split_storage._get_key(splits_json['splitChange1_1']['splits'][2]['name']), json.dumps(splits_json['splitChange1_1']['splits'][2])) + await redis_client.set(split_storage._FEATURE_FLAG_TILL_KEY, -1) + + telemetry_redis_storage = await RedisTelemetryStorageAsync.create(redis_client, metadata) + telemetry_producer = TelemetryStorageProducerAsync(telemetry_redis_storage) + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer() + + storages = { + 'splits': split_storage, + 'segments': segment_storage, + 'impressions': RedisImpressionsStorageAsync(redis_client, metadata), + 'events': RedisEventsStorageAsync(redis_client, metadata), + } + impmanager = ImpressionsManager(StrategyNoneMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener + recorder = PipelinedRecorderAsync(redis_client.pipeline, impmanager, + storages['events'], storages['impressions'], telemetry_redis_storage, unique_keys_tracker=UniqueKeysTracker(), imp_counter=ImpressionsCounter()) + factory = SplitFactoryAsync('some_api_key', + storages, + True, + recorder, + telemetry_producer=telemetry_producer, + telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), + ) # pylint:disable=attribute-defined-outside-init + ready_property = mocker.PropertyMock() + ready_property.return_value = True + type(factory).ready = ready_property + + try: + client = factory.client() + except: + pass + + assert await client.get_treatment('user1', 'SPLIT_1') == 'off' + assert await client.get_treatment('user2', 'SPLIT_2') == 'on' + assert await client.get_treatment('user3', 'SPLIT_3') == 'on' + await asyncio.sleep(0.2) + + imp_storage = factory._storages['impressions'] + impressions = [] + while True: + impression = await redis_client.lpop(imp_storage.IMPRESSIONS_QUEUE_KEY) + if impression is None: + break + impressions.append(json.loads(impression)) + + assert len(impressions) == 0 + assert client._recorder._unique_keys_tracker._cache == {'SPLIT_1': {'user1'}, 'SPLIT_2': {'user2'}, 'SPLIT_3': {'user3'}} + imps_count = client._recorder._imp_counter.pop_all() + assert len(imps_count) == 3 + assert imps_count[0].feature == 'SPLIT_1' + assert imps_count[0].count == 1 + assert imps_count[1].feature == 'SPLIT_2' + assert imps_count[1].count == 1 + assert imps_count[2].feature == 'SPLIT_3' + assert imps_count[2].count == 1 + await self.clear_cache() + await factory.destroy() + + async def clear_cache(self): + """Clear redis cache.""" + keys_to_delete = [ + "SPLITIO.split.SPLIT_3", + "SPLITIO.splits.till", + "SPLITIO.split.SPLIT_2", + "SPLITIO.split.SPLIT_1", + "SPLITIO.telemetry.latencies" + ] + + redis_client = await build_async(DEFAULT_CONFIG.copy()) + for key in keys_to_delete: + await redis_client.delete(key) + async def _validate_last_impressions_async(client, *to_validate): """Validate the last N impressions are present disregarding the order.""" imp_storage = client._factory._get_storage('impressions') diff --git a/tests/models/test_splits.py b/tests/models/test_splits.py index 9cd4bbfa..442a18d0 100644 --- a/tests/models/test_splits.py +++ b/tests/models/test_splits.py @@ -60,7 +60,8 @@ class SplitTests(object): 'configurations': { 'on': '{"color": "blue", "size": 13}' }, - 'sets': ['set1', 'set2'] + 'sets': ['set1', 'set2'], + 'impressionsDisabled': False } def test_from_raw(self): @@ -81,6 +82,7 @@ def test_from_raw(self): assert parsed.get_configurations_for('on') == '{"color": "blue", "size": 13}' assert parsed._configurations == {'on': '{"color": "blue", "size": 13}'} assert parsed.sets == {'set1', 'set2'} + assert parsed.impressions_disabled == False def test_get_segment_names(self, mocker): """Test fetching segment names.""" @@ -107,6 +109,7 @@ def test_to_json(self): assert as_json['algo'] == 2 assert len(as_json['conditions']) == 2 assert sorted(as_json['sets']) == ['set1', 'set2'] + assert as_json['impressionsDisabled'] is False def test_to_split_view(self): """Test SplitView creation.""" @@ -118,6 +121,7 @@ def test_to_split_view(self): assert as_split_view.traffic_type == self.raw['trafficTypeName'] assert set(as_split_view.treatments) == set(['on', 'off']) assert sorted(as_split_view.sets) == sorted(list(self.raw['sets'])) + assert as_split_view.impressions_disabled == self.raw['impressionsDisabled'] def test_incorrect_matcher(self): """Test incorrect matcher in split model parsing."""