Skip to content

core: zstd: Implement dispatcher methods for zstd decompression #10697

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented Aug 6, 2025

For robust handling of zstd compressed buffers and chunks, we need to implement a dispatch mechanism of zstd decompressions.

This is because decompression of zstd should be easily exceeded the limit of buffers and massive usage of memory.
So, we need to implement streaming decompression for zstd, too.

The motivation why we wanted to implement this feature is implementing zstd compression on input and output forward for forward protocol.
At this time, Fluentd had already implemeneted such feature and the later part of works we need to implement to use this core feature to handle robost zstd decompressions in in_forward plugin.

ref: fluent/fluentd#4758


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features
    • Added support for Zstandard (ZSTD) streaming decompression for incremental handling of compressed streams.
  • Bug Fixes
    • Improved detection and handling of corrupted ZSTD-compressed data with checksum validation and clear failure signaling.
    • Treat short/partial gzip headers as non-fatal (await more data) instead of logging an error.
  • Tests
    • Added tests for multi-chunk ZSTD streaming decompression and checksum-based corruption detection.
  • Chores
    • Updated compression identifiers and context management to enable ZSTD decompression.

Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Copy link

coderabbitai bot commented Aug 6, 2025

Walkthrough

Added ZSTD support: new algorithm constant, opaque ZSTD decompression context and lifecycle/dispatch APIs, integrated ZSTD into core decompression flow, implemented streaming frame-wise ZSTD decompressor, and added tests for multi-chunk streaming and checksum-based corruption detection.

Changes

Cohort / File(s) Change Summary
Compression Algorithm Constant
include/fluent-bit/flb_compression.h
Added #define FLB_COMPRESSION_ALGORITHM_ZSTD 2.
ZSTD Public API
include/fluent-bit/flb_zstd.h
Declared opaque struct flb_decompression_context; and added declarations for flb_zstd_decompression_context_create, flb_zstd_decompression_context_destroy, and flb_zstd_decompressor_dispatch.
Core Decompression Integration
src/flb_compression.c
Integrated ZSTD branches into decompression context create/destroy/dispatch; initialized ZSTD inner context and set appropriate initial state; fixed read_buffer initialization.
ZSTD Implementation
src/flb_zstd.c
Implemented flb_zstd_decompression_context with ZSTD_DCtx, create/destroy functions, and flb_zstd_decompressor_dispatch to find frame sizes, handle incomplete frames, decompress frames, advance input, and report fatal errors.
GZIP behavior tweak
src/flb_gzip.c
Treat short input header as insufficient data (non-fatal) instead of logging an error; returns NEEDS_MORE_DATA when header is incomplete.
Tests — ZSTD Streaming
tests/internal/zstd.c
Added helpers and two tests: multi-chunk streaming decompression with checksum and corruption-detection test; registered tests and included required headers.

Sequence Diagram(s)

sequenceDiagram
    participant Test as Test Case
    participant Core as flb_decompression_context
    participant ZSTD as flb_zstd_decompression_context
    participant Lib as ZSTD Library

    Test->>Core: create context (algorithm=ZSTD)
    Core->>ZSTD: flb_zstd_decompression_context_create()
    ZSTD->>Lib: ZSTD_createDCtx()
    Lib-->>ZSTD: DCtx

    loop for each input chunk
        Test->>Core: append chunk to input buffer
        Test->>Core: flb_decompress()
        Core->>ZSTD: flb_zstd_decompressor_dispatch()
        ZSTD->>Lib: ZSTD_findFrameCompressedSize()
        alt full frame available
            ZSTD->>Lib: ZSTD_decompressDCtx()
            Lib-->>ZSTD: decompressed data
            ZSTD-->>Core: return success, advance input
        else need more input
            ZSTD-->>Core: return INSUFFICIENT_DATA
        else corrupted frame
            ZSTD-->>Core: return FATAL_ERROR
        end
    end

    Test->>Core: destroy context
    Core->>ZSTD: flb_zstd_decompression_context_destroy()
    ZSTD->>Lib: ZSTD_freeDCtx()
    Lib-->>ZSTD: freed
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

