Skip to content

Commit 65c0dc2

Browse files
feat(otel): add skeleton of Beat processor (#45328)
* feat(otel): add skeleton of Beat processor * refactor: separate the OTel Beat processor from Beat processors in code * docs: add note on compatibility with receivers * docs: make the processor description clearer * docs: add README in `x-pack/otel` directory
1 parent 96794ae commit 65c0dc2

File tree

7 files changed

+1859
-1512
lines changed

7 files changed

+1859
-1512
lines changed

NOTICE.txt

Lines changed: 1723 additions & 1511 deletions
Large diffs are not rendered by default.

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,8 @@ require (
230230
go.opentelemetry.io/collector/exporter/debugexporter v0.129.0
231231
go.opentelemetry.io/collector/otelcol v0.129.0
232232
go.opentelemetry.io/collector/pdata v1.35.0
233+
go.opentelemetry.io/collector/processor v1.35.0
234+
go.opentelemetry.io/collector/processor/processorhelper v0.129.0
233235
go.opentelemetry.io/collector/receiver v1.35.0
234236
go.uber.org/mock v0.5.0
235237
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8
@@ -451,7 +453,6 @@ require (
451453
go.opentelemetry.io/collector/pdata/xpdata v0.129.0 // indirect
452454
go.opentelemetry.io/collector/pipeline v0.129.0 // indirect
453455
go.opentelemetry.io/collector/pipeline/xpipeline v0.129.0 // indirect
454-
go.opentelemetry.io/collector/processor v1.35.0 // indirect
455456
go.opentelemetry.io/collector/processor/processortest v0.129.0 // indirect
456457
go.opentelemetry.io/collector/processor/xprocessor v0.129.0 // indirect
457458
go.opentelemetry.io/collector/receiver/receivertest v0.129.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1147,6 +1147,8 @@ go.opentelemetry.io/collector/pipeline/xpipeline v0.129.0 h1:JDLSoGiUg4JgahMqHXj
11471147
go.opentelemetry.io/collector/pipeline/xpipeline v0.129.0/go.mod h1:qDjE/5uvKmXRHaDzy7yMo/VwSm4njtRWzACTjf5CVjg=
11481148
go.opentelemetry.io/collector/processor v1.35.0 h1:YOfHemhhodYn4BnPjN7kWYYDhzPVqRkyHCaQ8mAlavs=
11491149
go.opentelemetry.io/collector/processor v1.35.0/go.mod h1:cWHDOpmpAaVNCc9K9j2/okZoLIuP/EpGGRNhM4JGmFM=
1150+
go.opentelemetry.io/collector/processor/processorhelper v0.129.0 h1:/B2UJ7wOc5oJlQBnzwXjqnhFJOidHbdGmFfWyhi1Iyg=
1151+
go.opentelemetry.io/collector/processor/processorhelper v0.129.0/go.mod h1:tZXfmQgvpIE/gxLS9tjX82/EBzWt+xNIE0lUmgZzZlk=
11501152
go.opentelemetry.io/collector/processor/processortest v0.129.0 h1:r5iJHdS7Ffdb2zmMVYx4ahe92PLrce5cas/AJEXivkY=
11511153
go.opentelemetry.io/collector/processor/processortest v0.129.0/go.mod h1:gdf8GzyzjGoDTA11+CPwC4jfXphtC+B7MWbWn+LIWXc=
11521154
go.opentelemetry.io/collector/processor/xprocessor v0.129.0 h1:V3Zgd+YIeu3Ij3DPlGtzdcTwpqOQIqQVcL5jdHHS7sc=

x-pack/otel/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# OpenTelemetry Collector Components in Beats
2+
3+
This is the home of OpenTelemetry Collector components like receivers, processors, exporters, extensions, connectors, etc. that are related to Beats.
4+
5+
The intended structure of this directory is to mimic the structure in the [OpenTelemetry Collector Contrib] repository, specifically:
6+
7+
- Put X receiver in `receiver/xreceiver` subdirectory,
8+
- Put Y processor in `processor/yprocessor` subdirectory,
9+
- Put Z exporter in `exporter/zexporter` subdirectory,
10+
- and so on.
11+
12+
There should be no need to put any Go files directly in this directory.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Beat Processor
2+
3+
| Status | |
4+
| --------- | ------------------- |
5+
| Stability | [development]: logs |
6+
7+
[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
8+
9+
> [!NOTE]
10+
> This component is currently in development and no functionality is implemented.
11+
> Including it in a pipeline is a no-op.
12+
> The documentation describes the intended state after the functionality is implemented.
13+
14+
The Beat processor (`beat`) is an OpenTelemetry Collector processor that wraps the [Beat processors].
15+
This allows you to use Beat processorss like e.g. [add_host_metadata] anywhere in the OpenTelemetry Collector's pipeline, independently of Beat receivers.
16+
17+
> [!NOTE]
18+
> This component is only expected to work correctly with data from the Beat receivers: [Filebeat receiver], [Metricbeat receiver].
19+
> This is because it relies on the specific structure of telemetry emitted by those components.
20+
> Using it with data coming from other components is not recommended and may result in unexpected behavior.
21+
22+
## Example
23+
24+
The following [Filebeat receiver] configuration
25+
26+
```yaml
27+
receivers:
28+
filebeatreceiver:
29+
filebeat:
30+
inputs:
31+
- type: filestream
32+
id: host-logs
33+
paths:
34+
- /var/log/*.log
35+
processors:
36+
- add_host_metadata: ~
37+
output:
38+
otelconsumer:
39+
```
40+
41+
is functionally equivalent to this one, using the Beat processor:
42+
43+
```yaml
44+
receivers:
45+
filebeatreceiver:
46+
filebeat:
47+
inputs:
48+
- type: filestream
49+
id: host-logs
50+
paths:
51+
- /var/log/*.log
52+
output:
53+
otelconsumer:
54+
55+
processors:
56+
beat:
57+
processors:
58+
- add_host_metadata: ~
59+
```
60+
61+
[Beat processors]: https://www.elastic.co/docs/reference/beats/filebeat/filtering-enhancing-data#using-processors
62+
[Filebeat receiver]: https://github.com/elastic/beats/tree/main/x-pack/filebeat/fbreceiver
63+
[Metricbeat receiver]: https://github.com/elastic/beats/tree/main/x-pack/metricbeat/mbreceiver
64+
[add_host_metadata]: https://www.elastic.co/docs/reference/beats/filebeat/add-host-metadata
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package beatprocessor
6+
7+
type Config struct{}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package beatprocessor
6+
7+
import (
8+
"context"
9+
10+
"go.opentelemetry.io/collector/component"
11+
"go.opentelemetry.io/collector/consumer"
12+
"go.opentelemetry.io/collector/pdata/plog"
13+
"go.opentelemetry.io/collector/processor"
14+
"go.opentelemetry.io/collector/processor/processorhelper"
15+
)
16+
17+
const (
18+
Name = "beat"
19+
)
20+
21+
func NewFactory() processor.Factory {
22+
return processor.NewFactory(
23+
component.MustNewType(Name),
24+
createDefaultConfig,
25+
processor.WithLogs(createLogsProcessor, component.StabilityLevelDevelopment),
26+
)
27+
}
28+
29+
func createDefaultConfig() component.Config {
30+
return &Config{}
31+
}
32+
33+
func createLogsProcessor(
34+
ctx context.Context,
35+
set processor.Settings,
36+
cfg component.Config,
37+
nextConsumer consumer.Logs,
38+
) (processor.Logs, error) {
39+
return processorhelper.NewLogs(
40+
ctx,
41+
set,
42+
cfg,
43+
nextConsumer,
44+
func(_ context.Context, logs plog.Logs) (plog.Logs, error) {
45+
// This is a placeholder for the actual processing logic.
46+
return logs, nil
47+
},
48+
)
49+
}

0 commit comments

Comments
 (0)