Skip to content

core: zstd: Implement dispatcher methods for zstd decompression #10697

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/flb_compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#define FLB_COMPRESSION_ALGORITHM_NONE 0
#define FLB_COMPRESSION_ALGORITHM_GZIP 1
#define FLB_COMPRESSION_ALGORITHM_ZSTD 2

#define FLB_DECOMPRESSOR_STATE_FAILED -1
#define FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER 0
Expand Down
8 changes: 8 additions & 0 deletions include/fluent-bit/flb_zstd.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,16 @@

#include <fluent-bit/flb_info.h>
#include <zstd.h>
#include <zstd_errors.h>

struct flb_decompression_context;

size_t flb_zstd_compress(void *in_data, size_t in_len, void **out_data, size_t *out_len);
size_t flb_zstd_uncompress(void *in_data, size_t in_len, void **out_data, size_t *out_len);

int flb_zstd_decompressor_dispatch(struct flb_decompression_context *context,
void *output_buffer,
size_t *output_length);
void *flb_zstd_decompression_context_create(void);
void flb_zstd_decompression_context_destroy(void *context);
#endif
26 changes: 23 additions & 3 deletions src/flb_compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_log.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_zstd.h>
#include <fluent-bit/flb_compression.h>

static size_t flb_decompression_context_get_read_buffer_offset(
Expand Down Expand Up @@ -131,7 +132,12 @@ void flb_decompression_context_destroy(struct flb_decompression_context *context
}

if (context->inner_context != NULL) {
flb_gzip_decompression_context_destroy(context->inner_context);
if (context->algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) {
flb_gzip_decompression_context_destroy(context->inner_context);
}
else if (context->algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
flb_zstd_decompression_context_destroy(context->inner_context);
}

context->inner_context = NULL;
}
Expand Down Expand Up @@ -178,6 +184,9 @@ struct flb_decompression_context *flb_decompression_context_create(int algorithm
if (algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) {
context->inner_context = flb_gzip_decompression_context_create();
}
else if (algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
context->inner_context = flb_zstd_decompression_context_create();
}
else {
flb_error("invalid compression algorithm : %d", algorithm);

Expand All @@ -197,9 +206,14 @@ struct flb_decompression_context *flb_decompression_context_create(int algorithm
}

context->input_buffer_size = input_buffer_size;
context->read_buffer = context->read_buffer;
context->read_buffer = context->input_buffer;
context->algorithm = algorithm;
context->state = FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER;
if (algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) {
context->state = FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER;
}
else if (algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
context->state = FLB_DECOMPRESSOR_STATE_EXPECTING_BODY;
}

return context;
}
Expand All @@ -215,6 +229,12 @@ int flb_decompress(struct flb_decompression_context *context,
output_length);

}
else if (context->algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
return flb_zstd_decompressor_dispatch(context,
output_buffer,
output_length);

}
}