I nibble frames in bouncy rows,
Checksum blossoms where each chunk goes.
Chops and stitches, byte by byte,
Reassembled, snug and right.
A rabbit cheers — decompress tonight! 🐇✨

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch cosmo0920-implement-decompression-method-dispatcher-for-zstd

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
src/flb_compression.c (1)

134-138: Missing ZSTD context cleanup in destructor

The destructor only handles GZIP decompression context cleanup but not ZSTD. This will cause memory leaks when ZSTD contexts are destroyed.

Apply this fix:

     if (context->inner_context != NULL) {
-        flb_gzip_decompression_context_destroy(context->inner_context);
+        if (context->algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) {
+            flb_gzip_decompression_context_destroy(context->inner_context);
+        }
+        else if (context->algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
+            flb_zstd_decompression_context_destroy(context->inner_context);
+        }
 
         context->inner_context = NULL;
     }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1d4c4dc and 417cb44.

📒 Files selected for processing (5)
  • include/fluent-bit/flb_compression.h (1 hunks)
  • include/fluent-bit/flb_zstd.h (1 hunks)
  • src/flb_compression.c (4 hunks)
  • src/flb_zstd.c (2 hunks)
  • tests/internal/zstd.c (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (27)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
  • GitHub Check: PR - fuzzing test
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
  • GitHub Check: pr-compile-centos-7
🔇 Additional comments (8)
include/fluent-bit/flb_compression.h (1)

29-29: LGTM!

The new constant follows the sequential pattern and properly extends the compression algorithm enumeration.

include/fluent-bit/flb_zstd.h (1)

26-35: LGTM!

The forward declaration and new function signatures properly extend the zstd API with streaming decompression support. The opaque pointer pattern for context management is appropriate.

src/flb_compression.c (1)

182-184: LGTM!

The ZSTD algorithm integration is properly implemented with appropriate context creation, state initialization, and decompressor dispatch.

Also applies to: 209-211, 227-232

tests/internal/zstd.c (1)

167-317: LGTM!

Comprehensive test coverage for streaming decompression including multi-chunk processing and corruption detection. Proper memory cleanup in all test paths.

src/flb_zstd.c (4)

27-29: LGTM!

Clean and simple context structure encapsulating the ZSTD decompression context.


168-233: LGTM!

Well-implemented streaming decompression dispatcher with proper error handling:

  • Distinguishes between recoverable (need more data) and fatal errors
  • Handles frame boundary detection correctly
  • Updates buffer pointers appropriately after decompression
  • Sets failure state on errors for proper error propagation

235-253: LGTM!

Context creation properly handles allocation failures for both the wrapper structure and the internal ZSTD context.


255-265: LGTM!

Destructor properly checks for null pointers before freeing resources.

Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/flb_gzip.c (1)

691-692: Nit: Update the stale comment (“header + crc32”) to reflect header-only parsing here.

The logic now only requires the fixed 10-byte header at this point; CRC/footer is handled later. Suggest clarifying the comment to prevent confusion for future readers.

Apply this diff:

-    /* Minimal length: header + crc32 */
+    /* Minimal length for header parsing: GZIP fixed header (10 bytes).
+     * CRC/footer will be processed after the body. */
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between fe55e04 and 47bdc62.

📒 Files selected for processing (1)
  • src/flb_gzip.c (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (28)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
  • GitHub Check: PR - fuzzing test
  • GitHub Check: pr-compile-centos-7
🔇 Additional comments (1)
src/flb_gzip.c (1)

693-697: Good change: partial-header now returns INSUFFICIENT_DATA (non-fatal), enabling true streaming.

This aligns with the streaming decompressor semantics and avoids noisy error logs while waiting for more bytes. Keeping the state unchanged (still EXPECTING_HEADER) is correct and ensures idempotent re-entry once more data arrives.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants