Skip to content

Commit d6ff82b

Browse files
Handle leak of process info in hostfs provider for add_session_metadata (#42398)
* handle leak in hostfs provider for sessionmd * add metrics, clean up * fix tests * add process reaper for dropped exit events * remove test code * linter * more testing, fix mock provider * fix error checks * clean up, add session maps to reaper, expand metrics * fix tests * fix tests * format * docs
1 parent d05a070 commit d6ff82b

File tree

14 files changed

+714
-148
lines changed

14 files changed

+714
-148
lines changed

x-pack/auditbeat/processors/sessionmd/add_session_metadata.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"fmt"
1212
"reflect"
1313
"strconv"
14+
"sync/atomic"
1415

1516
"github.com/elastic/beats/v7/libbeat/beat"
1617
"github.com/elastic/beats/v7/libbeat/processors"
@@ -23,6 +24,7 @@ import (
2324
cfg "github.com/elastic/elastic-agent-libs/config"
2425
"github.com/elastic/elastic-agent-libs/logp"
2526
"github.com/elastic/elastic-agent-libs/mapstr"
27+
"github.com/elastic/elastic-agent-libs/monitoring"
2628
)
2729

2830
const (
@@ -37,6 +39,9 @@ func InitializeModule() {
3739
processors.RegisterPlugin(processorName, New)
3840
}
3941

42+
// instanceID assigns a uniqueID to every instance of the metrics handler for the procfs DB
43+
var instanceID atomic.Uint32
44+
4045
type addSessionMetadata struct {
4146
ctx context.Context
4247
cancel context.CancelFunc
@@ -56,9 +61,17 @@ func New(cfg *cfg.C) (beat.Processor, error) {
5661

5762
logger := logp.NewLogger(logName)
5863

64+
id := int(instanceID.Add(1))
65+
regName := "processor.add_session_metadata.processdb"
66+
// if more than one instance of the DB is running, start to increment the metrics keys.
67+
if id > 1 {
68+
regName = fmt.Sprintf("%s.%d", regName, id)
69+
}
70+
metricsReg := monitoring.Default.NewRegistry(regName)
71+
5972
ctx, cancel := context.WithCancel(context.Background())
6073
reader := procfs.NewProcfsReader(*logger)
61-
db, err := processdb.NewDB(reader, *logger)
74+
db, err := processdb.NewDB(ctx, metricsReg, reader, logger, c.DBReaperPeriod, c.ReapProcesses)
6275
if err != nil {
6376
cancel()
6477
return nil, fmt.Errorf("failed to create DB: %w", err)
@@ -182,7 +195,7 @@ func (p *addSessionMetadata) enrich(ev *beat.Event) (*beat.Event, error) {
182195
fullProcess, err = p.db.GetProcess(pid)
183196
if err != nil {
184197
e := fmt.Errorf("pid %v not found in db: %w", pid, err)
185-
p.logger.Debugw("PID not found in provider", "pid", pid, "error", err)
198+
p.logger.Debugf("PID %d not found in provider: %s", pid, err)
186199
return nil, e
187200
}
188201
}

x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
package sessionmd
88

99
import (
10+
"context"
1011
"testing"
12+
"time"
1113

1214
"github.com/google/go-cmp/cmp"
1315
"github.com/stretchr/testify/require"
@@ -18,6 +20,7 @@ import (
1820
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types"
1921
"github.com/elastic/elastic-agent-libs/logp"
2022
"github.com/elastic/elastic-agent-libs/mapstr"
23+
"github.com/elastic/elastic-agent-libs/monitoring"
2124
)
2225

2326
var (
@@ -337,10 +340,12 @@ var (
337340
)
338341

339342
func TestEnrich(t *testing.T) {
343+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*15)
344+
defer cancel()
340345
for _, tt := range enrichTests {
341346
t.Run(tt.testName, func(t *testing.T) {
342347
reader := procfs.NewMockReader()
343-
db, err := processdb.NewDB(reader, *logger)
348+
db, err := processdb.NewDB(ctx, monitoring.NewRegistry(), reader, logger, time.Second*30, false)
344349
require.Nil(t, err)
345350

346351
for _, ev := range tt.mockProcesses {

x-pack/auditbeat/processors/sessionmd/config.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,28 @@
66

77
package sessionmd
88

9+
import "time"
10+
911
// Config for add_session_metadata processor.
1012
type config struct {
11-
Backend string `config:"backend"`
13+
// Backend specifies the data source for the processor. Possible values are `auto`, `procfs`, and `kernel_tracing`
14+
Backend string `config:"backend"`
15+
// PIDField specifies the event field used to locate the process ID
1216
PIDField string `config:"pid_field"`
17+
/// DBReaperPeriod specifies the interval of how often the backing process DB should remove orphaned and exited events.
18+
// Only valid for the `procfs` backend, or if `auto` falls back to `procfs`
19+
DBReaperPeriod time.Duration `config:"db_reaper_period"`
20+
// ReapProcesses, if enabled, will tell the process DB reaper thread to also remove orphaned process exec events, in addition to orphaned exit events and compleated process events.
21+
// This can result in data loss if auditbeat is running in an environment where it can't properly talk to procfs, but it can also reduce the memory footprint of auditbeat.
22+
// Only valid for the `procfs` backend.
23+
ReapProcesses bool `config:"reap_processes"`
1324
}
1425

1526
func defaultConfig() config {
1627
return config{
17-
Backend: "auto",
18-
PIDField: "process.pid",
28+
Backend: "auto",
29+
PIDField: "process.pid",
30+
DBReaperPeriod: time.Second * 30,
31+
ReapProcesses: false,
1932
}
2033
}

x-pack/auditbeat/processors/sessionmd/docs/add_session_metadata.asciidoc

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ auditbeat.modules:
7070
- module: auditd
7171
processors:
7272
- add_session_metadata:
73-
backend: "auto"
73+
backend: "auto"
7474
-------------------------------------
7575
+
7676
. Add audit rules in the modules configuration section of `auditbeat.yml` or the
@@ -96,3 +96,25 @@ auditbeat.modules:
9696
-------------------------------------
9797
sudo systemctl restart auditbeat
9898
-------------------------------------
99+
100+
===== Configuring the Process Database
101+
102+
When using the `procfs` backend, `add_session_metadata` will use an in-memory database to store and match events as they arrive to the processor.
103+
This processor has a number of additional config values:
104+
105+
[source,yaml]
106+
-------------------------------------
107+
auditbeat.modules:
108+
- module: auditd
109+
processors:
110+
- add_session_metadata:
111+
backend: "procfs"
112+
reap_processes: false
113+
db_reaper_period: 30s
114+
-------------------------------------
115+
116+
* `reap_processes` tells the database to remove orphan `execve` and `execveat` process events for which no matching `exit_group` event is found.
117+
This may result in incomplete data, but will reduce memory usage under high load. The default is `false`.
118+
* `db_reaper_period` specifies the time interval of the reaper process that will regularly remove exited and orphaned processes from the database.
119+
Setting this value lower my result in incomplete data, but will reduce memory pressure. Setting this to a higher value may help on systems with high load, but will increase memory usage.
120+
The default is `30s.`

x-pack/auditbeat/processors/sessionmd/processdb/db.go

Lines changed: 92 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
package processdb
88

99
import (
10-
"container/heap"
10+
"context"
1111
"encoding/base64"
1212
"errors"
1313
"fmt"
@@ -26,6 +26,7 @@ import (
2626
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/timeutils"
2727
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types"
2828
"github.com/elastic/elastic-agent-libs/logp"
29+
"github.com/elastic/elastic-agent-libs/monitoring"
2930
)
3031

3132
type EntryType string
@@ -68,9 +69,12 @@ type Process struct {
6869
CTTY tty.TTYDev
6970
Argv []string
7071
Cwd string
71-
Env map[string]string
7272
Filename string
7373
ExitCode int32
74+
75+
// procfsLookupFail is true if procfs couldn't find a matching PID in /proc.
76+
procfsLookupFail bool
77+
insertTime time.Time
7478
}
7579

7680
var (
@@ -175,22 +179,59 @@ type DB struct {
175179
entryLeaderRelationships map[uint32]uint32
176180
procfs procfs.Reader
177181
stopChan chan struct{}
178-
removalCandidates rcHeap
179-
}
180-
181-
func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) {
182+
// map of processes that we can remove during the next reaper run, if the exit event is older than `removalCandidateTimeout`
183+
removalMap map[uint32]removalCandidate
184+
ctx context.Context
185+
186+
// used for metrics reporting
187+
stats *Stats
188+
189+
// knobs for the reaper thread follows
190+
191+
// how often the reaper checks for expired or orphaned events.
192+
// A negative value disables the reaper.
193+
reaperPeriod time.Duration
194+
// Tells the reaper to remove orphaned process exec events.
195+
// If true, exec events for which no /proc entry can be found will be removed after their insertion time has passed `orphanTimeout`.
196+
// If disabled, the reaper will only remove exec events if they are matched with a exit event.
197+
reapProcesses bool
198+
// The duration after which we'll reap an orphaned process exec event for which no /proc data exists. Measured from the time the event is inserted.
199+
processReapAfter time.Duration
200+
}
201+
202+
// NewDB creates a new DB for tracking processes.
203+
//
204+
// - metrics: monitoring registry for exporting DB metrics
205+
// - reader: handler for /proc data and events.
206+
// - reaperPeriod: tells the reaper to update its tracking of deat and orphaned processes every at every `n` period.
207+
// - reapProcesses: optionally tell the reaper to also reap orphan processes from the DB, if no matching exit event can be found.
208+
// May result in data loss if the DB in under load and events do not arrive in a timely fashion.
209+
func NewDB(ctx context.Context, metrics *monitoring.Registry, reader procfs.Reader, logger *logp.Logger, reaperPeriod time.Duration, reapProcesses bool) (*DB, error) {
182210
once.Do(initialize)
183211
if initError != nil {
184212
return &DB{}, initError
185213
}
186214
db := DB{
187-
logger: logp.NewLogger("processdb"),
215+
logger: logger.Named("processdb"),
188216
processes: make(map[uint32]Process),
189217
entryLeaders: make(map[uint32]EntryType),
190218
entryLeaderRelationships: make(map[uint32]uint32),
191219
procfs: reader,
192220
stopChan: make(chan struct{}),
193-
removalCandidates: make(rcHeap, 0),
221+
removalMap: make(map[uint32]removalCandidate),
222+
reaperPeriod: reaperPeriod,
223+
stats: NewStats(metrics),
224+
reapProcesses: reapProcesses,
225+
processReapAfter: time.Minute * 10,
226+
ctx: ctx,
227+
}
228+
229+
if db.reaperPeriod > 0 {
230+
logger.Infof("starting processDB reaper with interval %s", db.reaperPeriod)
231+
}
232+
233+
if db.reapProcesses {
234+
logger.Info("WARNING: reaping orphaned processes. May result in data loss.")
194235
}
195236
db.startReaper()
196237
return &db, nil
@@ -260,18 +301,34 @@ func (db *DB) insertProcess(process Process) {
260301
}
261302
}
262303

304+
// InsertExec adds an exec event
263305
func (db *DB) InsertExec(exec types.ProcessExecEvent) {
264306
db.mutex.Lock()
265307
defer db.mutex.Unlock()
266308

267309
proc := Process{
268-
PIDs: pidInfoFromProto(exec.PIDs),
269-
Creds: credInfoFromProto(exec.Creds),
270-
CTTY: ttyDevFromProto(exec.CTTY),
271-
Argv: exec.Argv,
272-
Cwd: exec.CWD,
273-
Env: exec.Env,
274-
Filename: exec.Filename,
310+
PIDs: pidInfoFromProto(exec.PIDs),
311+
Creds: credInfoFromProto(exec.Creds),
312+
CTTY: ttyDevFromProto(exec.CTTY),
313+
Argv: exec.Argv,
314+
Cwd: exec.CWD,
315+
Filename: exec.Filename,
316+
procfsLookupFail: exec.ProcfsLookupFail,
317+
insertTime: time.Now(),
318+
}
319+
if proc.procfsLookupFail {
320+
db.stats.procfsLookupFail.Add(1)
321+
}
322+
323+
// check to see if an orphaned exit event maps to this exec event.
324+
// the out-of-order problem where we get the exit before the exec usually happens under load.
325+
// if we don't track orphaned processes like this, we'll never scrub them from the DB.
326+
if evt, ok := db.removalMap[proc.PIDs.Tgid]; ok {
327+
proc.ExitCode = evt.exitCode
328+
db.stats.resolvedOrphanExits.Add(1)
329+
db.logger.Debugf("resolved orphan exit for pid %d", proc.PIDs.Tgid)
330+
evt.startTime = proc.PIDs.StartTimeNS
331+
db.removalMap[proc.PIDs.Tgid] = evt
275332
}
276333

277334
db.processes[exec.PIDs.Tgid] = proc
@@ -286,7 +343,7 @@ func (db *DB) createEntryLeader(pid uint32, entryType EntryType) {
286343
db.logger.Debugf("created entry leader %d: %s, name: %s", pid, string(entryType), db.processes[pid].Filename)
287344
}
288345

289-
// pid returned is a pointer type because its possible for no
346+
// pid returned is a pointer type because it is possible no matching PID is found.
290347
func (db *DB) evaluateEntryLeader(p Process) *uint32 {
291348
pid := p.PIDs.Tgid
292349

@@ -387,6 +444,7 @@ func (db *DB) evaluateEntryLeader(p Process) *uint32 {
387444
return nil
388445
}
389446

447+
// InsertSetsid adds a set SID event
390448
func (db *DB) InsertSetsid(setsid types.ProcessSetsidEvent) {
391449
db.mutex.Lock()
392450
defer db.mutex.Unlock()
@@ -401,23 +459,28 @@ func (db *DB) InsertSetsid(setsid types.ProcessSetsidEvent) {
401459
}
402460
}
403461

462+
// InsertExit adds a process exit event
404463
func (db *DB) InsertExit(exit types.ProcessExitEvent) {
405464
db.mutex.Lock()
406465
defer db.mutex.Unlock()
407-
408466
pid := exit.PIDs.Tgid
467+
newRemoval := removalCandidate{
468+
pid: pid,
469+
exitTime: time.Now(),
470+
exitCode: exit.ExitCode,
471+
}
472+
409473
process, ok := db.processes[pid]
410474
if !ok {
411-
db.logger.Debugf("could not insert exit, pid %v not found in db", pid)
412-
return
475+
newRemoval.orphanTime = time.Now()
476+
db.logger.Debugf("pid %v for exit event not found in db, adding as orphan", pid)
477+
} else {
478+
// If we already have the process, add our exit info
479+
process.ExitCode = exit.ExitCode
480+
db.processes[pid] = process
481+
newRemoval.startTime = process.PIDs.StartTimeNS
413482
}
414-
process.ExitCode = exit.ExitCode
415-
db.processes[pid] = process
416-
heap.Push(&db.removalCandidates, removalCandidate{
417-
pid: pid,
418-
startTime: process.PIDs.StartTimeNS,
419-
exitTime: time.Now(),
420-
})
483+
db.removalMap[pid] = newRemoval
421484
}
422485

423486
func fullProcessFromDBProcess(p Process) types.Process {
@@ -610,8 +673,10 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {
610673

611674
process, ok := db.processes[pid]
612675
if !ok {
676+
db.stats.failedToFindProcessCount.Add(1)
613677
return types.Process{}, errors.New("process not found")
614678
}
679+
db.stats.servedProcessCount.Add(1)
615680

616681
ret := fullProcessFromDBProcess(process)
617682

@@ -651,6 +716,7 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {
651716
}
652717
} else {
653718
db.logger.Debugf("failed to find entry leader for %d (%s)", pid, db.processes[pid].Filename)
719+
db.stats.entryLeaderLookupFail.Add(1)
654720
}
655721

656722
db.setEntityID(&ret)
@@ -695,7 +761,6 @@ func (db *DB) ScrapeProcfs() []uint32 {
695761
CTTY: ttyDevFromProto(procInfo.CTTY),
696762
Argv: procInfo.Argv,
697763
Cwd: procInfo.Cwd,
698-
Env: procInfo.Env,
699764
Filename: procInfo.Filename,
700765
}
701766

0 commit comments

Comments
 (0)