Skip to content

Commit e2d3177

Browse files
[9.0](backport #44576) [metricbeat][kafka] fix panic when fetching consumergroup member assignments (#44872)
* [metricbeat][kafka] fix panic when fetching consumergroup member assignments (#44576) * [metricbeat][kafka] fix panic when fetching consumergroup member assignments * add changelog entry * mage fmt * fix linter * update new changelog * fix behaviour in case of sarama error in MemberDescription conversion * fix linter * move changelog entry to appropriate version section * remove entries from docs/release-notes/index.md --------- Co-authored-by: Ishleen Kaur <102962586+ishleenk17@users.noreply.github.com> (cherry picked from commit 91de405) # Conflicts: # metricbeat/module/kafka/broker_test.go * resolve merge conflicts * remove unrelated changelog entries --------- Co-authored-by: stefans-elastic <stefan.stas@elastic.co>
1 parent eea7f9d commit e2d3177

File tree

3 files changed

+114
-15
lines changed

3 files changed

+114
-15
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
155155
- Fix Azure Monitor metric timespan to restore Storage Account PT1H metrics {issue}40376[40376] {pull}40367[40367]
156156
- Remove excessive info-level logs in cgroups setup {pull}40491[40491]
157157
- Add missing ECS Cloud fields in GCP `metrics` metricset when using `exclude_labels: true` {issue}40437[40437] {pull}40467[40467]
158+
- Fix panic in kafka consumergroup member assignment fetching when there are 0 members in consumer group. {pull}44576[44576]
158159
- Upgrade `go.mongodb.org/mongo-driver` from `v1.14.0` to `v1.17.4` to fix connection leaks in MongoDB module {pull}44769[44769]
159160

160161
*Osquerybeat*

metricbeat/module/kafka/broker.go

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (b *Broker) Connect() error {
138138
other := finder.findBroker(brokerAddress(b.broker), meta.Brokers)
139139
if other == nil { // no broker found
140140
closeBroker(b.broker)
141-
return fmt.Errorf("No advertised broker with address %v found", b.Addr())
141+
return fmt.Errorf("no advertised broker with address %v found", b.Addr())
142142
}
143143

144144
debugf("found matching broker %v with id %v", other.Addr(), other.ID())
@@ -244,21 +244,11 @@ func (b *Broker) DescribeGroups(
244244

245245
members := map[string]MemberDescription{}
246246
for memberID, memberDescr := range descr.Members {
247-
assignment, err := memberDescr.GetMemberAssignment()
247+
memberDescription, err := fromSaramaGroupMemberDescription(memberDescr)
248248
if err != nil {
249-
members[memberID] = MemberDescription{
250-
ClientID: memberDescr.ClientId,
251-
ClientHost: memberDescr.ClientHost,
252-
Err: err,
253-
}
254249
continue
255250
}
256-
257-
members[memberID] = MemberDescription{
258-
ClientID: memberDescr.ClientId,
259-
ClientHost: memberDescr.ClientHost,
260-
Topics: assignment.Topics,
261-
}
251+
members[memberID] = memberDescription
262252
}
263253
groups[descr.GroupId] = GroupDescription{Members: members}
264254
}
@@ -538,6 +528,31 @@ func (m *brokerFinder) lookupHosts(ips []net.IP) []string {
538528
return hosts
539529
}
540530

531+
func fromSaramaGroupMemberDescription(memberDescr *sarama.GroupMemberDescription) (MemberDescription, error) {
532+
if memberDescr == nil {
533+
return MemberDescription{}, errors.New("nil GroupMemberDescription")
534+
}
535+
536+
assignment, err := memberDescr.GetMemberAssignment()
537+
if err != nil {
538+
return MemberDescription{ //nolint:nilerr // in this case we should return no error and the error is reported in MemberDescription
539+
ClientID: memberDescr.ClientId,
540+
ClientHost: memberDescr.ClientHost,
541+
Err: err,
542+
}, nil
543+
}
544+
545+
assignmentTopics := make(map[string][]int32)
546+
if assignment != nil {
547+
assignmentTopics = assignment.Topics
548+
}
549+
return MemberDescription{
550+
ClientID: memberDescr.ClientId,
551+
ClientHost: memberDescr.ClientHost,
552+
Topics: assignmentTopics,
553+
}, nil
554+
}
555+
541556
func anyIPsMatch(as, bs []net.IP) bool {
542557
for _, a := range as {
543558
for _, b := range bs {

metricbeat/module/kafka/broker_test.go

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
package kafka
1919

2020
import (
21+
"errors"
2122
"net"
2223
"testing"
2324

24-
"errors"
25-
2625
"github.com/stretchr/testify/assert"
26+
"github.com/stretchr/testify/require"
27+
28+
"github.com/elastic/sarama"
2729
)
2830

2931
type dummyNet struct{}
@@ -158,3 +160,84 @@ func TestFindMatchingAddress(t *testing.T) {
158160
})
159161
}
160162
}
163+
164+
func Test_getMember(t *testing.T) {
165+
tests := []struct {
166+
name string
167+
inputMemberDescr *sarama.GroupMemberDescription
168+
169+
expectedErrMsg string
170+
expectedResult MemberDescription
171+
}{
172+
{
173+
name: "success",
174+
inputMemberDescr: &sarama.GroupMemberDescription{
175+
ClientId: "test-client",
176+
ClientHost: "test-host",
177+
MemberAssignment: []byte{0, 0, 0, 0, 0, 1, 0, 10, 116, 101, 115, 116, 45, 116, 111, 112, 105, 99, 0, 0, 0, 1, 0, 0, 0, 0, 255, 255, 255, 255},
178+
},
179+
180+
expectedErrMsg: "",
181+
expectedResult: MemberDescription{
182+
Err: nil,
183+
ClientID: "test-client",
184+
ClientHost: "test-host",
185+
Topics: map[string][]int32{
186+
"test-topic": {0},
187+
},
188+
},
189+
},
190+
{
191+
name: "nil sarama GroupMemberDescription",
192+
inputMemberDescr: nil,
193+
194+
expectedErrMsg: "nil GroupMemberDescription",
195+
expectedResult: MemberDescription{},
196+
},
197+
{
198+
name: "0 members in the group",
199+
inputMemberDescr: &sarama.GroupMemberDescription{
200+
ClientId: "test-client",
201+
ClientHost: "test-host",
202+
MemberAssignment: nil,
203+
},
204+
205+
expectedErrMsg: "",
206+
expectedResult: MemberDescription{
207+
Err: nil,
208+
ClientID: "test-client",
209+
ClientHost: "test-host",
210+
Topics: map[string][]int32{},
211+
},
212+
},
213+
{
214+
name: "ignore sarama error",
215+
inputMemberDescr: &sarama.GroupMemberDescription{
216+
ClientId: "test-client",
217+
ClientHost: "test-host",
218+
MemberAssignment: []byte{1, 2, 3},
219+
},
220+
221+
expectedErrMsg: "",
222+
expectedResult: MemberDescription{
223+
Err: errors.New("kafka: insufficient data to decode packet, more bytes expected"),
224+
ClientID: "test-client",
225+
ClientHost: "test-host",
226+
Topics: nil,
227+
},
228+
},
229+
}
230+
231+
for _, tt := range tests {
232+
t.Run(tt.name, func(t *testing.T) {
233+
result, err := fromSaramaGroupMemberDescription(tt.inputMemberDescr)
234+
if tt.expectedErrMsg == "" {
235+
require.NoError(t, err)
236+
237+
assert.Equal(t, tt.expectedResult, result)
238+
} else {
239+
assert.Error(t, err, tt.expectedErrMsg)
240+
}
241+
})
242+
}
243+
}

0 commit comments

Comments
 (0)