Skip to content

Commit 092f0ea

Browse files
[filebeat][streaming] - Added support for TLS & forward proxy configs for websockets (#41934) (#41985)
(cherry picked from commit fd81074) Co-authored-by: ShourieG <shourie.ganguly@elastic.co>
1 parent 2ae64cc commit 092f0ea

File tree

9 files changed

+402
-6
lines changed

9 files changed

+402
-6
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
321321
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
322322
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
323323
- Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869]
324+
- Add support for SSL and Proxy configurations for websoket type in streaming input. {pull}41934[41934]
324325
- AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694]
325326
- The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932]
326327
- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834]

x-pack/filebeat/docs/inputs/input-streaming.asciidoc

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,28 @@ The minimum time to wait between retries. This ensures that retries are spaced o
349349

350350
The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying.
351351

352+
[float]
353+
=== `timeout`
354+
Timeout is the maximum amount of time the websocket dialer will wait for a connection to be established. The default value is `180` seconds.
355+
356+
[float]
357+
==== `proxy_url`
358+
This specifies the forward proxy URL to use for the connection. The `proxy_url` configuration is optional and can be used to configure the proxy settings for the connection. The `proxy_url` default value is set by `http.ProxyFromEnvironment` which reads the `HTTP_PROXY`, `HTTPS_PROXY`, and `NO_PROXY` environment variables.
359+
360+
[float]
361+
==== `proxy_headers`
362+
This specifies the headers to be sent to the proxy server. The `proxy_headers` configuration is optional and can be used to configure the headers to be sent to the proxy server.
363+
364+
[float]
365+
==== `ssl`
366+
This specifies the SSL configuration for the connection. The `ssl` configuration is optional and can be used to configure the SSL settings for the connection. The `ssl` configuration has the following subfields:
367+
368+
- `certificate_authorities`: A list of root certificates to use for verifying the server's certificate.
369+
- `certificate`: The (PEM encoded) certificate to use for client authentication.
370+
- `key`: The (PEM encoded) private key to use for client authentication.
371+
372+
If this is a self-signed certificate, the `certificate_authorities` field should be set to the certificate itself.
373+
352374
[float]
353375
=== Metrics
354376

x-pack/filebeat/input/streaming/config.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,8 @@ type config struct {
4141
Redact *redact `config:"redact"`
4242
// Retry is the configuration for retrying failed connections.
4343
Retry *retry `config:"retry"`
44-
44+
// Transport is the common the transport config.
4545
Transport httpcommon.HTTPTransportSettings `config:",inline"`
46-
4746
// CrowdstrikeAppID is the value used to set the
4847
// appId request parameter in the FalconHose stream
4948
// discovery request.
@@ -166,3 +165,11 @@ func checkURLScheme(c config) error {
166165
return fmt.Errorf("unknown stream type: %s", c.Type)
167166
}
168167
}
168+
169+
func defaultConfig() config {
170+
return config{
171+
Transport: httpcommon.HTTPTransportSettings{
172+
Timeout: 180 * time.Second,
173+
},
174+
}
175+
}

x-pack/filebeat/input/streaming/input_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func NewInputManager(log *logp.Logger, store inputcursor.StateStore) InputManage
3434
}
3535

