Skip to content

Commit c6714c5

Browse files
[beatreceivers] Add otel e2e test for gcp pub-sub input (#44991) (#45401)
* Add e2e test for gcp pub-sub input (cherry picked from commit 09693ac) Co-authored-by: Khushi Jain <khushi.jain@elastic.co>
1 parent 72c25b2 commit c6714c5

File tree

5 files changed

+236
-32
lines changed

5 files changed

+236
-32
lines changed

libbeat/otelbeat/oteltest/helper.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package oteltest
19+
20+
import (
21+
"testing"
22+
23+
"github.com/google/go-cmp/cmp"
24+
"github.com/stretchr/testify/assert"
25+
26+
"github.com/elastic/elastic-agent-libs/mapstr"
27+
)
28+
29+
// AssertMapsEqual asserts that documents m1 and m2 are equal under name
30+
// flattening. Fields in ignoredFields are ignored unless they are missing
31+
// from both documents, in which case the assertion fails.
32+
func AssertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) {
33+
t.Helper()
34+
35+
flatM1 := m1.Flatten()
36+
flatM2 := m2.Flatten()
37+
for _, f := range ignoredFields {
38+
hasKeyM1, _ := flatM1.HasKey(f)
39+
hasKeyM2, _ := flatM2.HasKey(f)
40+
41+
if !hasKeyM1 && !hasKeyM2 {
42+
assert.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f)
43+
}
44+
45+
flatM1.Delete(f)
46+
flatM2.Delete(f)
47+
}
48+
if !cmp.Equal(flatM1, flatM2) {
49+
t.Error(cmp.Diff(flatM1, flatM2))
50+
}
51+
}

libbeat/tests/compose/project.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,12 +195,12 @@ func (c *Project) HostInformation(service string) (ServiceInfo, error) {
195195
}
196196

197197
if len(servicesStatus) == 0 {
198-
return nil, errors.New("no container running for service")
198+
return nil, fmt.Errorf("no container running for service: %s", service)
199199
}
200200

201201
status, ok := servicesStatus[service]
202202
if !ok || status.Host() == "" {
203-
return nil, errors.New("unknown host:port for service")
203+
return nil, fmt.Errorf("unknown host:port for service: %s", service)
204204
}
205205

