Skip to content

Commit e25750b

Browse files
[8.17](backport #44824) [filebeat][ABS] - Added missing "text/csv" content-type filter support (#44826)
Currently content-type "text/csv" did not have official filtering support even though we support csv decoding. This made blobs having this content type defined were not picked up for processing. This PR as a "bugfix", adds support for explicit "text/csv" content-type filtering in the scheduler. (cherry picked from commit fba3749) --------- Co-authored-by: Shourie Ganguly <shourie.ganguly@elastic.co>
1 parent d8e6d40 commit e25750b

File tree

6 files changed

+66
-1
lines changed

6 files changed

+66
-1
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
119119
- Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015]
120120
- If a Filestream input fails to be created, its ID is removed from the list of running input IDs {pull}44697[44697]
121121
- Ensure DEPROVISIONED Okta entities are published by Okta entityanalytics provider. {issue}12658[12658] {pull}44719[44719]
122+
- Added missing "text/csv" content-type filter support in azureblobsortorage input. {issue}44596[44596] {pull}44824[44824]
122123
- Fix unexpected EOF detection and improve memory usage. {pull}44813[44813]
123124

124125
*Heartbeat*

x-pack/filebeat/input/azureblobstorage/input_test.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const (
3636
beatsNdJSONContainer = "beatsndjsoncontainer"
3737
beatsGzJSONContainer = "beatsgzjsoncontainer"
3838
beatsJSONWithArrayContainer = "beatsjsonwitharraycontainer"
39+
beatsCSVContainer = "beatscsvcontainer"
3940
)
4041

4142
func Test_StorageClient(t *testing.T) {
@@ -471,6 +472,28 @@ func Test_StorageClient(t *testing.T) {
471472
mock.Beatscontainer_2_blob_data3_json: true,
472473
},
473474
},
475+
{
476+
name: "ReadCSV",
477+
baseConfig: map[string]interface{}{
478+
"account_name": "beatsblobnew",
479+
"auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==",
480+
"max_workers": 1,
481+
"poll": true,
482+
"poll_interval": "10s",
483+
"decoding.codec.csv.enabled": true,
484+
"decoding.codec.csv.comma": " ",
485+
"containers": []map[string]interface{}{
486+
{
487+
"name": beatsCSVContainer,
488+
},
489+
},
490+
},
491+
mockHandler: mock.AzureStorageFileServer,
492+
expected: map[string]bool{
493+
mock.BeatsFilesContainer_csv[0]: true,
494+
mock.BeatsFilesContainer_csv[1]: true,
495+
},
496+
},
474497
}
475498
for _, tt := range tests {
476499
t.Run(tt.name, func(t *testing.T) {
@@ -540,7 +563,7 @@ func Test_StorageClient(t *testing.T) {
540563
var err error
541564
val, err = got.Fields.GetValue("message")
542565
assert.NoError(t, err)
543-
assert.True(t, tt.expected[val.(string)])
566+
assert.True(t, tt.expected[strings.ReplaceAll(val.(string), "\r\n", "\n")])
544567
assert.Equal(t, tt.expectedError, err)
545568
receivedCount += 1
546569
if receivedCount == len(tt.expected) {

x-pack/filebeat/input/azureblobstorage/mock/data_files.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const (
1010
beatsNdJSONContainer = "beatsndjsoncontainer"
1111
beatsGzJSONContainer = "beatsgzjsoncontainer"
1212
beatsJSONWithArrayContainer = "beatsjsonwitharraycontainer"
13+
beatsCSVContainer = "beatscsvcontainer"
1314
)
1415

1516
var fileContainers = map[string]bool{
@@ -18,6 +19,7 @@ var fileContainers = map[string]bool{
1819
beatsNdJSONContainer: true,
1920
beatsGzJSONContainer: true,
2021
beatsJSONWithArrayContainer: true,
22+
beatsCSVContainer: true,
2123
}
2224

2325
var availableFileBlobs = map[string]map[string]bool{
@@ -38,6 +40,9 @@ var availableFileBlobs = map[string]map[string]bool{
3840
"array-at-root.json": true,
3941
"nested-arrays.json": true,
4042
},
43+
beatsCSVContainer: {
44+
"txn1.csv": true,
45+
},
4146
}
4247

4348
var fetchFilesContainer = map[string]string{
@@ -197,6 +202,30 @@ var fetchFilesContainer = map[string]string{
197202
</Blobs>
198203
<NextMarker />
199204
</EnumerationResults>`,
205+
beatsCSVContainer: `<?xml version="1.0" encoding="utf-8"?>
206+
<EnumerationResults ServiceEndpoint="https://127.0.0.1/" ContainerName="beatscsvcontainer">
207+
<Blobs>
208+
<Blob>
209+
<Name>txn1.csv</Name>
210+
<Properties>
211+
<Last-Modified>Wed, 14 Sep 2022 12:12:28 GMT</Last-Modified>
212+
<Etag>0x8DA964A64516C82</Etag>
213+
<Content-Length>643</Content-Length>
214+
<Content-Type>text/csv</Content-Type>
215+
<Content-Encoding />
216+
<Content-Language />
217+
<Content-MD5>UjQX73kQRTHx+UyXZDvVkg==</Content-MD5>
218+
<Cache-Control />
219+
<Content-Disposition />
220+
<BlobType>BlockBlob</BlobType>
221+
<LeaseStatus>unlocked</LeaseStatus>
222+
<LeaseState>available</LeaseState>
223+
</Properties>
224+
<Metadata />
225+
</Blob>
226+
</Blobs>
227+
<NextMarker />
228+
</EnumerationResults>`,
200229
}
201230

202231
var BeatsFilesContainer_multiline_json = []string{
@@ -231,3 +260,8 @@ var BeatsFilesContainer_multiline_json_gz = []string{
231260
"{\n \"@timestamp\": \"2021-05-25T17:25:42.806Z\",\n \"log.level\": \"error\",\n \"message\": \"error making http request\"\n}",
232261
"{\n \"@timestamp\": \"2021-05-25T17:25:51.391Z\",\n \"log.level\": \"info\",\n \"message\": \"available disk space 44.3gb\"\n}",
233262
}
263+
264+
var BeatsFilesContainer_csv = []string{
265+
"{\"id\":\"1\",\"name\":\"Alice\",\"email\":\"alice@example.com\",\"status\":\"active\"}",
266+
"{\"id\":\"2\",\"name\":\"Bob\",\"email\":\"bob@example.com\",\"status\":\"inactive\"}",
267+
}

x-pack/filebeat/input/azureblobstorage/mock/mock.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ func AzureStorageFileServer() http.Handler {
7878
w.Header().Set(contentType, jsonType)
7979
case "log.ndjson":
8080
w.Header().Set(contentType, "application/x-ndjson")
81+
case "txn1.csv":
82+
w.Header().Set(contentType, "text/csv")
8183
}
8284
w.Write(data)
8385
return
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
id name email status
2+
1 Alice alice@example.com active
3+
2 Bob bob@example.com inactive

x-pack/filebeat/input/azureblobstorage/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ const (
3737
octetType = "application/octet-stream"
3838
ndJsonType = "application/x-ndjson"
3939
gzType = "application/x-gzip"
40+
csvType = "text/csv"
4041
encodingGzip = "gzip"
4142
)
4243

@@ -59,4 +60,5 @@ var allowedContentTypes = map[string]bool{
5960
octetType: true,
6061
ndJsonType: true,
6162
gzType: true,
63+
csvType: true,
6264
}

0 commit comments

Comments
 (0)