3636
func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, error) {
37-
src := &source{cfg: config{}}
37+
src := &source{cfg: defaultConfig()}
3838
if err := cfg.Unpack(&src.cfg); err != nil {
3939
return nil, nil, err
4040
}

x-pack/filebeat/input/streaming/input_test.go

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package streaming
66

77
import (
88
"context"
9+
"crypto/tls"
910
"errors"
1011
"fmt"
1112
"net/http"
@@ -41,6 +42,7 @@ type WebSocketHandler func(*testing.T, *websocket.Conn, []string)
4142
var inputTests = []struct {
4243
name string
4344
server func(*testing.T, WebSocketHandler, map[string]interface{}, []string)
45+
proxyServer func(*testing.T, WebSocketHandler, map[string]interface{}, []string) *httptest.Server
4446
handler WebSocketHandler
4547
config map[string]interface{}
4648
response []string
@@ -450,6 +452,140 @@ var inputTests = []struct {
450452
},
451453
wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake"),
452454
},
455+
{
456+
name: "single_event_tls",
457+
server: webSocketServerWithTLS(httptest.NewUnstartedServer),
458+
handler: defaultHandler,
459+
config: map[string]interface{}{
460+
"program": `
461+
bytes(state.response).decode_json().as(inner_body,{
462+
"events": [inner_body],
463+
})`,
464+
"ssl": map[string]interface{}{
465+
"enabled": true,
466+
"certificate_authorities": []string{"testdata/certs/ca.crt"},
467+
"certificate": "testdata/certs/cert.pem",
468+
"key": "testdata/certs/key.pem",
469+
},
470+
},
471+
response: []string{`
472+
{
473+
"pps": {
474+
"agent": "example.proofpoint.com",
475+
"cid": "mmeng_uivm071"
476+
},
477+
"ts": "2017-08-17T14:54:12.949180-07:00",
478+
"data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr=<user1@example.com> (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent",
479+
"sm": {
480+
"tls": {
481+
"verify": "NONE"
482+
},
483+
"stat": "Sent",
484+
"qid": "v7HLqYbx029423",
485+
"dsn": "2.0.0",
486+
"mailer": "*file*",
487+
"to": [
488+
"/dev/null"
489+
],
490+
"ctladdr": "<user1@example.com> (8/0)",
491+
"delay": "00:00:00",
492+
"xdelay": "00:00:00",
493+
"pri": 35342
494+
},
495+
"id": "ZeYGULpZmL5N0151HN1OyA"
496+
}`},
497+
want: []map[string]interface{}{
498+
{
499+
"pps": map[string]interface{}{
500+
"agent": "example.proofpoint.com",
501+
"cid": "mmeng_uivm071",
502+
},
503+
"ts": "2017-08-17T14:54:12.949180-07:00",
504+
"data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr=<user1@example.com> (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent",
505+
"sm": map[string]interface{}{
506+
"tls": map[string]interface{}{
507+
"verify": "NONE",
508+
},
509+
"stat": "Sent",
510+
"qid": "v7HLqYbx029423",
511+
"dsn": "2.0.0",
512+
"mailer": "*file*",
513+
"to": []interface{}{
514+
"/dev/null",
515+
},
516+
"ctladdr": "<user1@example.com> (8/0)",
517+
"delay": "00:00:00",
518+
"xdelay": "00:00:00",
519+
"pri": float64(35342),
520+
},
521+
"id": "ZeYGULpZmL5N0151HN1OyA",
522+
},
523+
},
524+
},
525+
{
526+
name: "basic_proxy_forwarding",
527+
proxyServer: newWebSocketProxyTestServer,
528+
handler: defaultHandler,
529+
config: map[string]interface{}{
530+
"program": `
531+
bytes(state.response).decode_json().as(inner_body,{
532+
"events": [inner_body],
533+
})`,
534+
},
535+
response: []string{`
536+
{
537+
"pps": {
538+
"agent": "example.proofpoint.com",
539+
"cid": "mmeng_uivm071"
540+
},
541+
"ts": "2017-08-17T14:54:12.949180-07:00",
542+
"data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr=<user1@example.com> (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent",
543+
"sm": {
544+
"tls": {
545+
"verify": "NONE"
546+
},
547+
"stat": "Sent",
548+
"qid": "v7HLqYbx029423",
549+
"dsn": "2.0.0",
550+
"mailer": "*file*",
551+
"to": [
552+
"/dev/null"
553+
],
554+
"ctladdr": "<user1@example.com> (8/0)",
555+
"delay": "00:00:00",
556+
"xdelay": "00:00:00",
557+
"pri": 35342
558+
},
559+
"id": "ZeYGULpZmL5N0151HN1OyA"
560+
}`},
561+
want: []map[string]interface{}{
562+
{
563+
"pps": map[string]interface{}{
564+
"agent": "example.proofpoint.com",
565+
"cid": "mmeng_uivm071",
566+
},
567+
"ts": "2017-08-17T14:54:12.949180-07:00",
568+
"data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr=<user1@example.com> (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent",
569+
"sm": map[string]interface{}{
570+
"tls": map[string]interface{}{
571+
"verify": "NONE",
572+
},
573+
"stat": "Sent",
574+
"qid": "v7HLqYbx029423",
575+
"dsn": "2.0.0",
576+
"mailer": "*file*",
577+
"to": []interface{}{
578+
"/dev/null",
579+
},
580+
"ctladdr": "<user1@example.com> (8/0)",
581+
"delay": "00:00:00",
582+
"xdelay": "00:00:00",
583+
"pri": float64(35342),
584+
},
585+
"id": "ZeYGULpZmL5N0151HN1OyA",
586+
},
587+
},
588+
},
453589
}
454590

455591
var urlEvalTests = []struct {
@@ -560,6 +696,9 @@ func TestInput(t *testing.T) {
560696
if test.server != nil {
561697
test.server(t, test.handler, test.config, test.response)
562698
}
699+
if test.proxyServer != nil {
700+
test.proxyServer(t, test.handler, test.config, test.response)
701+
}
563702

564703
cfg := conf.MustNewConfigFrom(test.config)
565704

@@ -771,6 +910,46 @@ func webSocketServerWithRetry(serve func(http.Handler) *httptest.Server) func(*t
771910
}
772911
}
773912

913+
// webSocketServerWithTLS simulates a WebSocket server with TLS based authentication.
914+
func webSocketServerWithTLS(serve func(http.Handler) *httptest.Server) func(*testing.T, WebSocketHandler, map[string]interface{}, []string) {
915+
return func(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) {
916+
server := serve(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
917+
upgrader := websocket.Upgrader{
918+
CheckOrigin: func(r *http.Request) bool {
919+
return true
920+
},
921+
}
922+
923+
conn, err := upgrader.Upgrade(w, r, nil)
924+
if err != nil {
925+
t.Fatalf("error upgrading connection to WebSocket: %v", err)
926+
return
927+
}
928+
929+
handler(t, conn, response)
930+
}))
931+
//nolint:gosec // there is no need to use a secure cert for testing
932+
server.TLS = &tls.Config{
933+
Certificates: []tls.Certificate{generateSelfSignedCert(t)},
934+
}
935+
server.StartTLS()
936+
937+
if config["url"] == nil {
938+
config["url"] = "ws" + server.URL[4:]
939+
}
940+
t.Cleanup(server.Close)
941+
}
942+
}
943+
944+
// generateSelfSignedCert returns a self-signed certificate for testing purposes based on the dummy certs in the testdata directory
945+
func generateSelfSignedCert(t *testing.T) tls.Certificate {
946+
cert, err := tls.LoadX509KeyPair("testdata/certs/cert.pem", "testdata/certs/key.pem")
947+
if err != nil {
948+
t.Fatalf("failed to generate self-signed cert: %v", err)
949+
}
950+
return cert
951+
}
952+
774953
// defaultHandler is a default handler for WebSocket connections.
775954
func defaultHandler(t *testing.T, conn *websocket.Conn, response []string) {
776955
for _, r := range response {
@@ -780,3 +959,73 @@ func defaultHandler(t *testing.T, conn *websocket.Conn, response []string) {
780959
}
781960
}
782961
}
962+
963+
// webSocketTestServer creates a WebSocket target server that communicates with the proxy handler.
964+
func webSocketTestServer(t *testing.T, handler WebSocketHandler, response []string) *httptest.Server {
965+
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
966+
upgrader := websocket.Upgrader{
967+
CheckOrigin: func(r *http.Request) bool {
968+
return true
969+
},
970+
}
971+
conn, err := upgrader.Upgrade(w, r, nil)
972+
if err != nil {
973+
t.Fatalf("failed to upgrade WebSocket connection: %v", err)
974+
return
975+
}
976+
handler(t, conn, response)
977+
}))
978+
}
979+
980+
// webSocketProxyHandler forwards WebSocket connections to the target server.
981+
//
982+
//nolint:errcheck //we can safely ignore errors checks here
983+
func webSocketProxyHandler(targetURL string) http.HandlerFunc {
984+
return func(w http.ResponseWriter, r *http.Request) {
985+
defer r.Response.Body.Close()
986+
//nolint:bodyclose // we can ignore the body close here
987+
targetConn, _, err := websocket.DefaultDialer.Dial(targetURL, nil)
988+
if err != nil {
989+
http.Error(w, "failed to connect to backend WebSocket server", http.StatusBadGateway)
990+
return
991+
}
992+
defer targetConn.Close()
993+
994+
upgrader := websocket.Upgrader{
995+
CheckOrigin: func(r *http.Request) bool {
996+
return true
997+
},
998+
}
999+
clientConn, err := upgrader.Upgrade(w, r, nil)
1000+
if err != nil {
1001+
http.Error(w, "failed to upgrade client connection", http.StatusInternalServerError)
1002+
return
1003+
}
1004+
defer clientConn.Close()
1005+
// forward messages between client and target server
1006+
go func() {
1007+
for {
1008+
messageType, message, err := targetConn.ReadMessage()
1009+
if err != nil {
1010+
break
1011+
}
1012+
clientConn.WriteMessage(messageType, message)
1013+
}
1014+
}()
1015+
for {
1016+
messageType, message, err := clientConn.ReadMessage()
1017+
if err != nil {
1018+
break
1019+
}
1020+
targetConn.WriteMessage(messageType, message)
1021+
}
1022+
}
1023+
}
1024+
1025+
// newWebSocketProxyTestServer creates a proxy server forwarding WebSocket traffic.
1026+
func newWebSocketProxyTestServer(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) *httptest.Server {
1027+
backendServer := webSocketTestServer(t, handler, response)
1028+
config["url"] = "ws" + backendServer.URL[4:]
1029+
config["proxy_url"] = "ws" + backendServer.URL[4:]
1030+
return httptest.NewServer(webSocketProxyHandler(config["url"].(string)))
1031+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
-----BEGIN CERTIFICATE-----
2+
MIIDbTCCAlWgAwIBAgIUS/rm8sWDc2a+eD9L+q+9XQpBa5MwDQYJKoZIhvcNAQEL
3+
BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
4+
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAgFw0yNDEyMDUxMjM4NThaGA8yMTI0
5+
MTExMTEyMzg1OFowRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx
6+
ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcN
7+
AQEBBQADggEPADCCAQoCggEBALhEaFVqFuYwSwH4GHhMeqhilC+sWXKaQP8QmaH7
8+
HWRST8Ko6YTT9NixUL4Qs5OmzCQFavRN9qtEo4wtqCJBOEyXQG1wAHuLWIY+KOCB
9+
twUg8fP+uYaYUOQOYNLkBz7SLlejuZYTyGxepIkc+UeJRcOE36anIPHpc2KSr3Hm
10+
vKJxZUVpQEbJvQ7pe7+iLL4jSOfzpQNcV9S/bzTo6taZXuo+ryEPlshkU/ME5VCN
11+
LFrU3AW2fzKW0Xa/skkW5izCiAU8KNEy84UQM6aZkJfFi9O394i97sGgYg+q36XL
12+
sXbZ+sCXHI3CGx+pwOx0h7S8n7iJJ7BbmwM6QuLFF6bFYkkCAwEAAaNTMFEwHQYD
13+
VR0OBBYEFEHtfvey8SdncMr7VDqA2YhtEiGYMB8GA1UdIwQYMBaAFEHtfvey8Sdn
14+
cMr7VDqA2YhtEiGYMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEB
15+
AKFcAeh9yhIbkkxvXl6ebmLcj817NVjhpcvDZlKP2MVu+w/h70w+JwBktLUlZlXd
16+
UNkKEWZyRvrdmY+YN6rwY/QI75N17bcmDWy6QnNlVJF0AJkBSdbKonCgHrZm7K25
17+
TOKpj0QF8l7k9wr5FWHHcBw/vFF9cGZ5TO4HbnI25N/cEKgdzZFEVA5Y/Rv7GIGU
18+
COjJG20Cr2HIKvVYoyWvN6sL7+gbzUMyjvQyGMCT7YoIqscUfrUU+T46QaOLAKa3
19+
z91Obfmv6uTO/rsieoxVWVJ35GeHeNJkAPkr7Z1sWIrreJ/3WsecWuPPEDNDXiSV
20+
5h0bTbbPOyEIe5ydEIbr5kA=
21+
-----END CERTIFICATE-----

0 commit comments

Comments
 (0)