206206
return status, nil
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
//go:build integration && !agentbeat
6+
7+
package gcppubsub
8+
9+
import (
10+
"bytes"
11+
"context"
12+
"fmt"
13+
"strings"
14+
"testing"
15+
"text/template"
16+
"time"
17+
18+
"github.com/gofrs/uuid/v5"
19+
"github.com/stretchr/testify/require"
20+
21+
"github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
22+
"github.com/elastic/beats/v7/libbeat/tests/integration"
23+
24+
"github.com/elastic/elastic-agent-libs/testing/estools"
25+
)
26+
27+
func TestGCPInputOTelE2E(t *testing.T) {
28+
integration.EnsureESIsRunning(t)
29+
30+
// Create pubsub client for setting up and communicating to emulator.
31+
client, clientCancel := testSetup(t)
32+
defer func() {
33+
clientCancel()
34+
client.Close()
35+
}()
36+
37+
createTopic(t, client)
38+
createSubscription(t, "test-subscription-otel", client)
39+
createSubscription(t, "test-subscription-fb", client)
40+
const numMsgs = 10
41+
publishMessages(t, client, numMsgs)
42+
43+
host := integration.GetESURL(t, "http")
44+
user := host.User.Username()
45+
password, _ := host.User.Password()
46+
47+
// create a random uuid and make sure it doesn't contain dashes/
48+
otelNamespace := fmt.Sprintf("%x", uuid.Must(uuid.NewV4()))
49+
fbNameSpace := fmt.Sprintf("%x", uuid.Must(uuid.NewV4()))
50+
51+
type options struct {
52+
Namespace string
53+
ESURL string
54+
Username string
55+
Password string
56+
Subscription string
57+
}
58+
59+
gcpConfig := `filebeat.inputs:
60+
- type: gcp-pubsub
61+
project_id: test-project-id
62+
topic: test-topic-foo
63+
subscription.name: {{ .Subscription }}
64+
credentials_file: "testdata/fake.json"
65+
66+
output:
67+
elasticsearch:
68+
hosts:
69+
- {{ .ESURL }}
70+
username: {{ .Username }}
71+
password: {{ .Password }}
72+
index: logs-integration-{{ .Namespace }}
73+
74+
queue.mem.flush.timeout: 0s
75+
setup.template.enabled: false
76+
processors:
77+
- add_host_metadata: ~
78+
- add_cloud_metadata: ~
79+
- add_docker_metadata: ~
80+
- add_kubernetes_metadata: ~
81+
`
82+
83+
// start filebeat in otel mode
84+
filebeatOTel := integration.NewBeat(
85+
t,
86+
"filebeat-otel",
87+
"../../filebeat.test",
88+
"otel",
89+
)
90+
91+
optionsValue := options{
92+
ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host),
93+
Username: user,
94+
Password: password,
95+
}
96+
97+
var configBuffer bytes.Buffer
98+
optionsValue.Namespace = otelNamespace
99+
optionsValue.Subscription = "test-subscription-otel"
100+
require.NoError(t, template.Must(template.New("config").Parse(gcpConfig)).Execute(&configBuffer, optionsValue))
101+
102+
filebeatOTel.WriteConfigFile(configBuffer.String())
103+
104+
filebeatOTel.Start()
105+
defer filebeatOTel.Stop()
106+
107+
// reset buffer
108+
configBuffer.Reset()
109+
110+
optionsValue.Namespace = fbNameSpace
111+
optionsValue.Subscription = "test-subscription-fb"
112+
require.NoError(t, template.Must(template.New("config").Parse(gcpConfig)).Execute(&configBuffer, optionsValue))
113+
114+
// start filebeat
115+
filebeat := integration.NewBeat(
116+
t,
117+
"filebeat",
118+
"../../filebeat.test",
119+
)
120+
121+
filebeat.WriteConfigFile(configBuffer.String())
122+
filebeat.Start()
123+
defer filebeat.Stop()
124+
125+
// prepare to query ES
126+
es := integration.GetESClient(t, "http")
127+
128+
rawQuery := map[string]any{
129+
"query": map[string]any{
130+
"match_phrase": map[string]any{
131+
"input.type": "gcp-pubsub",
132+
},
133+
},
134+
"sort": []map[string]any{
135+
{"@timestamp": map[string]any{"order": "asc"}},
136+
},
137+
}
138+
139+
var filebeatDocs estools.Documents
140+
var otelDocs estools.Documents
141+
var err error
142+
msg := &strings.Builder{}
143+
144+
// wait for logs to be published
145+
require.Eventuallyf(t,
146+
func() bool {
147+
msg.Reset()
148+
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
149+
defer findCancel()
150+
151+
otelDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-logs-integration-"+otelNamespace+"*", es)
152+
msg.WriteString(fmt.Sprintf("failed to query ES for beat documents: %v", err))
153+
154+
filebeatDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-logs-integration-"+fbNameSpace+"*", es)
155+
msg.WriteString(fmt.Sprintf("failed to query ES for beat documents: %v", err))
156+
157+
return otelDocs.Hits.Total.Value >= 1 && filebeatDocs.Hits.Total.Value >= 1
158+
},
159+
3*time.Minute, 1*time.Second, "document indexed by fb-otel: %d, by fb-classic: %d: expected atleast one document by both modes: %s", otelDocs.Hits.Total.Value, filebeatDocs.Hits.Total.Value, msg)
160+
161+
filebeatDoc := filebeatDocs.Hits.Hits[0].Source
162+
otelDoc := otelDocs.Hits.Hits[0].Source
163+
ignoredFields := []string{
164+
// Expected to change between agentDocs and OtelDocs
165+
"@timestamp",
166+
"agent.ephemeral_id",
167+
"agent.id",
168+
"event.created",
169+
}
170+
171+
oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
172+
173+
}

