From 7cd34ebff80915d5e58e7c5f0eacb194c8c1424b Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Wed, 12 Mar 2025 16:56:28 -0300 Subject: [PATCH 1/5] updated redis, pluggable and localjson storages --- splitio/storage/inmemmory.py | 3 +- splitio/storage/pluggable.py | 244 ++++++++++++++++++++++++++++++++++- splitio/storage/redis.py | 236 ++++++++++++++++++++++++++++++++- splitio/sync/split.py | 104 ++++++++++----- 4 files changed, 549 insertions(+), 38 deletions(-) diff --git a/splitio/storage/inmemmory.py b/splitio/storage/inmemmory.py index f7af8825..98fc0543 100644 --- a/splitio/storage/inmemmory.py +++ b/splitio/storage/inmemmory.py @@ -109,6 +109,7 @@ def remove_flag_set(self, flag_sets, feature_flag_name, should_filter): class InMemoryRuleBasedSegmentStorage(RuleBasedSegmentsStorage): """InMemory implementation of a feature flag storage base.""" + def __init__(self): """Constructor.""" self._lock = threading.RLock() @@ -192,7 +193,7 @@ def _set_change_number(self, new_change_number): def get_segment_names(self): """ - Retrieve a list of all excluded segments names. + Retrieve a list of all rule based segments names. :return: List of segment names. :rtype: list(str) diff --git a/splitio/storage/pluggable.py b/splitio/storage/pluggable.py index 7f0a5287..1cb7e054 100644 --- a/splitio/storage/pluggable.py +++ b/splitio/storage/pluggable.py @@ -5,15 +5,253 @@ import threading from splitio.optional.loaders import asyncio -from splitio.models import splits, segments +from splitio.models import splits, segments, rule_based_segments from splitio.models.impressions import Impression from splitio.models.telemetry import MethodExceptions, MethodLatencies, TelemetryConfig, MAX_TAGS,\ MethodLatenciesAsync, MethodExceptionsAsync, TelemetryConfigAsync -from splitio.storage import FlagSetsFilter, SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, TelemetryStorage +from splitio.storage import FlagSetsFilter, SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, TelemetryStorage, RuleBasedSegmentsStorage from splitio.util.storage_helper import get_valid_flag_sets, combine_valid_flag_sets _LOGGER = logging.getLogger(__name__) +class PluggableRuleBasedSegmentsStorageBase(RuleBasedSegmentsStorage): + """RedPluggable storage for rule based segments.""" + + _RB_SEGMENT_NAME_LENGTH = 23 + _TILL_LENGTH = 4 + + def __init__(self, pluggable_adapter, prefix=None): + """ + Class constructor. + + :param redis_client: Redis client or compliant interface. + :type redis_client: splitio.storage.adapters.redis.RedisAdapter + """ + self._pluggable_adapter = pluggable_adapter + self._prefix = "SPLITIO.rbsegment.${segmen_name}" + self._rb_segments_till_prefix = "SPLITIO.rbsegments.till" + if prefix is not None: + self._prefix = prefix + "." + self._prefix + self._rb_segments_till_prefix = prefix + "." + self._rb_segments_till_prefix + + def get(self, segment_name): + """ + Retrieve a rule based segment. + + :param segment_name: Name of the segment to fetch. + :type segment_name: str + + :rtype: str + """ + pass + + def get_change_number(self): + """ + Retrieve latest rule based segment change number. + + :rtype: int + """ + pass + + def contains(self, segment_names): + """ + Return whether the segments exists in rule based segment in cache. + + :param segment_names: segment name to validate. + :type segment_names: str + + :return: True if segment names exists. False otherwise. + :rtype: bool + """ + pass + + def get_segment_names(self): + """ + Retrieve a list of all excluded segments names. + + :return: List of segment names. + :rtype: list(str) + """ + pass + + def update(self, to_add, to_delete, new_change_number): + """ + Update rule based segment.. + + :param to_add: List of rule based segment. to add + :type to_add: list[splitio.models.rule_based_segments.RuleBasedSegment] + :param to_delete: List of rule based segment. to delete + :type to_delete: list[splitio.models.rule_based_segments.RuleBasedSegment] + :param new_change_number: New change number. + :type new_change_number: int + """ + raise NotImplementedError('Only redis-consumer mode is supported.') + + def get_large_segment_names(self): + """ + Retrieve a list of all excluded large segments names. + + :return: List of segment names. + :rtype: list(str) + """ + pass + +class PluggableRuleBasedSegmentsStorage(PluggableRuleBasedSegmentsStorageBase): + """RedPluggable storage for rule based segments.""" + + def __init__(self, pluggable_adapter, prefix=None): + """ + Class constructor. + + :param redis_client: Redis client or compliant interface. + :type redis_client: splitio.storage.adapters.redis.RedisAdapter + """ + PluggableRuleBasedSegmentsStorageBase.__init__(self, pluggable_adapter, prefix) + + def get(self, segment_name): + """ + Retrieve a rule based segment. + + :param segment_name: Name of the segment to fetch. + :type segment_name: str + + :rtype: str + """ + try: + rb_segment = self._pluggable_adapter.get(self._prefix.format(segment_name=segment_name)) + if not rb_segment: + return None + + return rule_based_segments.from_raw(rb_segment) + + except Exception: + _LOGGER.error('Error getting rule based segment from storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + + def get_change_number(self): + """ + Retrieve latest rule based segment change number. + + :rtype: int + """ + try: + return self._pluggable_adapter.get(self._rb_segments_till_prefix) + + except Exception: + _LOGGER.error('Error getting change number in rule based segment storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + + def contains(self, segment_names): + """ + Return whether the segments exists in rule based segment in cache. + + :param segment_names: segment name to validate. + :type segment_names: str + + :return: True if segment names exists. False otherwise. + :rtype: bool + """ + return set(segment_names).issubset(self.get_segment_names()) + + def get_segment_names(self): + """ + Retrieve a list of all rule based segments names. + + :return: List of segment names. + :rtype: list(str) + """ + try: + keys = [] + for key in self._pluggable_adapter.get_keys_by_prefix(self._prefix[:-self._RB_SEGMENT_NAME_LENGTH]): + if key[-self._TILL_LENGTH:] != 'till': + keys.append(key[len(self._prefix[:-self._RB_SEGMENT_NAME_LENGTH]):]) + return keys + + except Exception: + _LOGGER.error('Error getting rule based segments names from storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + +class PluggableRuleBasedSegmentsStorageAsync(RuleBasedSegmentsStorage): + """RedPluggable storage for rule based segments.""" + + def __init__(self, pluggable_adapter, prefix=None): + """ + Class constructor. + + :param redis_client: Redis client or compliant interface. + :type redis_client: splitio.storage.adapters.redis.RedisAdapter + """ + PluggableRuleBasedSegmentsStorageBase.__init__(self, pluggable_adapter, prefix) + + async def get(self, segment_name): + """ + Retrieve a rule based segment. + + :param segment_name: Name of the segment to fetch. + :type segment_name: str + + :rtype: str + """ + try: + rb_segment = await self._pluggable_adapter.get(self._prefix.format(segment_name=segment_name)) + if not rb_segment: + return None + + return rule_based_segments.from_raw(rb_segment) + + except Exception: + _LOGGER.error('Error getting rule based segment from storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + + async def get_change_number(self): + """ + Retrieve latest rule based segment change number. + + :rtype: int + """ + try: + return await self._pluggable_adapter.get(self._rb_segments_till_prefix) + + except Exception: + _LOGGER.error('Error getting change number in rule based segment storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + + async def contains(self, segment_names): + """ + Return whether the segments exists in rule based segment in cache. + + :param segment_names: segment name to validate. + :type segment_names: str + + :return: True if segment names exists. False otherwise. + :rtype: bool + """ + return await set(segment_names).issubset(self.get_segment_names()) + + async def get_segment_names(self): + """ + Retrieve a list of all rule based segments names. + + :return: List of segment names. + :rtype: list(str) + """ + try: + keys = [] + for key in await self._pluggable_adapter.get_keys_by_prefix(self._prefix[:-self._RB_SEGMENT_NAME_LENGTH]): + if key[-self._TILL_LENGTH:] != 'till': + keys.append(key[len(self._prefix[:-self._RB_SEGMENT_NAME_LENGTH]):]) + return keys + + except Exception: + _LOGGER.error('Error getting rule based segments names from storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + class PluggableSplitStorageBase(SplitStorage): """InMemory implementation of a feature flag storage.""" @@ -90,7 +328,7 @@ def update(self, to_add, to_delete, new_change_number): :param new_change_number: New change number. :type new_change_number: int """ -# pass + pass # try: # split = self.get(feature_flag_name) # if not split: diff --git a/splitio/storage/redis.py b/splitio/storage/redis.py index 982e0213..60b532e9 100644 --- a/splitio/storage/redis.py +++ b/splitio/storage/redis.py @@ -4,10 +4,10 @@ import threading from splitio.models.impressions import Impression -from splitio.models import splits, segments +from splitio.models import splits, segments, rule_based_segments from splitio.models.telemetry import TelemetryConfig, TelemetryConfigAsync from splitio.storage import SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, \ - ImpressionPipelinedStorage, TelemetryStorage, FlagSetsFilter + ImpressionPipelinedStorage, TelemetryStorage, FlagSetsFilter, RuleBasedSegmentsStorage from splitio.storage.adapters.redis import RedisAdapterException from splitio.storage.adapters.cache_trait import decorate as add_cache, DEFAULT_MAX_AGE from splitio.storage.adapters.cache_trait import LocalMemoryCache, LocalMemoryCacheAsync @@ -16,8 +16,238 @@ _LOGGER = logging.getLogger(__name__) MAX_TAGS = 10 +class RedisRuleBasedSegmentsStorage(RuleBasedSegmentsStorage): + """Redis-based storage for rule based segments.""" + + _RB_SEGMENT_KEY = 'SPLITIO.rbsegment.${segmen_name}' + _RB_SEGMENT_TILL_KEY = 'SPLITIO.rbsegments.till' + + def __init__(self, redis_client): + """ + Class constructor. + + :param redis_client: Redis client or compliant interface. + :type redis_client: splitio.storage.adapters.redis.RedisAdapter + """ + self._redis = redis_client + self._pipe = self._redis.pipeline + + def _get_key(self, segment_name): + """ + Use the provided feature_flag_name to build the appropriate redis key. + + :param feature_flag_name: Name of the feature flag to interact with in redis. + :type feature_flag_name: str + + :return: Redis key. + :rtype: str. + """ + return self._RB_SEGMENT_KEY.format(segment_name=segment_name) + + def get(self, segment_name): + """ + Retrieve a rule based segment. + + :param segment_name: Name of the segment to fetch. + :type segment_name: str + + :rtype: str + """ + try: + raw = self._redis.get(self._get_key(segment_name)) + _LOGGER.debug("Fetchting rule based segment [%s] from redis" % segment_name) + _LOGGER.debug(raw) + return rule_based_segments.from_raw(json.loads(raw)) if raw is not None else None + + except RedisAdapterException: + _LOGGER.error('Error fetching rule based segment from storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + + def update(self, to_add, to_delete, new_change_number): + """ + Update rule based segment.. + + :param to_add: List of rule based segment. to add + :type to_add: list[splitio.models.rule_based_segments.RuleBasedSegment] + :param to_delete: List of rule based segment. to delete + :type to_delete: list[splitio.models.rule_based_segments.RuleBasedSegment] + :param new_change_number: New change number. + :type new_change_number: int + """ + raise NotImplementedError('Only redis-consumer mode is supported.') + + def get_change_number(self): + """ + Retrieve latest rule based segment change number. + + :rtype: int + """ + try: + stored_value = self._redis.get(self._RB_SEGMENT_TILL_KEY) + _LOGGER.debug("Fetching rule based segment Change Number from redis: %s" % stored_value) + return json.loads(stored_value) if stored_value is not None else None + + except RedisAdapterException: + _LOGGER.error('Error fetching rule based segment change number from storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + + def contains(self, segment_names): + """ + Return whether the segments exists in rule based segment in cache. + + :param segment_names: segment name to validate. + :type segment_names: str + + :return: True if segment names exists. False otherwise. + :rtype: bool + """ + return set(segment_names).issubset(self.get_segment_names()) + + def get_segment_names(self): + """ + Retrieve a list of all rule based segments names. + + :return: List of segment names. + :rtype: list(str) + """ + try: + keys = self._redis.keys(self._get_key('*')) + _LOGGER.debug("Fetchting rule based segments names from redis: %s" % keys) + return [key.replace(self._get_key(''), '') for key in keys] + + except RedisAdapterException: + _LOGGER.error('Error fetching rule based segments names from storage') + _LOGGER.debug('Error: ', exc_info=True) + return [] + + def get_large_segment_names(self): + """ + Retrieve a list of all excluded large segments names. + + :return: List of segment names. + :rtype: list(str) + """ + pass + +class RedisRuleBasedSegmentsStorageAsync(RuleBasedSegmentsStorage): + """Redis-based storage for rule based segments.""" + + _RB_SEGMENT_KEY = 'SPLITIO.rbsegment.${segmen_name}' + _RB_SEGMENT_TILL_KEY = 'SPLITIO.rbsegments.till' + + def __init__(self, redis_client): + """ + Class constructor. + + :param redis_client: Redis client or compliant interface. + :type redis_client: splitio.storage.adapters.redis.RedisAdapter + """ + self._redis = redis_client + self._pipe = self._redis.pipeline + + def _get_key(self, segment_name): + """ + Use the provided feature_flag_name to build the appropriate redis key. + + :param feature_flag_name: Name of the feature flag to interact with in redis. + :type feature_flag_name: str + + :return: Redis key. + :rtype: str. + """ + return self._RB_SEGMENT_KEY.format(segment_name=segment_name) + + async def get(self, segment_name): + """ + Retrieve a rule based segment. + + :param segment_name: Name of the segment to fetch. + :type segment_name: str + + :rtype: str + """ + try: + raw = await self._redis.get(self._get_key(segment_name)) + _LOGGER.debug("Fetchting rule based segment [%s] from redis" % segment_name) + _LOGGER.debug(raw) + return rule_based_segments.from_raw(json.loads(raw)) if raw is not None else None + + except RedisAdapterException: + _LOGGER.error('Error fetching rule based segment from storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + + async def update(self, to_add, to_delete, new_change_number): + """ + Update rule based segment.. + + :param to_add: List of rule based segment. to add + :type to_add: list[splitio.models.rule_based_segments.RuleBasedSegment] + :param to_delete: List of rule based segment. to delete + :type to_delete: list[splitio.models.rule_based_segments.RuleBasedSegment] + :param new_change_number: New change number. + :type new_change_number: int + """ + raise NotImplementedError('Only redis-consumer mode is supported.') + + async def get_change_number(self): + """ + Retrieve latest rule based segment change number. + + :rtype: int + """ + try: + stored_value = await self._redis.get(self._RB_SEGMENT_TILL_KEY) + _LOGGER.debug("Fetching rule based segment Change Number from redis: %s" % stored_value) + return json.loads(stored_value) if stored_value is not None else None + + except RedisAdapterException: + _LOGGER.error('Error fetching rule based segment change number from storage') + _LOGGER.debug('Error: ', exc_info=True) + return None + + async def contains(self, segment_names): + """ + Return whether the segments exists in rule based segment in cache. + + :param segment_names: segment name to validate. + :type segment_names: str + + :return: True if segment names exists. False otherwise. + :rtype: bool + """ + return set(segment_names).issubset(await self.get_segment_names()) + + async def get_segment_names(self): + """ + Retrieve a list of all rule based segments names. + + :return: List of segment names. + :rtype: list(str) + """ + try: + keys = await self._redis.keys(self._get_key('*')) + _LOGGER.debug("Fetchting rule based segments names from redis: %s" % keys) + return [key.replace(self._get_key(''), '') for key in keys] + + except RedisAdapterException: + _LOGGER.error('Error fetching rule based segments names from storage') + _LOGGER.debug('Error: ', exc_info=True) + return [] + + async def get_large_segment_names(self): + """ + Retrieve a list of all excluded large segments names. + + :return: List of segment names. + :rtype: list(str) + """ + pass + class RedisSplitStorageBase(SplitStorage): - """Redis-based storage base for s.""" + """Redis-based storage base for feature flags.""" _FEATURE_FLAG_KEY = 'SPLITIO.split.{feature_flag_name}' _FEATURE_FLAG_TILL_KEY = 'SPLITIO.splits.till' diff --git a/splitio/sync/split.py b/splitio/sync/split.py index d0e4690c..4d4d5a5a 100644 --- a/splitio/sync/split.py +++ b/splitio/sync/split.py @@ -456,10 +456,10 @@ def _make_whitelist_condition(whitelist, treatment): 'combiner': 'AND' } } - - def _sanitize_feature_flag(self, parsed): + + def _sanitize_json_elements(self, parsed): """ - implement Sanitization if neded. + Sanitize all json elements. :param parsed: feature flags, till and since elements dict :type parsed: Dict @@ -467,14 +467,14 @@ def _sanitize_feature_flag(self, parsed): :return: sanitized structure dict :rtype: Dict """ - parsed = self._sanitize_json_elements(parsed) - parsed['splits'] = self._sanitize_feature_flag_elements(parsed['splits']) - + parsed = self._satitize_json_section(parsed, 'ff') + parsed = self._satitize_json_section(parsed, 'rbs') + return parsed - def _sanitize_json_elements(self, parsed): + def _satitize_json_section(self, parsed, section_name): """ - Sanitize all json elements. + Sanitize specific json section. :param parsed: feature flags, till and since elements dict :type parsed: Dict @@ -482,15 +482,17 @@ def _sanitize_json_elements(self, parsed): :return: sanitized structure dict :rtype: Dict """ - if 'splits' not in parsed: - parsed['splits'] = [] - if 'till' not in parsed or parsed['till'] is None or parsed['till'] < -1: - parsed['till'] = -1 - if 'since' not in parsed or parsed['since'] is None or parsed['since'] < -1 or parsed['since'] > parsed['till']: - parsed['since'] = parsed['till'] + if section_name not in parsed: + parsed['ff'] = {"t": -1, "s": -1, "d": []} + if 'd' not in parsed[section_name]: + parsed[section_name]['d'] = [] + if 't' not in parsed[section_name] or parsed[section_name]['t'] is None or parsed[section_name]['t'] < -1: + parsed[section_name]['t'] = -1 + if 's' not in parsed[section_name] or parsed[section_name]['s'] is None or parsed[section_name]['s'] < -1 or parsed[section_name]['s'] > parsed[section_name]['t']: + parsed[section_name]['s'] = parsed[section_name]['t'] return parsed - + def _sanitize_feature_flag_elements(self, parsed_feature_flags): """ Sanitize all feature flags elements. @@ -523,6 +525,29 @@ def _sanitize_feature_flag_elements(self, parsed_feature_flags): sanitized_feature_flags.append(feature_flag) return sanitized_feature_flags + def _sanitize_rb_segment_elements(self, parsed_rb_segments): + """ + Sanitize all rule based segments elements. + + :param parsed_rb_segments: rule based segments array + :type parsed_rb_segments: [Dict] + + :return: sanitized structure dict + :rtype: [Dict] + """ + sanitized_rb_segments = [] + for rb_segment in parsed_rb_segments: + if 'name' not in rb_segment or rb_segment['name'].strip() == '': + _LOGGER.warning("A rule based segment in json file does not have (Name) or property is empty, skipping.") + continue + for element in [('trafficTypeName', 'user', None, None, None, None), + ('status', 'ACTIVE', None, None, ['ACTIVE', 'ARCHIVED'], None), + ('changeNumber', 0, 0, None, None, None)]: + rb_segment = util._sanitize_object_element(rb_segment, 'rule based segment', element[0], element[1], lower_value=element[2], upper_value=element[3], in_list=element[4], not_in_list=element[5]) + rb_segment = self._sanitize_condition(rb_segment) + sanitized_rb_segments.append(rb_segment) + return sanitized_rb_segments + def _sanitize_condition(self, feature_flag): """ Sanitize feature flag and ensure a condition type ROLLOUT and matcher exist with ALL_KEYS elements. @@ -601,7 +626,7 @@ def _convert_yaml_to_feature_flag(cls, parsed): class LocalSplitSynchronizer(LocalSplitSynchronizerBase): """Localhost mode feature_flag synchronizer.""" - def __init__(self, filename, feature_flag_storage, localhost_mode=LocalhostMode.LEGACY): + def __init__(self, filename, feature_flag_storage, rule_based_segment_storage, localhost_mode=LocalhostMode.LEGACY): """ Class constructor. @@ -614,6 +639,7 @@ def __init__(self, filename, feature_flag_storage, localhost_mode=LocalhostMode. """ self._filename = filename self._feature_flag_storage = feature_flag_storage + self._rule_based_segment_storage = rule_based_segment_storage self._localhost_mode = localhost_mode self._current_json_sha = "-1" @@ -706,18 +732,23 @@ def _synchronize_json(self): :rtype: [str] """ try: - fetched, till = self._read_feature_flags_from_json_file(self._filename) + parsed = self._read_feature_flags_from_json_file(self._filename) segment_list = set() - fecthed_sha = util._get_sha(json.dumps(fetched)) + fecthed_sha = util._get_sha(json.dumps(parsed)) if fecthed_sha == self._current_json_sha: return [] self._current_json_sha = fecthed_sha - if self._feature_flag_storage.get_change_number() > till and till != self._DEFAULT_FEATURE_FLAG_TILL: + if self._feature_flag_storage.get_change_number() > parsed['ff']['t'] and parsed['ff']['t'] != self._DEFAULT_FEATURE_FLAG_TILL: return [] - fetched_feature_flags = [splits.from_raw(feature_flag) for feature_flag in fetched] - segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, till) + fetched_feature_flags = [splits.from_raw(feature_flag) for feature_flag in parsed['ff']['d']] + segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, parsed['ff']['t']) + + if self._rule_based_segment_storage.get_change_number() <= parsed['rbs']['t'] or parsed['rbs']['t'] == self._DEFAULT_FEATURE_FLAG_TILL: + fetched_rb_segments = [rule_based_segments.from_raw(rb_segment) for rb_segment in parsed['rbs']['d']] + segment_list.update(update_rule_based_segment_storage(self._rule_based_segment_storage, fetched_rb_segments, parsed['rbs']['t'])) + return segment_list except Exception as exc: @@ -737,8 +768,11 @@ def _read_feature_flags_from_json_file(self, filename): try: with open(filename, 'r') as flo: parsed = json.load(flo) - santitized = self._sanitize_feature_flag(parsed) - return santitized['splits'], santitized['till'] + santitized = self._sanitize_json_elements(parsed) + santitized['ff'] = self._sanitize_feature_flag_elements(santitized['ff']) + santitized['rbs'] = self._sanitize_rb_segment_elements(santitized['rbs']) + return santitized + except Exception as exc: _LOGGER.debug('Exception: ', exc_info=True) raise ValueError("Error parsing file %s. Make sure it's readable." % filename) from exc @@ -747,7 +781,7 @@ def _read_feature_flags_from_json_file(self, filename): class LocalSplitSynchronizerAsync(LocalSplitSynchronizerBase): """Localhost mode async feature_flag synchronizer.""" - def __init__(self, filename, feature_flag_storage, localhost_mode=LocalhostMode.LEGACY): + def __init__(self, filename, feature_flag_storage, rule_based_segment_storage, localhost_mode=LocalhostMode.LEGACY): """ Class constructor. @@ -760,6 +794,7 @@ def __init__(self, filename, feature_flag_storage, localhost_mode=LocalhostMode. """ self._filename = filename self._feature_flag_storage = feature_flag_storage + self._rule_based_segment_storage = rule_based_segment_storage self._localhost_mode = localhost_mode self._current_json_sha = "-1" @@ -853,18 +888,23 @@ async def _synchronize_json(self): :rtype: [str] """ try: - fetched, till = await self._read_feature_flags_from_json_file(self._filename) + parsed = await self._read_feature_flags_from_json_file(self._filename) segment_list = set() - fecthed_sha = util._get_sha(json.dumps(fetched)) + fecthed_sha = util._get_sha(json.dumps(parsed)) if fecthed_sha == self._current_json_sha: return [] self._current_json_sha = fecthed_sha - if await self._feature_flag_storage.get_change_number() > till and till != self._DEFAULT_FEATURE_FLAG_TILL: + if await self._feature_flag_storage.get_change_number() > parsed['ff']['t'] and parsed['ff']['t'] != self._DEFAULT_FEATURE_FLAG_TILL: return [] - fetched_feature_flags = [splits.from_raw(feature_flag) for feature_flag in fetched] - segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, till) + fetched_feature_flags = [splits.from_raw(feature_flag) for feature_flag in parsed['ff']['d']] + segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, parsed['ff']['t']) + + if await self._rule_based_segment_storage.get_change_number() <= parsed['rbs']['t'] or parsed['rbs']['t'] == self._DEFAULT_FEATURE_FLAG_TILL: + fetched_rb_segments = [rule_based_segments.from_raw(rb_segment) for rb_segment in parsed['rbs']['d']] + segment_list.update(await update_rule_based_segment_storage(self._rule_based_segment_storage, fetched_rb_segments, parsed['rbs']['t'])) + return segment_list except Exception as exc: @@ -884,8 +924,10 @@ async def _read_feature_flags_from_json_file(self, filename): try: async with aiofiles.open(filename, 'r') as flo: parsed = json.loads(await flo.read()) - santitized = self._sanitize_feature_flag(parsed) - return santitized['splits'], santitized['till'] + santitized = self._sanitize_json_elements(parsed) + santitized['ff'] = self._sanitize_feature_flag_elements(santitized['ff']) + santitized['rbs'] = self._sanitize_rb_segment_elements(santitized['rbs']) + return santitized except Exception as exc: _LOGGER.debug('Exception: ', exc_info=True) raise ValueError("Error parsing file %s. Make sure it's readable." % filename) from exc From 4d8327c84cdb0036899fa7f1639a7980b79ae7de Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Thu, 13 Mar 2025 15:05:04 -0300 Subject: [PATCH 2/5] Updated redis, pluggable and localjson storages --- splitio/models/rule_based_segments.py | 30 ++- splitio/storage/pluggable.py | 20 +- splitio/storage/redis.py | 4 +- splitio/sync/split.py | 20 +- tests/integration/__init__.py | 4 + tests/storage/test_pluggable.py | 128 +++++++++++- tests/storage/test_redis.py | 163 ++++++++++++++- tests/sync/test_splits_synchronizer.py | 274 +++++++++++++------------ tests/sync/test_synchronizer.py | 2 - 9 files changed, 482 insertions(+), 163 deletions(-) diff --git a/splitio/models/rule_based_segments.py b/splitio/models/rule_based_segments.py index 4ff548b2..66ec7ddf 100644 --- a/splitio/models/rule_based_segments.py +++ b/splitio/models/rule_based_segments.py @@ -11,14 +11,14 @@ class RuleBasedSegment(object): """RuleBasedSegment object class.""" - def __init__(self, name, traffic_yype_Name, change_number, status, conditions, excluded): + def __init__(self, name, traffic_type_name, change_number, status, conditions, excluded): """ Class constructor. :param name: Segment name. :type name: str - :param traffic_yype_Name: traffic type name. - :type traffic_yype_Name: str + :param traffic_type_name: traffic type name. + :type traffic_type_name: str :param change_number: change number. :type change_number: str :param status: status. @@ -29,7 +29,7 @@ def __init__(self, name, traffic_yype_Name, change_number, status, conditions, e :type excluded: Excluded """ self._name = name - self._traffic_yype_Name = traffic_yype_Name + self._traffic_type_name = traffic_type_name self._change_number = change_number self._status = status self._conditions = conditions @@ -41,9 +41,9 @@ def name(self): return self._name @property - def traffic_yype_Name(self): + def traffic_type_name(self): """Return traffic type name.""" - return self._traffic_yype_Name + return self._traffic_type_name @property def change_number(self): @@ -65,6 +65,17 @@ def excluded(self): """Return excluded.""" return self._excluded + def to_json(self): + """Return a JSON representation of this rule based segment.""" + return { + 'changeNumber': self.change_number, + 'trafficTypeName': self.traffic_type_name, + 'name': self.name, + 'status': self.status, + 'conditions': [c.to_json() for c in self.conditions], + 'excluded': self.excluded.to_json() + } + def from_raw(raw_rule_based_segment): """ Parse a Rule based segment from a JSON portion of splitChanges. @@ -111,3 +122,10 @@ def get_excluded_keys(self): def get_excluded_segments(self): """Return excluded segments""" return self._segments + + def to_json(self): + """Return a JSON representation of this object.""" + return { + 'keys': self._keys, + 'segments': self._segments + } diff --git a/splitio/storage/pluggable.py b/splitio/storage/pluggable.py index 1cb7e054..66fad1e5 100644 --- a/splitio/storage/pluggable.py +++ b/splitio/storage/pluggable.py @@ -17,7 +17,6 @@ class PluggableRuleBasedSegmentsStorageBase(RuleBasedSegmentsStorage): """RedPluggable storage for rule based segments.""" - _RB_SEGMENT_NAME_LENGTH = 23 _TILL_LENGTH = 4 def __init__(self, pluggable_adapter, prefix=None): @@ -28,9 +27,11 @@ def __init__(self, pluggable_adapter, prefix=None): :type redis_client: splitio.storage.adapters.redis.RedisAdapter """ self._pluggable_adapter = pluggable_adapter - self._prefix = "SPLITIO.rbsegment.${segmen_name}" + self._prefix = "SPLITIO.rbsegment.{segment_name}" self._rb_segments_till_prefix = "SPLITIO.rbsegments.till" + self._rb_segment_name_length = 18 if prefix is not None: + self._rb_segment_name_length += len(prefix) + 1 self._prefix = prefix + "." + self._prefix self._rb_segments_till_prefix = prefix + "." + self._rb_segments_till_prefix @@ -163,10 +164,13 @@ def get_segment_names(self): :rtype: list(str) """ try: + _LOGGER.error(self._rb_segment_name_length) + _LOGGER.error(self._prefix) + _LOGGER.error(self._prefix[:self._rb_segment_name_length]) keys = [] - for key in self._pluggable_adapter.get_keys_by_prefix(self._prefix[:-self._RB_SEGMENT_NAME_LENGTH]): + for key in self._pluggable_adapter.get_keys_by_prefix(self._prefix[:self._rb_segment_name_length]): if key[-self._TILL_LENGTH:] != 'till': - keys.append(key[len(self._prefix[:-self._RB_SEGMENT_NAME_LENGTH]):]) + keys.append(key[len(self._prefix[:self._rb_segment_name_length]):]) return keys except Exception: @@ -174,7 +178,7 @@ def get_segment_names(self): _LOGGER.debug('Error: ', exc_info=True) return None -class PluggableRuleBasedSegmentsStorageAsync(RuleBasedSegmentsStorage): +class PluggableRuleBasedSegmentsStorageAsync(PluggableRuleBasedSegmentsStorageBase): """RedPluggable storage for rule based segments.""" def __init__(self, pluggable_adapter, prefix=None): @@ -231,7 +235,7 @@ async def contains(self, segment_names): :return: True if segment names exists. False otherwise. :rtype: bool """ - return await set(segment_names).issubset(self.get_segment_names()) + return set(segment_names).issubset(await self.get_segment_names()) async def get_segment_names(self): """ @@ -242,9 +246,9 @@ async def get_segment_names(self): """ try: keys = [] - for key in await self._pluggable_adapter.get_keys_by_prefix(self._prefix[:-self._RB_SEGMENT_NAME_LENGTH]): + for key in await self._pluggable_adapter.get_keys_by_prefix(self._prefix[:self._rb_segment_name_length]): if key[-self._TILL_LENGTH:] != 'till': - keys.append(key[len(self._prefix[:-self._RB_SEGMENT_NAME_LENGTH]):]) + keys.append(key[len(self._prefix[:self._rb_segment_name_length]):]) return keys except Exception: diff --git a/splitio/storage/redis.py b/splitio/storage/redis.py index 60b532e9..e5398cf7 100644 --- a/splitio/storage/redis.py +++ b/splitio/storage/redis.py @@ -19,7 +19,7 @@ class RedisRuleBasedSegmentsStorage(RuleBasedSegmentsStorage): """Redis-based storage for rule based segments.""" - _RB_SEGMENT_KEY = 'SPLITIO.rbsegment.${segmen_name}' + _RB_SEGMENT_KEY = 'SPLITIO.rbsegment.{segment_name}' _RB_SEGMENT_TILL_KEY = 'SPLITIO.rbsegments.till' def __init__(self, redis_client): @@ -134,7 +134,7 @@ def get_large_segment_names(self): class RedisRuleBasedSegmentsStorageAsync(RuleBasedSegmentsStorage): """Redis-based storage for rule based segments.""" - _RB_SEGMENT_KEY = 'SPLITIO.rbsegment.${segmen_name}' + _RB_SEGMENT_KEY = 'SPLITIO.rbsegment.{segment_name}' _RB_SEGMENT_TILL_KEY = 'SPLITIO.rbsegments.till' def __init__(self, redis_client): diff --git a/splitio/sync/split.py b/splitio/sync/split.py index 4d4d5a5a..58ea900a 100644 --- a/splitio/sync/split.py +++ b/splitio/sync/split.py @@ -545,6 +545,7 @@ def _sanitize_rb_segment_elements(self, parsed_rb_segments): ('changeNumber', 0, 0, None, None, None)]: rb_segment = util._sanitize_object_element(rb_segment, 'rule based segment', element[0], element[1], lower_value=element[2], upper_value=element[3], in_list=element[4], not_in_list=element[5]) rb_segment = self._sanitize_condition(rb_segment) + rb_segment = self._remove_partition(rb_segment) sanitized_rb_segments.append(rb_segment) return sanitized_rb_segments @@ -599,6 +600,15 @@ def _sanitize_condition(self, feature_flag): }) return feature_flag + + def _remove_partition(self, rb_segment): + sanitized = [] + for condition in rb_segment['conditions']: + if 'partition' in condition: + del condition['partition'] + sanitized.append(condition) + rb_segment['conditions'] = sanitized + return rb_segment @classmethod def _convert_yaml_to_feature_flag(cls, parsed): @@ -769,8 +779,8 @@ def _read_feature_flags_from_json_file(self, filename): with open(filename, 'r') as flo: parsed = json.load(flo) santitized = self._sanitize_json_elements(parsed) - santitized['ff'] = self._sanitize_feature_flag_elements(santitized['ff']) - santitized['rbs'] = self._sanitize_rb_segment_elements(santitized['rbs']) + santitized['ff']['d'] = self._sanitize_feature_flag_elements(santitized['ff']['d']) + santitized['rbs']['d'] = self._sanitize_rb_segment_elements(santitized['rbs']['d']) return santitized except Exception as exc: @@ -903,7 +913,7 @@ async def _synchronize_json(self): if await self._rule_based_segment_storage.get_change_number() <= parsed['rbs']['t'] or parsed['rbs']['t'] == self._DEFAULT_FEATURE_FLAG_TILL: fetched_rb_segments = [rule_based_segments.from_raw(rb_segment) for rb_segment in parsed['rbs']['d']] - segment_list.update(await update_rule_based_segment_storage(self._rule_based_segment_storage, fetched_rb_segments, parsed['rbs']['t'])) + segment_list.update(await update_rule_based_segment_storage_async(self._rule_based_segment_storage, fetched_rb_segments, parsed['rbs']['t'])) return segment_list @@ -925,8 +935,8 @@ async def _read_feature_flags_from_json_file(self, filename): async with aiofiles.open(filename, 'r') as flo: parsed = json.loads(await flo.read()) santitized = self._sanitize_json_elements(parsed) - santitized['ff'] = self._sanitize_feature_flag_elements(santitized['ff']) - santitized['rbs'] = self._sanitize_rb_segment_elements(santitized['rbs']) + santitized['ff']['d'] = self._sanitize_feature_flag_elements(santitized['ff']['d']) + santitized['rbs']['d'] = self._sanitize_rb_segment_elements(santitized['rbs']['d']) return santitized except Exception as exc: _LOGGER.debug('Exception: ', exc_info=True) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index ee2475df..ab6e3293 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -47,3 +47,7 @@ "splitChange6_2": split62, "splitChange6_3": split63, } + +rbsegments_json = { + "segment1": {"changeNumber": 12, "name": "some_segment", "status": "ACTIVE","trafficTypeName": "user","excluded":{"keys":[],"segments":[]},"conditions": []} +} \ No newline at end of file diff --git a/tests/storage/test_pluggable.py b/tests/storage/test_pluggable.py index 439049e5..953a4510 100644 --- a/tests/storage/test_pluggable.py +++ b/tests/storage/test_pluggable.py @@ -1,20 +1,21 @@ """Pluggable storage test module.""" import json import threading +import copy import pytest from splitio.optional.loaders import asyncio from splitio.models.splits import Split -from splitio.models import splits, segments +from splitio.models import splits, segments, rule_based_segments from splitio.models.segments import Segment from splitio.models.impressions import Impression from splitio.models.events import Event, EventWrapper from splitio.storage.pluggable import PluggableSplitStorage, PluggableSegmentStorage, PluggableImpressionsStorage, PluggableEventsStorage, \ PluggableTelemetryStorage, PluggableEventsStorageAsync, PluggableSegmentStorageAsync, PluggableImpressionsStorageAsync,\ - PluggableSplitStorageAsync, PluggableTelemetryStorageAsync + PluggableSplitStorageAsync, PluggableTelemetryStorageAsync, PluggableRuleBasedSegmentsStorage, PluggableRuleBasedSegmentsStorageAsync from splitio.client.util import get_metadata, SdkMetadata from splitio.models.telemetry import MAX_TAGS, MethodExceptionsAndLatencies, OperationMode -from tests.integration import splits_json +from tests.integration import splits_json, rbsegments_json class StorageMockAdapter(object): def __init__(self): @@ -1372,3 +1373,124 @@ async def test_push_config_stats(self): await pluggable_telemetry_storage.record_active_and_redundant_factories(2, 1) await pluggable_telemetry_storage.push_config_stats() assert(self.mock_adapter._keys[pluggable_telemetry_storage._telemetry_config_key + "::" + pluggable_telemetry_storage._sdk_metadata] == '{"aF": 2, "rF": 1, "sT": "memory", "oM": 0, "t": []}') + +class PluggableRuleBasedSegmentStorageTests(object): + """In memory rule based segment storage test cases.""" + + def setup_method(self): + """Prepare storages with test data.""" + self.mock_adapter = StorageMockAdapter() + + def test_get(self): + self.mock_adapter._keys = {} + for sprefix in [None, 'myprefix']: + pluggable_rbs_storage = PluggableRuleBasedSegmentsStorage(self.mock_adapter, prefix=sprefix) + + rbs1 = rule_based_segments.from_raw(rbsegments_json['segment1']) + rbs_name = rbsegments_json['segment1']['name'] + + self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs_name), rbs1.to_json()) + assert(pluggable_rbs_storage.get(rbs_name).to_json() == rule_based_segments.from_raw(rbsegments_json['segment1']).to_json()) + assert(pluggable_rbs_storage.get('not_existing') == None) + + def test_get_change_number(self): + self.mock_adapter._keys = {} + for sprefix in [None, 'myprefix']: + pluggable_rbs_storage = PluggableRuleBasedSegmentsStorage(self.mock_adapter, prefix=sprefix) + if sprefix == 'myprefix': + prefix = 'myprefix.' + else: + prefix = '' + self.mock_adapter.set(prefix + "SPLITIO.rbsegments.till", 1234) + assert(pluggable_rbs_storage.get_change_number() == 1234) + + def test_get_segment_names(self): + self.mock_adapter._keys = {} + for sprefix in [None, 'myprefix']: + pluggable_rbs_storage = PluggableRuleBasedSegmentsStorage(self.mock_adapter, prefix=sprefix) + rbs1 = rule_based_segments.from_raw(rbsegments_json['segment1']) + rbs2_temp = copy.deepcopy(rbsegments_json['segment1']) + rbs2_temp['name'] = 'another_segment' + rbs2 = rule_based_segments.from_raw(rbs2_temp) + self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs1.name), rbs1.to_json()) + self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs2.name), rbs2.to_json()) + assert(pluggable_rbs_storage.get_segment_names() == [rbs1.name, rbs2.name]) + + def test_contains(self): + self.mock_adapter._keys = {} + for sprefix in [None, 'myprefix']: + pluggable_rbs_storage = PluggableRuleBasedSegmentsStorage(self.mock_adapter, prefix=sprefix) + rbs1 = rule_based_segments.from_raw(rbsegments_json['segment1']) + rbs2_temp = copy.deepcopy(rbsegments_json['segment1']) + rbs2_temp['name'] = 'another_segment' + rbs2 = rule_based_segments.from_raw(rbs2_temp) + self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs1.name), rbs1.to_json()) + self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs2.name), rbs2.to_json()) + + assert(pluggable_rbs_storage.contains([rbs1.name, rbs2.name])) + assert(pluggable_rbs_storage.contains([rbs2.name])) + assert(not pluggable_rbs_storage.contains(['none-exists', rbs2.name])) + assert(not pluggable_rbs_storage.contains(['none-exists', 'none-exists2'])) + +class PluggableRuleBasedSegmentStorageAsyncTests(object): + """In memory rule based segment storage test cases.""" + + def setup_method(self): + """Prepare storages with test data.""" + self.mock_adapter = StorageMockAdapterAsync() + + @pytest.mark.asyncio + async def test_get(self): + self.mock_adapter._keys = {} + for sprefix in [None, 'myprefix']: + pluggable_rbs_storage = PluggableRuleBasedSegmentsStorageAsync(self.mock_adapter, prefix=sprefix) + + rbs1 = rule_based_segments.from_raw(rbsegments_json['segment1']) + rbs_name = rbsegments_json['segment1']['name'] + + await self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs_name), rbs1.to_json()) + rbs = await pluggable_rbs_storage.get(rbs_name) + assert(rbs.to_json() == rule_based_segments.from_raw(rbsegments_json['segment1']).to_json()) + assert(await pluggable_rbs_storage.get('not_existing') == None) + + @pytest.mark.asyncio + async def test_get_change_number(self): + self.mock_adapter._keys = {} + for sprefix in [None, 'myprefix']: + pluggable_rbs_storage = PluggableRuleBasedSegmentsStorageAsync(self.mock_adapter, prefix=sprefix) + if sprefix == 'myprefix': + prefix = 'myprefix.' + else: + prefix = '' + await self.mock_adapter.set(prefix + "SPLITIO.rbsegments.till", 1234) + assert(await pluggable_rbs_storage.get_change_number() == 1234) + + @pytest.mark.asyncio + async def test_get_segment_names(self): + self.mock_adapter._keys = {} + for sprefix in [None, 'myprefix']: + pluggable_rbs_storage = PluggableRuleBasedSegmentsStorageAsync(self.mock_adapter, prefix=sprefix) + rbs1 = rule_based_segments.from_raw(rbsegments_json['segment1']) + rbs2_temp = copy.deepcopy(rbsegments_json['segment1']) + rbs2_temp['name'] = 'another_segment' + rbs2 = rule_based_segments.from_raw(rbs2_temp) + await self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs1.name), rbs1.to_json()) + await self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs2.name), rbs2.to_json()) + assert(await pluggable_rbs_storage.get_segment_names() == [rbs1.name, rbs2.name]) + + @pytest.mark.asyncio + async def test_contains(self): + self.mock_adapter._keys = {} + for sprefix in [None, 'myprefix']: + pluggable_rbs_storage = PluggableRuleBasedSegmentsStorageAsync(self.mock_adapter, prefix=sprefix) + rbs1 = rule_based_segments.from_raw(rbsegments_json['segment1']) + rbs2_temp = copy.deepcopy(rbsegments_json['segment1']) + rbs2_temp['name'] = 'another_segment' + rbs2 = rule_based_segments.from_raw(rbs2_temp) + await self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs1.name), rbs1.to_json()) + await self.mock_adapter.set(pluggable_rbs_storage._prefix.format(segment_name=rbs2.name), rbs2.to_json()) + + assert(await pluggable_rbs_storage.contains([rbs1.name, rbs2.name])) + assert(await pluggable_rbs_storage.contains([rbs2.name])) + assert(not await pluggable_rbs_storage.contains(['none-exists', rbs2.name])) + assert(not await pluggable_rbs_storage.contains(['none-exists', 'none-exists2'])) diff --git a/tests/storage/test_redis.py b/tests/storage/test_redis.py index cce9a43d..04ddfc60 100644 --- a/tests/storage/test_redis.py +++ b/tests/storage/test_redis.py @@ -12,7 +12,8 @@ from splitio.optional.loaders import asyncio from splitio.storage import FlagSetsFilter from splitio.storage.redis import RedisEventsStorage, RedisEventsStorageAsync, RedisImpressionsStorage, RedisImpressionsStorageAsync, \ - RedisSegmentStorage, RedisSegmentStorageAsync, RedisSplitStorage, RedisSplitStorageAsync, RedisTelemetryStorage, RedisTelemetryStorageAsync + RedisSegmentStorage, RedisSegmentStorageAsync, RedisSplitStorage, RedisSplitStorageAsync, RedisTelemetryStorage, RedisTelemetryStorageAsync, \ + RedisRuleBasedSegmentsStorage, RedisRuleBasedSegmentsStorageAsync from splitio.storage.adapters.redis import RedisAdapter, RedisAdapterException, build from redis.asyncio.client import Redis as aioredis from splitio.storage.adapters import redis @@ -1230,3 +1231,163 @@ async def expire(*args): await redis_telemetry.expire_keys('key', 12, 2, 2) assert(self.called) + +class RedisRuleBasedSegmentStorageTests(object): + """Redis rule based segment storage test cases.""" + + def test_get_segment(self, mocker): + """Test retrieving a rule based segment works.""" + adapter = mocker.Mock(spec=RedisAdapter) + adapter.get.return_value = '{"name": "some_segment"}' + from_raw = mocker.Mock() + mocker.patch('splitio.storage.redis.rule_based_segments.from_raw', new=from_raw) + + storage = RedisRuleBasedSegmentsStorage(adapter) + storage.get('some_segment') + + assert adapter.get.mock_calls == [mocker.call('SPLITIO.rbsegment.some_segment')] + assert from_raw.mock_calls == [mocker.call({"name": "some_segment"})] + + # Test that a missing split returns None and doesn't call from_raw + adapter.reset_mock() + from_raw.reset_mock() + adapter.get.return_value = None + result = storage.get('some_segment') + assert result is None + assert adapter.get.mock_calls == [mocker.call('SPLITIO.rbsegment.some_segment')] + assert not from_raw.mock_calls + + def test_get_changenumber(self, mocker): + """Test fetching changenumber.""" + adapter = mocker.Mock(spec=RedisAdapter) + storage = RedisRuleBasedSegmentsStorage(adapter) + adapter.get.return_value = '-1' + assert storage.get_change_number() == -1 + assert adapter.get.mock_calls == [mocker.call('SPLITIO.rbsegments.till')] + + def test_get_segment_names(self, mocker): + """Test getching rule based segment names.""" + adapter = mocker.Mock(spec=RedisAdapter) + storage = RedisRuleBasedSegmentsStorage(adapter) + adapter.keys.return_value = [ + 'SPLITIO.rbsegment.segment1', + 'SPLITIO.rbsegment.segment2', + 'SPLITIO.rbsegment.segment3' + ] + assert storage.get_segment_names() == ['segment1', 'segment2', 'segment3'] + + def test_contains(self, mocker): + """Test storage containing rule based segment names.""" + adapter = mocker.Mock(spec=RedisAdapter) + storage = RedisRuleBasedSegmentsStorage(adapter) + adapter.keys.return_value = [ + 'SPLITIO.rbsegment.segment1', + 'SPLITIO.rbsegment.segment2', + 'SPLITIO.rbsegment.segment3' + ] + assert storage.contains(['segment1', 'segment3']) + assert not storage.contains(['segment1', 'segment4']) + assert storage.contains(['segment1']) + assert not storage.contains(['segment4', 'segment5']) + +class RedisRuleBasedSegmentStorageAsyncTests(object): + """Redis rule based segment storage test cases.""" + + @pytest.mark.asyncio + async def test_get_segment(self, mocker): + """Test retrieving a rule based segment works.""" + redis_mock = await aioredis.from_url("redis://localhost") + adapter = redis.RedisAdapterAsync(redis_mock, 'some_prefix') + + self.redis_ret = None + self.name = None + async def get(sel, name): + self.name = name + self.redis_ret = '{"changeNumber": "12", "name": "some_segment", "status": "ACTIVE","trafficTypeName": "user","excluded":{"keys":[],"segments":[]},"conditions": []}' + return self.redis_ret + mocker.patch('splitio.storage.adapters.redis.RedisAdapterAsync.get', new=get) + + storage = RedisRuleBasedSegmentsStorageAsync(adapter) + await storage.get('some_segment') + + assert self.name == 'SPLITIO.rbsegment.some_segment' + assert self.redis_ret == '{"changeNumber": "12", "name": "some_segment", "status": "ACTIVE","trafficTypeName": "user","excluded":{"keys":[],"segments":[]},"conditions": []}' + + # Test that a missing split returns None and doesn't call from_raw + + self.name = None + async def get2(sel, name): + self.name = name + return None + mocker.patch('splitio.storage.adapters.redis.RedisAdapterAsync.get', new=get2) + + result = await storage.get('some_segment') + assert result is None + assert self.name == 'SPLITIO.rbsegment.some_segment' + + # Test that a missing split returns None and doesn't call from_raw + result = await storage.get('some_segment2') + assert result is None + + @pytest.mark.asyncio + async def test_get_changenumber(self, mocker): + """Test fetching changenumber.""" + redis_mock = await aioredis.from_url("redis://localhost") + adapter = redis.RedisAdapterAsync(redis_mock, 'some_prefix') + storage = RedisRuleBasedSegmentsStorageAsync(adapter) + + self.redis_ret = None + self.name = None + async def get(sel, name): + self.name = name + self.redis_ret = '-1' + return self.redis_ret + mocker.patch('splitio.storage.adapters.redis.RedisAdapterAsync.get', new=get) + + assert await storage.get_change_number() == -1 + assert self.name == 'SPLITIO.rbsegments.till' + + @pytest.mark.asyncio + async def test_get_segment_names(self, mocker): + """Test getching rule based segment names.""" + redis_mock = await aioredis.from_url("redis://localhost") + adapter = redis.RedisAdapterAsync(redis_mock, 'some_prefix') + storage = RedisRuleBasedSegmentsStorageAsync(adapter) + + self.key = None + self.keys_ret = None + async def keys(sel, key): + self.key = key + self.keys_ret = [ + 'SPLITIO.rbsegment.segment1', + 'SPLITIO.rbsegment.segment2', + 'SPLITIO.rbsegment.segment3' + ] + return self.keys_ret + mocker.patch('splitio.storage.adapters.redis.RedisAdapterAsync.keys', new=keys) + + assert await storage.get_segment_names() == ['segment1', 'segment2', 'segment3'] + + @pytest.mark.asyncio + async def test_contains(self, mocker): + """Test storage containing rule based segment names.""" + redis_mock = await aioredis.from_url("redis://localhost") + adapter = redis.RedisAdapterAsync(redis_mock, 'some_prefix') + storage = RedisRuleBasedSegmentsStorageAsync(adapter) + + self.key = None + self.keys_ret = None + async def keys(sel, key): + self.key = key + self.keys_ret = [ + 'SPLITIO.rbsegment.segment1', + 'SPLITIO.rbsegment.segment2', + 'SPLITIO.rbsegment.segment3' + ] + return self.keys_ret + mocker.patch('splitio.storage.adapters.redis.RedisAdapterAsync.keys', new=keys) + + assert await storage.contains(['segment1', 'segment3']) + assert not await storage.contains(['segment1', 'segment4']) + assert await storage.contains(['segment1']) + assert not await storage.contains(['segment4', 'segment5']) diff --git a/tests/sync/test_splits_synchronizer.py b/tests/sync/test_splits_synchronizer.py index 2acf293f..ce1ade7e 100644 --- a/tests/sync/test_splits_synchronizer.py +++ b/tests/sync/test_splits_synchronizer.py @@ -8,14 +8,14 @@ from splitio.util.backoff import Backoff from splitio.api import APIException from splitio.api.commons import FetchOptions -from splitio.storage import SplitStorage +from splitio.storage import SplitStorage, RuleBasedSegmentsStorage from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySplitStorageAsync, InMemoryRuleBasedSegmentStorage, InMemoryRuleBasedSegmentStorageAsync from splitio.storage import FlagSetsFilter from splitio.models.splits import Split from splitio.models.rule_based_segments import RuleBasedSegment from splitio.sync.split import SplitSynchronizer, SplitSynchronizerAsync, LocalSplitSynchronizer, LocalSplitSynchronizerAsync, LocalhostMode from splitio.optional.loaders import aiofiles, asyncio -from tests.integration import splits_json +from tests.integration import splits_json, rbsegments_json splits_raw = [{ 'changeNumber': 123, @@ -861,12 +861,13 @@ async def get_changes(*args, **kwargs): class LocalSplitsSynchronizerTests(object): """Split synchronizer test cases.""" - splits = copy.deepcopy(splits_raw) + payload = copy.deepcopy(json_body) def test_synchronize_splits_error(self, mocker): """Test that if fetching splits fails at some_point, the task will continue running.""" storage = mocker.Mock(spec=SplitStorage) - split_synchronizer = LocalSplitSynchronizer("/incorrect_file", storage) + rbs_storage = mocker.Mock(spec=RuleBasedSegmentsStorage) + split_synchronizer = LocalSplitSynchronizer("/incorrect_file", storage, rbs_storage) with pytest.raises(Exception): split_synchronizer.synchronize_splits(1) @@ -874,74 +875,75 @@ def test_synchronize_splits_error(self, mocker): def test_synchronize_splits(self, mocker): """Test split sync.""" storage = InMemorySplitStorage() + rbs_storage = InMemoryRuleBasedSegmentStorage() - till = 123 def read_splits_from_json_file(*args, **kwargs): - return self.splits, till + return self.payload - split_synchronizer = LocalSplitSynchronizer("split.json", storage, LocalhostMode.JSON) + split_synchronizer = LocalSplitSynchronizer("split.json", storage, rbs_storage, LocalhostMode.JSON) split_synchronizer._read_feature_flags_from_json_file = read_splits_from_json_file split_synchronizer.synchronize_splits() - inserted_split = storage.get(self.splits[0]['name']) + inserted_split = storage.get(self.payload["ff"]["d"][0]['name']) assert isinstance(inserted_split, Split) assert inserted_split.name == 'some_name' # Should sync when changenumber is not changed - self.splits[0]['killed'] = True + self.payload["ff"]["d"][0]['killed'] = True split_synchronizer.synchronize_splits() - inserted_split = storage.get(self.splits[0]['name']) + inserted_split = storage.get(self.payload["ff"]["d"][0]['name']) assert inserted_split.killed # Should not sync when changenumber is less than stored - till = 122 - self.splits[0]['killed'] = False + self.payload["ff"]["t"] = 122 + self.payload["ff"]["d"][0]['killed'] = False split_synchronizer.synchronize_splits() - inserted_split = storage.get(self.splits[0]['name']) + inserted_split = storage.get(self.payload["ff"]["d"][0]['name']) assert inserted_split.killed # Should sync when changenumber is higher than stored - till = 124 + self.payload["ff"]["t"] = 1675095324999 split_synchronizer._current_json_sha = "-1" split_synchronizer.synchronize_splits() - inserted_split = storage.get(self.splits[0]['name']) + inserted_split = storage.get(self.payload["ff"]["d"][0]['name']) assert inserted_split.killed == False # Should sync when till is default (-1) - till = -1 + self.payload["ff"]["t"] = -1 split_synchronizer._current_json_sha = "-1" - self.splits[0]['killed'] = True + self.payload["ff"]["d"][0]['killed'] = True split_synchronizer.synchronize_splits() - inserted_split = storage.get(self.splits[0]['name']) + inserted_split = storage.get(self.payload["ff"]["d"][0]['name']) assert inserted_split.killed == True def test_sync_flag_sets_with_config_sets(self, mocker): """Test split sync with flag sets.""" storage = InMemorySplitStorage(['set1', 'set2']) - - split = self.splits[0].copy() + rbs_storage = InMemoryRuleBasedSegmentStorage() + + split = self.payload["ff"]["d"][0].copy() split['name'] = 'second' - splits1 = [self.splits[0].copy(), split] - splits2 = self.splits.copy() - splits3 = self.splits.copy() - splits4 = self.splits.copy() + splits1 = [self.payload["ff"]["d"][0].copy(), split] + splits2 = self.payload["ff"]["d"].copy() + splits3 = self.payload["ff"]["d"].copy() + splits4 = self.payload["ff"]["d"].copy() self.called = 0 def read_feature_flags_from_json_file(*args, **kwargs): self.called += 1 if self.called == 1: - return splits1, 123 + return {"ff": {"d": splits1, "t": 123, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} elif self.called == 2: splits2[0]['sets'] = ['set3'] - return splits2, 124 + return {"ff": {"d": splits2, "t": 124, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} elif self.called == 3: splits3[0]['sets'] = ['set1'] - return splits3, 12434 + return {"ff": {"d": splits3, "t": 12434, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} splits4[0]['sets'] = ['set6'] splits4[0]['name'] = 'new_split' - return splits4, 12438 + return {"ff": {"d": splits4, "t": 12438, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} - split_synchronizer = LocalSplitSynchronizer("split.json", storage, LocalhostMode.JSON) + split_synchronizer = LocalSplitSynchronizer("split.json", storage, rbs_storage, LocalhostMode.JSON) split_synchronizer._read_feature_flags_from_json_file = read_feature_flags_from_json_file split_synchronizer.synchronize_splits() @@ -959,30 +961,31 @@ def read_feature_flags_from_json_file(*args, **kwargs): def test_sync_flag_sets_without_config_sets(self, mocker): """Test split sync with flag sets.""" storage = InMemorySplitStorage() + rbs_storage = InMemoryRuleBasedSegmentStorage() - split = self.splits[0].copy() + split = self.payload["ff"]["d"][0].copy() split['name'] = 'second' - splits1 = [self.splits[0].copy(), split] - splits2 = self.splits.copy() - splits3 = self.splits.copy() - splits4 = self.splits.copy() + splits1 = [self.payload["ff"]["d"][0].copy(), split] + splits2 = self.payload["ff"]["d"].copy() + splits3 = self.payload["ff"]["d"].copy() + splits4 = self.payload["ff"]["d"].copy() self.called = 0 def read_feature_flags_from_json_file(*args, **kwargs): self.called += 1 if self.called == 1: - return splits1, 123 + return {"ff": {"d": splits1, "t": 123, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} elif self.called == 2: splits2[0]['sets'] = ['set3'] - return splits2, 124 + return {"ff": {"d": splits2, "t": 124, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} elif self.called == 3: splits3[0]['sets'] = ['set1'] - return splits3, 12434 + return {"ff": {"d": splits3, "t": 12434, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} splits4[0]['sets'] = ['set6'] splits4[0]['name'] = 'third_split' - return splits4, 12438 + return {"ff": {"d": splits4, "t": 12438, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} - split_synchronizer = LocalSplitSynchronizer("split.json", storage, LocalhostMode.JSON) + split_synchronizer = LocalSplitSynchronizer("split.json", storage, rbs_storage, LocalhostMode.JSON) split_synchronizer._read_feature_flags_from_json_file = read_feature_flags_from_json_file split_synchronizer.synchronize_splits() @@ -1000,95 +1003,73 @@ def read_feature_flags_from_json_file(*args, **kwargs): def test_reading_json(self, mocker): """Test reading json file.""" f = open("./splits.json", "w") - json_body = {'splits': [{ - 'changeNumber': 123, - 'trafficTypeName': 'user', - 'name': 'some_name', - 'trafficAllocation': 100, - 'trafficAllocationSeed': 123456, - 'seed': 321654, - 'status': 'ACTIVE', - 'killed': False, - 'defaultTreatment': 'off', - 'algo': 2, - 'conditions': [ - { - 'partitions': [ - {'treatment': 'on', 'size': 50}, - {'treatment': 'off', 'size': 50} - ], - 'contitionType': 'WHITELIST', - 'label': 'some_label', - 'matcherGroup': { - 'matchers': [ - { - 'matcherType': 'WHITELIST', - 'whitelistMatcherData': { - 'whitelist': ['k1', 'k2', 'k3'] - }, - 'negate': False, - } - ], - 'combiner': 'AND' - } - } - ], - 'sets': ['set1'] - }], - "till":1675095324253, - "since":-1, - } - - f.write(json.dumps(json_body)) + f.write(json.dumps(self.payload)) f.close() storage = InMemorySplitStorage() - split_synchronizer = LocalSplitSynchronizer("./splits.json", storage, LocalhostMode.JSON) + rbs_storage = InMemoryRuleBasedSegmentStorage() + split_synchronizer = LocalSplitSynchronizer("./splits.json", storage, rbs_storage, LocalhostMode.JSON) split_synchronizer.synchronize_splits() - inserted_split = storage.get(json_body['splits'][0]['name']) + inserted_split = storage.get(self.payload['ff']['d'][0]['name']) assert isinstance(inserted_split, Split) - assert inserted_split.name == 'some_name' + assert inserted_split.name == self.payload['ff']['d'][0]['name'] + + inserted_rbs = rbs_storage.get(self.payload['rbs']['d'][0]['name']) + assert isinstance(inserted_rbs, RuleBasedSegment) + assert inserted_rbs.name == self.payload['rbs']['d'][0]['name'] os.remove("./splits.json") def test_json_elements_sanitization(self, mocker): """Test sanitization.""" - split_synchronizer = LocalSplitSynchronizer(mocker.Mock(), mocker.Mock(), mocker.Mock()) + split_synchronizer = LocalSplitSynchronizer(mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()) # check no changes if all elements exist with valid values - parsed = {"splits": [], "since": -1, "till": -1} + parsed = {"ff": {"d": [], "s": -1, "t": -1}, "rbs": {"d": [], "s": -1, "t": -1}} assert (split_synchronizer._sanitize_json_elements(parsed) == parsed) # check set since to -1 when is None parsed2 = parsed.copy() - parsed2['since'] = None + parsed2['ff']['s'] = None assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) # check no changes if since > -1 parsed2 = parsed.copy() - parsed2['since'] = 12 + parsed2['ff']['s'] = 12 assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) # check set till to -1 when is None parsed2 = parsed.copy() - parsed2['till'] = None + parsed2['ff']['t'] = None assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) # check add since when missing - parsed2 = {"splits": [], "till": -1} + parsed2 = {"ff": {"d": [], "t": -1}, "rbs": {"d": [], "s": -1, "t": -1}} assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) # check add till when missing - parsed2 = {"splits": [], "since": -1} + parsed2 = {"ff": {"d": [], "s": -1}, "rbs": {"d": [], "s": -1, "t": -1}} assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) # check add splits when missing - parsed2 = {"since": -1, "till": -1} + parsed2 = {"ff": {"s": -1, "t": -1}, "rbs": {"d": [], "s": -1, "t": -1}} assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) - def test_split_elements_sanitization(self, mocker): + # check add since when missing + parsed2 = {"ff": {"d": [], "t": -1}, "rbs": {"d": [], "t": -1}} + assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) + + # check add till when missing + parsed2 = {"ff": {"d": [], "s": -1}, "rbs": {"d": [], "s": -1}} + assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) + + # check add splits when missing + parsed2 = {"ff": {"s": -1, "t": -1}, "rbs": {"s": -1, "t": -1}} + assert (split_synchronizer._sanitize_json_elements(parsed2) == parsed) + + def test_elements_sanitization(self, mocker): """Test sanitization.""" - split_synchronizer = LocalSplitSynchronizer(mocker.Mock(), mocker.Mock(), mocker.Mock()) + split_synchronizer = LocalSplitSynchronizer(mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()) # No changes when split structure is good assert (split_synchronizer._sanitize_feature_flag_elements(splits_json["splitChange1_1"]["splits"]) == splits_json["splitChange1_1"]["splits"]) @@ -1183,7 +1164,21 @@ def test_split_elements_sanitization(self, mocker): split[0]['algo'] = 1 assert (split_synchronizer._sanitize_feature_flag_elements(split)[0]['algo'] == 2) - def test_split_condition_sanitization(self, mocker): + # test 'status' is set to ACTIVE when None + rbs = copy.deepcopy(json_body["rbs"]["d"]) + rbs[0]['status'] = None + assert (split_synchronizer._sanitize_rb_segment_elements(rbs)[0]['status'] == 'ACTIVE') + + # test 'changeNumber' is set to 0 when invalid + rbs = copy.deepcopy(json_body["rbs"]["d"]) + rbs[0]['changeNumber'] = -2 + assert (split_synchronizer._sanitize_rb_segment_elements(rbs)[0]['changeNumber'] == 0) + + rbs = copy.deepcopy(json_body["rbs"]["d"]) + del rbs[0]['conditions'] + assert (len(split_synchronizer._sanitize_rb_segment_elements(rbs)[0]['conditions']) == 1) + + def test_condition_sanitization(self, mocker): """Test sanitization.""" split_synchronizer = LocalSplitSynchronizer(mocker.Mock(), mocker.Mock(), mocker.Mock()) @@ -1218,13 +1213,14 @@ def test_split_condition_sanitization(self, mocker): class LocalSplitsSynchronizerAsyncTests(object): """Split synchronizer test cases.""" - splits = copy.deepcopy(splits_raw) + payload = copy.deepcopy(json_body) @pytest.mark.asyncio async def test_synchronize_splits_error(self, mocker): """Test that if fetching splits fails at some_point, the task will continue running.""" storage = mocker.Mock(spec=SplitStorage) - split_synchronizer = LocalSplitSynchronizerAsync("/incorrect_file", storage) + rbs_storage = mocker.Mock(spec=RuleBasedSegmentsStorage) + split_synchronizer = LocalSplitSynchronizerAsync("/incorrect_file", storage, rbs_storage) with pytest.raises(Exception): await split_synchronizer.synchronize_splits(1) @@ -1233,75 +1229,76 @@ async def test_synchronize_splits_error(self, mocker): async def test_synchronize_splits(self, mocker): """Test split sync.""" storage = InMemorySplitStorageAsync() + rbs_storage = InMemoryRuleBasedSegmentStorageAsync() - till = 123 async def read_splits_from_json_file(*args, **kwargs): - return self.splits, till + return self.payload - split_synchronizer = LocalSplitSynchronizerAsync("split.json", storage, LocalhostMode.JSON) + split_synchronizer = LocalSplitSynchronizerAsync("split.json", storage, rbs_storage, LocalhostMode.JSON) split_synchronizer._read_feature_flags_from_json_file = read_splits_from_json_file await split_synchronizer.synchronize_splits() - inserted_split = await storage.get(self.splits[0]['name']) + inserted_split = await storage.get(self.payload["ff"]["d"][0]['name']) assert isinstance(inserted_split, Split) assert inserted_split.name == 'some_name' # Should sync when changenumber is not changed - self.splits[0]['killed'] = True + self.payload["ff"]["d"][0]['killed'] = True await split_synchronizer.synchronize_splits() - inserted_split = await storage.get(self.splits[0]['name']) + inserted_split = await storage.get(self.payload["ff"]["d"][0]['name']) assert inserted_split.killed # Should not sync when changenumber is less than stored - till = 122 - self.splits[0]['killed'] = False + self.payload["ff"]["t"] = 122 + self.payload["ff"]["d"][0]['killed'] = False await split_synchronizer.synchronize_splits() - inserted_split = await storage.get(self.splits[0]['name']) + inserted_split = await storage.get(self.payload["ff"]["d"][0]['name']) assert inserted_split.killed # Should sync when changenumber is higher than stored - till = 124 + self.payload["ff"]["t"] = 1675095324999 split_synchronizer._current_json_sha = "-1" await split_synchronizer.synchronize_splits() - inserted_split = await storage.get(self.splits[0]['name']) + inserted_split = await storage.get(self.payload["ff"]["d"][0]['name']) assert inserted_split.killed == False # Should sync when till is default (-1) - till = -1 + self.payload["ff"]["t"] = -1 split_synchronizer._current_json_sha = "-1" - self.splits[0]['killed'] = True + self.payload["ff"]["d"][0]['killed'] = True await split_synchronizer.synchronize_splits() - inserted_split = await storage.get(self.splits[0]['name']) + inserted_split = await storage.get(self.payload["ff"]["d"][0]['name']) assert inserted_split.killed == True @pytest.mark.asyncio async def test_sync_flag_sets_with_config_sets(self, mocker): """Test split sync with flag sets.""" storage = InMemorySplitStorageAsync(['set1', 'set2']) - - split = self.splits[0].copy() + rbs_storage = InMemoryRuleBasedSegmentStorageAsync() + + split = self.payload["ff"]["d"][0].copy() split['name'] = 'second' - splits1 = [self.splits[0].copy(), split] - splits2 = self.splits.copy() - splits3 = self.splits.copy() - splits4 = self.splits.copy() + splits1 = [self.payload["ff"]["d"][0].copy(), split] + splits2 = self.payload["ff"]["d"].copy() + splits3 = self.payload["ff"]["d"].copy() + splits4 = self.payload["ff"]["d"].copy() self.called = 0 async def read_feature_flags_from_json_file(*args, **kwargs): self.called += 1 if self.called == 1: - return splits1, 123 + return {"ff": {"d": splits1, "t": 123, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} elif self.called == 2: splits2[0]['sets'] = ['set3'] - return splits2, 124 + return {"ff": {"d": splits2, "t": 124, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} elif self.called == 3: splits3[0]['sets'] = ['set1'] - return splits3, 12434 + return {"ff": {"d": splits3, "t": 12434, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} splits4[0]['sets'] = ['set6'] splits4[0]['name'] = 'new_split' - return splits4, 12438 + return {"ff": {"d": splits4, "t": 12438, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} - split_synchronizer = LocalSplitSynchronizerAsync("split.json", storage, LocalhostMode.JSON) + split_synchronizer = LocalSplitSynchronizerAsync("split.json", storage, rbs_storage, LocalhostMode.JSON) split_synchronizer._read_feature_flags_from_json_file = read_feature_flags_from_json_file await split_synchronizer.synchronize_splits() @@ -1320,30 +1317,30 @@ async def read_feature_flags_from_json_file(*args, **kwargs): async def test_sync_flag_sets_without_config_sets(self, mocker): """Test split sync with flag sets.""" storage = InMemorySplitStorageAsync() - - split = self.splits[0].copy() + rbs_storage = InMemoryRuleBasedSegmentStorageAsync() + + split = self.payload["ff"]["d"][0].copy() split['name'] = 'second' - splits1 = [self.splits[0].copy(), split] - splits2 = self.splits.copy() - splits3 = self.splits.copy() - splits4 = self.splits.copy() + splits1 = [self.payload["ff"]["d"][0].copy(), split] + splits2 = self.payload["ff"]["d"].copy() + splits3 = self.payload["ff"]["d"].copy() + splits4 = self.payload["ff"]["d"].copy() self.called = 0 async def read_feature_flags_from_json_file(*args, **kwargs): self.called += 1 if self.called == 1: - return splits1, 123 + return {"ff": {"d": splits1, "t": 123, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} elif self.called == 2: - splits2[0]['sets'] = ['set3'] - return splits2, 124 + return {"ff": {"d": splits2, "t": 124, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} elif self.called == 3: splits3[0]['sets'] = ['set1'] - return splits3, 12434 + return {"ff": {"d": splits3, "t": 12434, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} splits4[0]['sets'] = ['set6'] splits4[0]['name'] = 'third_split' - return splits4, 12438 + return {"ff": {"d": splits4, "t": 12438, "s": -1}, "rbs": {"d": [], "t": -1, "s": -1}} - split_synchronizer = LocalSplitSynchronizerAsync("split.json", storage, LocalhostMode.JSON) + split_synchronizer = LocalSplitSynchronizerAsync("split.json", storage, rbs_storage, LocalhostMode.JSON) split_synchronizer._read_feature_flags_from_json_file = read_feature_flags_from_json_file await split_synchronizer.synchronize_splits() @@ -1362,13 +1359,18 @@ async def read_feature_flags_from_json_file(*args, **kwargs): async def test_reading_json(self, mocker): """Test reading json file.""" async with aiofiles.open("./splits.json", "w") as f: - await f.write(json.dumps(json_body)) + await f.write(json.dumps(self.payload)) storage = InMemorySplitStorageAsync() - split_synchronizer = LocalSplitSynchronizerAsync("./splits.json", storage, LocalhostMode.JSON) + rbs_storage = InMemoryRuleBasedSegmentStorageAsync() + split_synchronizer = LocalSplitSynchronizerAsync("./splits.json", storage, rbs_storage, LocalhostMode.JSON) await split_synchronizer.synchronize_splits() - inserted_split = await storage.get(json_body['splits'][0]['name']) + inserted_split = await storage.get(self.payload['ff']['d'][0]['name']) assert isinstance(inserted_split, Split) - assert inserted_split.name == 'some_name' + assert inserted_split.name == self.payload['ff']['d'][0]['name'] + + inserted_rbs = await rbs_storage.get(self.payload['rbs']['d'][0]['name']) + assert isinstance(inserted_rbs, RuleBasedSegment) + assert inserted_rbs.name == self.payload['rbs']['d'][0]['name'] os.remove("./splits.json") diff --git a/tests/sync/test_synchronizer.py b/tests/sync/test_synchronizer.py index b2ef9fa0..1e89af66 100644 --- a/tests/sync/test_synchronizer.py +++ b/tests/sync/test_synchronizer.py @@ -671,7 +671,6 @@ def test_start_periodic_data_recording(self, mocker): assert len(impression_count_task.start.mock_calls) == 1 assert len(event_task.start.mock_calls) == 1 - class RedisSynchronizerTests(object): def test_start_periodic_data_recording(self, mocker): impression_count_task = mocker.Mock(spec=ImpressionsCountSyncTask) @@ -744,7 +743,6 @@ def stop_mock(event): assert len(unique_keys_task.stop.mock_calls) == 1 assert len(clear_filter_task.stop.mock_calls) == 1 - class RedisSynchronizerAsyncTests(object): @pytest.mark.asyncio async def test_start_periodic_data_recording(self, mocker): From 2cbc6474bc6ec5f41615f9f3f0df3ae50c4603cb Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany <41021307+chillaq@users.noreply.github.com> Date: Fri, 14 Mar 2025 11:47:13 -0300 Subject: [PATCH 3/5] Update splitio/storage/pluggable.py Co-authored-by: Emiliano Sanchez --- splitio/storage/pluggable.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splitio/storage/pluggable.py b/splitio/storage/pluggable.py index 66fad1e5..1ac12bd2 100644 --- a/splitio/storage/pluggable.py +++ b/splitio/storage/pluggable.py @@ -15,7 +15,7 @@ _LOGGER = logging.getLogger(__name__) class PluggableRuleBasedSegmentsStorageBase(RuleBasedSegmentsStorage): - """RedPluggable storage for rule based segments.""" + """Pluggable storage for rule based segments.""" _TILL_LENGTH = 4 From d0b2c6729f9f0f18a271da2d9472d6a3652bc638 Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany <41021307+chillaq@users.noreply.github.com> Date: Fri, 14 Mar 2025 11:47:20 -0300 Subject: [PATCH 4/5] Update splitio/storage/pluggable.py Co-authored-by: Emiliano Sanchez --- splitio/storage/pluggable.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splitio/storage/pluggable.py b/splitio/storage/pluggable.py index 1ac12bd2..20d4d437 100644 --- a/splitio/storage/pluggable.py +++ b/splitio/storage/pluggable.py @@ -98,7 +98,7 @@ def get_large_segment_names(self): pass class PluggableRuleBasedSegmentsStorage(PluggableRuleBasedSegmentsStorageBase): - """RedPluggable storage for rule based segments.""" + """Pluggable storage for rule based segments.""" def __init__(self, pluggable_adapter, prefix=None): """ From cc990a9bedcb7de033ac8f07b8ee416023a5bebe Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany <41021307+chillaq@users.noreply.github.com> Date: Fri, 14 Mar 2025 11:47:27 -0300 Subject: [PATCH 5/5] Update splitio/storage/pluggable.py Co-authored-by: Emiliano Sanchez --- splitio/storage/pluggable.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splitio/storage/pluggable.py b/splitio/storage/pluggable.py index 20d4d437..c27a92fd 100644 --- a/splitio/storage/pluggable.py +++ b/splitio/storage/pluggable.py @@ -179,7 +179,7 @@ def get_segment_names(self): return None class PluggableRuleBasedSegmentsStorageAsync(PluggableRuleBasedSegmentsStorageBase): - """RedPluggable storage for rule based segments.""" + """Pluggable storage for rule based segments.""" def __init__(self, pluggable_adapter, prefix=None): """