Skip to content

ratelimitprocessor: Implement dynamic rate limit #683

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions processor/ratelimitprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ a in-memory rate limiter, or makes use of a [gubernator](https://github.com/gube
| `throttle_interval` | Time interval for throttling. It has effects only when `type` is `gubernator`. | No | `1s` |
| `type` | Type of rate limiter. Options are `local` or `gubernator`. | No | `local` |
| `overrides` | Allows customizing rate limiting parameters for specific metadata key-value pairs. Use this to apply different rate limits to different tenants, projects, or other entities identified by metadata. Each override is keyed by a metadata value and can specify custom `rate`, `burst`, and `throttle_interval` settings that take precedence over the global configuration for matching requests. | No | |
| `dynamic_limits` | Holds the dynamic rate limiting configuration. This is only applicable when the rate limiter type is `gubernator`. | No | |

### Overrides

Expand All @@ -27,6 +28,90 @@ You can override one or more of the following fields:
| `rate` | Bucket refill rate, in tokens per second. | No | |
| `burst` | Maximum number of tokens that can be consumed. | No | |
| `throttle_interval` | Time interval for throttling. It has effect only when `type` is `gubernator`. | No | |
| `static_only` | Disables dynamic rate limiting for the override. | No | `false` |

### Dynamic Rate Limiting

| Field | Description | Required | Default |
|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|------------|
| `enabled` | Enables the dynamic rate limiting feature. | No | `false` |
| `window_multiplier` | The factor by which the previous window rate is multiplied to get the dynamic limit. | No | `1.3` |
| `window_duration` | The time window duration for calculating traffic rates. | No | `2m` |

### Dynamic Rate Limiting Deep Dive

The dynamic rate limiting feature uses a sliding window approach to adjust the rate limit based on recent traffic patterns. This allows the processor to be more responsive to changes in traffic volume, preventing sudden spikes from overwhelming downstream services while also allowing for higher throughput when traffic is sustained.

The system maintains two time windows:

* **Current Window**: The most recent time window, defined by `window_duration`.
* **Previous Window**: The time window immediately preceding the current one.

The dynamic limit is calculated using the following simplified formula:

```text
dynamic_limit = max(static_rate, previous_rate * window_multiplier)
```

Where:

* `previous_rate`: The rate of traffic in the previous window (normalized per second).
* `static_rate`: The configured `rate` in the main configuration.
* `window_multiplier`: A factor applied to the previous window rate to determine the dynamic limit.

**Important Notes:**

* When `previous_rate` is 0 (no previous traffic), the dynamic limit defaults to the `static_rate`.
* The dynamic limit will always be at least the `static_rate`, ensuring a minimum level of throughput.
* The algorithm only records traffic in the current window when the incoming rate is within acceptable bounds to prevent runaway scaling.

Let's walk through a few examples to illustrate the behavior of the dynamic rate limiter.

Assume the following configuration:

* `window_duration`: 2m
* `window_multiplier`: 1.5
* `rate`: 1000 requests/second (this is our `static_rate`)

#### Scenario 1: Initial Traffic

1. **First 2 minutes**: 120,000 requests are received (1000 requests/second). `previous_rate` is 0.
* Since `previous_rate` is 0, the `dynamic_limit` is set to the `static_rate` of 1000.
2. **Next 2 minutes**: 90,000 requests are received (750 requests/second). `previous_rate` is 1000.
* `dynamic_limit = max(1000, 1000 * 1.5) = max(1000, 1500) = 1500`
* The `dynamic_limit` increases to 1500, allowing more traffic.

#### Scenario 2: Ramping Up Traffic

1. **Previous window rate**: 900 requests/second.
2. **Current window**: Incoming rate of 1200 requests/second.
* `dynamic_limit = max(1000, 900 * 1.5) = max(1000, 1350) = 1350`
* The `dynamic_limit` increases to 1350, allowing the increased traffic.

#### Scenario 3: Sustained High Traffic

1. **Previous window rate**: 1500 requests/second.
2. **Current window**: Incoming rate of 1600 requests/second.
* `dynamic_limit = max(1000, 1500 * 1.5) = max(1000, 2250) = 2250`
* The `dynamic_limit` continues to increase as the high traffic is sustained.

#### Scenario 4: Traffic Spike Protection

1. **Previous window rate**: 1000 requests/second.
2. **Current window**: Sudden spike to 3000 requests/second.
* `dynamic_limit = max(1000, 1000 * 1.5) = max(1000, 1500) = 1500`
* The dynamic limit only allows 1500 requests/second, protecting against the spike.
* The excess traffic (1500+ requests/second) is throttled, preventing the spike from being recorded.

#### Scenario 5: Traffic Reduction

1. **Previous window rate**: 2000 requests/second.
2. **Current window**: Reduced to 500 requests/second.
* `dynamic_limit = max(1000, 2000 * 1.5) = max(1000, 3000) = 3000`
* The `dynamic_limit` remains high initially, providing a buffer for traffic variations.
* As the reduced traffic continues, the limit will gradually decrease in subsequent windows.

This mechanism allows the rate limiter to adapt to sustained increases in traffic while the `window_multiplier` provides protection against sudden spikes that could destabilize the system.

### Examples

Expand Down Expand Up @@ -89,3 +174,20 @@ processors:
burst: 20000
throttle_interval: 10s # has effect only when `type` is `gubernator`
```

Example when using as a distributed rate limiter (Gubernator) with dynamic rate limiting:

```yaml
processors:
ratelimiter:
rate: 100
# NOTE burst isn't used when dynamic limits are enabled.
burst: 200
type: gubernator
strategy: bytes
throttle_behavior: error
dynamic_limits:
enabled: true
window_multiplier: 1.5
window_duration: 1m
```
61 changes: 61 additions & 0 deletions processor/ratelimitprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,43 @@ type Config struct {
// Embed the rate limit settings
RateLimitSettings `mapstructure:",squash"`

// DynamicRateLimiting holds the dynamic rate limiting configuration.
// This is only applicable when the rate limiter type is "gubernator".
DynamicRateLimiting `mapstructure:"dynamic_limits"`

// Overrides holds a list of overrides for the rate limiter.
//
// Defaults to empty
Overrides map[string]RateLimitOverrides `mapstructure:"overrides"`
}

// DynamicRateLimiting defines settings for dynamic rate limiting.
type DynamicRateLimiting struct {
// Enabled tells the processor to use dynamic rate limiting.
Enabled bool `mapstructure:"enabled"`
// WindowMultiplier is the factor by which the previous window rate is
// multiplied to get the dynamic part of the limit. Defaults to 1.5.
WindowMultiplier float64 `mapstructure:"window_multiplier"`
// WindowDuration defines the time window for the Exponentially Weighted Moving Average.
// A common value would be "5m" for a 5-minute window.
WindowDuration time.Duration `mapstructure:"window_duration"`
}

// Validate checks the DynamicRateLimiting configuration.
func (d *DynamicRateLimiting) Validate() error {
if !d.Enabled {
return nil
}
var errs []error
if d.WindowMultiplier < 1 {
errs = append(errs, errors.New("window_multiplier must be greater than or equal to 1"))
}
if d.WindowDuration <= 0 {
errs = append(errs, errors.New("window_duration must be greater than zero"))
}
return errors.Join(errs...)
}

// RateLimitSettings holds the core rate limiting configuration.
type RateLimitSettings struct {
// Strategy holds the rate limiting strategy.
Expand All @@ -67,9 +98,14 @@ type RateLimitSettings struct {
//
// Defaults to 1s
ThrottleInterval time.Duration `mapstructure:"throttle_interval"`

disableDynamic bool `mapstructure:"-"`
}

type RateLimitOverrides struct {
// Rate holds the override rate limit.
StaticOnly bool `mapstructure:"static_only"`

// Rate holds bucket refill rate, in tokens per second.
Rate *int `mapstructure:"rate"`

Expand All @@ -87,6 +123,19 @@ type RateLimitOverrides struct {
// Strategy identifies the rate-limiting strategy: requests, records, or bytes.
type Strategy string

func (s Strategy) String() string {
switch s {
case StrategyRateLimitRequests:
return "requests_per_sec"
case StrategyRateLimitRecords:
return "records_per_sec"
case StrategyRateLimitBytes:
return "bytes_per_sec"
default:
return string(s) // NOTE(marclop) shouldn't happen due to validation.
}
}

const (
// StrategyRateLimitRequests identifies the strategy for
// rate limiting by request.
Expand Down Expand Up @@ -144,6 +193,10 @@ func createDefaultConfig() component.Config {
ThrottleBehavior: ThrottleBehaviorError,
ThrottleInterval: DefaultThrottleInterval,
},
DynamicRateLimiting: DynamicRateLimiting{
WindowMultiplier: 1.3,
WindowDuration: 2 * time.Minute,
},
}
}

Expand All @@ -163,6 +216,9 @@ func resolveRateLimitSettings(cfg *Config, uniqueKey string) RateLimitSettings {
if override.ThrottleInterval != nil {
result.ThrottleInterval = *override.ThrottleInterval
}
if override.StaticOnly {
result.disableDynamic = true
}
}
return result
}
Expand Down Expand Up @@ -210,6 +266,11 @@ func (config *Config) Validate() error {
if err := config.RateLimitSettings.Validate(); err != nil {
errs = append(errs, err)
}
if config.Type == GubernatorRateLimiter {
if err := config.DynamicRateLimiting.Validate(); err != nil {
errs = append(errs, err)
}
}
for key, override := range config.Overrides {
if err := override.Validate(); err != nil {
errs = append(errs, fmt.Errorf("override %q: %w", key, err))
Expand Down
52 changes: 51 additions & 1 deletion processor/ratelimitprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import (
"github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/metadata"
)

var defaultDynamicRateLimiting = DynamicRateLimiting{
WindowMultiplier: 1.3,
WindowDuration: 2 * time.Minute,
}

func TestLoadConfig(t *testing.T) {
grpcClientConfig := configgrpc.NewDefaultClientConfig()
grpcClientConfig.Endpoint = "localhost:1081"
Expand All @@ -51,6 +56,7 @@ func TestLoadConfig(t *testing.T) {
ThrottleBehavior: ThrottleBehaviorError,
ThrottleInterval: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
},
},
{
Expand All @@ -64,6 +70,7 @@ func TestLoadConfig(t *testing.T) {
ThrottleBehavior: ThrottleBehaviorError,
ThrottleInterval: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
},
},
{
Expand All @@ -77,6 +84,7 @@ func TestLoadConfig(t *testing.T) {
ThrottleBehavior: ThrottleBehaviorError,
ThrottleInterval: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
},
},
{
Expand All @@ -90,7 +98,8 @@ func TestLoadConfig(t *testing.T) {
ThrottleBehavior: ThrottleBehaviorError,
ThrottleInterval: 1 * time.Second,
},
MetadataKeys: []string{"project_id"},
DynamicRateLimiting: defaultDynamicRateLimiting,
MetadataKeys: []string{"project_id"},
},
},
{
Expand All @@ -104,6 +113,7 @@ func TestLoadConfig(t *testing.T) {
ThrottleBehavior: ThrottleBehaviorError,
ThrottleInterval: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
Overrides: map[string]RateLimitOverrides{
"project-id:e678ebd7-3a15-43dd-a95c-1cf0639a6292": {
Rate: ptr(300),
Expand All @@ -123,6 +133,7 @@ func TestLoadConfig(t *testing.T) {
ThrottleBehavior: ThrottleBehaviorError,
ThrottleInterval: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
Overrides: map[string]RateLimitOverrides{
"project-id:e678ebd7-3a15-43dd-a95c-1cf0639a6292": {
Rate: ptr(300),
Expand All @@ -141,6 +152,7 @@ func TestLoadConfig(t *testing.T) {
ThrottleBehavior: ThrottleBehaviorError,
ThrottleInterval: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
Overrides: map[string]RateLimitOverrides{
"project-id:e678ebd7-3a15-43dd-a95c-1cf0639a6292": {
Burst: ptr(400),
Expand All @@ -159,6 +171,7 @@ func TestLoadConfig(t *testing.T) {
ThrottleBehavior: ThrottleBehaviorError,
ThrottleInterval: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
Overrides: map[string]RateLimitOverrides{
"project-id:e678ebd7-3a15-43dd-a95c-1cf0639a6292": {
Rate: ptr(400),
Expand All @@ -167,6 +180,43 @@ func TestLoadConfig(t *testing.T) {
},
},
},
{
name: "overrides_static_only",
expected: &Config{
Type: LocalRateLimiter,
RateLimitSettings: RateLimitSettings{
Rate: 100,
Burst: 200,
Strategy: StrategyRateLimitBytes,
ThrottleBehavior: ThrottleBehaviorError,
ThrottleInterval: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
Overrides: map[string]RateLimitOverrides{
"project-id:e678ebd7-3a15-43dd-a95c-1cf0639a6292": {
StaticOnly: true,
},
},
},
},
{
name: "dynamic_rate_limit",
expected: &Config{
Type: GubernatorRateLimiter,
RateLimitSettings: RateLimitSettings{
Rate: 100,
Burst: 200, // Unused for dynamic.
Strategy: StrategyRateLimitBytes,
ThrottleBehavior: ThrottleBehaviorError,
ThrottleInterval: 1 * time.Second,
},
DynamicRateLimiting: DynamicRateLimiting{
Enabled: true,
WindowMultiplier: 1.5,
WindowDuration: time.Minute,
},
},
},
{
name: "invalid_rate",
expectedErr: "rate must be greater than zero",
Expand Down
Loading