Skip to content

[8.18](backport #42398) Handle leak of process info in hostfs provider for add_session_metadata #45322

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions x-pack/auditbeat/processors/sessionmd/add_session_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ package sessionmd
import (
"context"
"fmt"
"math"
"reflect"
"strconv"
"sync/atomic"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors"
Expand All @@ -23,6 +25,7 @@ import (
cfg "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
)

const (
Expand All @@ -37,6 +40,9 @@ func InitializeModule() {
processors.RegisterPlugin(processorName, New)
}

// instanceID assigns a uniqueID to every instance of the metrics handler for the procfs DB
var instanceID atomic.Uint32

type addSessionMetadata struct {
ctx context.Context
cancel context.CancelFunc
Expand All @@ -56,9 +62,17 @@ func New(cfg *cfg.C) (beat.Processor, error) {

logger := logp.NewLogger(logName)

id := int(instanceID.Add(1))
regName := "processor.add_session_metadata.processdb"
// if more than one instance of the DB is running, start to increment the metrics keys.
if id > 1 {
regName = fmt.Sprintf("%s.%d", regName, id)
}
metricsReg := monitoring.Default.NewRegistry(regName)

ctx, cancel := context.WithCancel(context.Background())
reader := procfs.NewProcfsReader(*logger)
db, err := processdb.NewDB(reader, *logger)
db, err := processdb.NewDB(ctx, metricsReg, reader, logger, c.DBReaperPeriod, c.ReapProcesses)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create DB: %w", err)
Expand Down Expand Up @@ -182,7 +196,7 @@ func (p *addSessionMetadata) enrich(ev *beat.Event) (*beat.Event, error) {
fullProcess, err = p.db.GetProcess(pid)
if err != nil {
e := fmt.Errorf("pid %v not found in db: %w", pid, err)
p.logger.Debugw("PID not found in provider", "pid", pid, "error", err)
p.logger.Debugf("PID %d not found in provider: %s", pid, err)
return nil, e
}
}
Expand Down Expand Up @@ -210,22 +224,24 @@ func pidToUInt32(value interface{}) (pid uint32, err error) {
switch v := value.(type) {
case string:
nr, err := strconv.Atoi(v)
if err != nil {
if err != nil || nr < 0 || nr > math.MaxUint32 {
return 0, fmt.Errorf("error converting string to integer: %w", err)
}
pid = uint32(nr)
case uint32:
pid = v
case int, int8, int16, int32, int64:
pid64 := reflect.ValueOf(v).Int()
if pid = uint32(pid64); int64(pid) != pid64 {
if pid64 < 0 || pid64 > math.MaxUint32 {
return 0, fmt.Errorf("integer out of range: %d", pid64)
}
pid = uint32(pid64)
case uint, uintptr, uint8, uint16, uint64:
pidu64 := reflect.ValueOf(v).Uint()
if pid = uint32(pidu64); uint64(pid) != pidu64 {
if pidu64 > math.MaxUint32 {
return 0, fmt.Errorf("integer out of range: %d", pidu64)
}
pid = uint32(pidu64)
default:
return 0, fmt.Errorf("not an integer or string, but %T", v)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
package sessionmd

import (
"context"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
)

var (
Expand Down Expand Up @@ -337,10 +340,12 @@ var (
)

func TestEnrich(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*15)
defer cancel()
for _, tt := range enrichTests {
t.Run(tt.testName, func(t *testing.T) {
reader := procfs.NewMockReader()
db, err := processdb.NewDB(reader, *logger)
db, err := processdb.NewDB(ctx, monitoring.NewRegistry(), reader, logger, time.Second*30, false)
require.Nil(t, err)

for _, ev := range tt.mockProcesses {
Expand Down
19 changes: 16 additions & 3 deletions x-pack/auditbeat/processors/sessionmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,28 @@

package sessionmd

import "time"

// Config for add_session_metadata processor.
type config struct {
Backend string `config:"backend"`
// Backend specifies the data source for the processor. Possible values are `auto`, `procfs`, and `kernel_tracing`
Backend string `config:"backend"`
// PIDField specifies the event field used to locate the process ID
PIDField string `config:"pid_field"`
/// DBReaperPeriod specifies the interval of how often the backing process DB should remove orphaned and exited events.
// Only valid for the `procfs` backend, or if `auto` falls back to `procfs`
DBReaperPeriod time.Duration `config:"db_reaper_period"`
// ReapProcesses, if enabled, will tell the process DB reaper thread to also remove orphaned process exec events, in addition to orphaned exit events and compleated process events.
// This can result in data loss if auditbeat is running in an environment where it can't properly talk to procfs, but it can also reduce the memory footprint of auditbeat.
// Only valid for the `procfs` backend.
ReapProcesses bool `config:"reap_processes"`
}

func defaultConfig() config {
return config{
Backend: "auto",
PIDField: "process.pid",
Backend: "auto",
PIDField: "process.pid",
DBReaperPeriod: time.Second * 30,
ReapProcesses: false,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ auditbeat.modules:
- module: auditd
processors:
- add_session_metadata:
backend: "auto"
backend: "auto"
-------------------------------------
+
. Add audit rules in the modules configuration section of `auditbeat.yml` or the
Expand All @@ -96,3 +96,25 @@ auditbeat.modules:
-------------------------------------
sudo systemctl restart auditbeat
-------------------------------------

===== Configuring the Process Database

When using the `procfs` backend, `add_session_metadata` will use an in-memory database to store and match events as they arrive to the processor.
This processor has a number of additional config values:

[source,yaml]
-------------------------------------
auditbeat.modules:
- module: auditd
processors:
- add_session_metadata:
backend: "procfs"
reap_processes: false
db_reaper_period: 30s
-------------------------------------

* `reap_processes` tells the database to remove orphan `execve` and `execveat` process events for which no matching `exit_group` event is found.
This may result in incomplete data, but will reduce memory usage under high load. The default is `false`.
* `db_reaper_period` specifies the time interval of the reaper process that will regularly remove exited and orphaned processes from the database.
Setting this value lower my result in incomplete data, but will reduce memory pressure. Setting this to a higher value may help on systems with high load, but will increase memory usage.
The default is `30s.`
Loading
Loading