From 17615b14f164a30230d7444d1e756a427755d850 Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Wed, 16 Apr 2025 18:55:21 -0300 Subject: [PATCH 1/2] Added the ability to post unique keys in batches --- go.mod | 2 +- go.sum | 4 ++-- splitio/client/factory.go | 26 ++++++++++++++------------ splitio/conf/sdkconf.go | 4 ++++ splitio/conf/util.go | 29 +++++++++++++++++++++++++++++ splitio/impressions/builder.go | 12 +++++++----- splitio/impressions/builder_test.go | 12 ++++++++---- 7 files changed, 65 insertions(+), 24 deletions(-) diff --git a/go.mod b/go.mod index a90d25b..b904b06 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/splitio/go-client/v6 go 1.18 require ( - github.com/splitio/go-split-commons/v6 v6.1.0 + github.com/splitio/go-split-commons/v6 v6.1.1-0.20250414232405-62906f35f3b0 github.com/splitio/go-toolkit/v5 v5.4.0 ) diff --git a/go.sum b/go.sum index a087f70..ecf08f4 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.0.4 h1:FC82T+CHJ/Q/PdyLW++GeCO+Ol59Y4T7R4jbgjvktgc= github.com/redis/go-redis/v9 v9.0.4/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= -github.com/splitio/go-split-commons/v6 v6.1.0 h1:k3mwr12DF6gbEaV8XXU/tSAQlPkIEuzIgTEneYhGg2I= -github.com/splitio/go-split-commons/v6 v6.1.0/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY= +github.com/splitio/go-split-commons/v6 v6.1.1-0.20250414232405-62906f35f3b0 h1:Fvn5XPPQmsh1Zq0+3f9zq/bn2wTtKacdEudAf3JeL9o= +github.com/splitio/go-split-commons/v6 v6.1.1-0.20250414232405-62906f35f3b0/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY= github.com/splitio/go-toolkit/v5 v5.4.0 h1:g5WFpRhQomnXCmvfsNOWV4s5AuUrWIZ+amM68G8NBKM= github.com/splitio/go-toolkit/v5 v5.4.0/go.mod h1:xYhUvV1gga9/1029Wbp5pjnR6Cy8nvBpjw99wAbsMko= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= diff --git a/splitio/client/factory.go b/splitio/client/factory.go index 52634a5..7d664cd 100644 --- a/splitio/client/factory.go +++ b/splitio/client/factory.go @@ -59,6 +59,7 @@ type sdkStorages struct { runtimeTelemetry storage.TelemetryRuntimeProducer evaluationTelemetry storage.TelemetryEvaluationProducer impressionsCount storage.ImpressionsCountProducer + uniqueKeysStorage storage.UniqueKeysStorage } // SplitFactory struct is responsible for instantiating and storing instances of client and manager. @@ -283,7 +284,7 @@ func setupInMemoryFactory( advanced.StreamingEnabled = false } - inMememoryFullQueue := make(chan string, 2) // Size 2: So that it's able to accept one event from each resource simultaneously. + inMememoryFullQueue := make(chan string, 3) // Size 3: So that it's able to accept one event from each resource simultaneously. flagSetFilter := flagsets.NewFlagSetFilter(advanced.FlagSetsFilter) splitsStorage := mutexmap.NewMMSplitStorage(flagSetFilter) @@ -291,6 +292,7 @@ func setupInMemoryFactory( telemetryStorage, err := inmemory.NewTelemetryStorage() impressionsStorage := mutexqueue.NewMQImpressionsStorage(cfg.Advanced.ImpressionsQueueSize, inMememoryFullQueue, logger, telemetryStorage) eventsStorage := mutexqueue.NewMQEventsStorage(cfg.Advanced.EventsQueueSize, inMememoryFullQueue, logger, telemetryStorage) + uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(advanced.UniqueKeysQueueSize, inMememoryFullQueue, logger) if err != nil { return nil, err } @@ -302,7 +304,7 @@ func setupInMemoryFactory( SplitUpdater: split.NewSplitUpdater(splitsStorage, splitAPI.SplitFetcher, logger, telemetryStorage, dummyHC, flagSetFilter), SegmentUpdater: segment.NewSegmentUpdater(splitsStorage, segmentsStorage, splitAPI.SegmentFetcher, logger, telemetryStorage, dummyHC), EventRecorder: event.NewEventRecorderSingle(eventsStorage, splitAPI.EventRecorder, logger, metadata, telemetryStorage), - TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryStorage, splitAPI.TelemetryRecorder, splitsStorage, segmentsStorage, logger, metadata, telemetryStorage), + TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryStorage, splitAPI.TelemetryRecorder, splitsStorage, segmentsStorage, logger, metadata, telemetryStorage, uniqueKeysStorage), } splitTasks := synchronizer.SplitTasks{ SplitSyncTask: tasks.NewFetchSplitsTask(workers.SplitUpdater, cfg.TaskPeriods.SplitSync, logger), @@ -320,13 +322,14 @@ func setupInMemoryFactory( initTelemetry: telemetryStorage, evaluationTelemetry: telemetryStorage, runtimeTelemetry: telemetryStorage, + uniqueKeysStorage: uniqueKeysStorage, } if cfg.ImpressionsMode == "" { cfg.ImpressionsMode = config.ImpressionsModeOptimized } - impressionManager, err := impressions.BuildInMemoryManager(cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, storages.runtimeTelemetry, storages.impressionsConsumer) + impressionManager, err := impressions.BuildInMemoryManager(cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, storages.runtimeTelemetry, storages.impressionsConsumer, uniqueKeysStorage) if err != nil { return nil, err } @@ -394,11 +397,8 @@ func setupRedisFactory(apikey string, cfg *conf.SplitSdkConfig, logger logging.L inMememoryFullQueue := make(chan string, 2) // Size 2: So that it's able to accept one event from each resource simultaneously. impressionStorage := redis.NewImpressionStorage(redisClient, metadata, logger) - if len(cfg.Advanced.FlagSetsFilter) != 0 { - cfg.Advanced.FlagSetsFilter = []string{} - logger.Warning("FlagSets filter is not applicable for Consumer modes where the SDK does not keep rollout data in sync. FlagSet filter was discarded") - } - flagSetFilter := flagsets.NewFlagSetFilter([]string{}) + advanced := conf.NormalizeRedisSDKConf(cfg.Advanced, logger) + flagSetFilter := flagsets.NewFlagSetFilter(advanced.FlagSetsFilter) storages := sdkStorages{ splits: redis.NewSplitStorage(redisClient, logger, flagSetFilter), @@ -410,17 +410,19 @@ func setupRedisFactory(apikey string, cfg *conf.SplitSdkConfig, logger logging.L evaluationTelemetry: telemetryStorage, impressionsCount: redis.NewImpressionsCountStorage(redisClient, logger), runtimeTelemetry: runtimeTelemetry, + uniqueKeysStorage: mutexqueue.NewMQUniqueKeysStorage(advanced.UniqueKeysQueueSize, inMememoryFullQueue, logger), } splitTasks := synchronizer.SplitTasks{} - workers := synchronizer.Workers{} - advanced := config.AdvancedConfig{} + workers := synchronizer.Workers{ + TelemetryRecorder: telemetry.NewSynchronizerRedis(telemetryStorage, logger, storages.uniqueKeysStorage), + } if cfg.ImpressionsMode == "" { cfg.ImpressionsMode = config.ImpressionsModeDebug } - impressionManager, err := impressions.BuildRedisManager(cfg, logger, &splitTasks, storages.initTelemetry, storages.impressionsCount, storages.runtimeTelemetry) + impressionManager, err := impressions.BuildRedisManager(cfg, logger, &splitTasks, storages.initTelemetry, storages.impressionsCount, storages.runtimeTelemetry, storages.uniqueKeysStorage) if err != nil { return nil, err } @@ -444,7 +446,7 @@ func setupRedisFactory(apikey string, cfg *conf.SplitSdkConfig, logger logging.L operationMode: conf.RedisConsumer, storages: storages, readinessSubscriptors: make(map[int]chan int), - telemetrySync: telemetry.NewSynchronizerRedis(telemetryStorage, logger), + telemetrySync: workers.TelemetryRecorder, impressionManager: impressionManager, syncManager: syncManager, } diff --git a/splitio/conf/sdkconf.go b/splitio/conf/sdkconf.go index b431761..2978f1c 100644 --- a/splitio/conf/sdkconf.go +++ b/splitio/conf/sdkconf.go @@ -95,6 +95,8 @@ type AdvancedConfig struct { ImpressionsBulkSize int64 StreamingEnabled bool FlagSetsFilter []string + UniqueKeysQueueSize int64 + UniqueKeysBulkSize int64 } // Default returns a config struct with all the default values @@ -158,6 +160,8 @@ func Default() *SplitSdkConfig { ImpressionsQueueSize: 10000, ImpressionsBulkSize: 5000, StreamingEnabled: true, + UniqueKeysQueueSize: 10000, + UniqueKeysBulkSize: 5000, }, } } diff --git a/splitio/conf/util.go b/splitio/conf/util.go index 29c94c1..2e49008 100644 --- a/splitio/conf/util.go +++ b/splitio/conf/util.go @@ -5,14 +5,43 @@ import ( "github.com/splitio/go-split-commons/v6/conf" "github.com/splitio/go-split-commons/v6/flagsets" + "github.com/splitio/go-toolkit/v5/logging" ) +const ( + defaultUniqueKeysQueueSize = 2000 + defaultUniqueKeysBulkSize = 1000 +) + +func NormalizeRedisSDKConf(sdkConfig AdvancedConfig, logger logging.LoggerInterface) conf.AdvancedConfig { + config, _ := NormalizeSDKConf(sdkConfig) + + if sdkConfig.UniqueKeysQueueSize == 0 { + config.UniqueKeysQueueSize = defaultUniqueKeysQueueSize + } + if sdkConfig.UniqueKeysBulkSize == 0 { + config.UniqueKeysBulkSize = defaultUniqueKeysBulkSize + } + if len(sdkConfig.FlagSetsFilter) != 0 { + config.FlagSetsFilter = []string{} + logger.Warning("FlagSets filter is not applicable for Consumer modes where the SDK does not keep rollout data in sync. FlagSet filter was discarded") + } + + return config +} + // NormalizeSDKConf compares against SDK Config to set defaults func NormalizeSDKConf(sdkConfig AdvancedConfig) (conf.AdvancedConfig, []error) { config := conf.GetDefaultAdvancedConfig() if sdkConfig.HTTPTimeout > 0 { config.HTTPTimeout = sdkConfig.HTTPTimeout } + if sdkConfig.UniqueKeysQueueSize > 0 { + config.UniqueKeysQueueSize = sdkConfig.UniqueKeysQueueSize + } + if sdkConfig.UniqueKeysBulkSize > 0 { + config.UniqueKeysBulkSize = sdkConfig.UniqueKeysBulkSize + } if sdkConfig.EventsBulkSize > 0 { config.EventsBulkSize = sdkConfig.EventsBulkSize } diff --git a/splitio/impressions/builder.go b/splitio/impressions/builder.go index 01c8b52..67c71ab 100644 --- a/splitio/impressions/builder.go +++ b/splitio/impressions/builder.go @@ -38,16 +38,17 @@ func BuildInMemoryManager( splitAPI *api.SplitAPI, telemetryStorage storage.TelemetryRuntimeProducer, impressionStorage storage.ImpressionStorageConsumer, + uniqueKeysStorage storage.UniqueKeysStorage, ) (provisional.ImpressionManager, error) { listenerEnabled := cfg.Advanced.ImpressionListener != nil impressionsCounter := strategy.NewImpressionsCounter() filter := filter.NewBloomFilter(bfExpectedElemenets, bfFalsePositiveProbability) - uniqueKeysTracker := strategy.NewUniqueKeysTracker(filter) + uniqueKeysTracker := strategy.NewUniqueKeysTracker(filter, uniqueKeysStorage) workers.ImpressionsCountRecorder = impressionscount.NewRecorderSingle(impressionsCounter, splitAPI.ImpressionRecorder, metadata, logger, telemetryStorage) splitTasks.ImpressionsCountSyncTask = tasks.NewRecordImpressionsCountTask(workers.ImpressionsCountRecorder, logger, impressionsCountPeriodTaskInMemory) - splitTasks.UniqueKeysTask = tasks.NewRecordUniqueKeysTask(workers.TelemetryRecorder, uniqueKeysTracker, uniqueKeysPeriodTaskInMemory, logger) + splitTasks.UniqueKeysTask = tasks.NewRecordUniqueKeysTask(workers.TelemetryRecorder, uniqueKeysPeriodTaskInMemory, logger, advanced.UniqueKeysBulkSize) splitTasks.CleanFilterTask = tasks.NewCleanFilterTask(filter, logger, bfCleaningPeriod) noneStrategy := strategy.NewNoneImpl(impressionsCounter, uniqueKeysTracker, listenerEnabled) @@ -85,18 +86,19 @@ func BuildRedisManager( telemetryConfigStorage storage.TelemetryConfigProducer, impressionsCountStorage storage.ImpressionsCountProducer, telemetryRuntimeStorage storage.TelemetryRuntimeProducer, + uniqueKeysStorage storage.UniqueKeysStorage, ) (provisional.ImpressionManager, error) { listenerEnabled := cfg.Advanced.ImpressionListener != nil impressionsCounter := strategy.NewImpressionsCounter() filter := filter.NewBloomFilter(bfExpectedElemenets, bfFalsePositiveProbability) - uniqueKeysTracker := strategy.NewUniqueKeysTracker(filter) + uniqueKeysTracker := strategy.NewUniqueKeysTracker(filter, uniqueKeysStorage) - telemetryRecorder := telemetry.NewSynchronizerRedis(telemetryConfigStorage, logger) + telemetryRecorder := telemetry.NewSynchronizerRedis(telemetryConfigStorage, logger, uniqueKeysStorage) impressionsCountRecorder := impressionscount.NewRecorderRedis(impressionsCounter, impressionsCountStorage, logger) splitTasks.ImpressionsCountSyncTask = tasks.NewRecordImpressionsCountTask(impressionsCountRecorder, logger, impressionsCountPeriodTaskRedis) - splitTasks.UniqueKeysTask = tasks.NewRecordUniqueKeysTask(telemetryRecorder, uniqueKeysTracker, uniqueKeysPeriodTaskRedis, logger) + splitTasks.UniqueKeysTask = tasks.NewRecordUniqueKeysTask(telemetryRecorder, uniqueKeysPeriodTaskRedis, logger, cfg.Advanced.UniqueKeysBulkSize) splitTasks.CleanFilterTask = tasks.NewCleanFilterTask(filter, logger, bfCleaningPeriod) noneStrategy := strategy.NewNoneImpl(impressionsCounter, uniqueKeysTracker, listenerEnabled) diff --git a/splitio/impressions/builder_test.go b/splitio/impressions/builder_test.go index 771b0d2..8d56b6b 100644 --- a/splitio/impressions/builder_test.go +++ b/splitio/impressions/builder_test.go @@ -26,8 +26,9 @@ func TestBuildInMemoryWithNone(t *testing.T) { splitAPI := api.NewSplitAPI("apikey", advanced, logger, metadata) telemetryStorage, _ := inmemory.NewTelemetryStorage() impressionsStorage := mutexqueue.NewMQImpressionsStorage(cfg.Advanced.ImpressionsQueueSize, make(chan string, 2), logger, telemetryStorage) + uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(cfg.Advanced.UniqueKeysQueueSize, make(chan string), logger) - impManager, err := BuildInMemoryManager(&cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, telemetryStorage, impressionsStorage) + impManager, err := BuildInMemoryManager(&cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, telemetryStorage, impressionsStorage, uniqueKeysStorage) if err != nil { t.Error("err should be nil. ", err.Error()) } @@ -67,8 +68,9 @@ func TestBuildInMemoryWithDebug(t *testing.T) { splitAPI := api.NewSplitAPI("apikey", advanced, logger, metadata) telemetryStorage, _ := inmemory.NewTelemetryStorage() impressionsStorage := mutexqueue.NewMQImpressionsStorage(cfg.Advanced.ImpressionsQueueSize, make(chan string, 2), logger, telemetryStorage) + uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(cfg.Advanced.UniqueKeysQueueSize, make(chan string), logger) - impManager, err := BuildInMemoryManager(&cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, telemetryStorage, impressionsStorage) + impManager, err := BuildInMemoryManager(&cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, telemetryStorage, impressionsStorage, uniqueKeysStorage) if err != nil { t.Error("err should be nil. ", err.Error()) } @@ -107,8 +109,9 @@ func TestBuildInMemoryWithOptimized(t *testing.T) { splitAPI := api.NewSplitAPI("apikey", advanced, logger, metadata) telemetryStorage, _ := inmemory.NewTelemetryStorage() impressionsStorage := mutexqueue.NewMQImpressionsStorage(cfg.Advanced.ImpressionsQueueSize, make(chan string, 2), logger, telemetryStorage) + uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(cfg.Advanced.UniqueKeysQueueSize, make(chan string), logger) - impManager, err := BuildInMemoryManager(&cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, telemetryStorage, impressionsStorage) + impManager, err := BuildInMemoryManager(&cfg, advanced, logger, &splitTasks, &workers, metadata, splitAPI, telemetryStorage, impressionsStorage, uniqueKeysStorage) if err != nil { t.Error("err should be nil. ", err.Error()) } @@ -143,8 +146,9 @@ func TestBuildRedisWithNone(t *testing.T) { splitTasks := synchronizer.SplitTasks{} runtimeTelemetry := mocks.MockTelemetryStorage{} impressionCountStorage := mocks.MockImpressionsCountStorage{} + uniqueKeysStorage := mocks.MockUniqueKeysStorage{} - impManager, err := BuildRedisManager(&cfg, logger, &splitTasks, runtimeTelemetry, impressionCountStorage, runtimeTelemetry) + impManager, err := BuildRedisManager(&cfg, logger, &splitTasks, runtimeTelemetry, impressionCountStorage, runtimeTelemetry, uniqueKeysStorage) if err != nil { t.Error("err should be nil. ", err.Error()) } From 7cc2f66b472783514ce2d1fa47dab46ac4ea11da Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Wed, 16 Apr 2025 18:55:55 -0300 Subject: [PATCH 2/2] updated version --- splitio/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splitio/version.go b/splitio/version.go index 7309a07..cefbb5f 100644 --- a/splitio/version.go +++ b/splitio/version.go @@ -1,4 +1,4 @@ package splitio // Version contains a string with the split sdk version -const Version = "6.7.0" +const Version = "6.7.1.rc.1"