Skip to content

Commit 173ede6

Browse files
committed
Refactor serialization into chunk functions
1 parent fe42aef commit 173ede6

File tree

5 files changed

+186
-172
lines changed

5 files changed

+186
-172
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,14 @@ set(SPARROW_IPC_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src)
4343

4444
set(SPARROW_IPC_HEADERS
4545
${SPARROW_IPC_INCLUDE_DIR}/config/config.hpp
46+
${SPARROW_IPC_INCLUDE_DIR}/serialize.hpp
4647
${SPARROW_IPC_INCLUDE_DIR}/serialize_primitive_array.hpp
4748
${SPARROW_IPC_INCLUDE_DIR}/serialize_null_array.hpp
4849
${SPARROW_IPC_INCLUDE_DIR}/utils.hpp
4950
)
5051

5152
set(SPARROW_IPC_SRC
53+
${SPARROW_IPC_SOURCE_DIR}/serialize.cpp
5254
${SPARROW_IPC_SOURCE_DIR}/utils.cpp
5355
)
5456

include/serialize.hpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#pragma once
2+
3+
#include <optional>
4+
#include <vector>
5+
6+
#include "sparrow.hpp"
7+
8+
#include "config/config.hpp"
9+
10+
namespace sparrow_ipc
11+
{
12+
namespace details
13+
{
14+
SPARROW_IPC_API void serialize_schema_message(const ArrowSchema& arrow_schema, const std::optional<sparrow::key_value_view>& metadata, std::vector<uint8_t>& final_buffer);
15+
SPARROW_IPC_API void serialize_record_batch_message(const ArrowArray& arrow_arr, std::vector<uint8_t>& final_buffer);
16+
}
17+
}

include/serialize_null_array.hpp

Lines changed: 4 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "Message_generated.h"
1515
#include "Schema_generated.h"
1616

17+
#include "serialize.hpp"
1718
#include "utils.hpp"
1819

1920
namespace sparrow_ipc
@@ -28,116 +29,18 @@ namespace sparrow_ipc
2829
// making its message body zero-length.
2930
std::vector<uint8_t> serialize_null_array(sparrow::null_array& arr)
3031
{
31-
// Use the Arrow C Data Interface to get a generic description of the array.
32-
// For a null_array, the ArrowArray struct will report n_buffers = 0.
3332
auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr);
3433
auto& arrow_arr = *arrow_arr_ptr;
3534
auto& arrow_schema = *arrow_schema_ptr;
3635

3736
std::vector<uint8_t> final_buffer;
38-
3937
// I - Serialize the Schema message
40-
// This part is almost identical to how a primitive_array's schema is serialized.
41-
{
42-
flatbuffers::FlatBufferBuilder schema_builder;
43-
44-
flatbuffers::Offset<flatbuffers::String> fb_name_offset = 0;
45-
if (arrow_schema.name)
46-
{
47-
fb_name_offset = schema_builder.CreateString(arrow_schema.name);
48-
}
49-
50-
// For null_array, the format string is "n".
51-
auto [type_enum, type_offset] = utils::get_flatbuffer_type(schema_builder, arrow_schema.format);
52-
53-
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>>>
54-
fb_metadata_offset = 0;
55-
56-
if (arr.metadata())
57-
{
58-
sparrow::key_value_view metadata_view = *(arr.metadata());
59-
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>> kv_offsets;
60-
kv_offsets.reserve(metadata_view.size());
61-
auto mv_it = metadata_view.cbegin();
62-
for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it)
63-
{
64-
auto key_offset = schema_builder.CreateString(std::string((*mv_it).first));
65-
auto value_offset = schema_builder.CreateString(std::string((*mv_it).second));
66-
kv_offsets.push_back(
67-
org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset));
68-
}
69-
fb_metadata_offset = schema_builder.CreateVector(kv_offsets);
70-
}
71-
72-
auto fb_field = org::apache::arrow::flatbuf::CreateField(
73-
schema_builder,
74-
fb_name_offset,
75-
(arrow_schema.flags & static_cast<int64_t>(sparrow::ArrowFlag::NULLABLE)) != 0,
76-
type_enum,
77-
type_offset,
78-
0, // dictionary
79-
0, // children
80-
fb_metadata_offset);
81-
82-
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> fields_vec = {fb_field};
83-
auto fb_fields = schema_builder.CreateVector(fields_vec);
84-
85-
auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields);
86-
87-
auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage(
88-
schema_builder,
89-
org::apache::arrow::flatbuf::MetadataVersion::V5,
90-
org::apache::arrow::flatbuf::MessageHeader::Schema,
91-
schema_offset.Union(),
92-
0 // bodyLength
93-
);
94-
schema_builder.Finish(schema_message_offset);
95-
96-
uint32_t schema_len = schema_builder.GetSize();
97-
final_buffer.resize(sizeof(uint32_t) + schema_len);
98-
memcpy(final_buffer.data() + sizeof(uint32_t), schema_builder.GetBufferPointer(), schema_len);
99-
*(reinterpret_cast<uint32_t*>(final_buffer.data())) = schema_len;
100-
}
38+
details::serialize_schema_message(arrow_schema, arr.metadata(), final_buffer);
10139

10240
// II - Serialize the RecordBatch message
103-
{
104-
flatbuffers::FlatBufferBuilder batch_builder;
105-
106-
// The FieldNode describes the layout (length and null count).
107-
// For a null_array, length and null_count are always equal.
108-
org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count);
109-
auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1);
110-
111-
// A null_array has no buffers. The ArrowArray struct reports n_buffers = 0,
112-
// so we create an empty vector of buffers for the Flatbuffers message.
113-
auto fb_buffers_vector = batch_builder.CreateVectorOfStructs<org::apache::arrow::flatbuf::Buffer>({});
114-
115-
auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(batch_builder, arrow_arr.length, fb_nodes_vector, fb_buffers_vector);
116-
117-
// The bodyLength is 0 because there are no data buffers.
118-
auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage(
119-
batch_builder,
120-
org::apache::arrow::flatbuf::MetadataVersion::V5,
121-
org::apache::arrow::flatbuf::MessageHeader::RecordBatch,
122-
record_batch_offset.Union(),
123-
0 // bodyLength
124-
);
125-
batch_builder.Finish(batch_message_offset);
126-
127-
uint32_t batch_meta_len = batch_builder.GetSize();
128-
int64_t aligned_batch_meta_len = utils::align_to_8(batch_meta_len);
129-
130-
size_t current_size = final_buffer.size();
131-
// Resize for the RecordBatch metadata. There is no body to append.
132-
final_buffer.resize(current_size + sizeof(uint32_t) + aligned_batch_meta_len);
133-
uint8_t* dst = final_buffer.data() + current_size;
134-
135-
*(reinterpret_cast<uint32_t*>(dst)) = batch_meta_len;
136-
dst += sizeof(uint32_t);
137-
memcpy(dst, batch_builder.GetBufferPointer(), batch_meta_len);
138-
memset(dst + batch_meta_len, 0, aligned_batch_meta_len - batch_meta_len);
139-
}
41+
details::serialize_record_batch_message(arrow_arr, final_buffer);
14042

43+
// Return the final buffer containing the complete IPC stream
14144
return final_buffer;
14245
}
14346

include/serialize_primitive_array.hpp

Lines changed: 3 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "Message_generated.h"
1313
#include "Schema_generated.h"
1414

15+
#include "serialize.hpp"
1516
#include "utils.hpp"
1617

1718
namespace sparrow_ipc
@@ -45,77 +46,7 @@ namespace sparrow_ipc
4546
std::vector<uint8_t> final_buffer;
4647

4748
// I - Serialize the Schema message
48-
// An Arrow IPC stream must start with a Schema message
49-
{
50-
// Create a new builder for the Schema message's metadata
51-
flatbuffers::FlatBufferBuilder schema_builder;
52-
53-
// Create the Field metadata, which describes a single column (or array)
54-
flatbuffers::Offset<flatbuffers::String> fb_name_offset = 0;
55-
if (arrow_schema.name)
56-
{
57-
fb_name_offset = schema_builder.CreateString(arrow_schema.name);
58-
}
59-
60-
// Determine the Flatbuffer type information from the C schema's format string
61-
auto [type_enum, type_offset] = utils::get_flatbuffer_type(schema_builder, arrow_schema.format);
62-
63-
// Handle metadata
64-
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>>>
65-
fb_metadata_offset = 0;
66-
67-
if (arr.metadata())
68-
{
69-
sparrow::key_value_view metadata_view = *(arr.metadata());
70-
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>> kv_offsets;
71-
kv_offsets.reserve(metadata_view.size());
72-
auto mv_it = metadata_view.cbegin();
73-
for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it)
74-
{
75-
auto key_offset = schema_builder.CreateString(std::string((*mv_it).first));
76-
auto value_offset = schema_builder.CreateString(std::string((*mv_it).second));
77-
kv_offsets.push_back(
78-
org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset));
79-
}
80-
fb_metadata_offset = schema_builder.CreateVector(kv_offsets);
81-
}
82-
83-
// Build the Field object
84-
auto fb_field = org::apache::arrow::flatbuf::CreateField(
85-
schema_builder,
86-
fb_name_offset,
87-
(arrow_schema.flags & static_cast<int64_t>(sparrow::ArrowFlag::NULLABLE)) != 0,
88-
type_enum,
89-
type_offset,
90-
0, // dictionary
91-
0, // children
92-
fb_metadata_offset);
93-
94-
// A Schema contains a vector of fields. For this primitive array, there is only one
95-
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> fields_vec = {fb_field};
96-
auto fb_fields = schema_builder.CreateVector(fields_vec);
97-
98-
// Build the Schema object from the vector of fields
99-
auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields);
100-
101-
// Wrap the Schema in a top-level Message, which is the standard IPC envelope
102-
auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage(
103-
schema_builder,
104-
org::apache::arrow::flatbuf::MetadataVersion::V5,
105-
org::apache::arrow::flatbuf::MessageHeader::Schema,
106-
schema_offset.Union(),
107-
0
108-
);
109-
schema_builder.Finish(schema_message_offset);
110-
111-
// Assemble the Schema message bytes
112-
uint32_t schema_len = schema_builder.GetSize(); // Get the size of the serialized metadata
113-
final_buffer.resize(sizeof(uint32_t) + schema_len); // Resize the buffer to hold the message
114-
// Copy the metadata into the buffer, after the 4-byte length prefix
115-
memcpy(final_buffer.data() + sizeof(uint32_t), schema_builder.GetBufferPointer(), schema_len);
116-
// Write the 4-byte metadata length at the beginning of the message
117-
*(reinterpret_cast<uint32_t*>(final_buffer.data())) = schema_len;
118-
}
49+
details::serialize_schema_message(arrow_schema, arr.metadata(), final_buffer);
11950

12051
// II - Serialize the RecordBatch message
12152
// After the Schema, we send the RecordBatch containing the actual data
@@ -261,6 +192,7 @@ namespace sparrow_ipc
261192
metadata->reserve(fb_metadata->size());
262193
for (const auto& kv : *fb_metadata)
263194
{
195+
// TODO use str() instead of c_str()
264196
metadata->emplace_back(kv->key()->c_str(), kv->value()->c_str());
265197
}
266198
}

0 commit comments

Comments
 (0)