x-pack/filebeat/input/gcppubsub/pubsub_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func testSetup(t *testing.T) (*pubsub.Client, context.CancelFunc) {
4545

4646
var host string
4747
if isInDockerIntegTestEnv() {
48-
// We're running inside out integration test environment so
48+
// We're running inside of integration test environment so
4949
// make sure that that googlepubsub container is running.
5050
host = compose.EnsureUp(t, "googlepubsub").Host()
5151
os.Setenv("PUBSUB_EMULATOR_HOST", host)
@@ -167,11 +167,11 @@ func publishMessages(t *testing.T, client *pubsub.Client, numMsgs int) []string
167167
return messageIDs
168168
}
169169

170-
func createSubscription(t *testing.T, client *pubsub.Client) {
170+
func createSubscription(t *testing.T, subscription string, client *pubsub.Client) {
171171
ctx, cancel := context.WithCancel(context.Background())
172172
defer cancel()
173173

174-
sub := client.Subscription(emulatorSubscription)
174+
sub := client.Subscription(subscription)
175175
exists, err := sub.Exists(ctx)
176176
if err != nil {
177177
t.Fatalf("failed to check if sub exists: %v", err)
@@ -180,7 +180,7 @@ func createSubscription(t *testing.T, client *pubsub.Client) {
180180
return
181181
}
182182

183-
sub, err = client.CreateSubscription(ctx, emulatorSubscription, pubsub.SubscriptionConfig{
183+
sub, err = client.CreateSubscription(ctx, subscription, pubsub.SubscriptionConfig{
184184
Topic: client.Topic(emulatorTopic),
185185
})
186186
if err != nil {
@@ -369,7 +369,7 @@ func TestSubscriptionExists(t *testing.T) {
369369

370370
runTest(t, cfg, func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T) {
371371
createTopic(t, client)
372-
createSubscription(t, client)
372+
createSubscription(t, emulatorSubscription, client)
373373
publishMessages(t, client, 5)
374374

375375
var group errgroup.Group
@@ -443,7 +443,7 @@ func TestEndToEndACK(t *testing.T) {
443443

444444
runTestWithACKer(t, cfg, halfAcker, func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T) {
445445
createTopic(t, client)
446-
createSubscription(t, client)
446+
createSubscription(t, emulatorSubscription, client)
447447

448448
group, _ := errgroup.WithContext(context.Background())
449449
group.Go(input.run)

x-pack/filebeat/tests/integration/otel_test.go

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@ import (
1818
"text/template"
1919
"time"
2020

21-
"github.com/google/go-cmp/cmp"
2221
"github.com/stretchr/testify/assert"
2322
"github.com/stretchr/testify/require"
2423

2524
"github.com/gofrs/uuid/v5"
2625

26+
"github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
2727
"github.com/elastic/beats/v7/libbeat/tests/integration"
28-
"github.com/elastic/elastic-agent-libs/mapstr"
2928
"github.com/elastic/elastic-agent-libs/testing/estools"
3029
)
3130

@@ -126,7 +125,7 @@ http.port: %d
126125
"log.file.path",
127126
}
128127

129-
assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
128+
oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
130129
assertMonitoring(t, 5066)
131130
}
132131

@@ -248,7 +247,7 @@ processors:
248247
"event.created",
249248
}
250249

251-
assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
250+
oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
252251
}
253252

254253
func writeEventsToLogFile(t *testing.T, filename string, numEvents int) {
@@ -272,25 +271,6 @@ func writeEventsToLogFile(t *testing.T, filename string, numEvents int) {
272271
}
273272
}
274273

275-
func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) {
276-
t.Helper()
277-
278-
flatM1 := m1.Flatten()
279-
flatM2 := m2.Flatten()
280-
for _, f := range ignoredFields {
281-
hasKeyM1, _ := flatM1.HasKey(f)
282-
hasKeyM2, _ := flatM2.HasKey(f)
283-
284-
if !hasKeyM1 && !hasKeyM2 {
285-
assert.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f)
286-
}
287-
288-
flatM1.Delete(f)
289-
flatM2.Delete(f)
290-
}
291-
require.Equal(t, "", cmp.Diff(flatM1, flatM2), "expected maps to be equal")
292-
}
293-
294274
func assertMonitoring(t *testing.T, port int) {
295275
address := fmt.Sprintf("http://localhost:%d", port)
296276
r, err := http.Get(address) //nolint:noctx,bodyclose,gosec // fine for tests
@@ -472,7 +452,7 @@ http.port: %d
472452
"log.file.path",
473453
}
474454

475-
assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
455+
oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
476456
assertMonitoring(t, otelConfig.MonitoringPort)
477457
assertMonitoring(t, 5067) // filebeat
478458
}

0 commit comments

Comments
 (0)