Skip to content

Commit d7e7efb

Browse files
[libbeat] Cache processor docs and memory fixes. (backport #38561) (#39001)
* [libbeat] Cache processor docs and memory fixes. (#38561) Change cache processor documentation from write_period to write_interval. Fix expires heap cleanup on partial file writes: popping the expiries heap completely on every partial write caused a panic on the following put operations at the time of expiry check. Fix expires infinite growth when large TTLs and recurring keys are cached: when TTLs are large and keys are recurrent, we kept track of an arbitrary amount of expiry heap entries for the same key, causing memory to grow. (cherry picked from commit b1e4abc) * Update CHANGELOG.next.asciidoc --------- Co-authored-by: Marc Guasch <marc-gr@users.noreply.github.com>
1 parent 9411751 commit d7e7efb

File tree

6 files changed

+220
-9
lines changed

6 files changed

+220
-9
lines changed

CHANGELOG.next.asciidoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
4141

4242
*Affecting all Beats*
4343

44+
- Change cache processor documentation from `write_period` to `write_interval`. {pull}38561[38561]
45+
- Fix cache processor expiries heap cleanup on partial file writes. {pull}38561[38561]
46+
- Fix cache processor expiries infinite growth when large a large TTL is used and recurring keys are cached. {pull}38561[38561]
4447

4548
*Auditbeat*
4649

libbeat/processors/cache/docs/cache.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ One of `backend.memory.id` or `backend.file.id` must be provided.
5454
`backend.capacity`:: The number of elements that can be stored in the cache. `put` operations that would cause the capacity to be exceeded will result in evictions of the oldest elements. Values at or below zero indicate no limit. The capacity should not be lower than the number of elements that are expected to be referenced when processing the input as evicted elements are lost. The default is `0`, no limit.
5555
`backend.memory.id`:: The ID of a memory-based cache. Use the same ID across instance to reference the same cache.
5656
`backend.file.id`:: The ID of a file-based cache. Use the same ID across instance to reference the same cache.
57-
`backend.file.write_period`:: The interval between periodic cache writes to the backing file. Valid time units are h, m, s, ms, us/µs and ns. Periodic writes are only made if `backend.file.write_period` is greater than zero. The contents are always written out to the backing file when the processor is closed. Default is zero, no periodic writes.
57+
`backend.file.write_interval`:: The interval between periodic cache writes to the backing file. Valid time units are h, m, s, ms, us/µs and ns. Periodic writes are only made if `backend.file.write_interval` is greater than zero. The contents are always written out to the backing file when the processor is closed. Default is zero, no periodic writes.
5858

5959
One of `put`, `get` or `delete` must be provided.
6060

libbeat/processors/cache/file_store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,8 +283,8 @@ func (c *fileStore) writeState(final bool) {
283283
enc := json.NewEncoder(f)
284284
enc.SetEscapeHTML(false)
285285
now := time.Now()
286-
for c.expiries.Len() != 0 {
287-
e := c.expiries.pop()
286+
for i := 0; i < c.expiries.Len(); i++ {
287+
e := c.expiries[i]
288288
if e.Expires.Before(now) {
289289
// Don't write expired elements.
290290
continue

libbeat/processors/cache/file_store_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,109 @@ var fileStoreTests = []struct {
352352
{Key: "three", Value: 3.0},
353353
},
354354
},
355+
{
356+
name: "periodic_write",
357+
cfg: config{
358+
Store: &storeConfig{
359+
File: &fileConfig{ID: "test"},
360+
Capacity: 1000,
361+
Effort: 10,
362+
},
363+
Get: &getConfig{},
364+
},
365+
want: &fileStore{path: "testdata/periodic_write", memStore: memStore{
366+
id: "test",
367+
cache: map[string]*CacheEntry{},
368+
refs: 1,
369+
// TTL, capacity and effort are set only by put.
370+
ttl: -1,
371+
cap: -1,
372+
effort: -1,
373+
}},
374+
steps: []fileStoreTestSteps{
375+
0: {
376+
doTo: func(s *fileStore) error {
377+
putCfg := config{
378+
Store: &storeConfig{
379+
File: &fileConfig{ID: "test"},
380+
Capacity: 1000,
381+
Effort: 10,
382+
},
383+
Put: &putConfig{
384+
TTL: ptrTo(time.Second),
385+
},
386+
}
387+
s.add(putCfg)
388+
return nil
389+
},
390+
want: &fileStore{path: "testdata/periodic_write", memStore: memStore{
391+
id: "test",
392+
cache: map[string]*CacheEntry{},
393+
refs: 2,
394+
dirty: false,
395+
ttl: time.Second,
396+
cap: 1000,
397+
effort: 10,
398+
}},
399+
},
400+
1: {
401+
doTo: func(s *fileStore) error {
402+
s.Put("one", 1)
403+
s.Put("two", 2)
404+
s.Put("three", 3)
405+
return nil
406+
},
407+
want: &fileStore{path: "testdata/periodic_write", memStore: memStore{
408+
id: "test",
409+
cache: map[string]*CacheEntry{
410+
"one": {Key: "one", Value: int(1), index: 0},
411+
"two": {Key: "two", Value: int(2), index: 1},
412+
"three": {Key: "three", Value: int(3), index: 2},
413+
},
414+
expiries: expiryHeap{
415+
{Key: "one", Value: int(1), index: 0},
416+
{Key: "two", Value: int(2), index: 1},
417+
{Key: "three", Value: int(3), index: 2},
418+
},
419+
refs: 2,
420+
dirty: true,
421+
ttl: time.Second,
422+
cap: 1000,
423+
effort: 10,
424+
}},
425+
},
426+
2: {
427+
doTo: func(s *fileStore) error {
428+
s.writeState(false)
429+
return nil
430+
},
431+
want: &fileStore{path: "testdata/periodic_write", memStore: memStore{
432+
id: "test",
433+
cache: map[string]*CacheEntry{
434+
"one": {Key: "one", Value: int(1), index: 0},
435+
"two": {Key: "two", Value: int(2), index: 1},
436+
"three": {Key: "three", Value: int(3), index: 2},
437+
},
438+
expiries: expiryHeap{
439+
{Key: "one", Value: int(1), index: 0},
440+
{Key: "two", Value: int(2), index: 1},
441+
{Key: "three", Value: int(3), index: 2},
442+
},
443+
refs: 2,
444+
dirty: false,
445+
ttl: time.Second,
446+
cap: 1000,
447+
effort: 10,
448+
}},
449+
},
450+
},
451+
wantPersisted: []*CacheEntry{
452+
// Numeric values are float due to JSON round-trip.
453+
{Key: "one", Value: 1.0},
454+
{Key: "two", Value: 2.0},
455+
{Key: "three", Value: 3.0},
456+
},
457+
},
355458
}
356459

357460
func TestFileStore(t *testing.T) {

libbeat/processors/cache/mem_store.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,13 +172,21 @@ func (c *memStore) Put(key string, val any) error {
172172
defer c.mu.Unlock()
173173
now := time.Now()
174174
c.evictExpired(now)
175-
e := &CacheEntry{
176-
Key: key,
177-
Value: val,
178-
Expires: now.Add(c.ttl),
175+
// If the key is being overwritten we remove its previous expiry entry
176+
// this will prevent expiries heap to grow with large TTLs and recurring keys.
177+
if prev, found := c.cache[key]; found {
178+
prev.Value = val
179+
prev.Expires = now.Add(c.ttl)
180+
heap.Fix(&c.expiries, prev.index)
181+
} else {
182+
e := &CacheEntry{
183+
Key: key,
184+
Value: val,
185+
Expires: now.Add(c.ttl),
186+
}
187+
c.cache[key] = e
188+
heap.Push(&c.expiries, e)
179189
}
180-
c.cache[key] = e
181-
heap.Push(&c.expiries, e)
182190
c.dirty = true
183191
return nil
184192
}

libbeat/processors/cache/mem_store_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,103 @@ var memStoreTests = []struct {
335335
},
336336
},
337337
},
338+
{
339+
name: "re-hit",
340+
cfg: config{
341+
Store: &storeConfig{
342+
Memory: &memConfig{"test"},
343+
Capacity: 1000,
344+
Effort: 10,
345+
},
346+
Get: &getConfig{},
347+
},
348+
want: &memStore{
349+
id: "test",
350+
cache: map[string]*CacheEntry{},
351+
refs: 1,
352+
// TTL, capacity and effort are set only by put.
353+
ttl: -1,
354+
cap: -1,
355+
effort: -1,
356+
},
357+
steps: []memStoreTestSteps{
358+
0: {
359+
doTo: func(s *memStore) error {
360+
putCfg := config{
361+
Store: &storeConfig{
362+
Memory: &memConfig{"test"},
363+
Capacity: 1000,
364+
Effort: 10,
365+
},
366+
Put: &putConfig{
367+
TTL: ptrTo(10 * time.Minute),
368+
},
369+
}
370+
s.add(putCfg)
371+
return nil
372+
},
373+
want: &memStore{
374+
id: "test",
375+
cache: map[string]*CacheEntry{},
376+
refs: 2,
377+
dirty: false,
378+
ttl: 10 * time.Minute,
379+
cap: 1000,
380+
effort: 10,
381+
},
382+
},
383+
1: {
384+
doTo: func(s *memStore) error {
385+
s.Put("one", 1)
386+
s.Put("two", 2)
387+
s.Put("three", 3)
388+
return nil
389+
},
390+
want: &memStore{
391+
id: "test",
392+
cache: map[string]*CacheEntry{
393+
"one": {Key: "one", Value: int(1), index: 0},
394+
"two": {Key: "two", Value: int(2), index: 1},
395+
"three": {Key: "three", Value: int(3), index: 2},
396+
},
397+
expiries: expiryHeap{
398+
{Key: "one", Value: int(1), index: 0},
399+
{Key: "two", Value: int(2), index: 1},
400+
{Key: "three", Value: int(3), index: 2},
401+
},
402+
refs: 2,
403+
dirty: true,
404+
ttl: 10 * time.Minute,
405+
cap: 1000,
406+
effort: 10,
407+
},
408+
},
409+
2: {
410+
doTo: func(s *memStore) error {
411+
s.Put("one", 1)
412+
return nil
413+
},
414+
want: &memStore{
415+
id: "test",
416+
cache: map[string]*CacheEntry{
417+
"one": {Key: "one", Value: int(1), index: 1},
418+
"two": {Key: "two", Value: int(2), index: 0},
419+
"three": {Key: "three", Value: int(3), index: 2},
420+
},
421+
expiries: expiryHeap{
422+
{Key: "two", Value: int(2), index: 0},
423+
{Key: "one", Value: int(1), index: 1},
424+
{Key: "three", Value: int(3), index: 2},
425+
},
426+
refs: 2,
427+
dirty: true,
428+
ttl: 10 * time.Minute,
429+
cap: 1000,
430+
effort: 10,
431+
},
432+
},
433+
},
434+
},
338435
}
339436

340437
func TestMemStore(t *testing.T) {

0 commit comments

Comments
 (0)