Skip to content

Commit 4a16c6a

Browse files
authored
x-pack/metricbeat/module/statsd: Skip invalid statsd packet(s) (#35075)
* x-pack/metricbeat/module/statsd: Skip invalid packet(s) and continue parsing Previously, invalid statsd metric packets would break the ingestion and further metric packets were left unparsed. Now it'd be able to skip invalid packets and continue the parsing.
1 parent e1c1f3a commit 4a16c6a

File tree

3 files changed

+100
-37
lines changed

3 files changed

+100
-37
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
202202
- Fix no error logs displayed in CloudWatch EC2, RDS and SQS metadata {issue}34985[34985] {pull}35035[35035]
203203
- Remove Beta warning from IIS application_pool metricset {pull}35480[35480]
204204
- Improve documentation for ActiveMQ module {issue}35113[35113] {pull}35558[35558]
205+
- Resolve statsd module's prematurely halting of metrics parsing upon encountering an invalid packet. {pull}35075[35075]
205206

206207
*Osquerybeat*
207208

x-pack/metricbeat/module/statsd/server/data.go

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ package server
66

77
import (
88
"bytes"
9+
"errors"
10+
"fmt"
911
"strconv"
1012
"time"
1113

12-
"github.com/pkg/errors"
13-
1414
"github.com/elastic/beats/v7/libbeat/common"
1515
"github.com/elastic/beats/v7/metricbeat/helper/server"
1616
"github.com/elastic/elastic-agent-libs/logp"
@@ -20,8 +20,7 @@ import (
2020
var errInvalidPacket = errors.New("invalid statsd packet")
2121

2222
type metricProcessor struct {
23-
registry *registry
24-
reservoirSize int
23+
registry *registry
2524
}
2625

2726
type statsdMetric struct {
@@ -32,12 +31,12 @@ type statsdMetric struct {
3231
tags map[string]string
3332
}
3433

35-
func splitTags(rawTags []byte, kvSep []byte) map[string]string {
34+
func splitTags(rawTags, kvSep []byte) map[string]string {
3635
tags := map[string]string{}
3736
for _, kv := range bytes.Split(rawTags, []byte(",")) {
3837
kvSplit := bytes.SplitN(kv, kvSep, 2)
3938
if len(kvSplit) != 2 {
40-
logger.Warnf("could not parse tags")
39+
logger.Warn("could not parse tags")
4140
continue
4241
}
4342
tags[string(kvSplit[0])] = string(kvSplit[1])
@@ -86,14 +85,16 @@ func parseSingle(b []byte) (statsdMetric, error) {
8685
return s, nil
8786
}
8887

89-
// parse will parse a statsd metric into its components
88+
// parse will parse statsd metrics into individual metric and then its components
9089
func parse(b []byte) ([]statsdMetric, error) {
91-
metrics := []statsdMetric{}
92-
for _, rawMetric := range bytes.Split(b, []byte("\n")) {
93-
if len(rawMetric) > 0 {
94-
metric, err := parseSingle(rawMetric)
90+
rawMetrics := bytes.Split(b, []byte("\n"))
91+
metrics := make([]statsdMetric, 0, len(rawMetrics))
92+
for i := range rawMetrics {
93+
if len(rawMetrics[i]) > 0 {
94+
metric, err := parseSingle(rawMetrics[i])
9595
if err != nil {
96-
return metrics, err
96+
logger.Warnf("invalid packet: %s", err)
97+
continue
9798
}
9899
metrics = append(metrics, metric)
99100
}
@@ -120,13 +121,13 @@ func eventMapping(metricName string, metricValue interface{}, metricSetFields ma
120121
// Not all labels match
121122
// Skip and continue to next mapping
122123
if len(res) != (len(mapping.Labels) + 1) {
123-
logger.Debugf("not all labels match in statsd.mapping, skipped")
124+
logger.Debug("not all labels match in statsd.mapping, skipped")
124125
continue
125126
}
126127

127128
// Let's add the metric set fields from labels
128129
names := mapping.regex.SubexpNames()
129-
for i, _ := range res {
130+
for i := range res {
130131
for _, label := range mapping.Labels {
131132
if label.Attr != names[i] {
132133
continue
@@ -139,8 +140,6 @@ func eventMapping(metricName string, metricValue interface{}, metricSetFields ma
139140
// Let's add the metric with the value field
140141
metricSetFields[mapping.Value.Field] = metricValue
141142
}
142-
143-
return
144143
}
145144

146145
func newMetricProcessor(ttl time.Duration) *metricProcessor {
@@ -162,10 +161,10 @@ func (p *metricProcessor) processSingle(m statsdMetric) error {
162161
var err error
163162
sampleRate, err = strconv.ParseFloat(m.sampleRate, 64)
164163
if err != nil {
165-
return errors.Wrapf(err, "failed to process metric `%s` sample rate `%s`", m.name, m.sampleRate)
164+
return fmt.Errorf("failed to process metric `%s` sample rate `%s`: %w", m.name, m.sampleRate, err)
166165
}
167166
if sampleRate <= 0.0 {
168-
return errors.Errorf("sample rate of 0.0 is invalid for metric `%s`", m.name)
167+
return fmt.Errorf("sample rate of 0.0 is invalid for metric `%s`: %w", m.name, err)
169168
}
170169
}
171170

@@ -174,7 +173,7 @@ func (p *metricProcessor) processSingle(m statsdMetric) error {
174173
c := p.registry.GetOrNewCounter(m.name, m.tags)
175174
v, err := strconv.ParseInt(m.value, 10, 64)
176175
if err != nil {
177-
return errors.Wrapf(err, "failed to process counter `%s` with value `%s`", m.name, m.value)
176+
return fmt.Errorf("failed to process counter `%s` with value `%s`: %w", m.name, m.value, err)
178177
}
179178
// apply sample rate
180179
v = int64(float64(v) * (1.0 / sampleRate))
@@ -183,9 +182,8 @@ func (p *metricProcessor) processSingle(m statsdMetric) error {
183182
c := p.registry.GetOrNewGauge64(m.name, m.tags)
184183
v, err := strconv.ParseFloat(m.value, 64)
185184
if err != nil {
186-
return errors.Wrapf(err, "failed to process gauge `%s` with value `%s`", m.name, m.value)
185+
return fmt.Errorf("failed to process gauge `%s` with value `%s`: %w", m.name, m.value, err)
187186
}
188-
189187
// inc/dec or set
190188
if m.value[0] == '+' || m.value[0] == '-' {
191189
c.Inc(v)
@@ -196,14 +194,14 @@ func (p *metricProcessor) processSingle(m statsdMetric) error {
196194
c := p.registry.GetOrNewTimer(m.name, m.tags)
197195
v, err := strconv.ParseFloat(m.value, 64)
198196
if err != nil {
199-
return errors.Wrapf(err, "failed to process timer `%s` with value `%s`", m.name, m.value)
197+
return fmt.Errorf("failed to process timer `%s` with value `%s`: %w", m.name, m.value, err)
200198
}
201199
c.SampledUpdate(time.Duration(v), sampleRate)
202200
case "h": // TODO: can these be floats?
203201
c := p.registry.GetOrNewHistogram(m.name, m.tags)
204202
v, err := strconv.ParseInt(m.value, 10, 64)
205203
if err != nil {
206-
return errors.Wrapf(err, "failed to process histogram `%s` with value `%s`", m.name, m.value)
204+
return fmt.Errorf("failed to process histogram `%s` with value `%s`: %w", m.name, m.value, err)
207205
}
208206
c.Update(v)
209207
case "s":

x-pack/metricbeat/module/statsd/server/data_test.go

Lines changed: 78 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package server
66

77
import (
8-
"fmt"
8+
"errors"
99
"testing"
1010
"time"
1111

@@ -820,7 +820,7 @@ func TestBuildMappings(t *testing.T) {
820820
value:
821821
field: started
822822
`,
823-
err: fmt.Errorf(`repeated label fields "repeated_label_field"`),
823+
err: errors.New(`repeated label fields "repeated_label_field"`),
824824
expected: nil,
825825
},
826826
{
@@ -833,13 +833,14 @@ func TestBuildMappings(t *testing.T) {
833833
value:
834834
field: colliding_field
835835
`,
836-
err: fmt.Errorf(`collision between label field "colliding_field" and value field "colliding_field"`),
836+
err: errors.New(`collision between label field "colliding_field" and value field "colliding_field"`),
837837
expected: nil,
838838
},
839839
} {
840840
t.Run(test.title, func(t *testing.T) {
841841
var mappings []StatsdMapping
842842
err := yaml.Unmarshal([]byte(test.input), &mappings)
843+
require.NoError(t, err)
843844
actual, err := buildMappings(mappings)
844845
for k, v := range actual {
845846
v.regex = nil
@@ -883,12 +884,36 @@ func TestParseMetrics(t *testing.T) {
883884
}},
884885
},
885886
{
886-
input: "decrement-counter:-15|c",
887-
expected: []statsdMetric{{
888-
name: "decrement-counter",
889-
metricType: "c",
890-
value: "-15",
891-
}},
887+
// All metrics are parsed except the invalid packet
888+
input: "decrement-counter:-15|c\nmeter1-1.4|m\ndecrement-counter:-20|c",
889+
expected: []statsdMetric{
890+
{
891+
name: "decrement-counter",
892+
metricType: "c",
893+
value: "-15",
894+
},
895+
{
896+
name: "decrement-counter",
897+
metricType: "c",
898+
value: "-20",
899+
},
900+
},
901+
},
902+
{
903+
// All metrics are parsed except the invalid packet
904+
input: "meter1-1.4|m\ndecrement-counter:-20|c\ntimer1:1.2|ms",
905+
expected: []statsdMetric{
906+
{
907+
name: "decrement-counter",
908+
metricType: "c",
909+
value: "-20",
910+
},
911+
{
912+
name: "timer1",
913+
metricType: "ms",
914+
value: "1.2",
915+
},
916+
},
892917
},
893918
{
894919
input: "timer1:1.2|ms",
@@ -995,12 +1020,10 @@ func TestParseMetrics(t *testing.T) {
9951020
{
9961021
input: "meter1-1.4|m",
9971022
expected: []statsdMetric{},
998-
err: errInvalidPacket,
9991023
},
10001024
{
10011025
input: "meter1:1.4-m",
10021026
expected: []statsdMetric{},
1003-
err: errInvalidPacket,
10041027
},
10051028
} {
10061029
actual, err := parse([]byte(test.input))
@@ -1016,6 +1039,47 @@ func TestParseMetrics(t *testing.T) {
10161039
}
10171040
}
10181041

1042+
func TestParseSingle(t *testing.T) {
1043+
tests := map[string]struct {
1044+
input string
1045+
err error
1046+
want statsdMetric
1047+
}{
1048+
"invalid packet #1": {input: "meter1-1.4|m", err: errInvalidPacket, want: statsdMetric{}},
1049+
"invalid packet #2": {input: "meter1:1.4-m", err: errInvalidPacket, want: statsdMetric{}},
1050+
"valid packet: counter with tags": {
1051+
input: "tags1:1|c|#k1:v1,k2:v2",
1052+
err: nil,
1053+
want: statsdMetric{
1054+
name: "tags1",
1055+
metricType: "c",
1056+
sampleRate: "",
1057+
value: "1",
1058+
tags: map[string]string{"k1": "v1", "k2": "v2"},
1059+
},
1060+
},
1061+
"valid packet: gauge": {
1062+
input: "gauge1:1.0|g",
1063+
err: nil,
1064+
want: statsdMetric{
1065+
name: "gauge1",
1066+
metricType: "g",
1067+
sampleRate: "",
1068+
value: "1.0",
1069+
tags: nil,
1070+
},
1071+
},
1072+
}
1073+
1074+
for name, tc := range tests {
1075+
t.Run(name, func(t *testing.T) {
1076+
got, err := parseSingle([]byte(tc.input))
1077+
assert.Equal(t, tc.err, err)
1078+
assert.Equal(t, tc.want, got)
1079+
})
1080+
}
1081+
}
1082+
10191083
type testUDPEvent struct {
10201084
event mapstr.M
10211085
meta server.Meta
@@ -1068,13 +1132,13 @@ func TestTagsGrouping(t *testing.T) {
10681132
}
10691133

10701134
expectedTags := []mapstr.M{
1071-
mapstr.M{
1135+
{
10721136
"labels": mapstr.M{
10731137
"k1": "v1",
10741138
"k2": "v2",
10751139
},
10761140
},
1077-
mapstr.M{
1141+
{
10781142
"labels": mapstr.M{
10791143
"k1": "v2",
10801144
"k2": "v3",
@@ -1182,6 +1246,7 @@ func TestGaugeDeltas(t *testing.T) {
11821246
"metric01": map[string]interface{}{"value": -1.0},
11831247
})
11841248
}
1249+
11851250
func TestCounter(t *testing.T) {
11861251
ms := mbtest.NewMetricSet(t, map[string]interface{}{"module": "statsd"}).(*MetricSet)
11871252
testData := []string{
@@ -1316,5 +1381,4 @@ func BenchmarkIngest(b *testing.B) {
13161381
err := ms.processor.Process(events[i%len(events)])
13171382
assert.NoError(b, err)
13181383
}
1319-
13201384
}

0 commit comments

Comments
 (0)