From 1f302c4f4aaaa49fd914a5e582f603da8cc6080e Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Thu, 7 Aug 2025 11:17:23 +0200 Subject: [PATCH 01/14] Add null_array serialization --- CMakeLists.txt | 1 + include/serialize_null_array.hpp | 215 ++++++++++++++++++++++++++++ tests/CMakeLists.txt | 3 +- tests/test_serialize_null_array.cpp | 74 ++++++++++ 4 files changed, 291 insertions(+), 2 deletions(-) create mode 100644 include/serialize_null_array.hpp create mode 100644 tests/test_serialize_null_array.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index d27a2db..a7d8f29 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -50,6 +50,7 @@ set(SPARROW_IPC_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src) set(SPARROW_IPC_HEADERS ${SPARROW_IPC_INCLUDE_DIR}/config/config.hpp ${SPARROW_IPC_INCLUDE_DIR}/serialize.hpp + ${SPARROW_IPC_INCLUDE_DIR}/serialize_null_array.hpp ${SPARROW_IPC_INCLUDE_DIR}/utils.hpp ) diff --git a/include/serialize_null_array.hpp b/include/serialize_null_array.hpp new file mode 100644 index 0000000..6570e37 --- /dev/null +++ b/include/serialize_null_array.hpp @@ -0,0 +1,215 @@ +#pragma once + +// TODO check needs of all these below +#include +#include +#include +#include +#include +#include + +#include "sparrow.hpp" + +// TODO check needs of these two +#include "Message_generated.h" +#include "Schema_generated.h" + +#include "utils.hpp" + +namespace sparrow_ipc +{ + // TODO move to cpp if not templated + // TODO add comments and review + + // This function serializes a sparrow::null_array into a byte vector compliant + // with the Apache Arrow IPC Streaming Format. It mirrors the structure of + // serialize_primitive_array but is optimized for null_array's properties. + // A null_array is represented by metadata only (Schema, RecordBatch) and has no data buffers, + // making its message body zero-length. + std::vector serialize_null_array(sparrow::null_array& arr) + { + // Use the Arrow C Data Interface to get a generic description of the array. + // For a null_array, the ArrowArray struct will report n_buffers = 0. + auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr); + auto& arrow_arr = *arrow_arr_ptr; + auto& arrow_schema = *arrow_schema_ptr; + + std::vector final_buffer; + + // I - Serialize the Schema message + // This part is almost identical to how a primitive_array's schema is serialized. + { + flatbuffers::FlatBufferBuilder schema_builder; + + flatbuffers::Offset fb_name_offset = 0; + if (arrow_schema.name) + { + fb_name_offset = schema_builder.CreateString(arrow_schema.name); + } + + // For null_array, the format string is "n". + auto [type_enum, type_offset] = utils::get_flatbuffer_type(schema_builder, arrow_schema.format); + + flatbuffers::Offset>> + fb_metadata_offset = 0; + + if (arr.metadata()) + { + sparrow::key_value_view metadata_view = *(arr.metadata()); + std::vector> kv_offsets; +// kv_offsets.reserve(metadata_view.size()); +// for (const auto& pair : metadata_view) +// { +// auto key_offset = schema_builder.CreateString(std::string(pair.first)); +// auto value_offset = schema_builder.CreateString(std::string(pair.second)); +// kv_offsets.push_back( +// org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset)); +// } + auto mv_it = metadata_view.cbegin(); + for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it) + { + auto key_offset = schema_builder.CreateString(std::string((*mv_it).first)); + auto value_offset = schema_builder.CreateString(std::string((*mv_it).second)); + kv_offsets.push_back( + org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset)); + } + fb_metadata_offset = schema_builder.CreateVector(kv_offsets); + } + + auto fb_field = org::apache::arrow::flatbuf::CreateField( + schema_builder, + fb_name_offset, + (arrow_schema.flags & static_cast(sparrow::ArrowFlag::NULLABLE)) != 0, + type_enum, + type_offset, + 0, // dictionary + 0, // children + fb_metadata_offset); + + std::vector> fields_vec = {fb_field}; + auto fb_fields = schema_builder.CreateVector(fields_vec); + + auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields); + + auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage( + schema_builder, + org::apache::arrow::flatbuf::MetadataVersion::V5, + org::apache::arrow::flatbuf::MessageHeader::Schema, + schema_offset.Union(), + 0 // bodyLength + ); + schema_builder.Finish(schema_message_offset); + + uint32_t schema_len = schema_builder.GetSize(); + final_buffer.resize(sizeof(uint32_t) + schema_len); + memcpy(final_buffer.data() + sizeof(uint32_t), schema_builder.GetBufferPointer(), schema_len); + *(reinterpret_cast(final_buffer.data())) = schema_len; + } + + // II - Serialize the RecordBatch message + { + flatbuffers::FlatBufferBuilder batch_builder; + + // The FieldNode describes the layout (length and null count). + // For a null_array, length and null_count are always equal. + org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count); + auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1); + + // A null_array has no buffers. The ArrowArray struct reports n_buffers = 0, + // so we create an empty vector of buffers for the Flatbuffers message. + auto fb_buffers_vector = batch_builder.CreateVectorOfStructs({}); + + auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(batch_builder, arrow_arr.length, fb_nodes_vector, fb_buffers_vector); + + // The bodyLength is 0 because there are no data buffers. + auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage( + batch_builder, + org::apache::arrow::flatbuf::MetadataVersion::V5, + org::apache::arrow::flatbuf::MessageHeader::RecordBatch, + record_batch_offset.Union(), + 0 // bodyLength + ); + batch_builder.Finish(batch_message_offset); + + uint32_t batch_meta_len = batch_builder.GetSize(); + int64_t aligned_batch_meta_len = utils::align_to_8(batch_meta_len); + + size_t current_size = final_buffer.size(); + // Resize for the RecordBatch metadata. There is no body to append. + final_buffer.resize(current_size + sizeof(uint32_t) + aligned_batch_meta_len); + uint8_t* dst = final_buffer.data() + current_size; + + *(reinterpret_cast(dst)) = batch_meta_len; + dst += sizeof(uint32_t); + memcpy(dst, batch_builder.GetBufferPointer(), batch_meta_len); + memset(dst + batch_meta_len, 0, aligned_batch_meta_len - batch_meta_len); + } + + return final_buffer; + } + + // This function deserializes a byte vector into a sparrow::null_array. + // It reads the Schema and RecordBatch messages to extract the array's length, + // name, and metadata, then constructs a null_array. + sparrow::null_array deserialize_null_array(const std::vector& buffer) + { + const uint8_t* buf_ptr = buffer.data(); + size_t current_offset = 0; + + // I - Deserialize the Schema message + uint32_t schema_meta_len = *(reinterpret_cast(buf_ptr + current_offset)); + current_offset += sizeof(uint32_t); + auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset); + if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema) + { + throw std::runtime_error("Expected Schema message at the start of the buffer."); + } + auto flatbuffer_schema = static_cast(schema_message->header()); + auto fields = flatbuffer_schema->fields(); + if (fields->size() != 1) + { + throw std::runtime_error("Expected schema with exactly one field for null_array."); + } + auto field = fields->Get(0); + if (field->type_type() != org::apache::arrow::flatbuf::Type::Null) + { + throw std::runtime_error("Expected Null type in schema."); + } + + std::optional name; + if (auto fb_name = field->name()) + { + name = std::string(fb_name->c_str(), fb_name->size()); + } + + std::optional> metadata; + if (auto fb_metadata = field->custom_metadata()) + { + if (fb_metadata->size() > 0) + { + metadata = std::vector(); + metadata->reserve(fb_metadata->size()); + for (const auto& kv : *fb_metadata) + { + metadata->emplace_back(kv->key()->str(), kv->value()->str()); + } + } + } + + current_offset += schema_meta_len; + + // II - Deserialize the RecordBatch message + uint32_t batch_meta_len = *(reinterpret_cast(buf_ptr + current_offset)); + current_offset += sizeof(uint32_t); + auto batch_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset); + if (batch_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::RecordBatch) + { + throw std::runtime_error("Expected RecordBatch message, but got a different type."); + } + auto record_batch = static_cast(batch_message->header()); + + // The body is empty, so we don't need to read any further. + // Construct the null_array from the deserialized metadata. + return sparrow::null_array(record_batch->length(), name, metadata); + } +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index eafa5d7..63f2857 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -2,8 +2,7 @@ cmake_minimum_required(VERSION 3.28) set(test_target "test_sparrow_ipc_lib") -add_executable(${test_target} main.cpp test_primitive_array.cpp test_utils.cpp) - +add_executable(${test_target} main.cpp test_utils.cpp test_primitive_array.cpp test_serialize_null_array.cpp) target_link_libraries(${test_target} PRIVATE sparrow-ipc diff --git a/tests/test_serialize_null_array.cpp b/tests/test_serialize_null_array.cpp new file mode 100644 index 0000000..fa64e03 --- /dev/null +++ b/tests/test_serialize_null_array.cpp @@ -0,0 +1,74 @@ +#include "doctest/doctest.h" +#include "sparrow.hpp" + +#include "serialize_null_array.hpp" + +namespace sparrow_ipc +{ + namespace sp = sparrow; + // TODO have a generic compare_metadata in tests/helpers.hpp, cpp + // taking pa and na + void compare_metadata(sp::null_array& na1, sp::null_array& na2) + { + if (!na1.metadata().has_value()) + { + CHECK(!na2.metadata().has_value()); + return; + } + + CHECK(na2.metadata().has_value()); + sp::key_value_view kvs1_view = *(na1.metadata()); + sp::key_value_view kvs2_view = *(na2.metadata()); + + CHECK_EQ(kvs1_view.size(), kvs2_view.size()); + auto kvs1_it = kvs1_view.cbegin(); + auto kvs2_it = kvs2_view.cbegin(); + for (auto i = 0; i < kvs1_view.size(); ++i) + { + CHECK_EQ(*kvs1_it, *kvs2_it); + ++kvs1_it; + ++kvs2_it; + } + } + + TEST_CASE("Serialize and deserialize null_array") + { + const std::size_t size = 10; + const std::string_view name = "my_null_array"; + + const std::vector metadata_vec = {{"key1", "value1"}, {"key2", "value2"}}; + const std::optional> metadata = metadata_vec; + + sp::null_array arr(size, name, metadata); + + auto buffer = serialize_null_array(arr); + auto deserialized_arr = deserialize_null_array(buffer); + + CHECK_EQ(deserialized_arr.size(), arr.size()); + REQUIRE(deserialized_arr.name().has_value()); + CHECK_EQ(deserialized_arr.name().value(), arr.name().value()); + + REQUIRE(deserialized_arr.metadata().has_value()); + compare_metadata(arr, deserialized_arr); + + // Check the deserialized object is a null_array + const auto& arrow_proxy = sp::detail::array_access::get_arrow_proxy(deserialized_arr); + CHECK_EQ(arrow_proxy.format(), "n"); + CHECK_EQ(arrow_proxy.n_children(), 0); + CHECK_EQ(arrow_proxy.flags(), std::unordered_set{sp::ArrowFlag::NULLABLE}); + CHECK_EQ(arrow_proxy.name(), name); + CHECK_EQ(arrow_proxy.dictionary(), nullptr); + CHECK_EQ(arrow_proxy.buffers().size(), 0); + } + + TEST_CASE("Serialize and deserialize null_array with no name and no metadata") + { + const std::size_t size = 100; + sp::null_array arr(size); + auto buffer = serialize_null_array(arr); + auto deserialized_arr = deserialize_null_array(buffer); + CHECK_EQ(deserialized_arr.size(), arr.size()); + CHECK_FALSE(deserialized_arr.name().has_value()); + CHECK_FALSE(deserialized_arr.metadata().has_value()); + } +} From 9a9abb0be7566708cfcdfb6711f009e3e086eac4 Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Fri, 8 Aug 2025 14:10:32 +0200 Subject: [PATCH 02/14] Move things around and renaming --- tests/CMakeLists.txt | 13 +++++++- tests/include/sparrow_ipc_tests_helpers.hpp | 33 +++++++++++++++++++ ....cpp => test_null_array_serialization.cpp} | 24 +------------- ...=> test_primitive_array_serialization.cpp} | 27 ++------------- 4 files changed, 48 insertions(+), 49 deletions(-) create mode 100644 tests/include/sparrow_ipc_tests_helpers.hpp rename tests/{test_serialize_null_array.cpp => test_null_array_serialization.cpp} (71%) rename tests/{test_primitive_array.cpp => test_primitive_array_serialization.cpp} (90%) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 63f2857..e184974 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -2,7 +2,17 @@ cmake_minimum_required(VERSION 3.28) set(test_target "test_sparrow_ipc_lib") -add_executable(${test_target} main.cpp test_utils.cpp test_primitive_array.cpp test_serialize_null_array.cpp) +set( + SPARROW_IPC_TESTS_SRC + include/sparrow_ipc_tests_helpers.hpp + # TODO move all the files below under src? + main.cpp + test_utils.cpp + test_primitive_array_serialization.cpp + test_null_array_serialization.cpp +) + +add_executable(${test_target} ${SPARROW_IPC_TESTS_SRC}) target_link_libraries(${test_target} PRIVATE sparrow-ipc @@ -24,6 +34,7 @@ endif() target_include_directories(${test_target} PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/include ${CMAKE_BINARY_DIR}/generated ${CMAKE_SOURCE_DIR}/include ) diff --git a/tests/include/sparrow_ipc_tests_helpers.hpp b/tests/include/sparrow_ipc_tests_helpers.hpp new file mode 100644 index 0000000..01b9040 --- /dev/null +++ b/tests/include/sparrow_ipc_tests_helpers.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include "sparrow.hpp" +#include "doctest/doctest.h" + +namespace sparrow_ipc +{ + namespace sp = sparrow; + + template + void compare_metadata(T1& arr1, T2& arr2) + { + if (!arr1.metadata().has_value()) + { + CHECK(!arr2.metadata().has_value()); + return; + } + + CHECK(arr2.metadata().has_value()); + sp::key_value_view kvs1_view = arr1.metadata().value(); + sp::key_value_view kvs2_view = arr2.metadata().value(); + + CHECK_EQ(kvs1_view.size(), kvs2_view.size()); + auto kvs1_it = kvs1_view.cbegin(); + auto kvs2_it = kvs2_view.cbegin(); + for (auto i = 0; i < kvs1_view.size(); ++i) + { + CHECK_EQ(*kvs1_it, *kvs2_it); + ++kvs1_it; + ++kvs2_it; + } + } +} diff --git a/tests/test_serialize_null_array.cpp b/tests/test_null_array_serialization.cpp similarity index 71% rename from tests/test_serialize_null_array.cpp rename to tests/test_null_array_serialization.cpp index fa64e03..d1b809c 100644 --- a/tests/test_serialize_null_array.cpp +++ b/tests/test_null_array_serialization.cpp @@ -2,34 +2,12 @@ #include "sparrow.hpp" #include "serialize_null_array.hpp" +#include "sparrow_ipc_tests_helpers.hpp" namespace sparrow_ipc { namespace sp = sparrow; - // TODO have a generic compare_metadata in tests/helpers.hpp, cpp - // taking pa and na - void compare_metadata(sp::null_array& na1, sp::null_array& na2) - { - if (!na1.metadata().has_value()) - { - CHECK(!na2.metadata().has_value()); - return; - } - - CHECK(na2.metadata().has_value()); - sp::key_value_view kvs1_view = *(na1.metadata()); - sp::key_value_view kvs2_view = *(na2.metadata()); - CHECK_EQ(kvs1_view.size(), kvs2_view.size()); - auto kvs1_it = kvs1_view.cbegin(); - auto kvs2_it = kvs2_view.cbegin(); - for (auto i = 0; i < kvs1_view.size(); ++i) - { - CHECK_EQ(*kvs1_it, *kvs2_it); - ++kvs1_it; - ++kvs2_it; - } - } TEST_CASE("Serialize and deserialize null_array") { diff --git a/tests/test_primitive_array.cpp b/tests/test_primitive_array_serialization.cpp similarity index 90% rename from tests/test_primitive_array.cpp rename to tests/test_primitive_array_serialization.cpp index 4944f61..37d6650 100644 --- a/tests/test_primitive_array.cpp +++ b/tests/test_primitive_array_serialization.cpp @@ -7,6 +7,7 @@ #include "sparrow.hpp" #include "serialize.hpp" +#include "sparrow_ipc_tests_helpers.hpp" namespace sparrow_ipc { @@ -119,30 +120,6 @@ namespace sparrow_ipc } } - template - void compare_metadata(sp::primitive_array& pa1, sp::primitive_array& pa2) - { - if (!pa1.metadata().has_value()) - { - CHECK(!pa2.metadata().has_value()); - return; - } - - CHECK(pa2.metadata().has_value()); - sp::key_value_view kvs1_view = *(pa1.metadata()); - sp::key_value_view kvs2_view = *(pa2.metadata()); - - CHECK_EQ(kvs1_view.size(), kvs2_view.size()); - auto kvs1_it = kvs1_view.cbegin(); - auto kvs2_it = kvs2_view.cbegin(); - for (auto i = 0; i < kvs1_view.size(); ++i) - { - CHECK_EQ(*kvs1_it, *kvs2_it); - ++kvs1_it; - ++kvs2_it; - } - } - template void compare_primitive_arrays(sp::primitive_array& ar, sp::primitive_array& deserialized_ar) { @@ -161,7 +138,7 @@ namespace sparrow_ipc // compare_values(ar, deserialized_ar); compare_bitmap(ar, deserialized_ar); - compare_metadata(ar, deserialized_ar); + compare_metadata(ar, deserialized_ar); } TEST_CASE_TEMPLATE_DEFINE("Serialize and Deserialize primitive_array", T, primitive_array_types) From 4e84aeb4906d015d4830ca20875d7a0823418684 Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Fri, 8 Aug 2025 16:16:40 +0200 Subject: [PATCH 03/14] Renaming --- CMakeLists.txt | 2 +- include/{serialize.hpp => serialize_primitive_array.hpp} | 1 - tests/include/sparrow_ipc_tests_helpers.hpp | 2 +- tests/test_primitive_array_serialization.cpp | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) rename include/{serialize.hpp => serialize_primitive_array.hpp} (99%) diff --git a/CMakeLists.txt b/CMakeLists.txt index a7d8f29..1a94560 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -49,7 +49,7 @@ set(SPARROW_IPC_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src) set(SPARROW_IPC_HEADERS ${SPARROW_IPC_INCLUDE_DIR}/config/config.hpp - ${SPARROW_IPC_INCLUDE_DIR}/serialize.hpp + ${SPARROW_IPC_INCLUDE_DIR}/serialize_primitive_array.hpp ${SPARROW_IPC_INCLUDE_DIR}/serialize_null_array.hpp ${SPARROW_IPC_INCLUDE_DIR}/utils.hpp ) diff --git a/include/serialize.hpp b/include/serialize_primitive_array.hpp similarity index 99% rename from include/serialize.hpp rename to include/serialize_primitive_array.hpp index 8397a36..a629527 100644 --- a/include/serialize.hpp +++ b/include/serialize_primitive_array.hpp @@ -16,7 +16,6 @@ namespace sparrow_ipc { - //TODO split serialize/deserialize fcts in two different files or just rename the current one? template std::vector serialize_primitive_array(sparrow::primitive_array& arr); diff --git a/tests/include/sparrow_ipc_tests_helpers.hpp b/tests/include/sparrow_ipc_tests_helpers.hpp index 01b9040..e1fc1a1 100644 --- a/tests/include/sparrow_ipc_tests_helpers.hpp +++ b/tests/include/sparrow_ipc_tests_helpers.hpp @@ -1,7 +1,7 @@ #pragma once -#include "sparrow.hpp" #include "doctest/doctest.h" +#include "sparrow.hpp" namespace sparrow_ipc { diff --git a/tests/test_primitive_array_serialization.cpp b/tests/test_primitive_array_serialization.cpp index 37d6650..bc660ca 100644 --- a/tests/test_primitive_array_serialization.cpp +++ b/tests/test_primitive_array_serialization.cpp @@ -6,7 +6,7 @@ #include "doctest/doctest.h" #include "sparrow.hpp" -#include "serialize.hpp" +#include "serialize_primitive_array.hpp" #include "sparrow_ipc_tests_helpers.hpp" namespace sparrow_ipc From 066c10877f1278fff06b6da47f8fb13f82a0dd64 Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Fri, 8 Aug 2025 16:41:42 +0200 Subject: [PATCH 04/14] Remove commented code --- include/serialize_null_array.hpp | 9 +-------- include/serialize_primitive_array.hpp | 2 +- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/include/serialize_null_array.hpp b/include/serialize_null_array.hpp index 6570e37..ae43757 100644 --- a/include/serialize_null_array.hpp +++ b/include/serialize_null_array.hpp @@ -57,14 +57,7 @@ namespace sparrow_ipc { sparrow::key_value_view metadata_view = *(arr.metadata()); std::vector> kv_offsets; -// kv_offsets.reserve(metadata_view.size()); -// for (const auto& pair : metadata_view) -// { -// auto key_offset = schema_builder.CreateString(std::string(pair.first)); -// auto value_offset = schema_builder.CreateString(std::string(pair.second)); -// kv_offsets.push_back( -// org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset)); -// } + kv_offsets.reserve(metadata_view.size()); auto mv_it = metadata_view.cbegin(); for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it) { diff --git a/include/serialize_primitive_array.hpp b/include/serialize_primitive_array.hpp index a629527..7f5f51c 100644 --- a/include/serialize_primitive_array.hpp +++ b/include/serialize_primitive_array.hpp @@ -68,7 +68,7 @@ namespace sparrow_ipc { sparrow::key_value_view metadata_view = *(arr.metadata()); std::vector> kv_offsets; - + kv_offsets.reserve(metadata_view.size()); auto mv_it = metadata_view.cbegin(); for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it) { From 1ef9f2c2585366ba5f50822ad744fe06facd9e0a Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Mon, 11 Aug 2025 17:55:35 +0200 Subject: [PATCH 05/14] Refactor serialization into chunk functions --- CMakeLists.txt | 2 + include/serialize.hpp | 17 +++ include/serialize_null_array.hpp | 105 +---------------- include/serialize_primitive_array.hpp | 74 +----------- src/serialize.cpp | 160 ++++++++++++++++++++++++++ 5 files changed, 186 insertions(+), 172 deletions(-) create mode 100644 include/serialize.hpp create mode 100644 src/serialize.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 1a94560..393c5d6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -49,12 +49,14 @@ set(SPARROW_IPC_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src) set(SPARROW_IPC_HEADERS ${SPARROW_IPC_INCLUDE_DIR}/config/config.hpp + ${SPARROW_IPC_INCLUDE_DIR}/serialize.hpp ${SPARROW_IPC_INCLUDE_DIR}/serialize_primitive_array.hpp ${SPARROW_IPC_INCLUDE_DIR}/serialize_null_array.hpp ${SPARROW_IPC_INCLUDE_DIR}/utils.hpp ) set(SPARROW_IPC_SRC + ${SPARROW_IPC_SOURCE_DIR}/serialize.cpp ${SPARROW_IPC_SOURCE_DIR}/utils.cpp ) diff --git a/include/serialize.hpp b/include/serialize.hpp new file mode 100644 index 0000000..b47aca0 --- /dev/null +++ b/include/serialize.hpp @@ -0,0 +1,17 @@ +#pragma once + +#include +#include + +#include "sparrow.hpp" + +#include "config/config.hpp" + +namespace sparrow_ipc +{ + namespace details + { + SPARROW_IPC_API void serialize_schema_message(const ArrowSchema& arrow_schema, const std::optional& metadata, std::vector& final_buffer); + SPARROW_IPC_API void serialize_record_batch_message(const ArrowArray& arrow_arr, std::vector& final_buffer); + } +} diff --git a/include/serialize_null_array.hpp b/include/serialize_null_array.hpp index ae43757..947d88e 100644 --- a/include/serialize_null_array.hpp +++ b/include/serialize_null_array.hpp @@ -14,6 +14,7 @@ #include "Message_generated.h" #include "Schema_generated.h" +#include "serialize.hpp" #include "utils.hpp" namespace sparrow_ipc @@ -28,116 +29,18 @@ namespace sparrow_ipc // making its message body zero-length. std::vector serialize_null_array(sparrow::null_array& arr) { - // Use the Arrow C Data Interface to get a generic description of the array. - // For a null_array, the ArrowArray struct will report n_buffers = 0. auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr); auto& arrow_arr = *arrow_arr_ptr; auto& arrow_schema = *arrow_schema_ptr; std::vector final_buffer; - // I - Serialize the Schema message - // This part is almost identical to how a primitive_array's schema is serialized. - { - flatbuffers::FlatBufferBuilder schema_builder; - - flatbuffers::Offset fb_name_offset = 0; - if (arrow_schema.name) - { - fb_name_offset = schema_builder.CreateString(arrow_schema.name); - } - - // For null_array, the format string is "n". - auto [type_enum, type_offset] = utils::get_flatbuffer_type(schema_builder, arrow_schema.format); - - flatbuffers::Offset>> - fb_metadata_offset = 0; - - if (arr.metadata()) - { - sparrow::key_value_view metadata_view = *(arr.metadata()); - std::vector> kv_offsets; - kv_offsets.reserve(metadata_view.size()); - auto mv_it = metadata_view.cbegin(); - for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it) - { - auto key_offset = schema_builder.CreateString(std::string((*mv_it).first)); - auto value_offset = schema_builder.CreateString(std::string((*mv_it).second)); - kv_offsets.push_back( - org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset)); - } - fb_metadata_offset = schema_builder.CreateVector(kv_offsets); - } - - auto fb_field = org::apache::arrow::flatbuf::CreateField( - schema_builder, - fb_name_offset, - (arrow_schema.flags & static_cast(sparrow::ArrowFlag::NULLABLE)) != 0, - type_enum, - type_offset, - 0, // dictionary - 0, // children - fb_metadata_offset); - - std::vector> fields_vec = {fb_field}; - auto fb_fields = schema_builder.CreateVector(fields_vec); - - auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields); - - auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage( - schema_builder, - org::apache::arrow::flatbuf::MetadataVersion::V5, - org::apache::arrow::flatbuf::MessageHeader::Schema, - schema_offset.Union(), - 0 // bodyLength - ); - schema_builder.Finish(schema_message_offset); - - uint32_t schema_len = schema_builder.GetSize(); - final_buffer.resize(sizeof(uint32_t) + schema_len); - memcpy(final_buffer.data() + sizeof(uint32_t), schema_builder.GetBufferPointer(), schema_len); - *(reinterpret_cast(final_buffer.data())) = schema_len; - } + details::serialize_schema_message(arrow_schema, arr.metadata(), final_buffer); // II - Serialize the RecordBatch message - { - flatbuffers::FlatBufferBuilder batch_builder; - - // The FieldNode describes the layout (length and null count). - // For a null_array, length and null_count are always equal. - org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count); - auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1); - - // A null_array has no buffers. The ArrowArray struct reports n_buffers = 0, - // so we create an empty vector of buffers for the Flatbuffers message. - auto fb_buffers_vector = batch_builder.CreateVectorOfStructs({}); - - auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(batch_builder, arrow_arr.length, fb_nodes_vector, fb_buffers_vector); - - // The bodyLength is 0 because there are no data buffers. - auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage( - batch_builder, - org::apache::arrow::flatbuf::MetadataVersion::V5, - org::apache::arrow::flatbuf::MessageHeader::RecordBatch, - record_batch_offset.Union(), - 0 // bodyLength - ); - batch_builder.Finish(batch_message_offset); - - uint32_t batch_meta_len = batch_builder.GetSize(); - int64_t aligned_batch_meta_len = utils::align_to_8(batch_meta_len); - - size_t current_size = final_buffer.size(); - // Resize for the RecordBatch metadata. There is no body to append. - final_buffer.resize(current_size + sizeof(uint32_t) + aligned_batch_meta_len); - uint8_t* dst = final_buffer.data() + current_size; - - *(reinterpret_cast(dst)) = batch_meta_len; - dst += sizeof(uint32_t); - memcpy(dst, batch_builder.GetBufferPointer(), batch_meta_len); - memset(dst + batch_meta_len, 0, aligned_batch_meta_len - batch_meta_len); - } + details::serialize_record_batch_message(arrow_arr, final_buffer); + // Return the final buffer containing the complete IPC stream return final_buffer; } diff --git a/include/serialize_primitive_array.hpp b/include/serialize_primitive_array.hpp index 7f5f51c..860550c 100644 --- a/include/serialize_primitive_array.hpp +++ b/include/serialize_primitive_array.hpp @@ -12,6 +12,7 @@ #include "Message_generated.h" #include "Schema_generated.h" +#include "serialize.hpp" #include "utils.hpp" namespace sparrow_ipc @@ -45,77 +46,7 @@ namespace sparrow_ipc std::vector final_buffer; // I - Serialize the Schema message - // An Arrow IPC stream must start with a Schema message - { - // Create a new builder for the Schema message's metadata - flatbuffers::FlatBufferBuilder schema_builder; - - // Create the Field metadata, which describes a single column (or array) - flatbuffers::Offset fb_name_offset = 0; - if (arrow_schema.name) - { - fb_name_offset = schema_builder.CreateString(arrow_schema.name); - } - - // Determine the Flatbuffer type information from the C schema's format string - auto [type_enum, type_offset] = utils::get_flatbuffer_type(schema_builder, arrow_schema.format); - - // Handle metadata - flatbuffers::Offset>> - fb_metadata_offset = 0; - - if (arr.metadata()) - { - sparrow::key_value_view metadata_view = *(arr.metadata()); - std::vector> kv_offsets; - kv_offsets.reserve(metadata_view.size()); - auto mv_it = metadata_view.cbegin(); - for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it) - { - auto key_offset = schema_builder.CreateString(std::string((*mv_it).first)); - auto value_offset = schema_builder.CreateString(std::string((*mv_it).second)); - kv_offsets.push_back( - org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset)); - } - fb_metadata_offset = schema_builder.CreateVector(kv_offsets); - } - - // Build the Field object - auto fb_field = org::apache::arrow::flatbuf::CreateField( - schema_builder, - fb_name_offset, - (arrow_schema.flags & static_cast(sparrow::ArrowFlag::NULLABLE)) != 0, - type_enum, - type_offset, - 0, // dictionary - 0, // children - fb_metadata_offset); - - // A Schema contains a vector of fields. For this primitive array, there is only one - std::vector> fields_vec = {fb_field}; - auto fb_fields = schema_builder.CreateVector(fields_vec); - - // Build the Schema object from the vector of fields - auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields); - - // Wrap the Schema in a top-level Message, which is the standard IPC envelope - auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage( - schema_builder, - org::apache::arrow::flatbuf::MetadataVersion::V5, - org::apache::arrow::flatbuf::MessageHeader::Schema, - schema_offset.Union(), - 0 - ); - schema_builder.Finish(schema_message_offset); - - // Assemble the Schema message bytes - uint32_t schema_len = schema_builder.GetSize(); // Get the size of the serialized metadata - final_buffer.resize(sizeof(uint32_t) + schema_len); // Resize the buffer to hold the message - // Copy the metadata into the buffer, after the 4-byte length prefix - memcpy(final_buffer.data() + sizeof(uint32_t), schema_builder.GetBufferPointer(), schema_len); - // Write the 4-byte metadata length at the beginning of the message - *(reinterpret_cast(final_buffer.data())) = schema_len; - } + details::serialize_schema_message(arrow_schema, arr.metadata(), final_buffer); // II - Serialize the RecordBatch message // After the Schema, we send the RecordBatch containing the actual data @@ -261,6 +192,7 @@ namespace sparrow_ipc metadata->reserve(fb_metadata->size()); for (const auto& kv : *fb_metadata) { + // TODO use str() instead of c_str() metadata->emplace_back(kv->key()->c_str(), kv->value()->c_str()); } } diff --git a/src/serialize.cpp b/src/serialize.cpp new file mode 100644 index 0000000..89c12f6 --- /dev/null +++ b/src/serialize.cpp @@ -0,0 +1,160 @@ +// TODO check need of all these includes +#include +#include +#include +#include + +#include "Message_generated.h" +#include "Schema_generated.h" + +#include "serialize.hpp" +#include "utils.hpp" + +namespace sparrow_ipc +{ + namespace details + { + void serialize_schema_message(const ArrowSchema& arrow_schema, const std::optional& metadata, std::vector& final_buffer) + { + // Create a new builder for the Schema message's metadata + flatbuffers::FlatBufferBuilder schema_builder; + + flatbuffers::Offset fb_name_offset = 0; + if (arrow_schema.name) + { + fb_name_offset = schema_builder.CreateString(arrow_schema.name); + } + + // Determine the Flatbuffer type information from the C schema's format string + auto [type_enum, type_offset] = utils::get_flatbuffer_type(schema_builder, arrow_schema.format); + + // Handle metadata + flatbuffers::Offset>> + fb_metadata_offset = 0; + + if (metadata) + { + sparrow::key_value_view metadata_view = metadata.value(); + std::vector> kv_offsets; + kv_offsets.reserve(metadata_view.size()); + auto mv_it = metadata_view.cbegin(); + for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it) + { + auto key_offset = schema_builder.CreateString(std::string((*mv_it).first)); + auto value_offset = schema_builder.CreateString(std::string((*mv_it).second)); + kv_offsets.push_back( + org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset)); + } + fb_metadata_offset = schema_builder.CreateVector(kv_offsets); + } + + // Build the Field object + auto fb_field = org::apache::arrow::flatbuf::CreateField( + schema_builder, + fb_name_offset, + (arrow_schema.flags & static_cast(sparrow::ArrowFlag::NULLABLE)) != 0, + type_enum, + type_offset, + 0, // dictionary + 0, // children + fb_metadata_offset); + + // A Schema contains a vector of fields + std::vector> fields_vec = {fb_field}; + auto fb_fields = schema_builder.CreateVector(fields_vec); + + // Build the Schema object from the vector of fields + auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields); + + // Wrap the Schema in a top-level Message, which is the standard IPC envelope + auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage( + schema_builder, + org::apache::arrow::flatbuf::MetadataVersion::V5, + org::apache::arrow::flatbuf::MessageHeader::Schema, + schema_offset.Union(), + 0 + ); + schema_builder.Finish(schema_message_offset); + + // Assemble the Schema message bytes + uint32_t schema_len = schema_builder.GetSize(); // Get the size of the serialized metadata + final_buffer.resize(sizeof(uint32_t) + schema_len); // Resize the buffer to hold the message + // Copy the metadata into the buffer, after the 4-byte length prefix + memcpy(final_buffer.data() + sizeof(uint32_t), schema_builder.GetBufferPointer(), schema_len); + // Write the 4-byte metadata length at the beginning of the message + *(reinterpret_cast(final_buffer.data())) = schema_len; + } + + void serialize_record_batch_message(const ArrowArray& arrow_arr, std::vector& final_buffer) + { + // Create a new builder for the RecordBatch message's metadata + flatbuffers::FlatBufferBuilder batch_builder; + + // arrow_arr.buffers[0] is the validity bitmap + // arrow_arr.buffers[1] is the data buffer +// const uint8_t* validity_bitmap = reinterpret_cast(arrow_arr.buffers[0]); +// const uint8_t* data_buffer = reinterpret_cast(arrow_arr.buffers[1]); + + // Calculate the size of the validity and data buffers +// int64_t validity_size = (arrow_arr.length + 7) / 8; +// int64_t data_size = arrow_arr.length * sizeof(T); +// int64_t body_len = validity_size + data_size; // The total size of the message body + int64_t body_len = 0; // NULL ARRAY + + // Create the FieldNode, which describes the layout of the array data + org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count); + // A RecordBatch contains a vector of nodes and a vector of buffers + auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1); +// std::vector buffers_vec = {validity_buffer_struct, data_buffer_struct}; + std::vector buffers_vec = {}; // NULL ARRAY + auto fb_buffers_vector = batch_builder.CreateVectorOfStructs(buffers_vec); + + // Build the RecordBatch metadata object + auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(batch_builder, arrow_arr.length, fb_nodes_vector, fb_buffers_vector); + + // Wrap the RecordBatch in a top-level Message + auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage( + batch_builder, + org::apache::arrow::flatbuf::MetadataVersion::V5, + org::apache::arrow::flatbuf::MessageHeader::RecordBatch, + record_batch_offset.Union(), + body_len + ); + batch_builder.Finish(batch_message_offset); + + // III - Append the RecordBatch message to the final buffer + uint32_t batch_meta_len = batch_builder.GetSize(); // Get the size of the batch metadata + int64_t aligned_batch_meta_len = utils::align_to_8(batch_meta_len); // Calculate the padded length + + size_t current_size = final_buffer.size(); // Get the current size (which is the end of the Schema message) + // Resize the buffer to append the new message + final_buffer.resize(current_size + sizeof(uint32_t) + aligned_batch_meta_len + body_len); + uint8_t* dst = final_buffer.data() + current_size; // Get a pointer to where the new message will start + + // Write the 4-byte metadata length for the RecordBatch message + *(reinterpret_cast(dst)) = batch_meta_len; + dst += sizeof(uint32_t); + // Copy the RecordBatch metadata into the buffer + memcpy(dst, batch_builder.GetBufferPointer(), batch_meta_len); + // Add padding to align the body to an 8-byte boundary + memset(dst + batch_meta_len, 0, aligned_batch_meta_len - batch_meta_len); + +// dst += aligned_batch_meta_len; +// // Copy the actual data buffers (the message body) into the buffer +// if (validity_bitmap) +// { +// memcpy(dst, validity_bitmap, validity_size); +// } +// else +// { +// // If validity_bitmap is null, it means there are no nulls +// memset(dst, 0xFF, validity_size); +// } +// dst += validity_size; +// if (data_buffer) +// { +// memcpy(dst, data_buffer, data_size); +// } + } + } // namespace details +} // namespace sparrow-ipc From 5e40fcc546147fcd8ba8958835812d3784bc771c Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Tue, 12 Aug 2025 13:47:19 +0200 Subject: [PATCH 06/14] Make serialize_record_batch_message generic --- include/serialize.hpp | 2 +- include/serialize_null_array.hpp | 2 +- include/serialize_primitive_array.hpp | 77 +++------------------------ src/serialize.cpp | 62 ++++++++++----------- 4 files changed, 40 insertions(+), 103 deletions(-) diff --git a/include/serialize.hpp b/include/serialize.hpp index b47aca0..a4ca78e 100644 --- a/include/serialize.hpp +++ b/include/serialize.hpp @@ -12,6 +12,6 @@ namespace sparrow_ipc namespace details { SPARROW_IPC_API void serialize_schema_message(const ArrowSchema& arrow_schema, const std::optional& metadata, std::vector& final_buffer); - SPARROW_IPC_API void serialize_record_batch_message(const ArrowArray& arrow_arr, std::vector& final_buffer); + SPARROW_IPC_API void serialize_record_batch_message(const ArrowArray& arrow_arr, const std::vector& buffers_sizes, std::vector& final_buffer); } } diff --git a/include/serialize_null_array.hpp b/include/serialize_null_array.hpp index 947d88e..36ae8fd 100644 --- a/include/serialize_null_array.hpp +++ b/include/serialize_null_array.hpp @@ -38,7 +38,7 @@ namespace sparrow_ipc details::serialize_schema_message(arrow_schema, arr.metadata(), final_buffer); // II - Serialize the RecordBatch message - details::serialize_record_batch_message(arrow_arr, final_buffer); + details::serialize_record_batch_message(arrow_arr, {}, final_buffer); // Return the final buffer containing the complete IPC stream return final_buffer; diff --git a/include/serialize_primitive_array.hpp b/include/serialize_primitive_array.hpp index 860550c..d28b18a 100644 --- a/include/serialize_primitive_array.hpp +++ b/include/serialize_primitive_array.hpp @@ -50,77 +50,12 @@ namespace sparrow_ipc // II - Serialize the RecordBatch message // After the Schema, we send the RecordBatch containing the actual data - { - // Create a new builder for the RecordBatch message's metadata - flatbuffers::FlatBufferBuilder batch_builder; - - // arrow_arr.buffers[0] is the validity bitmap - // arrow_arr.buffers[1] is the data buffer - const uint8_t* validity_bitmap = reinterpret_cast(arrow_arr.buffers[0]); - const uint8_t* data_buffer = reinterpret_cast(arrow_arr.buffers[1]); - - // Calculate the size of the validity and data buffers - int64_t validity_size = (arrow_arr.length + 7) / 8; - int64_t data_size = arrow_arr.length * sizeof(T); - int64_t body_len = validity_size + data_size; // The total size of the message body - - // Create Flatbuffer descriptions for the data buffers - org::apache::arrow::flatbuf::Buffer validity_buffer_struct(0, validity_size); - org::apache::arrow::flatbuf::Buffer data_buffer_struct(validity_size, data_size); - // Create the FieldNode, which describes the layout of the array data - org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count); - - // A RecordBatch contains a vector of nodes and a vector of buffers - auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1); - std::vector buffers_vec = {validity_buffer_struct, data_buffer_struct}; - auto fb_buffers_vector = batch_builder.CreateVectorOfStructs(buffers_vec); - - // Build the RecordBatch metadata object - auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(batch_builder, arrow_arr.length, fb_nodes_vector, fb_buffers_vector); - - // Wrap the RecordBatch in a top-level Message - auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage( - batch_builder, - org::apache::arrow::flatbuf::MetadataVersion::V5, - org::apache::arrow::flatbuf::MessageHeader::RecordBatch, - record_batch_offset.Union(), - body_len - ); - batch_builder.Finish(batch_message_offset); - - // III - Append the RecordBatch message to the final buffer - uint32_t batch_meta_len = batch_builder.GetSize(); // Get the size of the batch metadata - int64_t aligned_batch_meta_len = utils::align_to_8(batch_meta_len); // Calculate the padded length - - size_t current_size = final_buffer.size(); // Get the current size (which is the end of the Schema message) - // Resize the buffer to append the new message - final_buffer.resize(current_size + sizeof(uint32_t) + aligned_batch_meta_len + body_len); - uint8_t* dst = final_buffer.data() + current_size; // Get a pointer to where the new message will start - - // Write the 4-byte metadata length for the RecordBatch message - *(reinterpret_cast(dst)) = batch_meta_len; - dst += sizeof(uint32_t); - // Copy the RecordBatch metadata into the buffer - memcpy(dst, batch_builder.GetBufferPointer(), batch_meta_len); - // Add padding to align the body to an 8-byte boundary - memset(dst + batch_meta_len, 0, aligned_batch_meta_len - batch_meta_len); - dst += aligned_batch_meta_len; - // Copy the actual data buffers (the message body) into the buffer - if (validity_bitmap) - { - memcpy(dst, validity_bitmap, validity_size); - } - else - { - // If validity_bitmap is null, it means there are no nulls - memset(dst, 0xFF, validity_size); - } - dst += validity_size; - if (data_buffer) - { - memcpy(dst, data_buffer, data_size); - } - } + + // Calculate the size of the validity and data buffers + int64_t validity_size = (arrow_arr.length + 7) / 8; + int64_t data_size = arrow_arr.length * sizeof(T); + std::vector buffers_sizes = {validity_size, data_size}; + details::serialize_record_batch_message(arrow_arr, buffers_sizes, final_buffer); // Return the final buffer containing the complete IPC stream return final_buffer; diff --git a/src/serialize.cpp b/src/serialize.cpp index 89c12f6..0adcbe3 100644 --- a/src/serialize.cpp +++ b/src/serialize.cpp @@ -85,28 +85,25 @@ namespace sparrow_ipc *(reinterpret_cast(final_buffer.data())) = schema_len; } - void serialize_record_batch_message(const ArrowArray& arrow_arr, std::vector& final_buffer) + void serialize_record_batch_message(const ArrowArray& arrow_arr, const std::vector& buffers_sizes, std::vector& final_buffer) { // Create a new builder for the RecordBatch message's metadata flatbuffers::FlatBufferBuilder batch_builder; - // arrow_arr.buffers[0] is the validity bitmap - // arrow_arr.buffers[1] is the data buffer -// const uint8_t* validity_bitmap = reinterpret_cast(arrow_arr.buffers[0]); -// const uint8_t* data_buffer = reinterpret_cast(arrow_arr.buffers[1]); - - // Calculate the size of the validity and data buffers -// int64_t validity_size = (arrow_arr.length + 7) / 8; -// int64_t data_size = arrow_arr.length * sizeof(T); -// int64_t body_len = validity_size + data_size; // The total size of the message body - int64_t body_len = 0; // NULL ARRAY + std::vector buffers_vec; + int64_t current_offset = 0; + int64_t body_len = 0; // The total size of the message body + for (const auto& size : buffers_sizes) + { + buffers_vec.emplace_back(current_offset, size); + current_offset += size; + } + body_len = current_offset; // Create the FieldNode, which describes the layout of the array data org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count); // A RecordBatch contains a vector of nodes and a vector of buffers auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1); -// std::vector buffers_vec = {validity_buffer_struct, data_buffer_struct}; - std::vector buffers_vec = {}; // NULL ARRAY auto fb_buffers_vector = batch_builder.CreateVectorOfStructs(buffers_vec); // Build the RecordBatch metadata object @@ -122,7 +119,7 @@ namespace sparrow_ipc ); batch_builder.Finish(batch_message_offset); - // III - Append the RecordBatch message to the final buffer + // Append the RecordBatch message to the final buffer uint32_t batch_meta_len = batch_builder.GetSize(); // Get the size of the batch metadata int64_t aligned_batch_meta_len = utils::align_to_8(batch_meta_len); // Calculate the padded length @@ -139,22 +136,27 @@ namespace sparrow_ipc // Add padding to align the body to an 8-byte boundary memset(dst + batch_meta_len, 0, aligned_batch_meta_len - batch_meta_len); -// dst += aligned_batch_meta_len; -// // Copy the actual data buffers (the message body) into the buffer -// if (validity_bitmap) -// { -// memcpy(dst, validity_bitmap, validity_size); -// } -// else -// { -// // If validity_bitmap is null, it means there are no nulls -// memset(dst, 0xFF, validity_size); -// } -// dst += validity_size; -// if (data_buffer) -// { -// memcpy(dst, data_buffer, data_size); -// } + dst += aligned_batch_meta_len; + // Copy the actual data buffers (the message body) into the buffer + for (size_t i = 0; i < buffers_sizes.size(); ++i) + { + // arrow_arr.buffers[0] is the validity bitmap + // arrow_arr.buffers[1] is the actual data buffer + const uint8_t* data_buffer = reinterpret_cast(arrow_arr.buffers[i]); + if (data_buffer) + { + memcpy(dst, data_buffer, buffers_sizes[i]); + } + else + { + // If validity_bitmap is null, it means there are no nulls + if (i == 0) + { + memset(dst, 0xFF, buffers_sizes[i]); + } + } + dst += buffers_sizes[i]; + } } } // namespace details } // namespace sparrow-ipc From 47441a0f6ff115ffa76dfecc73be6283489aad58 Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Tue, 12 Aug 2025 16:02:48 +0200 Subject: [PATCH 07/14] Add deserialize_schema_message --- include/serialize.hpp | 12 +++++---- include/serialize_null_array.hpp | 39 +-------------------------- include/serialize_primitive_array.hpp | 38 +++----------------------- src/serialize.cpp | 38 ++++++++++++++++++++++++++ 4 files changed, 49 insertions(+), 78 deletions(-) diff --git a/include/serialize.hpp b/include/serialize.hpp index a4ca78e..3866c83 100644 --- a/include/serialize.hpp +++ b/include/serialize.hpp @@ -9,9 +9,11 @@ namespace sparrow_ipc { - namespace details - { - SPARROW_IPC_API void serialize_schema_message(const ArrowSchema& arrow_schema, const std::optional& metadata, std::vector& final_buffer); - SPARROW_IPC_API void serialize_record_batch_message(const ArrowArray& arrow_arr, const std::vector& buffers_sizes, std::vector& final_buffer); - } + namespace details + { + SPARROW_IPC_API void serialize_schema_message(const ArrowSchema& arrow_schema, const std::optional& metadata, std::vector& final_buffer); + SPARROW_IPC_API void serialize_record_batch_message(const ArrowArray& arrow_arr, const std::vector& buffers_sizes, std::vector& final_buffer); + + SPARROW_IPC_API void deserialize_schema_message(const uint8_t* buf_ptr, size_t& current_offset, std::optional& name, std::optional>& metadata); + } } diff --git a/include/serialize_null_array.hpp b/include/serialize_null_array.hpp index 36ae8fd..ca27f08 100644 --- a/include/serialize_null_array.hpp +++ b/include/serialize_null_array.hpp @@ -53,46 +53,9 @@ namespace sparrow_ipc size_t current_offset = 0; // I - Deserialize the Schema message - uint32_t schema_meta_len = *(reinterpret_cast(buf_ptr + current_offset)); - current_offset += sizeof(uint32_t); - auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset); - if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema) - { - throw std::runtime_error("Expected Schema message at the start of the buffer."); - } - auto flatbuffer_schema = static_cast(schema_message->header()); - auto fields = flatbuffer_schema->fields(); - if (fields->size() != 1) - { - throw std::runtime_error("Expected schema with exactly one field for null_array."); - } - auto field = fields->Get(0); - if (field->type_type() != org::apache::arrow::flatbuf::Type::Null) - { - throw std::runtime_error("Expected Null type in schema."); - } - std::optional name; - if (auto fb_name = field->name()) - { - name = std::string(fb_name->c_str(), fb_name->size()); - } - std::optional> metadata; - if (auto fb_metadata = field->custom_metadata()) - { - if (fb_metadata->size() > 0) - { - metadata = std::vector(); - metadata->reserve(fb_metadata->size()); - for (const auto& kv : *fb_metadata) - { - metadata->emplace_back(kv->key()->str(), kv->value()->str()); - } - } - } - - current_offset += schema_meta_len; + details::deserialize_schema_message(buf_ptr, current_offset, name, metadata); // II - Deserialize the RecordBatch message uint32_t batch_meta_len = *(reinterpret_cast(buf_ptr + current_offset)); diff --git a/include/serialize_primitive_array.hpp b/include/serialize_primitive_array.hpp index d28b18a..703188b 100644 --- a/include/serialize_primitive_array.hpp +++ b/include/serialize_primitive_array.hpp @@ -67,20 +67,9 @@ namespace sparrow_ipc size_t current_offset = 0; // I - Deserialize the Schema message - uint32_t schema_meta_len = *(reinterpret_cast(buf_ptr + current_offset)); - current_offset += sizeof(uint32_t); - auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset); - if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema) - { - throw std::runtime_error("Expected Schema message at the start of the buffer."); - } - auto flatbuffer_schema = static_cast(schema_message->header()); - auto fields = flatbuffer_schema->fields(); - if (fields->size() != 1) - { - throw std::runtime_error("Expected schema with exactly one field for primitive_array."); - } - current_offset += schema_meta_len; + std::optional name; + std::optional> metadata; + details::deserialize_schema_message(buf_ptr, current_offset, name, metadata); // II - Deserialize the RecordBatch message uint32_t batch_meta_len = *(reinterpret_cast(buf_ptr + current_offset)); @@ -110,27 +99,6 @@ namespace sparrow_ipc uint8_t* data_buffer_copy = new uint8_t[data_len]; memcpy(data_buffer_copy, body_ptr + buffers_meta->Get(1)->offset(), data_len); - // Get name - std::optional name; - const flatbuffers::String* fb_name_flatbuffer = fields->Get(0)->name(); - if (fb_name_flatbuffer) - { - name = std::string(fb_name_flatbuffer->c_str(), fb_name_flatbuffer->size()); - } - - // Handle metadata - std::optional> metadata; - auto fb_metadata = fields->Get(0)->custom_metadata(); - if (fb_metadata && !fb_metadata->empty()) - { - metadata = std::vector(); - metadata->reserve(fb_metadata->size()); - for (const auto& kv : *fb_metadata) - { - // TODO use str() instead of c_str() - metadata->emplace_back(kv->key()->c_str(), kv->value()->c_str()); - } - } auto data = sparrow::u8_buffer(reinterpret_cast(data_buffer_copy), node_meta->length()); auto bitmap = sparrow::validity_bitmap(validity_buffer_copy, node_meta->length()); diff --git a/src/serialize.cpp b/src/serialize.cpp index 0adcbe3..938f052 100644 --- a/src/serialize.cpp +++ b/src/serialize.cpp @@ -158,5 +158,43 @@ namespace sparrow_ipc dst += buffers_sizes[i]; } } + + void deserialize_schema_message(const uint8_t* buf_ptr, size_t& current_offset, std::optional& name, std::optional>& metadata) + { + uint32_t schema_meta_len = *(reinterpret_cast(buf_ptr + current_offset)); + current_offset += sizeof(uint32_t); + auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset); + if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema) + { + throw std::runtime_error("Expected Schema message at the start of the buffer."); + } + auto flatbuffer_schema = static_cast(schema_message->header()); + auto fields = flatbuffer_schema->fields(); + if (fields->size() != 1) + { + throw std::runtime_error("Expected schema with exactly one field."); + } + + auto field = fields->Get(0); + + // Get name + if (const auto fb_name = field->name()) + { + name = fb_name->str(); + } + + // Handle metadata + auto fb_metadata = field->custom_metadata(); + if (fb_metadata && !fb_metadata->empty()) + { + metadata = std::vector(); + metadata->reserve(fb_metadata->size()); + for (const auto& kv : *fb_metadata) + { + metadata->emplace_back(kv->key()->str(), kv->value()->str()); + } + } + current_offset += schema_meta_len; + } } // namespace details } // namespace sparrow-ipc From e11f65ba8c0b27b544c040de08140b663dd4bb1e Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Tue, 12 Aug 2025 17:00:51 +0200 Subject: [PATCH 08/14] Add deserialize_record_batch_message --- include/serialize.hpp | 1 + include/serialize_null_array.hpp | 9 +-------- include/serialize_primitive_array.hpp | 9 ++------- src/serialize.cpp | 12 ++++++++++++ 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/include/serialize.hpp b/include/serialize.hpp index 3866c83..fec4e50 100644 --- a/include/serialize.hpp +++ b/include/serialize.hpp @@ -15,5 +15,6 @@ namespace sparrow_ipc SPARROW_IPC_API void serialize_record_batch_message(const ArrowArray& arrow_arr, const std::vector& buffers_sizes, std::vector& final_buffer); SPARROW_IPC_API void deserialize_schema_message(const uint8_t* buf_ptr, size_t& current_offset, std::optional& name, std::optional>& metadata); + SPARROW_IPC_API const org::apache::arrow::flatbuf::RecordBatch* deserialize_record_batch_message(const uint8_t* buf_ptr, size_t& current_offset); } } diff --git a/include/serialize_null_array.hpp b/include/serialize_null_array.hpp index ca27f08..0fd6913 100644 --- a/include/serialize_null_array.hpp +++ b/include/serialize_null_array.hpp @@ -58,14 +58,7 @@ namespace sparrow_ipc details::deserialize_schema_message(buf_ptr, current_offset, name, metadata); // II - Deserialize the RecordBatch message - uint32_t batch_meta_len = *(reinterpret_cast(buf_ptr + current_offset)); - current_offset += sizeof(uint32_t); - auto batch_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset); - if (batch_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::RecordBatch) - { - throw std::runtime_error("Expected RecordBatch message, but got a different type."); - } - auto record_batch = static_cast(batch_message->header()); + const auto* record_batch = details::deserialize_record_batch_message(buf_ptr, current_offset); // The body is empty, so we don't need to read any further. // Construct the null_array from the deserialized metadata. diff --git a/include/serialize_primitive_array.hpp b/include/serialize_primitive_array.hpp index 703188b..0fe0d83 100644 --- a/include/serialize_primitive_array.hpp +++ b/include/serialize_primitive_array.hpp @@ -73,13 +73,8 @@ namespace sparrow_ipc // II - Deserialize the RecordBatch message uint32_t batch_meta_len = *(reinterpret_cast(buf_ptr + current_offset)); - current_offset += sizeof(uint32_t); - auto batch_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset); - if (batch_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::RecordBatch) - { - throw std::runtime_error("Expected RecordBatch message, but got a different type."); - } - auto record_batch = static_cast(batch_message->header()); + const auto* record_batch = details::deserialize_record_batch_message(buf_ptr, current_offset); + current_offset += utils::align_to_8(batch_meta_len); const uint8_t* body_ptr = buf_ptr + current_offset; diff --git a/src/serialize.cpp b/src/serialize.cpp index 938f052..c799239 100644 --- a/src/serialize.cpp +++ b/src/serialize.cpp @@ -196,5 +196,17 @@ namespace sparrow_ipc } current_offset += schema_meta_len; } + + const org::apache::arrow::flatbuf::RecordBatch* deserialize_record_batch_message(const uint8_t* buf_ptr, size_t& current_offset) + { + current_offset += sizeof(uint32_t); + auto batch_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset); + if (batch_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::RecordBatch) + { + throw std::runtime_error("Expected RecordBatch message, but got a different type."); + } + return static_cast(batch_message->header()); + } + } // namespace details } // namespace sparrow-ipc From ce2885b0af3ce13678635c13286b68e91d449a4e Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Tue, 12 Aug 2025 17:34:35 +0200 Subject: [PATCH 09/14] Move includes around --- include/serialize.hpp | 5 +++++ include/serialize_null_array.hpp | 16 ---------------- include/serialize_primitive_array.hpp | 10 ---------- src/serialize.cpp | 6 ------ 4 files changed, 5 insertions(+), 32 deletions(-) diff --git a/include/serialize.hpp b/include/serialize.hpp index fec4e50..02f6734 100644 --- a/include/serialize.hpp +++ b/include/serialize.hpp @@ -1,10 +1,15 @@ #pragma once +#include #include +#include #include #include "sparrow.hpp" +#include "Message_generated.h" +#include "Schema_generated.h" + #include "config/config.hpp" namespace sparrow_ipc diff --git a/include/serialize_null_array.hpp b/include/serialize_null_array.hpp index 0fd6913..a9a5591 100644 --- a/include/serialize_null_array.hpp +++ b/include/serialize_null_array.hpp @@ -1,26 +1,10 @@ #pragma once -// TODO check needs of all these below -#include -#include -#include -#include -#include -#include - -#include "sparrow.hpp" - -// TODO check needs of these two -#include "Message_generated.h" -#include "Schema_generated.h" - #include "serialize.hpp" -#include "utils.hpp" namespace sparrow_ipc { // TODO move to cpp if not templated - // TODO add comments and review // This function serializes a sparrow::null_array into a byte vector compliant // with the Apache Arrow IPC Streaming Format. It mirrors the structure of diff --git a/include/serialize_primitive_array.hpp b/include/serialize_primitive_array.hpp index 0fe0d83..29fb34b 100644 --- a/include/serialize_primitive_array.hpp +++ b/include/serialize_primitive_array.hpp @@ -1,16 +1,6 @@ #pragma once -#include #include -#include -#include -#include -#include - -#include "sparrow.hpp" - -#include "Message_generated.h" -#include "Schema_generated.h" #include "serialize.hpp" #include "utils.hpp" diff --git a/src/serialize.cpp b/src/serialize.cpp index c799239..fdab0c9 100644 --- a/src/serialize.cpp +++ b/src/serialize.cpp @@ -1,11 +1,5 @@ -// TODO check need of all these includes -#include #include #include -#include - -#include "Message_generated.h" -#include "Schema_generated.h" #include "serialize.hpp" #include "utils.hpp" From 8238187bb63c712b5f6cdf6a709074df96fa2b1d Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Tue, 12 Aug 2025 17:43:32 +0200 Subject: [PATCH 10/14] Move defintions to serialize_null_array.cpp --- CMakeLists.txt | 1 + include/serialize_null_array.hpp | 47 ++------------------------------ src/serialize_null_array.cpp | 43 +++++++++++++++++++++++++++++ 3 files changed, 47 insertions(+), 44 deletions(-) create mode 100644 src/serialize_null_array.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 393c5d6..4cd4ac1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -57,6 +57,7 @@ set(SPARROW_IPC_HEADERS set(SPARROW_IPC_SRC ${SPARROW_IPC_SOURCE_DIR}/serialize.cpp + ${SPARROW_IPC_SOURCE_DIR}/serialize_null_array.cpp ${SPARROW_IPC_SOURCE_DIR}/utils.cpp ) diff --git a/include/serialize_null_array.hpp b/include/serialize_null_array.hpp index a9a5591..b2a6985 100644 --- a/include/serialize_null_array.hpp +++ b/include/serialize_null_array.hpp @@ -1,51 +1,10 @@ #pragma once +#include "config/config.hpp" #include "serialize.hpp" namespace sparrow_ipc { - // TODO move to cpp if not templated - - // This function serializes a sparrow::null_array into a byte vector compliant - // with the Apache Arrow IPC Streaming Format. It mirrors the structure of - // serialize_primitive_array but is optimized for null_array's properties. - // A null_array is represented by metadata only (Schema, RecordBatch) and has no data buffers, - // making its message body zero-length. - std::vector serialize_null_array(sparrow::null_array& arr) - { - auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr); - auto& arrow_arr = *arrow_arr_ptr; - auto& arrow_schema = *arrow_schema_ptr; - - std::vector final_buffer; - // I - Serialize the Schema message - details::serialize_schema_message(arrow_schema, arr.metadata(), final_buffer); - - // II - Serialize the RecordBatch message - details::serialize_record_batch_message(arrow_arr, {}, final_buffer); - - // Return the final buffer containing the complete IPC stream - return final_buffer; - } - - // This function deserializes a byte vector into a sparrow::null_array. - // It reads the Schema and RecordBatch messages to extract the array's length, - // name, and metadata, then constructs a null_array. - sparrow::null_array deserialize_null_array(const std::vector& buffer) - { - const uint8_t* buf_ptr = buffer.data(); - size_t current_offset = 0; - - // I - Deserialize the Schema message - std::optional name; - std::optional> metadata; - details::deserialize_schema_message(buf_ptr, current_offset, name, metadata); - - // II - Deserialize the RecordBatch message - const auto* record_batch = details::deserialize_record_batch_message(buf_ptr, current_offset); - - // The body is empty, so we don't need to read any further. - // Construct the null_array from the deserialized metadata. - return sparrow::null_array(record_batch->length(), name, metadata); - } + SPARROW_IPC_API std::vector serialize_null_array(sparrow::null_array& arr); + SPARROW_IPC_API sparrow::null_array deserialize_null_array(const std::vector& buffer); } diff --git a/src/serialize_null_array.cpp b/src/serialize_null_array.cpp new file mode 100644 index 0000000..69d9e27 --- /dev/null +++ b/src/serialize_null_array.cpp @@ -0,0 +1,43 @@ +#include "serialize_null_array.hpp" + +namespace sparrow_ipc +{ + // A null_array is represented by metadata only (Schema, RecordBatch) and has no data buffers, + // making its message body zero-length. + std::vector serialize_null_array(sparrow::null_array& arr) + { + auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr); + auto& arrow_arr = *arrow_arr_ptr; + auto& arrow_schema = *arrow_schema_ptr; + + std::vector final_buffer; + // I - Serialize the Schema message + details::serialize_schema_message(arrow_schema, arr.metadata(), final_buffer); + + // II - Serialize the RecordBatch message + details::serialize_record_batch_message(arrow_arr, {}, final_buffer); + + // Return the final buffer containing the complete IPC stream + return final_buffer; + } + + // This reads the Schema and RecordBatch messages to extract the array's length, + // name, and metadata, then constructs a null_array. + sparrow::null_array deserialize_null_array(const std::vector& buffer) + { + const uint8_t* buf_ptr = buffer.data(); + size_t current_offset = 0; + + // I - Deserialize the Schema message + std::optional name; + std::optional> metadata; + details::deserialize_schema_message(buf_ptr, current_offset, name, metadata); + + // II - Deserialize the RecordBatch message + const auto* record_batch = details::deserialize_record_batch_message(buf_ptr, current_offset); + + // The body is empty, so we don't need to read any further. + // Construct the null_array from the deserialized metadata. + return sparrow::null_array(record_batch->length(), name, metadata); + } +} From 54efe1542d3b30902f00cb04e35e3fd3181f4dc7 Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Tue, 19 Aug 2025 17:42:11 +0200 Subject: [PATCH 11/14] Do a const pass --- include/serialize_primitive_array.hpp | 24 ++-- include/utils.hpp | 2 +- src/serialize.cpp | 50 ++++---- src/serialize_null_array.cpp | 7 +- src/utils.cpp | 116 +++++++++---------- tests/test_null_array_serialization.cpp | 8 +- tests/test_primitive_array_serialization.cpp | 26 ++--- 7 files changed, 117 insertions(+), 116 deletions(-) diff --git a/include/serialize_primitive_array.hpp b/include/serialize_primitive_array.hpp index 29fb34b..b339b2a 100644 --- a/include/serialize_primitive_array.hpp +++ b/include/serialize_primitive_array.hpp @@ -28,9 +28,9 @@ namespace sparrow_ipc // - Correctly populating the Flatbuffer-defined metadata for both messages. // Get arrow structures - auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr); - auto& arrow_arr = *arrow_arr_ptr; - auto& arrow_schema = *arrow_schema_ptr; + const auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr); + const auto& arrow_arr = *arrow_arr_ptr; + const auto& arrow_schema = *arrow_schema_ptr; // This will be the final buffer holding the complete IPC stream. std::vector final_buffer; @@ -42,9 +42,9 @@ namespace sparrow_ipc // After the Schema, we send the RecordBatch containing the actual data // Calculate the size of the validity and data buffers - int64_t validity_size = (arrow_arr.length + 7) / 8; - int64_t data_size = arrow_arr.length * sizeof(T); - std::vector buffers_sizes = {validity_size, data_size}; + const int64_t validity_size = (arrow_arr.length + 7) / 8; + const int64_t data_size = arrow_arr.length * sizeof(T); + const std::vector buffers_sizes = {validity_size, data_size}; details::serialize_record_batch_message(arrow_arr, buffers_sizes, final_buffer); // Return the final buffer containing the complete IPC stream @@ -62,21 +62,21 @@ namespace sparrow_ipc details::deserialize_schema_message(buf_ptr, current_offset, name, metadata); // II - Deserialize the RecordBatch message - uint32_t batch_meta_len = *(reinterpret_cast(buf_ptr + current_offset)); + const uint32_t batch_meta_len = *(reinterpret_cast(buf_ptr + current_offset)); const auto* record_batch = details::deserialize_record_batch_message(buf_ptr, current_offset); current_offset += utils::align_to_8(batch_meta_len); const uint8_t* body_ptr = buf_ptr + current_offset; // Extract metadata from the RecordBatch - auto buffers_meta = record_batch->buffers(); - auto nodes_meta = record_batch->nodes(); - auto node_meta = nodes_meta->Get(0); + const auto buffers_meta = record_batch->buffers(); + const auto nodes_meta = record_batch->nodes(); + const auto node_meta = nodes_meta->Get(0); // The body contains the validity bitmap and the data buffer concatenated // We need to copy this data into memory owned by the new ArrowArray - int64_t validity_len = buffers_meta->Get(0)->length(); - int64_t data_len = buffers_meta->Get(1)->length(); + const int64_t validity_len = buffers_meta->Get(0)->length(); + const int64_t data_len = buffers_meta->Get(1)->length(); uint8_t* validity_buffer_copy = new uint8_t[validity_len]; memcpy(validity_buffer_copy, body_ptr + buffers_meta->Get(0)->offset(), validity_len); diff --git a/include/utils.hpp b/include/utils.hpp index 78cb64b..60eae81 100644 --- a/include/utils.hpp +++ b/include/utils.hpp @@ -14,7 +14,7 @@ namespace sparrow_ipc namespace utils { // Aligns a value to the next multiple of 8, as required by the Arrow IPC format for message bodies - SPARROW_IPC_API int64_t align_to_8(int64_t n); + SPARROW_IPC_API int64_t align_to_8(const int64_t n); // Creates a Flatbuffers type from a format string // This function maps a sparrow data type to the corresponding Flatbuffers type diff --git a/src/serialize.cpp b/src/serialize.cpp index fdab0c9..a7ad415 100644 --- a/src/serialize.cpp +++ b/src/serialize.cpp @@ -20,7 +20,7 @@ namespace sparrow_ipc } // Determine the Flatbuffer type information from the C schema's format string - auto [type_enum, type_offset] = utils::get_flatbuffer_type(schema_builder, arrow_schema.format); + const auto [type_enum, type_offset] = utils::get_flatbuffer_type(schema_builder, arrow_schema.format); // Handle metadata flatbuffers::Offset>> @@ -28,14 +28,14 @@ namespace sparrow_ipc if (metadata) { - sparrow::key_value_view metadata_view = metadata.value(); + const sparrow::key_value_view metadata_view = metadata.value(); std::vector> kv_offsets; kv_offsets.reserve(metadata_view.size()); auto mv_it = metadata_view.cbegin(); for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it) { - auto key_offset = schema_builder.CreateString(std::string((*mv_it).first)); - auto value_offset = schema_builder.CreateString(std::string((*mv_it).second)); + const auto key_offset = schema_builder.CreateString(std::string((*mv_it).first)); + const auto value_offset = schema_builder.CreateString(std::string((*mv_it).second)); kv_offsets.push_back( org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset)); } @@ -43,7 +43,7 @@ namespace sparrow_ipc } // Build the Field object - auto fb_field = org::apache::arrow::flatbuf::CreateField( + const auto fb_field = org::apache::arrow::flatbuf::CreateField( schema_builder, fb_name_offset, (arrow_schema.flags & static_cast(sparrow::ArrowFlag::NULLABLE)) != 0, @@ -54,14 +54,14 @@ namespace sparrow_ipc fb_metadata_offset); // A Schema contains a vector of fields - std::vector> fields_vec = {fb_field}; - auto fb_fields = schema_builder.CreateVector(fields_vec); + const std::vector> fields_vec = {fb_field}; + const auto fb_fields = schema_builder.CreateVector(fields_vec); // Build the Schema object from the vector of fields - auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields); + const auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields); // Wrap the Schema in a top-level Message, which is the standard IPC envelope - auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage( + const auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage( schema_builder, org::apache::arrow::flatbuf::MetadataVersion::V5, org::apache::arrow::flatbuf::MessageHeader::Schema, @@ -71,7 +71,7 @@ namespace sparrow_ipc schema_builder.Finish(schema_message_offset); // Assemble the Schema message bytes - uint32_t schema_len = schema_builder.GetSize(); // Get the size of the serialized metadata + const uint32_t schema_len = schema_builder.GetSize(); // Get the size of the serialized metadata final_buffer.resize(sizeof(uint32_t) + schema_len); // Resize the buffer to hold the message // Copy the metadata into the buffer, after the 4-byte length prefix memcpy(final_buffer.data() + sizeof(uint32_t), schema_builder.GetBufferPointer(), schema_len); @@ -95,16 +95,16 @@ namespace sparrow_ipc body_len = current_offset; // Create the FieldNode, which describes the layout of the array data - org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count); + const org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count); // A RecordBatch contains a vector of nodes and a vector of buffers - auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1); - auto fb_buffers_vector = batch_builder.CreateVectorOfStructs(buffers_vec); + const auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1); + const auto fb_buffers_vector = batch_builder.CreateVectorOfStructs(buffers_vec); // Build the RecordBatch metadata object - auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(batch_builder, arrow_arr.length, fb_nodes_vector, fb_buffers_vector); + const auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(batch_builder, arrow_arr.length, fb_nodes_vector, fb_buffers_vector); // Wrap the RecordBatch in a top-level Message - auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage( + const auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage( batch_builder, org::apache::arrow::flatbuf::MetadataVersion::V5, org::apache::arrow::flatbuf::MessageHeader::RecordBatch, @@ -114,10 +114,10 @@ namespace sparrow_ipc batch_builder.Finish(batch_message_offset); // Append the RecordBatch message to the final buffer - uint32_t batch_meta_len = batch_builder.GetSize(); // Get the size of the batch metadata - int64_t aligned_batch_meta_len = utils::align_to_8(batch_meta_len); // Calculate the padded length + const uint32_t batch_meta_len = batch_builder.GetSize(); // Get the size of the batch metadata + const int64_t aligned_batch_meta_len = utils::align_to_8(batch_meta_len); // Calculate the padded length - size_t current_size = final_buffer.size(); // Get the current size (which is the end of the Schema message) + const size_t current_size = final_buffer.size(); // Get the current size (which is the end of the Schema message) // Resize the buffer to append the new message final_buffer.resize(current_size + sizeof(uint32_t) + aligned_batch_meta_len + body_len); uint8_t* dst = final_buffer.data() + current_size; // Get a pointer to where the new message will start @@ -155,21 +155,21 @@ namespace sparrow_ipc void deserialize_schema_message(const uint8_t* buf_ptr, size_t& current_offset, std::optional& name, std::optional>& metadata) { - uint32_t schema_meta_len = *(reinterpret_cast(buf_ptr + current_offset)); + const uint32_t schema_meta_len = *(reinterpret_cast(buf_ptr + current_offset)); current_offset += sizeof(uint32_t); - auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset); + const auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset); if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema) { throw std::runtime_error("Expected Schema message at the start of the buffer."); } - auto flatbuffer_schema = static_cast(schema_message->header()); - auto fields = flatbuffer_schema->fields(); + const auto flatbuffer_schema = static_cast(schema_message->header()); + const auto fields = flatbuffer_schema->fields(); if (fields->size() != 1) { throw std::runtime_error("Expected schema with exactly one field."); } - auto field = fields->Get(0); + const auto field = fields->Get(0); // Get name if (const auto fb_name = field->name()) @@ -178,7 +178,7 @@ namespace sparrow_ipc } // Handle metadata - auto fb_metadata = field->custom_metadata(); + const auto fb_metadata = field->custom_metadata(); if (fb_metadata && !fb_metadata->empty()) { metadata = std::vector(); @@ -194,7 +194,7 @@ namespace sparrow_ipc const org::apache::arrow::flatbuf::RecordBatch* deserialize_record_batch_message(const uint8_t* buf_ptr, size_t& current_offset) { current_offset += sizeof(uint32_t); - auto batch_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset); + const auto batch_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset); if (batch_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::RecordBatch) { throw std::runtime_error("Expected RecordBatch message, but got a different type."); diff --git a/src/serialize_null_array.cpp b/src/serialize_null_array.cpp index 69d9e27..dc57608 100644 --- a/src/serialize_null_array.cpp +++ b/src/serialize_null_array.cpp @@ -6,9 +6,10 @@ namespace sparrow_ipc // making its message body zero-length. std::vector serialize_null_array(sparrow::null_array& arr) { - auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr); - auto& arrow_arr = *arrow_arr_ptr; - auto& arrow_schema = *arrow_schema_ptr; + // TODO Use `arr` as const after fixing the issue upstream in sparrow::get_arrow_structures + const auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr); + const auto& arrow_arr = *arrow_arr_ptr; + const auto& arrow_schema = *arrow_schema_ptr; std::vector final_buffer; // I - Serialize the Schema message diff --git a/src/utils.cpp b/src/utils.cpp index 54275a0..a538700 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -15,7 +15,7 @@ namespace sparrow_ipc std::optional parse_format(std::string_view format_str, std::string_view sep) { // Find the position of the delimiter - auto sep_pos = format_str.find(sep); + const auto sep_pos = format_str.find(sep); if (sep_pos == std::string_view::npos) { return std::nullopt; @@ -24,7 +24,7 @@ namespace sparrow_ipc std::string_view substr_str(format_str.data() + sep_pos + 1, format_str.size() - sep_pos - 1); int32_t substr_size = 0; - auto [ptr, ec] = std::from_chars(substr_str.data(), substr_str.data() + substr_str.size(), substr_size); + const auto [ptr, ec] = std::from_chars(substr_str.data(), substr_str.data() + substr_str.size(), substr_size); if (ec != std::errc() || ptr != substr_str.data() + substr_str.size()) { @@ -36,29 +36,29 @@ namespace sparrow_ipc // Creates a Flatbuffers Decimal type from a format string // The format string is expected to be in the format "d:precision,scale" std::pair> - get_flatbuffer_decimal_type(flatbuffers::FlatBufferBuilder& builder, std::string_view format_str, int32_t bitWidth) + get_flatbuffer_decimal_type(flatbuffers::FlatBufferBuilder& builder, std::string_view format_str, const int32_t bitWidth) { // Decimal requires precision and scale. We need to parse the format_str. // Format: "d:precision,scale" - auto scale = parse_format(format_str, ","); + const auto scale = parse_format(format_str, ","); if (!scale.has_value()) { throw std::runtime_error("Failed to parse Decimal " + std::to_string(bitWidth) + " scale from format string: " + std::string(format_str)); } - size_t comma_pos = format_str.find(','); - auto precision = parse_format(format_str.substr(0, comma_pos), ":"); + const size_t comma_pos = format_str.find(','); + const auto precision = parse_format(format_str.substr(0, comma_pos), ":"); if (!precision.has_value()) { throw std::runtime_error("Failed to parse Decimal " + std::to_string(bitWidth) + " precision from format string: " + std::string(format_str)); } - auto decimal_type = org::apache::arrow::flatbuf::CreateDecimal(builder, precision.value(), scale.value(), bitWidth); + const auto decimal_type = org::apache::arrow::flatbuf::CreateDecimal(builder, precision.value(), scale.value(), bitWidth); return {org::apache::arrow::flatbuf::Type::Decimal, decimal_type.Union()}; } } namespace utils { - int64_t align_to_8(int64_t n) + int64_t align_to_8(const int64_t n) { return (n + 7) & -8; } @@ -66,248 +66,248 @@ namespace sparrow_ipc std::pair> get_flatbuffer_type(flatbuffers::FlatBufferBuilder& builder, std::string_view format_str) { - auto type = sparrow::format_to_data_type(format_str); + const auto type = sparrow::format_to_data_type(format_str); switch (type) { case sparrow::data_type::NA: { - auto null_type = org::apache::arrow::flatbuf::CreateNull(builder); + const auto null_type = org::apache::arrow::flatbuf::CreateNull(builder); return {org::apache::arrow::flatbuf::Type::Null, null_type.Union()}; } case sparrow::data_type::BOOL: { - auto bool_type = org::apache::arrow::flatbuf::CreateBool(builder); + const auto bool_type = org::apache::arrow::flatbuf::CreateBool(builder); return {org::apache::arrow::flatbuf::Type::Bool, bool_type.Union()}; } case sparrow::data_type::UINT8: { - auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 8, false); + const auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 8, false); return {org::apache::arrow::flatbuf::Type::Int, int_type.Union()}; } case sparrow::data_type::INT8: { - auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 8, true); + const auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 8, true); return {org::apache::arrow::flatbuf::Type::Int, int_type.Union()}; } case sparrow::data_type::UINT16: { - auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 16, false); + const auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 16, false); return {org::apache::arrow::flatbuf::Type::Int, int_type.Union()}; } case sparrow::data_type::INT16: { - auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 16, true); + const auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 16, true); return {org::apache::arrow::flatbuf::Type::Int, int_type.Union()}; } case sparrow::data_type::UINT32: { - auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 32, false); + const auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 32, false); return {org::apache::arrow::flatbuf::Type::Int, int_type.Union()}; } case sparrow::data_type::INT32: { - auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 32, true); + const auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 32, true); return {org::apache::arrow::flatbuf::Type::Int, int_type.Union()}; } case sparrow::data_type::UINT64: { - auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 64, false); + const auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 64, false); return {org::apache::arrow::flatbuf::Type::Int, int_type.Union()}; } case sparrow::data_type::INT64: { - auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 64, true); + const auto int_type = org::apache::arrow::flatbuf::CreateInt(builder, 64, true); return {org::apache::arrow::flatbuf::Type::Int, int_type.Union()}; } case sparrow::data_type::HALF_FLOAT: { - auto fp_type = org::apache::arrow::flatbuf::CreateFloatingPoint( + const auto fp_type = org::apache::arrow::flatbuf::CreateFloatingPoint( builder, org::apache::arrow::flatbuf::Precision::HALF); return {org::apache::arrow::flatbuf::Type::FloatingPoint, fp_type.Union()}; } case sparrow::data_type::FLOAT: { - auto fp_type = org::apache::arrow::flatbuf::CreateFloatingPoint( + const auto fp_type = org::apache::arrow::flatbuf::CreateFloatingPoint( builder, org::apache::arrow::flatbuf::Precision::SINGLE); return {org::apache::arrow::flatbuf::Type::FloatingPoint, fp_type.Union()}; } case sparrow::data_type::DOUBLE: { - auto fp_type = org::apache::arrow::flatbuf::CreateFloatingPoint( + const auto fp_type = org::apache::arrow::flatbuf::CreateFloatingPoint( builder, org::apache::arrow::flatbuf::Precision::DOUBLE); return {org::apache::arrow::flatbuf::Type::FloatingPoint, fp_type.Union()}; } case sparrow::data_type::STRING: { - auto string_type = org::apache::arrow::flatbuf::CreateUtf8(builder); + const auto string_type = org::apache::arrow::flatbuf::CreateUtf8(builder); return {org::apache::arrow::flatbuf::Type::Utf8, string_type.Union()}; } case sparrow::data_type::LARGE_STRING: { - auto large_string_type = org::apache::arrow::flatbuf::CreateLargeUtf8(builder); + const auto large_string_type = org::apache::arrow::flatbuf::CreateLargeUtf8(builder); return {org::apache::arrow::flatbuf::Type::LargeUtf8, large_string_type.Union()}; } case sparrow::data_type::BINARY: { - auto binary_type = org::apache::arrow::flatbuf::CreateBinary(builder); + const auto binary_type = org::apache::arrow::flatbuf::CreateBinary(builder); return {org::apache::arrow::flatbuf::Type::Binary, binary_type.Union()}; } case sparrow::data_type::LARGE_BINARY: { - auto large_binary_type = org::apache::arrow::flatbuf::CreateLargeBinary(builder); + const auto large_binary_type = org::apache::arrow::flatbuf::CreateLargeBinary(builder); return {org::apache::arrow::flatbuf::Type::LargeBinary, large_binary_type.Union()}; } case sparrow::data_type::STRING_VIEW: { - auto string_view_type = org::apache::arrow::flatbuf::CreateUtf8View(builder); + const auto string_view_type = org::apache::arrow::flatbuf::CreateUtf8View(builder); return {org::apache::arrow::flatbuf::Type::Utf8View, string_view_type.Union()}; } case sparrow::data_type::BINARY_VIEW: { - auto binary_view_type = org::apache::arrow::flatbuf::CreateBinaryView(builder); + const auto binary_view_type = org::apache::arrow::flatbuf::CreateBinaryView(builder); return {org::apache::arrow::flatbuf::Type::BinaryView, binary_view_type.Union()}; } case sparrow::data_type::DATE_DAYS: { - auto date_type = org::apache::arrow::flatbuf::CreateDate(builder, org::apache::arrow::flatbuf::DateUnit::DAY); + const auto date_type = org::apache::arrow::flatbuf::CreateDate(builder, org::apache::arrow::flatbuf::DateUnit::DAY); return {org::apache::arrow::flatbuf::Type::Date, date_type.Union()}; } case sparrow::data_type::DATE_MILLISECONDS: { - auto date_type = org::apache::arrow::flatbuf::CreateDate(builder, org::apache::arrow::flatbuf::DateUnit::MILLISECOND); + const auto date_type = org::apache::arrow::flatbuf::CreateDate(builder, org::apache::arrow::flatbuf::DateUnit::MILLISECOND); return {org::apache::arrow::flatbuf::Type::Date, date_type.Union()}; } case sparrow::data_type::TIMESTAMP_SECONDS: { - auto timestamp_type = org::apache::arrow::flatbuf::CreateTimestamp(builder, org::apache::arrow::flatbuf::TimeUnit::SECOND); + const auto timestamp_type = org::apache::arrow::flatbuf::CreateTimestamp(builder, org::apache::arrow::flatbuf::TimeUnit::SECOND); return {org::apache::arrow::flatbuf::Type::Timestamp, timestamp_type.Union()}; } case sparrow::data_type::TIMESTAMP_MILLISECONDS: { - auto timestamp_type = org::apache::arrow::flatbuf::CreateTimestamp(builder, org::apache::arrow::flatbuf::TimeUnit::MILLISECOND); + const auto timestamp_type = org::apache::arrow::flatbuf::CreateTimestamp(builder, org::apache::arrow::flatbuf::TimeUnit::MILLISECOND); return {org::apache::arrow::flatbuf::Type::Timestamp, timestamp_type.Union()}; } case sparrow::data_type::TIMESTAMP_MICROSECONDS: { - auto timestamp_type = org::apache::arrow::flatbuf::CreateTimestamp(builder, org::apache::arrow::flatbuf::TimeUnit::MICROSECOND); + const auto timestamp_type = org::apache::arrow::flatbuf::CreateTimestamp(builder, org::apache::arrow::flatbuf::TimeUnit::MICROSECOND); return {org::apache::arrow::flatbuf::Type::Timestamp, timestamp_type.Union()}; } case sparrow::data_type::TIMESTAMP_NANOSECONDS: { - auto timestamp_type = org::apache::arrow::flatbuf::CreateTimestamp(builder, org::apache::arrow::flatbuf::TimeUnit::NANOSECOND); + const auto timestamp_type = org::apache::arrow::flatbuf::CreateTimestamp(builder, org::apache::arrow::flatbuf::TimeUnit::NANOSECOND); return {org::apache::arrow::flatbuf::Type::Timestamp, timestamp_type.Union()}; } case sparrow::data_type::DURATION_SECONDS: { - auto duration_type = org::apache::arrow::flatbuf::CreateDuration(builder, org::apache::arrow::flatbuf::TimeUnit::SECOND); + const auto duration_type = org::apache::arrow::flatbuf::CreateDuration(builder, org::apache::arrow::flatbuf::TimeUnit::SECOND); return {org::apache::arrow::flatbuf::Type::Duration, duration_type.Union()}; } case sparrow::data_type::DURATION_MILLISECONDS: { - auto duration_type = org::apache::arrow::flatbuf::CreateDuration(builder, org::apache::arrow::flatbuf::TimeUnit::MILLISECOND); + const auto duration_type = org::apache::arrow::flatbuf::CreateDuration(builder, org::apache::arrow::flatbuf::TimeUnit::MILLISECOND); return {org::apache::arrow::flatbuf::Type::Duration, duration_type.Union()}; } case sparrow::data_type::DURATION_MICROSECONDS: { - auto duration_type = org::apache::arrow::flatbuf::CreateDuration(builder, org::apache::arrow::flatbuf::TimeUnit::MICROSECOND); + const auto duration_type = org::apache::arrow::flatbuf::CreateDuration(builder, org::apache::arrow::flatbuf::TimeUnit::MICROSECOND); return {org::apache::arrow::flatbuf::Type::Duration, duration_type.Union()}; } case sparrow::data_type::DURATION_NANOSECONDS: { - auto duration_type = org::apache::arrow::flatbuf::CreateDuration(builder, org::apache::arrow::flatbuf::TimeUnit::NANOSECOND); + const auto duration_type = org::apache::arrow::flatbuf::CreateDuration(builder, org::apache::arrow::flatbuf::TimeUnit::NANOSECOND); return {org::apache::arrow::flatbuf::Type::Duration, duration_type.Union()}; } case sparrow::data_type::INTERVAL_MONTHS: { - auto interval_type = org::apache::arrow::flatbuf::CreateInterval(builder, org::apache::arrow::flatbuf::IntervalUnit::YEAR_MONTH); + const auto interval_type = org::apache::arrow::flatbuf::CreateInterval(builder, org::apache::arrow::flatbuf::IntervalUnit::YEAR_MONTH); return {org::apache::arrow::flatbuf::Type::Interval, interval_type.Union()}; } case sparrow::data_type::INTERVAL_DAYS_TIME: { - auto interval_type = org::apache::arrow::flatbuf::CreateInterval(builder, org::apache::arrow::flatbuf::IntervalUnit::DAY_TIME); + const auto interval_type = org::apache::arrow::flatbuf::CreateInterval(builder, org::apache::arrow::flatbuf::IntervalUnit::DAY_TIME); return {org::apache::arrow::flatbuf::Type::Interval, interval_type.Union()}; } case sparrow::data_type::INTERVAL_MONTHS_DAYS_NANOSECONDS: { - auto interval_type = org::apache::arrow::flatbuf::CreateInterval(builder, org::apache::arrow::flatbuf::IntervalUnit::MONTH_DAY_NANO); + const auto interval_type = org::apache::arrow::flatbuf::CreateInterval(builder, org::apache::arrow::flatbuf::IntervalUnit::MONTH_DAY_NANO); return {org::apache::arrow::flatbuf::Type::Interval, interval_type.Union()}; } case sparrow::data_type::TIME_SECONDS: { - auto time_type = org::apache::arrow::flatbuf::CreateTime(builder, org::apache::arrow::flatbuf::TimeUnit::SECOND, 32); + const auto time_type = org::apache::arrow::flatbuf::CreateTime(builder, org::apache::arrow::flatbuf::TimeUnit::SECOND, 32); return {org::apache::arrow::flatbuf::Type::Time, time_type.Union()}; } case sparrow::data_type::TIME_MILLISECONDS: { - auto time_type = org::apache::arrow::flatbuf::CreateTime(builder, org::apache::arrow::flatbuf::TimeUnit::MILLISECOND, 32); + const auto time_type = org::apache::arrow::flatbuf::CreateTime(builder, org::apache::arrow::flatbuf::TimeUnit::MILLISECOND, 32); return {org::apache::arrow::flatbuf::Type::Time, time_type.Union()}; } case sparrow::data_type::TIME_MICROSECONDS: { - auto time_type = org::apache::arrow::flatbuf::CreateTime(builder, org::apache::arrow::flatbuf::TimeUnit::MICROSECOND, 64); + const auto time_type = org::apache::arrow::flatbuf::CreateTime(builder, org::apache::arrow::flatbuf::TimeUnit::MICROSECOND, 64); return {org::apache::arrow::flatbuf::Type::Time, time_type.Union()}; } case sparrow::data_type::TIME_NANOSECONDS: { - auto time_type = org::apache::arrow::flatbuf::CreateTime(builder, org::apache::arrow::flatbuf::TimeUnit::NANOSECOND, 64); + const auto time_type = org::apache::arrow::flatbuf::CreateTime(builder, org::apache::arrow::flatbuf::TimeUnit::NANOSECOND, 64); return {org::apache::arrow::flatbuf::Type::Time, time_type.Union()}; } case sparrow::data_type::LIST: { - auto list_type = org::apache::arrow::flatbuf::CreateList(builder); + const auto list_type = org::apache::arrow::flatbuf::CreateList(builder); return {org::apache::arrow::flatbuf::Type::List, list_type.Union()}; } case sparrow::data_type::LARGE_LIST: { - auto large_list_type = org::apache::arrow::flatbuf::CreateLargeList(builder); + const auto large_list_type = org::apache::arrow::flatbuf::CreateLargeList(builder); return {org::apache::arrow::flatbuf::Type::LargeList, large_list_type.Union()}; } case sparrow::data_type::LIST_VIEW: { - auto list_view_type = org::apache::arrow::flatbuf::CreateListView(builder); + const auto list_view_type = org::apache::arrow::flatbuf::CreateListView(builder); return {org::apache::arrow::flatbuf::Type::ListView, list_view_type.Union()}; } case sparrow::data_type::LARGE_LIST_VIEW: { - auto large_list_view_type = org::apache::arrow::flatbuf::CreateLargeListView(builder); + const auto large_list_view_type = org::apache::arrow::flatbuf::CreateLargeListView(builder); return {org::apache::arrow::flatbuf::Type::LargeListView, large_list_view_type.Union()}; } case sparrow::data_type::FIXED_SIZED_LIST: { // FixedSizeList requires listSize. We need to parse the format_str. // Format: "+w:size" - auto list_size = parse_format(format_str, ":"); + const auto list_size = parse_format(format_str, ":"); if (!list_size.has_value()) { throw std::runtime_error("Failed to parse FixedSizeList size from format string: " + std::string(format_str)); } - auto fixed_size_list_type = org::apache::arrow::flatbuf::CreateFixedSizeList(builder, list_size.value()); + const auto fixed_size_list_type = org::apache::arrow::flatbuf::CreateFixedSizeList(builder, list_size.value()); return {org::apache::arrow::flatbuf::Type::FixedSizeList, fixed_size_list_type.Union()}; } case sparrow::data_type::STRUCT: { - auto struct_type = org::apache::arrow::flatbuf::CreateStruct_(builder); + const auto struct_type = org::apache::arrow::flatbuf::CreateStruct_(builder); return {org::apache::arrow::flatbuf::Type::Struct_, struct_type.Union()}; } case sparrow::data_type::MAP: { - auto map_type = org::apache::arrow::flatbuf::CreateMap(builder, false); // not sorted keys + const auto map_type = org::apache::arrow::flatbuf::CreateMap(builder, false); // not sorted keys return {org::apache::arrow::flatbuf::Type::Map, map_type.Union()}; } case sparrow::data_type::DENSE_UNION: { - auto union_type = org::apache::arrow::flatbuf::CreateUnion(builder, org::apache::arrow::flatbuf::UnionMode::Dense, 0); + const auto union_type = org::apache::arrow::flatbuf::CreateUnion(builder, org::apache::arrow::flatbuf::UnionMode::Dense, 0); return {org::apache::arrow::flatbuf::Type::Union, union_type.Union()}; } case sparrow::data_type::SPARSE_UNION: { - auto union_type = org::apache::arrow::flatbuf::CreateUnion(builder, org::apache::arrow::flatbuf::UnionMode::Sparse, 0); + const auto union_type = org::apache::arrow::flatbuf::CreateUnion(builder, org::apache::arrow::flatbuf::UnionMode::Sparse, 0); return {org::apache::arrow::flatbuf::Type::Union, union_type.Union()}; } case sparrow::data_type::RUN_ENCODED: { - auto run_end_encoded_type = org::apache::arrow::flatbuf::CreateRunEndEncoded(builder); + const auto run_end_encoded_type = org::apache::arrow::flatbuf::CreateRunEndEncoded(builder); return {org::apache::arrow::flatbuf::Type::RunEndEncoded, run_end_encoded_type.Union()}; } case sparrow::data_type::DECIMAL32: @@ -330,13 +330,13 @@ namespace sparrow_ipc { // FixedSizeBinary requires byteWidth. We need to parse the format_str. // Format: "w:size" - auto byte_width = parse_format(format_str, ":"); + const auto byte_width = parse_format(format_str, ":"); if (!byte_width.has_value()) { throw std::runtime_error("Failed to parse FixedWidthBinary size from format string: " + std::string(format_str)); } - auto fixed_width_binary_type = org::apache::arrow::flatbuf::CreateFixedSizeBinary(builder, byte_width.value()); + const auto fixed_width_binary_type = org::apache::arrow::flatbuf::CreateFixedSizeBinary(builder, byte_width.value()); return {org::apache::arrow::flatbuf::Type::FixedSizeBinary, fixed_width_binary_type.Union()}; } default: diff --git a/tests/test_null_array_serialization.cpp b/tests/test_null_array_serialization.cpp index d1b809c..d3b06f0 100644 --- a/tests/test_null_array_serialization.cpp +++ b/tests/test_null_array_serialization.cpp @@ -19,8 +19,8 @@ namespace sparrow_ipc sp::null_array arr(size, name, metadata); - auto buffer = serialize_null_array(arr); - auto deserialized_arr = deserialize_null_array(buffer); + const auto buffer = serialize_null_array(arr); + const auto deserialized_arr = deserialize_null_array(buffer); CHECK_EQ(deserialized_arr.size(), arr.size()); REQUIRE(deserialized_arr.name().has_value()); @@ -43,8 +43,8 @@ namespace sparrow_ipc { const std::size_t size = 100; sp::null_array arr(size); - auto buffer = serialize_null_array(arr); - auto deserialized_arr = deserialize_null_array(buffer); + const auto buffer = serialize_null_array(arr); + const auto deserialized_arr = deserialize_null_array(buffer); CHECK_EQ(deserialized_arr.size(), arr.size()); CHECK_FALSE(deserialized_arr.name().has_value()); CHECK_FALSE(deserialized_arr.metadata().has_value()); diff --git a/tests/test_primitive_array_serialization.cpp b/tests/test_primitive_array_serialization.cpp index bc660ca..463e40f 100644 --- a/tests/test_primitive_array_serialization.cpp +++ b/tests/test_primitive_array_serialization.cpp @@ -84,8 +84,8 @@ namespace sparrow_ipc { CHECK_NE(lhs.buffers[i], rhs.buffers[i]); } - auto lhs_buffers = reinterpret_cast(lhs.buffers); - auto rhs_buffers = reinterpret_cast(rhs.buffers); + const auto lhs_buffers = reinterpret_cast(lhs.buffers); + const auto rhs_buffers = reinterpret_cast(rhs.buffers); for (size_t i = 0; i < static_cast(lhs.length); ++i) { @@ -94,7 +94,7 @@ namespace sparrow_ipc } template - void compare_values(sp::primitive_array& pa1, sp::primitive_array& pa2) + void compare_values(const sp::primitive_array& pa1, const sp::primitive_array& pa2) { CHECK_EQ(pa1.size(), pa1.size()); for (size_t i = 0; i < pa1.size(); ++i) @@ -104,7 +104,7 @@ namespace sparrow_ipc } template - void compare_bitmap(sp::primitive_array& pa1, sp::primitive_array& pa2) + void compare_bitmap(const sp::primitive_array& pa1, const sp::primitive_array& pa2) { const auto pa1_bitmap = pa1.bitmap(); const auto pa2_bitmap = pa2.bitmap(); @@ -123,8 +123,8 @@ namespace sparrow_ipc template void compare_primitive_arrays(sp::primitive_array& ar, sp::primitive_array& deserialized_ar) { - auto [arrow_array_ar, arrow_schema_ar] = sp::get_arrow_structures(ar); - auto [arrow_array_deserialized_ar, arrow_schema_deserialized_ar] = sp::get_arrow_structures(deserialized_ar); + const auto [arrow_array_ar, arrow_schema_ar] = sp::get_arrow_structures(ar); + const auto [arrow_array_deserialized_ar, arrow_schema_deserialized_ar] = sp::get_arrow_structures(deserialized_ar); // Check ArrowSchema equality REQUIRE_NE(arrow_schema_ar, nullptr); @@ -164,7 +164,7 @@ namespace sparrow_ipc sp::primitive_array ar = create_primitive_array(); - std::vector serialized_data = serialize_primitive_array(ar); + const std::vector serialized_data = serialize_primitive_array(ar); CHECK(serialized_data.size() > 0); @@ -178,7 +178,7 @@ namespace sparrow_ipc TEST_CASE("Serialize and Deserialize primitive_array - int with nulls") { // Data buffer - sp::u8_buffer data_buffer = {100, 200, 300, 400, 500}; + const sp::u8_buffer data_buffer = {100, 200, 300, 400, 500}; // Validity bitmap: 100 (valid), 200 (valid), 300 (null), 400 (valid), 500 (null) sp::validity_bitmap validity(5, true); // All valid initially @@ -187,7 +187,7 @@ namespace sparrow_ipc sp::primitive_array ar(std::move(data_buffer), std::move(validity)); - std::vector serialized_data = serialize_primitive_array(ar); + const std::vector serialized_data = serialize_primitive_array(ar); CHECK(serialized_data.size() > 0); @@ -199,13 +199,13 @@ namespace sparrow_ipc TEST_CASE("Serialize and Deserialize primitive_array - with name and metadata") { // Data buffer - sp::u8_buffer data_buffer = {1, 2, 3}; + const sp::u8_buffer data_buffer = {1, 2, 3}; // Validity bitmap: All valid - sp::validity_bitmap validity(3, true); + const sp::validity_bitmap validity(3, true); // Custom metadata - std::vector metadata = { + const std::vector metadata = { {"key1", "value1"}, {"key2", "value2"} }; @@ -217,7 +217,7 @@ namespace sparrow_ipc std::make_optional(std::vector{{"key1", "value1"}, {"key2", "value2"}}) ); - std::vector serialized_data = serialize_primitive_array(ar); + const std::vector serialized_data = serialize_primitive_array(ar); CHECK(serialized_data.size() > 0); From 57b11133492a54b43e43820dab5528bb039dcf1a Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Wed, 20 Aug 2025 10:13:34 +0200 Subject: [PATCH 12/14] Use range loop --- src/serialize.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/serialize.cpp b/src/serialize.cpp index a7ad415..01c2715 100644 --- a/src/serialize.cpp +++ b/src/serialize.cpp @@ -31,11 +31,10 @@ namespace sparrow_ipc const sparrow::key_value_view metadata_view = metadata.value(); std::vector> kv_offsets; kv_offsets.reserve(metadata_view.size()); - auto mv_it = metadata_view.cbegin(); - for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it) + for (const auto& [key, value] : metadata_view) { - const auto key_offset = schema_builder.CreateString(std::string((*mv_it).first)); - const auto value_offset = schema_builder.CreateString(std::string((*mv_it).second)); + const auto key_offset = schema_builder.CreateString(std::string(key)); + const auto value_offset = schema_builder.CreateString(std::string(value)); kv_offsets.push_back( org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset)); } From cfaee8f292e583ccd7a0bd0d3d4e3cac797dda60 Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Wed, 20 Aug 2025 11:25:14 +0200 Subject: [PATCH 13/14] Return final buffer instead of arg Add/move comments --- include/serialize.hpp | 2 +- include/serialize_null_array.hpp | 1 + include/serialize_primitive_array.hpp | 6 ++---- src/serialize.cpp | 5 ++++- src/serialize_null_array.cpp | 4 +--- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/include/serialize.hpp b/include/serialize.hpp index 02f6734..c4ac16e 100644 --- a/include/serialize.hpp +++ b/include/serialize.hpp @@ -16,7 +16,7 @@ namespace sparrow_ipc { namespace details { - SPARROW_IPC_API void serialize_schema_message(const ArrowSchema& arrow_schema, const std::optional& metadata, std::vector& final_buffer); + SPARROW_IPC_API std::vector serialize_schema_message(const ArrowSchema& arrow_schema, const std::optional& metadata); SPARROW_IPC_API void serialize_record_batch_message(const ArrowArray& arrow_arr, const std::vector& buffers_sizes, std::vector& final_buffer); SPARROW_IPC_API void deserialize_schema_message(const uint8_t* buf_ptr, size_t& current_offset, std::optional& name, std::optional>& metadata); diff --git a/include/serialize_null_array.hpp b/include/serialize_null_array.hpp index b2a6985..269184a 100644 --- a/include/serialize_null_array.hpp +++ b/include/serialize_null_array.hpp @@ -5,6 +5,7 @@ namespace sparrow_ipc { + // TODO Use `arr` as const after fixing the issue upstream in sparrow::get_arrow_structures SPARROW_IPC_API std::vector serialize_null_array(sparrow::null_array& arr); SPARROW_IPC_API sparrow::null_array deserialize_null_array(const std::vector& buffer); } diff --git a/include/serialize_primitive_array.hpp b/include/serialize_primitive_array.hpp index b339b2a..13f6606 100644 --- a/include/serialize_primitive_array.hpp +++ b/include/serialize_primitive_array.hpp @@ -7,6 +7,7 @@ namespace sparrow_ipc { + // TODO Use `arr` as const after fixing the issue upstream in sparrow::get_arrow_structures template std::vector serialize_primitive_array(sparrow::primitive_array& arr); @@ -32,11 +33,8 @@ namespace sparrow_ipc const auto& arrow_arr = *arrow_arr_ptr; const auto& arrow_schema = *arrow_schema_ptr; - // This will be the final buffer holding the complete IPC stream. - std::vector final_buffer; - // I - Serialize the Schema message - details::serialize_schema_message(arrow_schema, arr.metadata(), final_buffer); + auto final_buffer = details::serialize_schema_message(arrow_schema, arr.metadata()); // II - Serialize the RecordBatch message // After the Schema, we send the RecordBatch containing the actual data diff --git a/src/serialize.cpp b/src/serialize.cpp index 01c2715..81c651b 100644 --- a/src/serialize.cpp +++ b/src/serialize.cpp @@ -8,7 +8,7 @@ namespace sparrow_ipc { namespace details { - void serialize_schema_message(const ArrowSchema& arrow_schema, const std::optional& metadata, std::vector& final_buffer) + std::vector serialize_schema_message(const ArrowSchema& arrow_schema, const std::optional& metadata) { // Create a new builder for the Schema message's metadata flatbuffers::FlatBufferBuilder schema_builder; @@ -71,11 +71,14 @@ namespace sparrow_ipc // Assemble the Schema message bytes const uint32_t schema_len = schema_builder.GetSize(); // Get the size of the serialized metadata + // This will be the final buffer holding the complete IPC stream. + std::vector final_buffer; final_buffer.resize(sizeof(uint32_t) + schema_len); // Resize the buffer to hold the message // Copy the metadata into the buffer, after the 4-byte length prefix memcpy(final_buffer.data() + sizeof(uint32_t), schema_builder.GetBufferPointer(), schema_len); // Write the 4-byte metadata length at the beginning of the message *(reinterpret_cast(final_buffer.data())) = schema_len; + return final_buffer; } void serialize_record_batch_message(const ArrowArray& arrow_arr, const std::vector& buffers_sizes, std::vector& final_buffer) diff --git a/src/serialize_null_array.cpp b/src/serialize_null_array.cpp index dc57608..a927db7 100644 --- a/src/serialize_null_array.cpp +++ b/src/serialize_null_array.cpp @@ -6,14 +6,12 @@ namespace sparrow_ipc // making its message body zero-length. std::vector serialize_null_array(sparrow::null_array& arr) { - // TODO Use `arr` as const after fixing the issue upstream in sparrow::get_arrow_structures const auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr); const auto& arrow_arr = *arrow_arr_ptr; const auto& arrow_schema = *arrow_schema_ptr; - std::vector final_buffer; // I - Serialize the Schema message - details::serialize_schema_message(arrow_schema, arr.metadata(), final_buffer); + auto final_buffer = details::serialize_schema_message(arrow_schema, arr.metadata()); // II - Serialize the RecordBatch message details::serialize_record_batch_message(arrow_arr, {}, final_buffer); From 22ded7e668b06a8420750e049d6d59d680518e92 Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Wed, 20 Aug 2025 14:59:22 +0200 Subject: [PATCH 14/14] Get metadata directly from arrow schema --- include/serialize.hpp | 2 +- include/serialize_primitive_array.hpp | 2 +- src/serialize.cpp | 6 +++--- src/serialize_null_array.cpp | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/include/serialize.hpp b/include/serialize.hpp index c4ac16e..2dbf148 100644 --- a/include/serialize.hpp +++ b/include/serialize.hpp @@ -16,7 +16,7 @@ namespace sparrow_ipc { namespace details { - SPARROW_IPC_API std::vector serialize_schema_message(const ArrowSchema& arrow_schema, const std::optional& metadata); + SPARROW_IPC_API std::vector serialize_schema_message(const ArrowSchema& arrow_schema); SPARROW_IPC_API void serialize_record_batch_message(const ArrowArray& arrow_arr, const std::vector& buffers_sizes, std::vector& final_buffer); SPARROW_IPC_API void deserialize_schema_message(const uint8_t* buf_ptr, size_t& current_offset, std::optional& name, std::optional>& metadata); diff --git a/include/serialize_primitive_array.hpp b/include/serialize_primitive_array.hpp index 13f6606..e3fa799 100644 --- a/include/serialize_primitive_array.hpp +++ b/include/serialize_primitive_array.hpp @@ -34,7 +34,7 @@ namespace sparrow_ipc const auto& arrow_schema = *arrow_schema_ptr; // I - Serialize the Schema message - auto final_buffer = details::serialize_schema_message(arrow_schema, arr.metadata()); + auto final_buffer = details::serialize_schema_message(arrow_schema); // II - Serialize the RecordBatch message // After the Schema, we send the RecordBatch containing the actual data diff --git a/src/serialize.cpp b/src/serialize.cpp index 81c651b..0c76678 100644 --- a/src/serialize.cpp +++ b/src/serialize.cpp @@ -8,7 +8,7 @@ namespace sparrow_ipc { namespace details { - std::vector serialize_schema_message(const ArrowSchema& arrow_schema, const std::optional& metadata) + std::vector serialize_schema_message(const ArrowSchema& arrow_schema) { // Create a new builder for the Schema message's metadata flatbuffers::FlatBufferBuilder schema_builder; @@ -26,9 +26,9 @@ namespace sparrow_ipc flatbuffers::Offset>> fb_metadata_offset = 0; - if (metadata) + if (arrow_schema.metadata) { - const sparrow::key_value_view metadata_view = metadata.value(); + const auto metadata_view = sparrow::key_value_view(arrow_schema.metadata); std::vector> kv_offsets; kv_offsets.reserve(metadata_view.size()); for (const auto& [key, value] : metadata_view) diff --git a/src/serialize_null_array.cpp b/src/serialize_null_array.cpp index a927db7..230a9db 100644 --- a/src/serialize_null_array.cpp +++ b/src/serialize_null_array.cpp @@ -11,7 +11,7 @@ namespace sparrow_ipc const auto& arrow_schema = *arrow_schema_ptr; // I - Serialize the Schema message - auto final_buffer = details::serialize_schema_message(arrow_schema, arr.metadata()); + auto final_buffer = details::serialize_schema_message(arrow_schema); // II - Serialize the RecordBatch message details::serialize_record_batch_message(arrow_arr, {}, final_buffer);