return FLB_DECOMPRESSOR_FAILURE;
Expand Down
8 changes: 5 additions & 3 deletions src/flb_gzip.c
Original file line number Diff line number Diff line change
Expand Up @@ -690,9 +690,11 @@ static int flb_gzip_decompressor_process_header(

/* Minimal length: header + crc32 */
if (context->input_buffer_length < FLB_GZIP_HEADER_SIZE) {
flb_error("[gzip] unexpected content length");

return FLB_DECOMPRESSOR_FAILURE;
/*
* This is not a fatal error; it's the expected condition when waiting
* for more data. Return INSUFFICIENT_DATA without logging an error.
*/
return FLB_DECOMPRESSOR_INSUFFICIENT_DATA;
}

memcpy(&inner_context->gzip_header,
Expand Down
102 changes: 102 additions & 0 deletions src/flb_zstd.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
#include <fluent-bit/flb_compression.h>
#include <fluent-bit/flb_zstd.h>

struct flb_zstd_decompression_context {
ZSTD_DCtx *dctx;
};

#define FLB_ZSTD_DEFAULT_CHUNK 64 * 1024 /* 64 KB buffer */

Expand Down Expand Up @@ -162,3 +165,102 @@ size_t flb_zstd_uncompress(void *in_data, size_t in_len, void **out_data, size_t
return 0;
}

int flb_zstd_decompressor_dispatch(struct flb_decompression_context *context,
void *output_buffer,
size_t *output_length)
{
struct flb_zstd_decompression_context *zstd_ctx;
size_t compressed_frame_size;
size_t decompressed_size;
size_t original_output_length;
size_t error_code;

if (context == NULL || context->inner_context == NULL || output_length == NULL) {
return FLB_DECOMPRESSOR_FAILURE;
}

zstd_ctx = (struct flb_zstd_decompression_context *) context->inner_context;
original_output_length = *output_length;
*output_length = 0;

if (context->input_buffer_length == 0) {
return FLB_DECOMPRESSOR_SUCCESS;
}

compressed_frame_size = ZSTD_findFrameCompressedSize(context->read_buffer,
context->input_buffer_length);

error_code = ZSTD_getErrorCode(compressed_frame_size);

/*
* Distinguish between recoverable and fatal errors.
* If we get srcSize_wrong, it just means we need more data to find the
* end of the frame. This is expected in a streaming scenario.
*/
if (error_code == ZSTD_error_srcSize_wrong) {
/* Not an error, just need more data. Return success with 0 bytes produced. */
return FLB_DECOMPRESSOR_SUCCESS;
}

/* Check for any other, truly fatal error from finding the frame. */
if (ZSTD_isError(compressed_frame_size)) {
flb_error("[zstd] frame is corrupted: %s",
ZSTD_getErrorName(compressed_frame_size));
context->state = FLB_DECOMPRESSOR_STATE_FAILED;
return FLB_DECOMPRESSOR_FAILURE;
}

/* We have a full frame. Decompress it in one shot using the robust API. */
decompressed_size = ZSTD_decompressDCtx(zstd_ctx->dctx,
output_buffer,
original_output_length,
context->read_buffer,
compressed_frame_size);

if (ZSTD_isError(decompressed_size)) {
flb_error("[zstd] decompression failed: %s",
ZSTD_getErrorName(decompressed_size));
context->state = FLB_DECOMPRESSOR_STATE_FAILED;
return FLB_DECOMPRESSOR_FAILURE;
}

/* Success. Update our pointers and report the decompressed size. */
context->read_buffer += compressed_frame_size;
context->input_buffer_length -= compressed_frame_size;
*output_length = decompressed_size;

return FLB_DECOMPRESSOR_SUCCESS;
}

void *flb_zstd_decompression_context_create(void)
{
struct flb_zstd_decompression_context *context;

context = flb_calloc(1, sizeof(struct flb_zstd_decompression_context));

if (context == NULL) {
flb_errno();
return NULL;
}

context->dctx = ZSTD_createDCtx();
if (context->dctx == NULL) {
flb_error("[zstd] could not create decompression context");
flb_free(context);
return NULL;
}

return (void *) context;
}

void flb_zstd_decompression_context_destroy(void *context)
{
struct flb_zstd_decompression_context *zstd_ctx = context;

if (zstd_ctx != NULL) {
if (zstd_ctx->dctx != NULL) {
ZSTD_freeDCtx(zstd_ctx->dctx);
}
flb_free(zstd_ctx);
}
}
156 changes: 156 additions & 0 deletions tests/internal/zstd.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_zstd.h>
#include <fluent-bit/flb_compression.h>
#include "flb_tests_internal.h"

/* try a small string */
Expand Down Expand Up @@ -163,12 +164,167 @@ static void test_decompress_unknown_size()
flb_free(decompressed_data);
}

static void append_to_context(struct flb_decompression_context *ctx, const void *data, size_t size)
{
uint8_t *append_ptr;
size_t available_space;

available_space = flb_decompression_context_get_available_space(ctx);

if (size > available_space) {
size_t required_size = ctx->input_buffer_length + size;
flb_decompression_context_resize_buffer(ctx, required_size);
}

/* Get pointer to the write location */
append_ptr = flb_decompression_context_get_append_buffer(ctx);
TEST_CHECK(append_ptr != NULL);

/* Copy the data */
memcpy(append_ptr, data, size);

ctx->input_buffer_length += size;
}

static void *compress_with_checksum(const void *original_data, size_t original_len,
size_t *compressed_len)
{
ZSTD_CCtx* cctx;
void *compressed_buffer;
size_t bound;
size_t ret;

/* Create a compression context */
cctx = ZSTD_createCCtx();
TEST_CHECK(cctx != NULL);

/*
* THIS IS THE KEY: Explicitly enable content checksums in the frame.
*/
ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1);
TEST_CHECK(!ZSTD_isError(ret));

/* Compress the data */
bound = ZSTD_compressBound(original_len);
compressed_buffer = flb_malloc(bound);
TEST_CHECK(compressed_buffer != NULL);

*compressed_len = ZSTD_compress2(cctx,
compressed_buffer, bound,
original_data, original_len);

TEST_CHECK(!ZSTD_isError(*compressed_len));

ZSTD_freeCCtx(cctx);

return compressed_buffer;
}

void test_zstd_streaming_decompress_multi_chunk(void)
{
struct flb_decompression_context *ctx;
char *output_buf;
size_t output_len;
size_t total_written = 0;
int ret;
size_t chunk1_size = 0;
size_t chunk2_size = 0;
size_t chunk3_size = 0;
char *original_text = "zstd streaming is a feature that must be tested with multiple, uneven chunks!";
size_t original_len;
void *compressed_buf = NULL;
size_t compressed_len = 0;

original_len = strlen(original_text);
compressed_buf = compress_with_checksum(original_text, original_len, &compressed_len);
TEST_CHECK(compressed_buf != NULL);

ctx = flb_decompression_context_create(FLB_COMPRESSION_ALGORITHM_ZSTD, compressed_len);
TEST_CHECK(ctx != NULL);
output_buf = flb_malloc(original_len + 1);

chunk1_size = compressed_len / 3;
chunk2_size = compressed_len / 2;
chunk3_size = compressed_len - chunk1_size - chunk2_size;

append_to_context(ctx, compressed_buf, chunk1_size);
output_len = original_len;
ret = flb_decompress(ctx, output_buf, &output_len);
TEST_CHECK(ret == FLB_DECOMPRESSOR_SUCCESS);
total_written += output_len;

append_to_context(ctx, (char *)compressed_buf + chunk1_size, chunk2_size);
output_len = original_len - total_written;
ret = flb_decompress(ctx, output_buf + total_written, &output_len);
TEST_CHECK(ret == FLB_DECOMPRESSOR_SUCCESS);
total_written += output_len;

append_to_context(ctx, (char *)compressed_buf + chunk1_size + chunk2_size, chunk3_size);
output_len = original_len - total_written;
ret = flb_decompress(ctx, output_buf + total_written, &output_len);
TEST_CHECK(ret == FLB_DECOMPRESSOR_SUCCESS);
total_written += output_len;

TEST_CHECK(total_written == original_len);
TEST_CHECK(memcmp(original_text, output_buf, original_len) == 0);

flb_free(compressed_buf);
flb_free(output_buf);
flb_decompression_context_destroy(ctx);
}

/* In tests/internal/zstd.c */

void test_zstd_streaming_decompress_corrupted_data(void)
{
struct flb_decompression_context *ctx;
char *output_buf;
size_t output_len;
int ret;
char *original_text = "this test ensures corrupted data with a checksum fails";
size_t original_len = strlen(original_text);
void *compressed_buf = NULL;
size_t compressed_len = 0;
char *corrupted_input;

compressed_buf = compress_with_checksum(original_text, original_len, &compressed_len);
TEST_CHECK(compressed_buf != NULL);

ctx = flb_decompression_context_create(FLB_COMPRESSION_ALGORITHM_ZSTD, compressed_len);
TEST_CHECK(ctx != NULL);

/* Create a corrupted copy of the input */
corrupted_input = flb_malloc(compressed_len);
TEST_CHECK(corrupted_input != NULL);
memcpy(corrupted_input, compressed_buf, compressed_len);
/* Corrupt a byte in the middle */
corrupted_input[compressed_len / 2]++;

append_to_context(ctx, corrupted_input, compressed_len);

output_buf = flb_malloc(original_len + 1);
output_len = original_len;

ret = flb_decompress(ctx, output_buf, &output_len);

TEST_CHECK(ret == FLB_DECOMPRESSOR_FAILURE);
TEST_CHECK(ctx->state == FLB_DECOMPRESSOR_STATE_FAILED);

flb_free(compressed_buf);
flb_free(corrupted_input);
flb_free(output_buf);
flb_decompression_context_destroy(ctx);
}


TEST_LIST = {
{ "compress_small_string", test_compress_small_string },
{ "decompress_small_string", test_decompress_small_string },
{ "compress_empty_input", test_compress_empty_input },
{ "decompress_invalid_data", test_decompress_invalid_data },
{ "compress_decompress_large_data", test_compress_decompress_large_data },
{ "decompress_unknown_size", test_decompress_unknown_size },
{ "streaming_decompress_multi_chunk", test_zstd_streaming_decompress_multi_chunk },
{ "streaming_decompress_corrupted_data", test_zstd_streaming_decompress_corrupted_data },
{ NULL, NULL }
};
Loading