From 30ba2fa5e9e0112ca16f95f994bf04da441deb31 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Wed, 9 Apr 2025 16:21:52 +0800 Subject: [PATCH] feat: support `Vec` for `DynRecord` --- bindings/js/Cargo.toml | 2 +- bindings/js/src/utils.rs | 17 +- bindings/python/src/utils.rs | 1 + src/lib.rs | 18 +- src/record/runtime/array.rs | 772 ++++++++++++++++++++++++++++++- src/record/runtime/mod.rs | 166 ++++++- src/record/runtime/record.rs | 299 +++++++++++- src/record/runtime/record_ref.rs | 166 ++++++- src/record/runtime/value.rs | 572 ++++++++++++++++++++--- src/transaction.rs | 8 +- tests/dyn_record.rs | 293 ++++++++++++ tests/wasm.rs | 57 ++- 12 files changed, 2239 insertions(+), 132 deletions(-) create mode 100644 tests/dyn_record.rs diff --git a/bindings/js/Cargo.toml b/bindings/js/Cargo.toml index 594e15b3..d8f94d75 100644 --- a/bindings/js/Cargo.toml +++ b/bindings/js/Cargo.toml @@ -26,7 +26,7 @@ wasm-bindgen-futures = { version = "0.4.45" } wasm-bindgen-test = "0.3.9" wasm-streams = "0.4.2" web-sys = { version = "0.3", features = ["console"] } -fusio = { version = "0.3.7", default-features = false, features = [ +fusio = { version = "0.3.8", default-features = false, features = [ "dyn", "bytes", "opfs", diff --git a/bindings/js/src/utils.rs b/bindings/js/src/utils.rs index 4c9d2835..3a69edf4 100644 --- a/bindings/js/src/utils.rs +++ b/bindings/js/src/utils.rs @@ -11,7 +11,7 @@ fn to_col_value(value: T, primary: bool) -> Arc Arc { +fn none_value(datatype: &DataType) -> Arc { match datatype { DataType::UInt8 => Arc::new(Option::::None), DataType::UInt16 => Arc::new(Option::::None), @@ -24,6 +24,7 @@ fn none_value(datatype: DataType) -> Arc { DataType::String => Arc::new(Option::::None), DataType::Boolean => Arc::new(Option::::None), DataType::Bytes => Arc::new(Option::>::None), + DataType::List(_) => unimplemented!(), } } @@ -33,9 +34,9 @@ pub(crate) fn parse_key(desc: &ValueDesc, key: JsValue, primary: bool) -> Result true => return Err(format!("{} can not be null", &desc.name).into()), false => { return Ok(Value::new( - desc.datatype, + desc.datatype.clone(), desc.name.clone(), - none_value(desc.datatype), + none_value(&desc.datatype), desc.is_nullable, )) } @@ -55,10 +56,11 @@ pub(crate) fn parse_key(desc: &ValueDesc, key: JsValue, primary: bool) -> Result DataType::Bytes => { to_col_value::>(key.dyn_into::().unwrap().to_vec(), primary) } + DataType::List(_) => unimplemented!(), }; Ok(Value::new( - desc.datatype, + desc.datatype.clone(), desc.name.clone(), value, desc.is_nullable, @@ -148,6 +150,7 @@ pub(crate) fn to_record(cols: &Vec, primary_key_index: usize) -> JsValue .map(|v| Uint8Array::from(v.as_slice()).into()) .unwrap_or(JsValue::NULL), }, + DataType::List(_) => unimplemented!(), }; Reflect::set(&obj, &col.desc.name.as_str().into(), &value).unwrap(); @@ -169,7 +172,7 @@ mod tests { { let desc = ValueDesc::new("id".to_string(), DataType::UInt64, false); let key_col = parse_key(&desc, JsValue::from(19), true).unwrap(); - assert_eq!(key_col.datatype(), DataType::UInt64); + assert_eq!(key_col.datatype(), &DataType::UInt64); assert_eq!(key_col.value.as_ref().downcast_ref::(), Some(&19_u64)); } { @@ -180,7 +183,7 @@ mod tests { { let desc = ValueDesc::new("name".to_string(), DataType::String, false); let key_col = parse_key(&desc, JsValue::from("Hello tonbo"), false).unwrap(); - assert_eq!(key_col.datatype(), DataType::String); + assert_eq!(key_col.datatype(), &DataType::String); assert_eq!( key_col.value.as_ref().downcast_ref::>(), Some(&Some("Hello tonbo".to_string())) @@ -189,7 +192,7 @@ mod tests { { let desc = ValueDesc::new("data".to_string(), DataType::Bytes, false); let key_col = parse_key(&desc, JsValue::from(b"Hello tonbo".to_vec()), false).unwrap(); - assert_eq!(key_col.datatype(), DataType::Bytes); + assert_eq!(key_col.datatype(), &DataType::Bytes); assert_eq!( key_col.value.as_ref().downcast_ref::>>(), Some(&Some(b"Hello tonbo".to_vec())) diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs index 0999efd5..560f10c6 100644 --- a/bindings/python/src/utils.rs +++ b/bindings/python/src/utils.rs @@ -118,6 +118,7 @@ pub(crate) fn to_dict(py: Python, primary_key_index: usize, record: Vec) .unwrap(); } } + TonboDataType::List(_) => unimplemented!(), } } dict diff --git a/src/lib.rs b/src/lib.rs index b08ad219..48015ad9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1901,7 +1901,7 @@ pub(crate) mod tests { let columns = entry.value().unwrap().columns; let primary_key_col = columns.first().unwrap(); - assert_eq!(primary_key_col.datatype(), DataType::Int64); + assert_eq!(primary_key_col.datatype(), &DataType::Int64); assert_eq!(primary_key_col.desc.name, "id".to_string()); assert_eq!( *primary_key_col @@ -1913,7 +1913,7 @@ pub(crate) mod tests { ); let col = columns.get(2).unwrap(); - assert_eq!(col.datatype(), DataType::Int16); + assert_eq!(col.datatype(), &DataType::Int16); assert_eq!(col.desc.name, "height".to_string()); let height = *col.value.as_ref().downcast_ref::>().unwrap(); if i < 45 { @@ -1928,28 +1928,28 @@ pub(crate) mod tests { } let col = columns.get(3).unwrap(); - assert_eq!(col.datatype(), DataType::Int32); + assert_eq!(col.datatype(), &DataType::Int32); assert_eq!(col.desc.name, "weight".to_string()); let weight = col.value.as_ref().downcast_ref::>(); assert!(weight.is_some()); assert_eq!(*weight.unwrap(), None); let col = columns.get(4).unwrap(); - assert_eq!(col.datatype(), DataType::String); + assert_eq!(col.datatype(), &DataType::String); assert_eq!(col.desc.name, "name".to_string()); let name = col.value.as_ref().downcast_ref::>(); assert!(name.is_some()); assert_eq!(name.unwrap(), &None); let col = columns.get(6).unwrap(); - assert_eq!(col.datatype(), DataType::Boolean); + assert_eq!(col.datatype(), &DataType::Boolean); assert_eq!(col.desc.name, "enabled".to_string()); let enabled = col.value.as_ref().downcast_ref::>(); assert!(enabled.is_some()); assert_eq!(*enabled.unwrap(), None); let col = columns.get(7).unwrap(); - assert_eq!(col.datatype(), DataType::Bytes); + assert_eq!(col.datatype(), &DataType::Bytes); assert_eq!(col.desc.name, "bytes".to_string()); let bytes = col.value.as_ref().downcast_ref::>>(); assert!(bytes.is_some()); @@ -2077,7 +2077,7 @@ pub(crate) mod tests { let columns = entry.value().unwrap().columns; let primary_key_col = columns.first().unwrap(); - assert_eq!(primary_key_col.datatype(), DataType::Int64); + assert_eq!(primary_key_col.datatype(), &DataType::Int64); assert_eq!(primary_key_col.desc.name, "id".to_string()); assert_eq!(*cast_arc_value!(primary_key_col.value, i64), i); @@ -2097,7 +2097,7 @@ pub(crate) mod tests { let columns = entry.value().unwrap().columns; let primary_key_col = columns.first().unwrap(); - assert_eq!(primary_key_col.datatype(), DataType::Int64); + assert_eq!(primary_key_col.datatype(), &DataType::Int64); assert_eq!(primary_key_col.desc.name, "id".to_string()); assert_eq!(*cast_arc_value!(primary_key_col.value, i64), i); @@ -2117,7 +2117,7 @@ pub(crate) mod tests { let columns = entry.value().unwrap().columns; let primary_key_col = columns.first().unwrap(); - assert_eq!(primary_key_col.datatype(), DataType::Int64); + assert_eq!(primary_key_col.datatype(), &DataType::Int64); assert_eq!(primary_key_col.desc.name, "id".to_string()); assert_eq!(*cast_arc_value!(primary_key_col.value, i64), i); diff --git a/src/record/runtime/array.rs b/src/record/runtime/array.rs index b618e338..1865aff2 100644 --- a/src/record/runtime/array.rs +++ b/src/record/runtime/array.rs @@ -3,8 +3,9 @@ use std::{any::Any, mem, sync::Arc}; use arrow::{ array::{ Array, ArrayBuilder, ArrayRef, ArrowPrimitiveType, BooleanArray, BooleanBufferBuilder, - BooleanBuilder, GenericBinaryArray, GenericBinaryBuilder, PrimitiveArray, PrimitiveBuilder, - StringArray, StringBuilder, UInt32Builder, + BooleanBuilder, GenericBinaryArray, GenericBinaryBuilder, GenericListArray, + GenericListBuilder, PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder, + UInt32Builder, }, datatypes::{ Int16Type, Int32Type, Int64Type, Int8Type, Schema as ArrowSchema, UInt16Type, UInt32Type, @@ -12,7 +13,7 @@ use arrow::{ }, }; -use super::{record::DynRecord, record_ref::DynRecordRef, value::Value, DataType}; +use super::{record::DynRecord, record_ref::DynRecordRef, value::Value, DataType, ValueDesc}; use crate::{ cast_arc_value, inmem::immutable::{ArrowArrays, Builder}, @@ -91,6 +92,104 @@ impl ArrowArrays for DynRecordImmutableArrays { capacity, 0, ))); } + DataType::List(field) => match &field.datatype { + DataType::UInt8 => { + let bd = PrimitiveBuilder::::with_capacity(capacity); + builders.push(Box::new( + GenericListBuilder::>::with_capacity( + bd, 0, + ) + .with_field(field.arrow_field()), + )) + } + DataType::UInt16 => { + let bd = PrimitiveBuilder::::with_capacity(capacity); + builders.push(Box::new( + GenericListBuilder::>::with_capacity( + bd, 0, + ) + .with_field(field.arrow_field()), + )) + } + DataType::UInt32 => { + let bd = PrimitiveBuilder::::with_capacity(capacity); + builders.push(Box::new( + GenericListBuilder::>::with_capacity( + bd, 0, + ) + .with_field(field.arrow_field()), + )) + } + DataType::UInt64 => { + let bd = PrimitiveBuilder::::with_capacity(capacity); + builders.push(Box::new( + GenericListBuilder::>::with_capacity( + bd, 0, + ) + .with_field(field.arrow_field()), + )) + } + DataType::Int8 => { + let bd = PrimitiveBuilder::::with_capacity(capacity); + builders.push(Box::new( + GenericListBuilder::>::with_capacity( + bd, 0, + ) + .with_field(field.arrow_field()), + )) + } + DataType::Int16 => { + let bd = PrimitiveBuilder::::with_capacity(capacity); + builders.push(Box::new( + GenericListBuilder::>::with_capacity( + bd, 0, + ) + .with_field(field.arrow_field()), + )) + } + DataType::Int32 => { + let bd = PrimitiveBuilder::::with_capacity(capacity); + builders.push(Box::new( + GenericListBuilder::>::with_capacity( + bd, 0, + ) + .with_field(field.arrow_field()), + )) + } + DataType::Int64 => { + let bd = PrimitiveBuilder::::with_capacity(capacity); + builders.push(Box::new( + GenericListBuilder::>::with_capacity( + bd, 0, + ) + .with_field(field.arrow_field()), + )) + } + DataType::String => { + let bd = StringBuilder::with_capacity(capacity, 0); + builders.push(Box::new( + GenericListBuilder::::with_capacity(bd, 0) + .with_field(field.arrow_field()), + )) + } + DataType::Boolean => { + let bd = BooleanBuilder::with_capacity(capacity); + builders.push(Box::new( + GenericListBuilder::::with_capacity(bd, 0) + .with_field(field.arrow_field()), + )) + } + DataType::Bytes => { + let bd = GenericBinaryBuilder::with_capacity(capacity, 0); + builders.push(Box::new( + GenericListBuilder::>::with_capacity( + bd, capacity, + ) + .with_field(field.arrow_field()), + )) + } + DataType::List(_) => unimplemented!("Vec> is not supporte yet"), + }, } datatypes.push(datatype); } @@ -223,9 +322,159 @@ impl ArrowArrays for DynRecordImmutableArrays { Arc::new(Some(v)) } } + DataType::List(desc) => { + let array = cast_arc_value!(col.value, GenericListArray).value(offset); + + match &desc.datatype { + DataType::UInt8 => { + let v = array + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); + let parts = v.into_parts(); + if primary_key_index == idx { + Arc::new(parts.1.to_vec()) + } else { + Arc::new(Some(parts.1.to_vec())) + } + } + DataType::UInt16 => { + let v = array + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); + let parts = v.into_parts(); + if primary_key_index == idx { + Arc::new(parts.1.to_vec()) + } else { + Arc::new(Some(parts.1.to_vec())) + } + } + DataType::UInt32 => { + let v = array + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); + let parts = v.into_parts(); + if primary_key_index == idx { + Arc::new(parts.1.to_vec()) + } else { + Arc::new(Some(parts.1.to_vec())) + } + } + DataType::UInt64 => { + let v = array + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); + let parts = v.into_parts(); + if primary_key_index == idx { + Arc::new(parts.1.to_vec()) + } else { + Arc::new(Some(parts.1.to_vec())) + } + } + DataType::Int8 => { + let v = array + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); + let parts = v.into_parts(); + if primary_key_index == idx { + Arc::new(parts.1.to_vec()) + } else { + Arc::new(Some(parts.1.to_vec())) + } + } + DataType::Int16 => { + let v = array + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); + let parts = v.into_parts(); + if primary_key_index == idx { + Arc::new(parts.1.to_vec()) + } else { + Arc::new(Some(parts.1.to_vec())) + } + } + DataType::Int32 => { + let v = array + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); + let parts = v.into_parts(); + if primary_key_index == idx { + Arc::new(parts.1.to_vec()) + } else { + Arc::new(Some(parts.1.to_vec())) + } + } + DataType::Int64 => { + let v = array + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); + let parts = v.into_parts(); + if primary_key_index == idx { + Arc::new(parts.1.to_vec()) + } else { + Arc::new(Some(parts.1.to_vec())) + } + } + DataType::String => { + let data = array.as_any().downcast_ref::().unwrap(); + let v = data + .iter() + .map(|v| v.unwrap().to_string()) + .collect::>(); + if primary_key_index == idx { + Arc::new(v) + } else { + Arc::new(Some(v)) + } + } + DataType::Boolean => { + let data = array.as_any().downcast_ref::().unwrap(); + let v = data.iter().map(|v| v.unwrap()).collect::>(); + if primary_key_index == idx { + Arc::new(v) + } else { + Arc::new(Some(v)) + } + } + DataType::Bytes => { + let data = array + .as_any() + .downcast_ref::>() + .unwrap(); + + let v = data + .iter() + .map(|v| v.unwrap().to_vec()) + .collect::>>(); + if primary_key_index == idx { + Arc::new(v) + } else { + Arc::new(Some(v)) + } + } + DataType::List(_) => { + unimplemented!("Vec> is not supporte yet") + } + } + } }; - columns.push(Value::new(datatype, name, value, nullable)); + columns.push(Value::new(datatype.clone(), name, value, nullable)); } else { columns.push(col.clone()); } @@ -387,6 +636,123 @@ impl Builder for DynRecordBuilder { None => bd.append_value(vec![]), } } + DataType::List(field) => match &field.datatype { + DataType::UInt8 => { + let bd = Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()); + match cast_arc_value!(col.value, Option>) { + Some(value) => bd.append_value(value.iter().map(|v| Some(*v))), + None if col.is_nullable() => bd.append_null(), + None => bd.append_value(vec![]), + } + } + DataType::UInt16 => { + let bd = Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()); + match cast_arc_value!(col.value, Option>) { + Some(value) => bd.append_value(value.iter().map(|v| Some(*v))), + None if col.is_nullable() => bd.append_null(), + None => bd.append_value(vec![]), + } + } + DataType::UInt32 => { + let bd = Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()); + match cast_arc_value!(col.value, Option>) { + Some(value) => bd.append_value(value.iter().map(|v| Some(*v))), + None if col.is_nullable() => bd.append_null(), + None => bd.append_value(vec![]), + } + } + DataType::UInt64 => { + let bd = Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()); + match cast_arc_value!(col.value, Option>) { + Some(value) => bd.append_value(value.iter().map(|v| Some(*v))), + None if col.is_nullable() => bd.append_null(), + None => bd.append_value(vec![]), + } + } + DataType::Int8 => { + let bd = Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()); + match cast_arc_value!(col.value, Option>) { + Some(value) => bd.append_value(value.iter().map(|v| Some(*v))), + None if col.is_nullable() => bd.append_null(), + None => bd.append_value(vec![]), + } + } + DataType::Int16 => { + let bd = Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()); + match cast_arc_value!(col.value, Option>) { + Some(value) => bd.append_value(value.iter().map(|v| Some(*v))), + None if col.is_nullable() => bd.append_null(), + None => bd.append_value(vec![]), + } + } + DataType::Int32 => { + let bd = Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()); + match cast_arc_value!(col.value, Option>) { + Some(value) => bd.append_value(value.iter().map(|v| Some(*v))), + None if col.is_nullable() => bd.append_null(), + None => bd.append_value(vec![]), + } + } + DataType::Int64 => { + let bd = Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()); + match cast_arc_value!(col.value, Option>) { + Some(value) => bd.append_value(value.iter().map(|v| Some(*v))), + None if col.is_nullable() => bd.append_null(), + None => bd.append_value(vec![]), + } + } + DataType::String => { + let bd = Self::as_builder_mut::< + GenericListBuilder, + >(builder.as_mut()); + match cast_arc_value!(col.value, Option>) { + Some(value) => { + bd.append_value(value.iter().map(|v| Some(v.clone()))) + } + None if col.is_nullable() => bd.append_null(), + None => bd.append(true), + } + } + DataType::Boolean => { + let bd = Self::as_builder_mut::< + GenericListBuilder, + >(builder.as_mut()); + match cast_arc_value!(col.value, Option>) { + Some(value) => bd.append_value(value.iter().map(|v| Some(*v))), + None if col.is_nullable() => bd.append_null(), + None => bd.append_value([]), + } + } + DataType::Bytes => { + let bd = Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()); + match cast_arc_value!(col.value, Option>>) { + Some(value) => { + bd.append_value(value.iter().map(|v| Some(v.clone()))) + } + None if col.is_nullable() => bd.append_null(), + None => bd.append(true), + } + } + DataType::List(_) => unimplemented!("Vec> is not supporte yet"), + }, } } } @@ -445,6 +811,53 @@ impl Builder for DynRecordBuilder { Self::as_builder_mut::>(builder.as_mut()) .append_value(Vec::::default()); } + DataType::List(field) => match field.datatype { + DataType::UInt8 => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append(true), + DataType::UInt16 => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append(true), + DataType::UInt32 => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append(true), + DataType::UInt64 => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append(true), + DataType::Int8 => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append(true), + DataType::Int16 => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append(true), + DataType::Int32 => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append(true), + DataType::Int64 => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append(true), + DataType::String => Self::as_builder_mut::< + GenericListBuilder, + >(builder.as_mut()) + .append(true), + DataType::Boolean => Self::as_builder_mut::< + GenericListBuilder, + >(builder.as_mut()) + .append(true), + DataType::Bytes => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append(true), + DataType::List(_) => unimplemented!("Vec> is not supporte yet"), + }, } } } @@ -500,6 +913,88 @@ impl Builder for DynRecordBuilder { Self::as_builder::>(builder.as_ref()) .values_slice(), ), + DataType::List(field) => { + match field.datatype { + DataType::UInt8 => mem::size_of_val( + Self::as_builder::< + GenericListBuilder>, + >(builder.as_ref()) + .values_ref() + .values_slice(), + ), + DataType::UInt16 => mem::size_of_val( + Self::as_builder::< + GenericListBuilder>, + >(builder.as_ref()) + .values_ref() + .values_slice(), + ), + DataType::UInt32 => mem::size_of_val( + Self::as_builder::< + GenericListBuilder>, + >(builder.as_ref()) + .values_ref() + .values_slice(), + ), + DataType::UInt64 => mem::size_of_val( + Self::as_builder::< + GenericListBuilder>, + >(builder.as_ref()) + .values_ref() + .values_slice(), + ), + DataType::Int8 => mem::size_of_val( + Self::as_builder::< + GenericListBuilder>, + >(builder.as_ref()) + .values_ref() + .values_slice(), + ), + DataType::Int16 => mem::size_of_val( + Self::as_builder::< + GenericListBuilder>, + >(builder.as_ref()) + .values_ref() + .values_slice(), + ), + DataType::Int32 => mem::size_of_val( + Self::as_builder::< + GenericListBuilder>, + >(builder.as_ref()) + .values_ref() + .values_slice(), + ), + DataType::Int64 => mem::size_of_val( + Self::as_builder::< + GenericListBuilder>, + >(builder.as_ref()) + .values_ref() + .values_slice(), + ), + DataType::String => mem::size_of_val( + Self::as_builder::>( + builder.as_ref(), + ) + .values_ref() + .values_slice(), + ), + DataType::Boolean => mem::size_of_val( + Self::as_builder::>( + builder.as_ref(), + ) + .values_ref() + .values_slice(), + ), + DataType::Bytes => mem::size_of_val( + Self::as_builder::< + GenericListBuilder>, + >(builder.as_ref()) + .values_ref() + .values_slice(), + ), + DataType::List(_) => unimplemented!("Vec> is not supporte yet"), + } + } } }) } @@ -658,6 +1153,88 @@ impl Builder for DynRecordBuilder { )); array_refs.push(value); } + DataType::List(desc) => { + let value = match desc.datatype { + DataType::UInt8 => Arc::new( + Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .finish(), + ), + DataType::UInt16 => Arc::new( + Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .finish(), + ), + DataType::UInt32 => Arc::new( + Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .finish(), + ), + DataType::UInt64 => Arc::new( + Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .finish(), + ), + DataType::Int8 => Arc::new( + Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .finish(), + ), + DataType::Int16 => Arc::new( + Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .finish(), + ), + DataType::Int32 => Arc::new( + Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .finish(), + ), + DataType::Int64 => Arc::new( + Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .finish(), + ), + DataType::String => Arc::new( + Self::as_builder_mut::>( + builder.as_mut(), + ) + .finish(), + ), + DataType::Boolean => Arc::new( + Self::as_builder_mut::>( + builder.as_mut(), + ) + .finish(), + ), + DataType::Bytes => Arc::new( + Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .finish(), + ), + DataType::List(_) => unimplemented!("Vec> is not supporte yet"), + }; + columns.push(Value::new( + DataType::List(Arc::new(ValueDesc::new( + desc.name.clone(), + desc.datatype.clone(), + desc.is_nullable, + ))), + field.name().to_owned(), + value.clone(), + is_nullable, + )); + array_refs.push(value); + } }; } @@ -725,6 +1302,91 @@ impl DynRecordBuilder { .append_value(*cast_arc_value!(col.value, bool)), DataType::Bytes => Self::as_builder_mut::>(builder.as_mut()) .append_value(cast_arc_value!(col.value, Vec)), + DataType::List(field) => { + match field.datatype { + DataType::UInt8 => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append_value(cast_arc_value!(col.value, Vec).iter().map(|v| Some(*v))), + DataType::UInt16 => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append_value( + cast_arc_value!(col.value, Vec) + .iter() + .map(|v| Some(*v)), + ), + DataType::UInt32 => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append_value( + cast_arc_value!(col.value, Vec) + .iter() + .map(|v| Some(*v)), + ), + DataType::UInt64 => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append_value( + cast_arc_value!(col.value, Vec) + .iter() + .map(|v| Some(*v)), + ), + DataType::Int8 => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append_value(cast_arc_value!(col.value, Vec).iter().map(|v| Some(*v))), + DataType::Int16 => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append_value( + cast_arc_value!(col.value, Vec) + .iter() + .map(|v| Some(*v)), + ), + DataType::Int32 => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append_value( + cast_arc_value!(col.value, Vec) + .iter() + .map(|v| Some(*v)), + ), + DataType::Int64 => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append_value( + cast_arc_value!(col.value, Vec) + .iter() + .map(|v| Some(*v)), + ), + DataType::String => Self::as_builder_mut::< + GenericListBuilder, + >(builder.as_mut()) + .append_value( + cast_arc_value!(col.value, Vec) + .iter() + .map(|v| Some(v.as_str())), + ), + DataType::Boolean => Self::as_builder_mut::< + GenericListBuilder, + >(builder.as_mut()) + .append_value( + cast_arc_value!(col.value, Vec) + .iter() + .map(|v| Some(*v)), + ), + DataType::Bytes => Self::as_builder_mut::< + GenericListBuilder>, + >(builder.as_mut()) + .append_value( + cast_arc_value!(col.value, Vec>) + .iter() + .map(|v| Some(v.as_slice())), + ), + DataType::List(_) => unimplemented!("Vec> is not supporte yet"), + } + } }; } @@ -746,12 +1408,17 @@ impl DynRecordBuilder { #[cfg(test)] mod tests { + use std::sync::Arc; + use parquet::arrow::ProjectionMask; use crate::{ dyn_record, dyn_schema, inmem::immutable::{ArrowArrays, Builder}, - record::{DynRecordImmutableArrays, DynRecordRef, Record, RecordRef, Schema}, + record::{ + DataType, DynRecord, DynRecordImmutableArrays, DynRecordRef, DynSchema, Record, + RecordRef, Schema, Value, ValueDesc, + }, }; #[tokio::test] @@ -806,4 +1473,99 @@ mod tests { ); } } + + #[tokio::test] + async fn test_build_array_list() { + let schema = DynSchema::new( + vec![ + ValueDesc::new("id".into(), DataType::UInt32, false), + ValueDesc::new( + "bools".into(), + DataType::List(Arc::new(ValueDesc::new( + "".into(), + DataType::Boolean, + false, + ))), + true, + ), + ValueDesc::new( + "bytes".into(), + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::Bytes, false))), + true, + ), + ValueDesc::new( + "none".into(), + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::Int64, false))), + true, + ), + ValueDesc::new( + "strs".into(), + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::String, false))), + false, + ), + ], + 0, + ); + + let record = DynRecord::new( + vec![ + Value::new(DataType::UInt32, "id".into(), Arc::new(1_u32), false), + Value::new( + DataType::List(Arc::new(ValueDesc::new( + "".into(), + DataType::Boolean, + false, + ))), + "bools".to_string(), + Arc::new(Some(vec![true, false, false, false, true])), + true, + ), + Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::Bytes, false))), + "bytes".to_string(), + Arc::new(Some(vec![ + vec![1_u8, 2, 3, 4], + vec![11, 22, 23, 24], + vec![31, 32, 33, 34], + ])), + true, + ), + Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::Int64, false))), + "none".to_string(), + Arc::new(None::>), + true, + ), + Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::String, false))), + "strs".to_string(), + Arc::new(vec![ + "abc".to_string(), + "xyz".to_string(), + "tonbo".to_string(), + ]), + false, + ), + ], + 0, + ); + let mut builder = DynRecordImmutableArrays::builder(schema.arrow_schema().clone(), 5); + let key = crate::timestamp::Timestamped { + ts: 0.into(), + value: record.key(), + }; + + builder.push(key.clone(), Some(record.as_record_ref())); + builder.push(key.clone(), None); + builder.push(key.clone(), Some(record.as_record_ref())); + let arrays = builder.finish(None); + + let res = arrays.get(0, &ProjectionMask::all()); + assert!(res.is_some()); + let res2 = res.unwrap(); + assert!(res2.is_some()); + + dbg!(res2.clone().unwrap().columns.len()); + dbg!(res2.unwrap().columns); + } } diff --git a/src/record/runtime/mod.rs b/src/record/runtime/mod.rs index 7e8bd599..6175424c 100644 --- a/src/record/runtime/mod.rs +++ b/src/record/runtime/mod.rs @@ -4,14 +4,19 @@ mod record_ref; mod schema; mod value; +use std::sync::Arc; + pub use array::*; -use arrow::datatypes::DataType as ArrowDataType; +use arrow::datatypes::{DataType as ArrowDataType, Field}; +use fusio_log::{Decode, Encode}; pub use record::*; pub use record_ref::*; pub use schema::*; pub use value::*; -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] +type ValueDescRef = Arc; + +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub enum DataType { UInt8, UInt16, @@ -24,6 +29,7 @@ pub enum DataType { String, Boolean, Bytes, + List(ValueDescRef), } impl From<&ArrowDataType> for DataType { @@ -40,11 +46,167 @@ impl From<&ArrowDataType> for DataType { ArrowDataType::Utf8 => DataType::String, ArrowDataType::Boolean => DataType::Boolean, ArrowDataType::Binary => DataType::Bytes, + ArrowDataType::List(field) => DataType::List(ValueDescRef::new(field.as_ref().into())), _ => todo!(), } } } +impl From<&DataType> for ArrowDataType { + fn from(datatype: &DataType) -> Self { + match datatype { + DataType::UInt8 => ArrowDataType::UInt8, + DataType::UInt16 => ArrowDataType::UInt16, + DataType::UInt32 => ArrowDataType::UInt32, + DataType::UInt64 => ArrowDataType::UInt64, + DataType::Int8 => ArrowDataType::Int8, + DataType::Int16 => ArrowDataType::Int16, + DataType::Int32 => ArrowDataType::Int32, + DataType::Int64 => ArrowDataType::Int64, + DataType::String => ArrowDataType::Utf8, + DataType::Boolean => ArrowDataType::Boolean, + DataType::Bytes => ArrowDataType::Binary, + DataType::List(field) => ArrowDataType::List(Arc::new(Field::new( + &field.name, + (&field.datatype).into(), + field.is_nullable, + ))), + } + } +} + +impl From for DataType { + fn from(value: u8) -> Self { + match value { + 0 => DataType::UInt8, + 1 => DataType::UInt16, + 2 => DataType::UInt32, + 3 => DataType::UInt64, + 4 => DataType::Int8, + 5 => DataType::Int16, + 6 => DataType::Int32, + 7 => DataType::Int64, + 8 => DataType::String, + 9 => DataType::Boolean, + 10 => DataType::Bytes, + _ => panic!("can not construct `DataType` from {}", value), + } + } +} + +impl Encode for DataType { + type Error = fusio::Error; + + async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> + where + W: fusio::Write, + { + match self { + DataType::UInt8 => 0_u8.encode(writer).await?, + DataType::UInt16 => 1_u8.encode(writer).await?, + DataType::UInt32 => 2_u8.encode(writer).await?, + DataType::UInt64 => 3_u8.encode(writer).await?, + DataType::Int8 => 4_u8.encode(writer).await?, + DataType::Int16 => 5_u8.encode(writer).await?, + DataType::Int32 => 6_u8.encode(writer).await?, + DataType::Int64 => 7_u8.encode(writer).await?, + DataType::String => 8_u8.encode(writer).await?, + DataType::Boolean => 9_u8.encode(writer).await?, + DataType::Bytes => 10_u8.encode(writer).await?, + DataType::List(desc) => { + 11_u8.encode(writer).await?; + desc.is_nullable.encode(writer).await?; + match desc.datatype { + DataType::UInt8 => 0_u8.encode(writer).await?, + DataType::UInt16 => 1_u8.encode(writer).await?, + DataType::UInt32 => 2_u8.encode(writer).await?, + DataType::UInt64 => 3_u8.encode(writer).await?, + DataType::Int8 => 4_u8.encode(writer).await?, + DataType::Int16 => 5_u8.encode(writer).await?, + DataType::Int32 => 6_u8.encode(writer).await?, + DataType::Int64 => 7_u8.encode(writer).await?, + DataType::String => 8_u8.encode(writer).await?, + DataType::Boolean => 9_u8.encode(writer).await?, + DataType::Bytes => 10_u8.encode(writer).await?, + DataType::List(_) => unimplemented!("Vec> is not supporte yet"), + } + } + }; + Ok(()) + } + + fn size(&self) -> usize { + match self { + DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::String + | DataType::Boolean + | DataType::Bytes => 1, + DataType::List(_) => 3, + } + } +} + +impl Decode for DataType { + type Error = fusio::Error; + + async fn decode(reader: &mut R) -> Result + where + R: fusio::SeqRead, + { + let tag = u8::decode(reader).await?; + Ok(match tag { + 0 => DataType::UInt8, + 1 => DataType::UInt16, + 2 => DataType::UInt32, + 3 => DataType::UInt64, + 4 => DataType::Int8, + 5 => DataType::Int16, + 6 => DataType::Int32, + 7 => DataType::Int64, + 8 => DataType::String, + 9 => DataType::Boolean, + 10 => DataType::Bytes, + 11 => { + let is_nullable = bool::decode(reader).await?; + let inner_datatype = u8::decode(reader).await?; + DataType::List(Arc::new(ValueDesc::new( + "".into(), + inner_datatype.into(), + is_nullable, + ))) + } + _ => panic!("invalid datatype tag"), + }) + } +} + +/// Cast the `Arc` to the rust native Vec. +#[macro_export] +macro_rules! arrow_array_to_native { + ($value:expr, $arrow_ty:expr, $native_ty:expr) => { + let arrow_array = $value.as_any().downcast_ref::<$arrow_ty>().unwrap(); + arrow_array + .iter() + .map(|v| v.unwrap().to_string()) + .collect::<$native_ty>() + }; +} + +/// Cast the `Arc` to the rust native Vec. +#[macro_export] +macro_rules! arrow_array_iter { + ($value:expr, $arrow_ty:ty) => { + $value.as_any().downcast_ref::<$arrow_ty>().unwrap().iter() + }; +} + /// Cast the `Arc` to the value of given type. #[macro_export] macro_rules! cast_arc_value { diff --git a/src/record/runtime/record.rs b/src/record/runtime/record.rs index 1f3a6941..60abaee9 100644 --- a/src/record/runtime/record.rs +++ b/src/record/runtime/record.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use fusio::SeqRead; use fusio_log::{Decode, Encode}; -use super::{schema::DynSchema, DataType, DynRecordRef, Value}; +use super::{schema::DynSchema, wrapped_value, DataType, DynRecordRef, Value}; use crate::{ cast_arc_value, record::{Record, RecordDecodeError}, @@ -57,6 +57,62 @@ impl Decode for DynRecord { DataType::Bytes => { Arc::new(cast_arc_value!(col.value, Option>).clone().unwrap()) } + DataType::List(desc) => match desc.datatype { + DataType::UInt8 => { + Arc::new(cast_arc_value!(col.value, Option>).clone().unwrap()) + } + DataType::UInt16 => Arc::new( + cast_arc_value!(col.value, Option>) + .clone() + .unwrap(), + ), + DataType::UInt32 => Arc::new( + cast_arc_value!(col.value, Option>) + .clone() + .unwrap(), + ), + DataType::UInt64 => Arc::new( + cast_arc_value!(col.value, Option>) + .clone() + .unwrap(), + ), + DataType::Int8 => { + Arc::new(cast_arc_value!(col.value, Option>).clone().unwrap()) + } + DataType::Int16 => Arc::new( + cast_arc_value!(col.value, Option>) + .clone() + .unwrap(), + ), + DataType::Int32 => Arc::new( + cast_arc_value!(col.value, Option>) + .clone() + .unwrap(), + ), + DataType::Int64 => Arc::new( + cast_arc_value!(col.value, Option>) + .clone() + .unwrap(), + ), + DataType::String => Arc::new( + cast_arc_value!(col.value, Option>) + .clone() + .unwrap(), + ), + DataType::Boolean => Arc::new( + cast_arc_value!(col.value, Option>) + .clone() + .unwrap(), + ), + DataType::Bytes => Arc::new( + cast_arc_value!(col.value, Option>>) + .clone() + .unwrap(), + ), + DataType::List(_) => { + unimplemented!("Vec> is not supporte yet") + } + }, }; } values.push(col); @@ -79,29 +135,14 @@ impl Record for DynRecord { for (idx, col) in self.values.iter().enumerate() { let datatype = col.datatype(); let is_nullable = col.is_nullable(); - let mut value = col.value.clone(); - if idx != self.primary_index && !is_nullable { - value = match datatype { - DataType::UInt8 => Arc::new(Some(*cast_arc_value!(col.value, u8))), - DataType::UInt16 => Arc::new(Some(*cast_arc_value!(col.value, u16))), - DataType::UInt32 => Arc::new(Some(*cast_arc_value!(col.value, u32))), - DataType::UInt64 => Arc::new(Some(*cast_arc_value!(col.value, u64))), - DataType::Int8 => Arc::new(Some(*cast_arc_value!(col.value, i8))), - DataType::Int16 => Arc::new(Some(*cast_arc_value!(col.value, i16))), - DataType::Int32 => Arc::new(Some(*cast_arc_value!(col.value, i32))), - DataType::Int64 => Arc::new(Some(*cast_arc_value!(col.value, i64))), - DataType::String => { - Arc::new(Some(cast_arc_value!(col.value, String).to_owned())) - } - DataType::Boolean => Arc::new(Some(*cast_arc_value!(col.value, bool))), - DataType::Bytes => { - Arc::new(Some(cast_arc_value!(col.value, Vec).to_owned())) - } - }; - } + let value = if idx != self.primary_index && !is_nullable { + wrapped_value(datatype, &col.value) + } else { + col.value.clone() + }; columns.push(Value::new( - datatype, + datatype.clone(), col.desc.name.to_owned(), value, is_nullable, @@ -159,10 +200,15 @@ macro_rules! dyn_record { #[cfg(test)] pub(crate) mod test { - use std::sync::Arc; + use std::{io::Cursor, sync::Arc}; + + use fusio_log::{Decode, Encode}; use super::{DynRecord, DynSchema}; - use crate::dyn_schema; + use crate::{ + dyn_schema, + record::{DataType, Record, Value, ValueDesc}, + }; #[allow(unused)] pub(crate) fn test_dyn_item_schema() -> DynSchema { @@ -202,4 +248,209 @@ pub(crate) mod test { } items } + + #[test] + fn test_dyn_record_ref() { + { + let desc = ValueDesc::new("".into(), DataType::String, false); + let record = DynRecord::new( + vec![ + Value::new(DataType::UInt32, "id".into(), Arc::new(1_u32), false), + Value::new( + DataType::List(Arc::new(desc.clone())), + "strs".to_string(), + Arc::new(vec![ + "abc".to_string(), + "xyz".to_string(), + "tonbo".to_string(), + ]), + false, + ), + Value::new( + DataType::List(Arc::new(desc.clone())), + "strs_option".to_string(), + Arc::new(Some(vec![ + "abc".to_string(), + "xyz".to_string(), + "tonbo".to_string(), + ])), + true, + ), + ], + 0, + ); + + let record_ref = record.as_record_ref(); + assert_eq!( + record_ref.columns.get(1).unwrap(), + &Value::new( + DataType::List(Arc::new(desc.clone())), + "strs".to_string(), + Arc::new(Some(vec![ + "abc".to_string(), + "xyz".to_string(), + "tonbo".to_string(), + ])), + false, + ) + ); + assert_eq!( + record_ref.columns.get(2).unwrap(), + &Value::new( + DataType::List(Arc::new(desc.clone())), + "strs".to_string(), + Arc::new(Some(vec![ + "abc".to_string(), + "xyz".to_string(), + "tonbo".to_string(), + ])), + true, + ) + ) + } + { + let record = DynRecord::new( + vec![ + Value::new(DataType::UInt32, "id".into(), Arc::new(1_u32), false), + Value::new( + DataType::List(Arc::new(ValueDesc::new( + "".into(), + DataType::Boolean, + false, + ))), + "bools".to_string(), + Arc::new(vec![true, false, false, true]), + false, + ), + Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::Bytes, true))), + "bytes".to_string(), + Arc::new(Some(vec![ + vec![0_u8, 1, 2, 3], + vec![10, 11, 12, 13], + vec![20, 21, 22, 23], + ])), + true, + ), + ], + 0, + ); + + let record_ref = record.as_record_ref(); + assert_eq!( + record_ref.columns.get(1).unwrap(), + &Value::new( + DataType::List(Arc::new(ValueDesc::new( + "".into(), + DataType::Boolean, + false, + ))), + "bools".to_string(), + Arc::new(Some(vec![true, false, false, true])), + false, + ) + ); + assert_eq!( + record_ref.columns.get(2).unwrap(), + &Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::Bytes, true))), + "bytes".to_string(), + Arc::new(Some(vec![ + vec![0_u8, 1, 2, 3], + vec![10, 11, 12, 13], + vec![20, 21, 22, 23], + ])), + true, + ) + ) + } + } + + #[tokio::test] + async fn test_encode_encode_dyn_record_list() { + use tokio::io::AsyncSeekExt; + + { + let desc = ValueDesc::new("".into(), DataType::String, false); + let record = DynRecord::new( + vec![ + Value::new(DataType::UInt32, "id".into(), Arc::new(1_u32), false), + Value::new( + DataType::List(Arc::new(desc.clone())), + "strs".to_string(), + Arc::new(vec![ + "abc".to_string(), + "xyz".to_string(), + "tonbo".to_string(), + ]), + false, + ), + Value::new( + DataType::List(Arc::new(desc.clone())), + "strs_option".to_string(), + Arc::new(Some(vec![ + "abc".to_string(), + "xyz".to_string(), + "tonbo".to_string(), + ])), + true, + ), + ], + 0, + ); + + let mut source = vec![]; + let mut cursor = Cursor::new(&mut source); + let record_ref = record.as_record_ref(); + record_ref.encode(&mut cursor).await.unwrap(); + + cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); + let decoded = DynRecord::decode(&mut cursor).await.unwrap(); + assert_eq!(decoded.values, record.values); + } + + { + let record = DynRecord::new( + vec![ + Value::new(DataType::UInt32, "id".into(), Arc::new(1_u32), false), + Value::new( + DataType::List(Arc::new(ValueDesc::new( + "".into(), + DataType::Boolean, + false, + ))), + "bools".to_string(), + Arc::new(vec![true, false, false, false, true]), + false, + ), + Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::Bytes, false))), + "bytes".to_string(), + Arc::new(Some(vec![ + vec![1_u8, 2, 3, 4], + vec![11, 22, 23, 24], + vec![31, 32, 33, 34], + ])), + true, + ), + Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::Int64, true))), + "none".to_string(), + Arc::new(None::>), + true, + ), + ], + 0, + ); + + let mut source = vec![]; + let mut cursor = Cursor::new(&mut source); + let record_ref = record.as_record_ref(); + record_ref.encode(&mut cursor).await.unwrap(); + + cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); + let decoded = DynRecord::decode(&mut cursor).await.unwrap(); + assert_eq!(decoded.values, record.values); + } + } } diff --git a/src/record/runtime/record_ref.rs b/src/record/runtime/record_ref.rs index 5a168453..d6056082 100644 --- a/src/record/runtime/record_ref.rs +++ b/src/record/runtime/record_ref.rs @@ -1,7 +1,10 @@ use std::{any::Any, marker::PhantomData, mem, sync::Arc}; use arrow::{ - array::{Array, ArrayRef, ArrowPrimitiveType, AsArray}, + array::{ + Array, ArrayRef, ArrowPrimitiveType, AsArray, BooleanArray, GenericBinaryArray, + PrimitiveArray, StringArray, + }, datatypes::{ Int16Type, Int32Type, Int64Type, Int8Type, Schema as ArrowSchema, UInt16Type, UInt32Type, UInt64Type, UInt8Type, @@ -90,10 +93,10 @@ impl<'r> RecordRef<'r> for DynRecordRef<'r> { let mut columns = vec![]; - for (idx, field) in full_schema.flattened_fields().iter().enumerate().skip(2) { + for (idx, field) in full_schema.fields().iter().enumerate().skip(2) { let datatype = DataType::from(field.data_type()); let schema = record_batch.schema(); - let flattened_fields = schema.flattened_fields(); + let flattened_fields = schema.fields(); let batch_field = flattened_fields .iter() .enumerate() @@ -108,7 +111,7 @@ impl<'r> RecordRef<'r> for DynRecordRef<'r> { } let col = record_batch.column(batch_field.unwrap().0); let is_nullable = field.is_nullable(); - let value = match datatype { + let value = match &datatype { DataType::UInt8 => Self::primitive_value::( col, offset, @@ -197,7 +200,146 @@ impl<'r> RecordRef<'r> for DynRecordRef<'r> { Arc::new(value) as Arc } } + DataType::List(desc) => { + let array = col.as_list::().value(offset); + match &desc.datatype { + DataType::UInt8 => { + let data = array + .as_any() + .downcast_ref::>() + .unwrap(); + let v = data.iter().map(|v| v.unwrap()).collect::>(); + if primary_index == idx - 2 { + Arc::new(v) as Arc + } else { + Arc::new((!array.is_null(offset)).then_some(v)) + } + } + DataType::UInt16 => { + let data = array + .as_any() + .downcast_ref::>() + .unwrap(); + let v = data.iter().map(|v| v.unwrap()).collect::>(); + if primary_index == idx - 2 { + Arc::new(v) as Arc + } else { + Arc::new((!array.is_null(offset)).then_some(v)) + } + } + DataType::UInt32 => { + let data = array + .as_any() + .downcast_ref::>() + .unwrap(); + let v = data.iter().map(|v| v.unwrap()).collect::>(); + if primary_index == idx - 2 { + Arc::new(v) as Arc + } else { + Arc::new((!array.is_null(offset)).then_some(v)) + } + } + DataType::UInt64 => { + let data = array + .as_any() + .downcast_ref::>() + .unwrap(); + let v = data.iter().map(|v| v.unwrap()).collect::>(); + if primary_index == idx - 2 { + Arc::new(v) as Arc + } else { + Arc::new((!array.is_null(offset)).then_some(v)) + } + } + DataType::Int8 => { + let data = array + .as_any() + .downcast_ref::>() + .unwrap(); + let v = data.iter().map(|v| v.unwrap()).collect::>(); + if primary_index == idx - 2 { + Arc::new(v) as Arc + } else { + Arc::new((!array.is_null(offset)).then_some(v)) + } + } + DataType::Int16 => { + let data = array + .as_any() + .downcast_ref::>() + .unwrap(); + let v = data.iter().map(|v| v.unwrap()).collect::>(); + if primary_index == idx - 2 { + Arc::new(v) as Arc + } else { + Arc::new((!array.is_null(offset)).then_some(v)) + } + } + DataType::Int32 => { + let data = array + .as_any() + .downcast_ref::>() + .unwrap(); + let v = data.iter().map(|v| v.unwrap()).collect::>(); + if primary_index == idx - 2 { + Arc::new(v) as Arc + } else { + Arc::new((!array.is_null(offset)).then_some(v)) + } + } + DataType::Int64 => { + let data = array + .as_any() + .downcast_ref::>() + .unwrap(); + let v = data.iter().map(|v| v.unwrap()).collect::>(); + if primary_index == idx - 2 { + Arc::new(v) as Arc + } else { + Arc::new((!array.is_null(offset)).then_some(v)) + } + } + DataType::String => { + let data = array.as_any().downcast_ref::().unwrap(); + let v = data + .iter() + .map(|v| v.unwrap().to_string()) + .collect::>(); + if primary_index == idx - 2 { + Arc::new(v) as Arc + } else { + Arc::new((!array.is_null(offset)).then_some(v)) + } + } + DataType::Boolean => { + let data = array.as_any().downcast_ref::().unwrap(); + let v = data.iter().map(|v| v.unwrap()).collect::>(); + if primary_index == idx - 2 { + Arc::new(v) as Arc + } else { + Arc::new((!array.is_null(offset)).then_some(v)) + } + } + DataType::Bytes => { + let data = array + .as_any() + .downcast_ref::>() + .unwrap(); + let v = data + .iter() + .map(|v| v.unwrap().to_vec()) + .collect::>>(); + if primary_index == idx - 2 { + Arc::new(v) as Arc + } else { + Arc::new((!array.is_null(offset)).then_some(v)) + } + } + DataType::List(_) => unimplemented!("Vec> is not supporte yet"), + } + } }; + columns.push(Value::new( datatype, field.name().to_owned(), @@ -230,6 +372,22 @@ impl<'r> RecordRef<'r> for DynRecordRef<'r> { DataType::String => col.value = Arc::>::new(None), DataType::Boolean => col.value = Arc::>::new(None), DataType::Bytes => col.value = Arc::>>::new(None), + DataType::List(field) => { + col.value = match &field.datatype { + DataType::UInt8 => Arc::>>::new(None), + DataType::UInt16 => Arc::>>::new(None), + DataType::UInt32 => Arc::>>::new(None), + DataType::UInt64 => Arc::>>::new(None), + DataType::Int8 => Arc::>>::new(None), + DataType::Int16 => Arc::>>::new(None), + DataType::Int32 => Arc::>>::new(None), + DataType::Int64 => Arc::>>::new(None), + DataType::String => Arc::>>::new(None), + DataType::Boolean => Arc::>>::new(None), + DataType::Bytes => Arc::>>>::new(None), + DataType::List(_) => unimplemented!("Vec> is not supporte yet"), + }; + } }; } } diff --git a/src/record/runtime/value.rs b/src/record/runtime/value.rs index 00ca53e2..0fc0ace7 100644 --- a/src/record/runtime/value.rs +++ b/src/record/runtime/value.rs @@ -1,17 +1,22 @@ -use std::{any::Any, fmt::Debug, hash::Hash, sync::Arc}; +use core::hash::{Hash, Hasher}; +use std::{any::Any, fmt::Debug, sync::Arc}; use arrow::{ array::{ - BooleanArray, GenericBinaryArray, Int16Array, Int32Array, Int64Array, Int8Array, - StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, + Array, BooleanArray, GenericBinaryArray, Int16Array, Int32Array, Int64Array, Int8Array, + ListArray, StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }, - datatypes::{DataType as ArrowDataType, Field}, + buffer::OffsetBuffer, + datatypes::{DataType as ArrowDataType, Field, FieldRef}, }; use fusio::{SeqRead, Write}; use fusio_log::{Decode, DecodeError, Encode}; use super::DataType; -use crate::record::{Key, KeyRef}; +use crate::{ + cast_arc_value, + record::{Key, KeyRef}, +}; #[derive(Debug, Clone)] pub struct ValueDesc { @@ -20,6 +25,48 @@ pub struct ValueDesc { pub name: String, } +impl PartialEq for ValueDesc { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + && self.datatype == other.datatype + && self.is_nullable == other.is_nullable + } +} +impl Eq for ValueDesc {} + +impl PartialOrd for ValueDesc { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ValueDesc { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.name + .cmp(&other.name) + .then_with(|| self.datatype.cmp(&other.datatype)) + .then_with(|| self.is_nullable.cmp(&other.is_nullable)) + } +} + +impl Hash for ValueDesc { + fn hash(&self, state: &mut H) { + self.name.hash(state); + self.datatype.hash(state); + self.is_nullable.hash(state); + } +} + +impl From<&Field> for ValueDesc { + fn from(field: &Field) -> Self { + Self { + datatype: field.data_type().into(), + is_nullable: field.is_nullable(), + name: field.name().to_owned(), + } + } +} + impl ValueDesc { pub fn new(name: String, datatype: DataType, is_nullable: bool) -> Self { Self { @@ -30,7 +77,7 @@ impl ValueDesc { } pub(crate) fn arrow_field(&self) -> Field { - let arrow_type = match self.datatype { + let arrow_type = match &self.datatype { DataType::UInt8 => ArrowDataType::UInt8, DataType::UInt16 => ArrowDataType::UInt16, DataType::UInt32 => ArrowDataType::UInt32, @@ -42,6 +89,29 @@ impl ValueDesc { DataType::String => ArrowDataType::Utf8, DataType::Boolean => ArrowDataType::Boolean, DataType::Bytes => ArrowDataType::Binary, + DataType::List(desc) => { + let array_ty = match &desc.datatype { + DataType::UInt8 => ArrowDataType::UInt8, + DataType::UInt16 => ArrowDataType::UInt16, + DataType::UInt32 => ArrowDataType::UInt32, + DataType::UInt64 => ArrowDataType::UInt64, + DataType::Int8 => ArrowDataType::Int8, + DataType::Int16 => ArrowDataType::Int16, + DataType::Int32 => ArrowDataType::Int32, + DataType::Int64 => ArrowDataType::Int64, + DataType::String => ArrowDataType::Utf8, + DataType::Boolean => ArrowDataType::Boolean, + DataType::Bytes => ArrowDataType::Binary, + DataType::List(_) => { + unimplemented!("`Vec> is not supported now") + } + }; + ArrowDataType::List(FieldRef::new(Field::new( + &desc.name, + array_ty, + desc.is_nullable, + ))) + } }; Field::new(&self.name, arrow_type, self.is_nullable) } @@ -67,7 +137,7 @@ impl Value { } pub(crate) fn with_none_value(datatype: DataType, name: String, is_nullable: bool) -> Self { - match datatype { + match &datatype { DataType::UInt8 => Self::new(datatype, name, Arc::>::new(None), is_nullable), DataType::UInt16 => { Self::new(datatype, name, Arc::>::new(None), is_nullable) @@ -103,11 +173,80 @@ impl Value { Arc::>>::new(None), is_nullable, ), + DataType::List(desc) => match &desc.datatype { + DataType::UInt8 => Self::new( + datatype, + name, + Arc::>>::new(None), + is_nullable, + ), + DataType::UInt16 => Self::new( + datatype, + name, + Arc::>>::new(None), + is_nullable, + ), + DataType::UInt32 => Self::new( + datatype, + name, + Arc::>>::new(None), + is_nullable, + ), + DataType::UInt64 => Self::new( + datatype, + name, + Arc::>>::new(None), + is_nullable, + ), + DataType::Int8 => Self::new( + datatype, + name, + Arc::>>::new(None), + is_nullable, + ), + DataType::Int16 => Self::new( + datatype, + name, + Arc::>>::new(None), + is_nullable, + ), + DataType::Int32 => Self::new( + datatype, + name, + Arc::>>::new(None), + is_nullable, + ), + DataType::Int64 => Self::new( + datatype, + name, + Arc::>>::new(None), + is_nullable, + ), + DataType::String => Self::new( + datatype, + name, + Arc::>>::new(None), + is_nullable, + ), + DataType::Boolean => Self::new( + datatype, + name, + Arc::>>::new(None), + is_nullable, + ), + DataType::Bytes => Self::new( + datatype, + name, + Arc::>>>::new(None), + is_nullable, + ), + DataType::List(_) => unimplemented!("Vec> is not supporte yet"), + }, } } - pub fn datatype(&self) -> DataType { - self.desc.datatype + pub fn datatype(&self) -> &DataType { + &self.desc.datatype } pub fn is_nullable(&self) -> bool { @@ -119,6 +258,42 @@ impl Value { } } +/// transform `Arc` to `Arc>` +pub(crate) fn wrapped_value( + datatype: &DataType, + value: &Arc, +) -> Arc { + match datatype { + DataType::UInt8 => Arc::new(Some(*cast_arc_value!(value, u8))), + DataType::UInt16 => Arc::new(Some(*cast_arc_value!(value, u16))), + DataType::UInt32 => Arc::new(Some(*cast_arc_value!(value, u32))), + DataType::UInt64 => Arc::new(Some(*cast_arc_value!(value, u64))), + DataType::Int8 => Arc::new(Some(*cast_arc_value!(value, i8))), + DataType::Int16 => Arc::new(Some(*cast_arc_value!(value, i16))), + DataType::Int32 => Arc::new(Some(*cast_arc_value!(value, i32))), + DataType::Int64 => Arc::new(Some(*cast_arc_value!(value, i64))), + DataType::String => Arc::new(Some(cast_arc_value!(value, String).to_owned())), + DataType::Boolean => Arc::new(Some(*cast_arc_value!(value, bool))), + DataType::Bytes => Arc::new(Some(cast_arc_value!(value, Vec).to_owned())), + DataType::List(desc) => match desc.datatype { + DataType::UInt8 => Arc::new(Some(cast_arc_value!(value, Vec).to_owned())), + DataType::UInt16 => Arc::new(Some(cast_arc_value!(value, Vec).to_owned())), + DataType::UInt32 => Arc::new(Some(cast_arc_value!(value, Vec).to_owned())), + DataType::UInt64 => Arc::new(Some(cast_arc_value!(value, Vec).to_owned())), + DataType::Int8 => Arc::new(Some(cast_arc_value!(value, Vec).to_owned())), + DataType::Int16 => Arc::new(Some(cast_arc_value!(value, Vec).to_owned())), + DataType::Int32 => Arc::new(Some(cast_arc_value!(value, Vec).to_owned())), + DataType::Int64 => Arc::new(Some(cast_arc_value!(value, Vec).to_owned())), + DataType::String => Arc::new(Some(cast_arc_value!(value, Vec).to_owned())), + DataType::Boolean => Arc::new(Some(cast_arc_value!(value, Vec).to_owned())), + DataType::Bytes => Arc::new(Some(cast_arc_value!(value, Vec>).to_owned())), + DataType::List(_) => { + unimplemented!("Vec> is not supporte yet") + } + }, + } +} + impl Eq for Value {} impl PartialOrd for Value { @@ -138,6 +313,17 @@ macro_rules! implement_col { .downcast_ref::<$Type>() .cmp(&other.value.downcast_ref::<$Type>()), )* + DataType::List(field) => { + match &field.datatype { + $( + DataType::$DataType => self + .value + .downcast_ref::>() + .cmp(&other.value.downcast_ref::>()), + )* + DataType::List(_) => unimplemented!("Vec> is not supported yet") + } + } } } } @@ -148,11 +334,40 @@ macro_rules! implement_col { && self.is_nullable() == other.is_nullable() && match self.datatype() { $( - DataType::$DataType => self + DataType::$DataType => { + if let Some(v) = self .value - .downcast_ref::<$Type>() - .eq(&other.value.downcast_ref::<$Type>()), + .downcast_ref::<$Type>() { + v.eq(other.value.downcast_ref::<$Type>().unwrap()) + } else { + self.value.downcast_ref::>().unwrap().eq(other.value.downcast_ref::>().unwrap()) + } + } )* + DataType::List(field) => { + match &field.datatype { + $( + DataType::$DataType => { + if let Some(v) = self + .value + .downcast_ref::>() { + v.eq(other.value.downcast_ref::>().unwrap()) + } else { + self.value + .downcast_ref::>>() + .expect(stringify!("unexpected datatype, can not convert to " Vec<$Type>)) + .eq(other.value.downcast_ref::>>() + .expect(stringify!("unexpected datatype, can not convert to " Vec<$Type>))) + } + // self + // .value + // .downcast_ref::>() + // .eq(&other.value.downcast_ref::>()), + } + )* + DataType::List(_) => unimplemented!("Vec> is not supported yet") + } + } } } } @@ -163,6 +378,14 @@ macro_rules! implement_col { $( DataType::$DataType => self.value.downcast_ref::<$Type>().hash(state), )* + DataType::List(field) => { + match &field.datatype { + $( + DataType::$DataType => self.value.downcast_ref::>().hash(state), + )* + DataType::List(_) => unimplemented!("Vec> is not supported yet") + } + } } } } @@ -179,11 +402,29 @@ macro_rules! implement_col { } else { debug_struct.field( "value", - self.value.as_ref().downcast_ref::>().unwrap(), + self.value.as_ref().downcast_ref::>().expect("can not convert value to {}"), ); } } )* + DataType::List(field) => { + match &field.datatype { + $( + DataType::$DataType => { + debug_struct.field("datatype", &stringify!(Vec<$Type>)); + if let Some(value) = self.value.as_ref().downcast_ref::>() { + debug_struct.field("value", value); + } else { + debug_struct.field( + "value", + self.value.as_ref().downcast_ref::>>().unwrap(), + ); + } + } + )* + DataType::List(_) => unimplemented!("Vec> is not supported yet") + } + } } debug_struct.field("nullable", &self.is_nullable()).finish() } @@ -233,6 +474,31 @@ macro_rules! implement_key_col { .downcast_ref::>() .expect("unexpected datatype, expected bytes"), )), + DataType::List(desc) => { + let arr: Arc = match &desc.datatype { + $( + DataType::$DataType => { + Arc::new($Array::from_iter_values(cast_arc_value!(self.value, Vec<$Type>).clone())) + } + )* + DataType::Boolean => { + Arc::new(BooleanArray::from(cast_arc_value!(self.value, Vec).clone())) + }, + DataType::String => { + Arc::new(StringArray::from(cast_arc_value!(self.value, Vec).clone())) + }, + DataType::Bytes => { + Arc::new(GenericBinaryArray::::from_vec(vec![cast_arc_value!(self.value, Vec)])) + }, + DataType::List(_) => unimplemented!("Vec> is not supported yet") + }; + Arc::new(ListArray::new( + Arc::new(Field::new("", (&desc.datatype).into(), desc.is_nullable)), + OffsetBuffer::from_lengths([arr.len()]), + arr, + None + )) + }, } } } @@ -256,12 +522,11 @@ macro_rules! implement_decode_col { where R: SeqRead, { - let tag = u8::decode(reader).await?; - let datatype = Self::tag_to_datatype(tag); + let datatype = DataType::decode(reader).await?; let is_nullable = bool::decode(reader).await?; let is_some = !bool::decode(reader).await?; let value = - match datatype { + match &datatype { $( DataType::$DataType => match is_some { true => Arc::new(Option::<$Type>::decode(reader).await.map_err( @@ -274,6 +539,23 @@ macro_rules! implement_decode_col { false => Arc::new(<$Type>::decode(reader).await?) as Arc, }, )* + DataType::List(field) => { + match &field.datatype { + $( + DataType::$DataType => match is_some { + true => Arc::new(Option::>::decode(reader).await.map_err( + |err| match err { + DecodeError::Io(error) => fusio::Error::Io(error), + DecodeError::Fusio(error) => error, + DecodeError::Inner(error) => fusio::Error::Other(Box::new(error)), + }, + )?) as Arc, + false => Arc::new(>::decode(reader).await?) as Arc, + }, + )* + DataType::List(_) => unimplemented!("Vec> is not supported yet") + } + }, }; let name = String::decode(reader).await?; Ok(Value::new( @@ -296,7 +578,7 @@ macro_rules! implement_encode_col { where W: Write, { - Self::tag(self.datatype()).encode(writer).await?; + self.datatype().encode(writer).await?; self.is_nullable().encode(writer).await?; match self.datatype() { $( @@ -316,13 +598,35 @@ macro_rules! implement_encode_col { } } )* + DataType::List(field) => { + match &field.datatype { + $( + DataType::$DataType => { + if let Some(value) = self.value.as_ref().downcast_ref::>() { + true.encode(writer).await?; + value.encode(writer).await? + } else { + false.encode(writer).await?; + self.value + .as_ref() + .downcast_ref::>>() + .unwrap() + .encode(writer) + .await + .map_err(|err| fusio::Error::Other(Box::new(err)))?; + } + } + )* + DataType::List(_) => unimplemented!("Vec> is not supported yet") + } + }, }; self.desc.name.encode(writer).await?; Ok(()) } fn size(&self) -> usize { - 3 + self.desc.name.size() + match self.desc.datatype { + 2 + self.desc.name.size() + self.datatype().size() + match self.datatype() { $( DataType::$DataType => { if let Some(value) = self.value.as_ref().downcast_ref::<$Type>() { @@ -336,65 +640,43 @@ macro_rules! implement_encode_col { } } )* + DataType::List(field) => { + match &field.datatype { + $( + DataType::$DataType => { + if let Some(value) = self.value.as_ref().downcast_ref::>() { + value.size() + } else { + self.value + .as_ref() + .downcast_ref::>>() + .unwrap() + .size() + } + } + )* + DataType::List(_) => unimplemented!("Vec> is not supported yet") + } + }, } } } } } -impl Value { - fn tag(datatype: DataType) -> u8 { - match datatype { - DataType::UInt8 => 0, - DataType::UInt16 => 1, - DataType::UInt32 => 2, - DataType::UInt64 => 3, - DataType::Int8 => 4, - DataType::Int16 => 5, - DataType::Int32 => 6, - DataType::Int64 => 7, - DataType::String => 8, - DataType::Boolean => 9, - DataType::Bytes => 10, - } - } - - fn tag_to_datatype(tag: u8) -> DataType { - match tag { - 0 => DataType::UInt8, - 1 => DataType::UInt16, - 2 => DataType::UInt32, - 3 => DataType::UInt64, - 4 => DataType::Int8, - 5 => DataType::Int16, - 6 => DataType::Int32, - 7 => DataType::Int64, - 8 => DataType::String, - 9 => DataType::Boolean, - 10 => DataType::Bytes, - _ => panic!("invalid datatype tag"), - } - } -} - impl From<&ValueDesc> for Field { fn from(col: &ValueDesc) -> Self { - match col.datatype { - DataType::UInt8 => Field::new(&col.name, ArrowDataType::UInt8, col.is_nullable), - DataType::UInt16 => Field::new(&col.name, ArrowDataType::UInt16, col.is_nullable), - DataType::UInt32 => Field::new(&col.name, ArrowDataType::UInt32, col.is_nullable), - DataType::UInt64 => Field::new(&col.name, ArrowDataType::UInt64, col.is_nullable), - DataType::Int8 => Field::new(&col.name, ArrowDataType::Int8, col.is_nullable), - DataType::Int16 => Field::new(&col.name, ArrowDataType::Int16, col.is_nullable), - DataType::Int32 => Field::new(&col.name, ArrowDataType::Int32, col.is_nullable), - DataType::Int64 => Field::new(&col.name, ArrowDataType::Int64, col.is_nullable), - DataType::String => Field::new(&col.name, ArrowDataType::Utf8, col.is_nullable), - DataType::Boolean => Field::new(&col.name, ArrowDataType::Boolean, col.is_nullable), - DataType::Bytes => Field::new(&col.name, ArrowDataType::Binary, col.is_nullable), - } + col.arrow_field() } } +#[macro_export] +macro_rules! value { + ($name: expr, $ty: expr, $nullable: expr, $value: expr) => {{ + $crate::record::Value::new($ty, $name.into(), std::sync::Arc::new($value), $nullable) + }}; +} + macro_rules! for_datatype { ($macro:tt $(, $x:tt)*) => { $macro! { @@ -421,3 +703,161 @@ implement_key_col!( for_datatype! { implement_col } for_datatype! { implement_decode_col } for_datatype! { implement_encode_col } + +#[cfg(test)] +mod tests { + use std::{io::Cursor, sync::Arc}; + + use arrow::{ + array::{AsArray, BooleanArray, PrimitiveArray, StringArray}, + datatypes::UInt64Type, + }; + use fusio_log::{Decode, Encode}; + + use crate::record::{DataType, Key, Value, ValueDesc}; + + #[tokio::test] + async fn test_encode_decode_list() { + use tokio::io::AsyncSeekExt; + { + let v = Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::UInt64, false))), + "u64s".into(), + Arc::new(vec![1_u64, 2, 3, 4]), + false, + ); + + let mut source = vec![]; + let mut cursor = Cursor::new(&mut source); + v.encode(&mut cursor).await.unwrap(); + + cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); + let decoded = Value::decode(&mut cursor).await.unwrap(); + assert_eq!(v, decoded); + } + { + let v = Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::UInt64, false))), + "u64s".into(), + Arc::new(Some(vec![1_u64, 2, 3, 4])), + true, + ); + + let mut source = vec![]; + let mut cursor = Cursor::new(&mut source); + v.encode(&mut cursor).await.unwrap(); + + cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); + let decoded = Value::decode(&mut cursor).await.unwrap(); + assert_eq!(v, decoded); + } + { + let v = Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::Bytes, false))), + "bytes".into(), + Arc::new(Some(vec![ + vec![1_u8, 2, 3, 4], + vec![72, 83, 94], + vec![112, 113, 124], + ])), + true, + ); + + let mut source = vec![]; + let mut cursor = Cursor::new(&mut source); + v.encode(&mut cursor).await.unwrap(); + + cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); + let decoded = Value::decode(&mut cursor).await.unwrap(); + assert_eq!(v, decoded); + } + } + + #[test] + fn test_key_ref_list() { + { + let v = Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::UInt64, false))), + "u64s".into(), + Arc::new(vec![1_u64, 2, 3, 4]), + false, + ); + + let vref = v.as_key_ref(); + assert_eq!(v, vref); + } + { + let v = Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::UInt64, true))), + "u64s".into(), + Arc::new(Some(vec![1_u64, 2, 3, 4])), + false, + ); + + let vref = v.as_key_ref(); + assert_eq!(v, vref); + } + } + + #[test] + fn test_key_list_datum() { + { + let v = Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::UInt64, false))), + "u64s".into(), + Arc::new(vec![1_u64, 2, 3, 4]), + false, + ); + let datum = v.to_arrow_datum(); + assert_eq!( + datum + .get() + .0 + .as_list::() + .value(0) + .as_primitive::(), + &PrimitiveArray::from_iter([1_u64, 2, 3, 4]) + ); + } + { + let v = Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::String, false))), + "u64s".into(), + Arc::new(vec![ + "1_u64".to_string(), + "2".to_string(), + "3".to_string(), + "4".to_string(), + ]), + false, + ); + let datum = v.to_arrow_datum(); + assert_eq!( + datum.get().0.as_list::().value(0).as_string(), + &StringArray::from(vec![ + "1_u64".to_string(), + "2".to_string(), + "3".to_string(), + "4".to_string() + ]) + ); + } + { + let v = Value::new( + DataType::List(Arc::new(ValueDesc::new( + "".into(), + DataType::Boolean, + false, + ))), + "u64s".into(), + Arc::new(vec![true, false, false, true]), + false, + ); + let datum = v.to_arrow_datum(); + assert_eq!( + datum.get().0.as_list::().value(0).as_boolean(), + &BooleanArray::from(vec![true, false, false, true]) + ); + } + } +} diff --git a/src/transaction.rs b/src/transaction.rs index 3b6f2a92..b10ab50a 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -925,7 +925,7 @@ mod tests { assert_eq!(record_ref.columns.len(), 3); let col = record_ref.columns.first().unwrap(); - assert_eq!(col.datatype(), DataType::Int8); + assert_eq!(col.datatype(), &DataType::Int8); let name = col.value.as_ref().downcast_ref::(); assert!(name.is_some()); assert_eq!(*name.unwrap(), 1); @@ -954,21 +954,21 @@ mod tests { dbg!(columns.clone()); let primary_key_col = columns.first().unwrap(); - assert_eq!(primary_key_col.datatype(), DataType::Int8); + assert_eq!(primary_key_col.datatype(), &DataType::Int8); assert_eq!( *primary_key_col.value.as_ref().downcast_ref::().unwrap(), 1 ); let col = columns.get(1).unwrap(); - assert_eq!(col.datatype(), DataType::Int16); + assert_eq!(col.datatype(), &DataType::Int16); assert_eq!( *col.value.as_ref().downcast_ref::>().unwrap(), Some(180) ); let col = columns.get(2).unwrap(); - assert_eq!(col.datatype(), DataType::Int32); + assert_eq!(col.datatype(), &DataType::Int32); let weight = col.value.as_ref().downcast_ref::>(); assert!(weight.is_some()); assert_eq!(*weight.unwrap(), Some(56_i32)); diff --git a/tests/dyn_record.rs b/tests/dyn_record.rs new file mode 100644 index 00000000..af14bf28 --- /dev/null +++ b/tests/dyn_record.rs @@ -0,0 +1,293 @@ +#[cfg(all(test, feature = "tokio"))] +mod tests { + use std::{ops::Bound, sync::Arc}; + + use futures::StreamExt; + use parquet::arrow::{ArrowSchemaConverter, ProjectionMask}; + use tempfile::TempDir; + use tonbo::{ + executor::tokio::TokioExecutor, + record::{DataType, DynRecord, DynSchema, Record, RecordRef, Schema, Value, ValueDesc}, + DbOption, Path, Projection, DB, + }; + + fn test_dyn_item_schema() -> DynSchema { + DynSchema::new( + vec![ + ValueDesc::new("id".into(), DataType::UInt32, false), + ValueDesc::new( + "bools".into(), + DataType::List(Arc::new(ValueDesc::new( + "".into(), + DataType::Boolean, + false, + ))), + true, + ), + ValueDesc::new( + "bytes".into(), + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::Bytes, false))), + true, + ), + ValueDesc::new( + "i64s".into(), + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::Int64, false))), + true, + ), + ValueDesc::new( + "u16s".into(), + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::UInt16, false))), + false, + ), + ValueDesc::new( + "strs".into(), + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::String, false))), + false, + ), + ], + 0, + ) + } + + fn generate_record(i: usize) -> DynRecord { + DynRecord::new( + vec![ + Value::new(DataType::UInt32, "id".into(), Arc::new(i as u32), false), + Value::new( + DataType::List(Arc::new(ValueDesc::new( + "".into(), + DataType::Boolean, + false, + ))), + "bools".to_string(), + Arc::new(Some(vec![true, i % 3 == 0, i % 4 == 0, i % 5 == 0, false])), + true, + ), + Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::Bytes, false))), + "bytes".to_string(), + Arc::new(Some(vec![ + format!("{i}@tonbo").as_bytes().to_vec(), + format!("{}@tonbo", i).as_bytes().to_vec(), + format!("{}@tonbo", i * 2).as_bytes().to_vec(), + format!("{}@tonbo", i * 3).as_bytes().to_vec(), + ])), + true, + ), + Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::Int64, false))), + "i64s".to_string(), + Arc::new(Some(vec![ + 1231_i64, + 201 * i as i64, + 379 * i as i64, + 493 * i as i64, + ])), + true, + ), + Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::UInt16, false))), + "u16s".to_string(), + Arc::new(vec![1_u16, 2, 3, 4]), + false, + ), + Value::new( + DataType::List(Arc::new(ValueDesc::new("".into(), DataType::String, false))), + "strs".to_string(), + Arc::new(vec![ + format!("{i}@tonbo"), + format!("{}@tonbo", i * 3), + format!("{}@tonbo", i * 4), + format!("{}@tonbo", i * 7), + ]), + false, + ), + ], + 0, + ) + } + + fn test_dyn_items() -> Vec { + let mut items = Vec::with_capacity(50); + for i in 0..50 { + items.push(generate_record(i)); + } + items + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_read_write_dyn_record_mem() { + let temp_dir = TempDir::new().unwrap(); + + let dyn_schema = test_dyn_item_schema(); + let arrow_schema = dyn_schema.arrow_schema().clone(); + let option = DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &dyn_schema, + ); + + let db: DB = + DB::new(option, TokioExecutor::current(), dyn_schema) + .await + .unwrap(); + + for (i, item) in test_dyn_items().into_iter().enumerate() { + if i == 28 { + db.remove(item.key()).await.unwrap(); + } else { + db.insert(item).await.unwrap(); + } + } + + // test get + { + let tx = db.transaction().await; + + for i in 0..50 { + let key = Value::new( + DataType::UInt32, + "id".to_string(), + Arc::new(i as u32), + false, + ); + let option1 = tx.get(&key, Projection::All).await.unwrap(); + if i == 28 { + assert!(option1.is_none()); + continue; + } + let entry = option1.unwrap(); + let record_ref = entry.get(); + + let expected = generate_record(i); + assert_eq!(record_ref.columns, expected.as_record_ref().columns); + } + } + + // test scan + { + let tx = db.transaction().await; + let lower = Value::new(DataType::UInt32, "id".to_owned(), Arc::new(0_u32), false); + let upper = Value::new(DataType::UInt32, "id".to_owned(), Arc::new(49_u32), false); + let mut scan = tx + .scan((Bound::Included(&lower), Bound::Included(&upper))) + .projection(&["id", "bools", "bytes", "strs"]) + .take() + .await + .unwrap(); + + let mut i = 0_i64; + while let Some(entry) = scan.next().await.transpose().unwrap() { + if i == 28 { + assert!(entry.value().is_none()); + i += 1; + continue; + } + let record_ref = entry.value().unwrap(); + let expected = generate_record(i as usize); + let mut expected_ref = expected.as_record_ref(); + let mask = ProjectionMask::roots( + &ArrowSchemaConverter::new().convert(&arrow_schema).unwrap(), + [0, 1, 2, 3, 4, 7], + ); + expected_ref.projection(&mask); + + assert_eq!(record_ref.columns, expected_ref.columns); + i += 1; + } + assert_eq!(i, 50); + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_read_write_dyn_record_from_disk() { + let temp_dir = TempDir::new().unwrap(); + + let dyn_schema = test_dyn_item_schema(); + let arrow_schema = dyn_schema.arrow_schema().clone(); + let option = DbOption::new( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + &dyn_schema, + ) + .immutable_chunk_num(1) + .major_threshold_with_sst_size(2) + .level_sst_magnification(1) + .max_sst_file_size(1024) + .major_default_oldest_table_num(1); + + let db: DB = + DB::new(option, TokioExecutor::current(), dyn_schema) + .await + .unwrap(); + + for (i, item) in test_dyn_items().into_iter().enumerate() { + if i == 28 { + db.remove(item.key()).await.unwrap(); + } else { + db.insert(item).await.unwrap(); + } + if i == 5 { + db.flush().await.unwrap(); + } + } + db.flush().await.unwrap(); + + // test get + { + let tx = db.transaction().await; + + for i in 0..50 { + let key = Value::new( + DataType::UInt32, + "id".to_string(), + Arc::new(i as u32), + false, + ); + let option1 = tx.get(&key, Projection::All).await.unwrap(); + if i == 28 { + assert!(option1.is_none()); + continue; + } + let entry = option1.unwrap(); + let record_ref = entry.get(); + // dbg!(record_ref.columns.clone()); + + let expected = generate_record(i); + assert_eq!(record_ref.columns, expected.as_record_ref().columns); + } + } + // test scan + { + let tx = db.transaction().await; + let lower = Value::new(DataType::UInt32, "id".to_owned(), Arc::new(0_u32), false); + let upper = Value::new(DataType::UInt32, "id".to_owned(), Arc::new(49_u32), false); + let mut scan = tx + .scan((Bound::Included(&lower), Bound::Included(&upper))) + .projection(&["id", "bools", "bytes", "strs"]) + .take() + .await + .unwrap(); + + let mut i = 0_i64; + while let Some(entry) = scan.next().await.transpose().unwrap() { + if i == 28 { + assert!(entry.value().is_none()); + i += 1; + continue; + } + let record_ref = entry.value().unwrap(); + let expected = generate_record(i as usize); + let mut expected_ref = expected.as_record_ref(); + // dbg!(record_ref.columns.clone()); + let mask = ProjectionMask::roots( + &ArrowSchemaConverter::new().convert(&arrow_schema).unwrap(), + [0, 1, 2, 3, 4, 7], + ); + expected_ref.projection(&mask); + + assert_eq!(record_ref.columns, expected_ref.columns); + i += 1; + } + assert_eq!(i, 50); + } + } +} diff --git a/tests/wasm.rs b/tests/wasm.rs index 2b84fc69..e8caef9a 100644 --- a/tests/wasm.rs +++ b/tests/wasm.rs @@ -7,7 +7,7 @@ mod tests { use futures::StreamExt; use tonbo::{ executor::opfs::OpfsExecutor, - record::{DataType, DynRecord, DynSchema, Record, Value, ValueDesc}, + record::{DataType, DynRecord, DynSchema, Record, RecordRef, Schema, Value, ValueDesc}, DbOption, Projection, DB, }; use wasm_bindgen_test::wasm_bindgen_test; @@ -183,7 +183,7 @@ mod tests { let columns = entry.value().unwrap().columns; let primary_key_col = columns.first().unwrap(); - assert_eq!(primary_key_col.datatype(), DataType::Int64); + assert_eq!(primary_key_col.datatype(), &DataType::Int64); assert_eq!(primary_key_col.desc.name, "id".to_string()); assert_eq!( *primary_key_col @@ -195,21 +195,21 @@ mod tests { ); let col = columns.get(1).unwrap(); - assert_eq!(col.datatype(), DataType::Int8); + assert_eq!(col.datatype(), &DataType::Int8); assert_eq!(col.desc.name, "age".to_string()); let age = col.value.as_ref().downcast_ref::>(); assert!(age.is_some()); assert_eq!(age.unwrap(), &None); let col = columns.get(2).unwrap(); - assert_eq!(col.datatype(), DataType::String); + assert_eq!(col.datatype(), &DataType::String); assert_eq!(col.desc.name, "name".to_string()); let name = col.value.as_ref().downcast_ref::>(); assert!(name.is_some()); assert_eq!(name.unwrap(), &Some(i.to_string())); let col = columns.get(4).unwrap(); - assert_eq!(col.datatype(), DataType::Bytes); + assert_eq!(col.datatype(), &DataType::Bytes); assert_eq!(col.desc.name, "bytes".to_string()); let bytes = col.value.as_ref().downcast_ref::>>(); assert!(bytes.is_some()); @@ -258,7 +258,6 @@ mod tests { let tx = db.transaction().await; let mut scan = tx .scan((Bound::Unbounded, Bound::Unbounded)) - .projection(&["id", "age", "name"]) .take() .await .unwrap(); @@ -274,6 +273,44 @@ mod tests { } } } + { + use parquet::arrow::{ArrowSchemaConverter, ProjectionMask}; + // test projection + + let mut sort_items = BTreeMap::new(); + for item in test_dyn_items() { + sort_items.insert(item.key(), item); + } + + let tx = db.transaction().await; + let mut scan = tx + .scan((Bound::Unbounded, Bound::Unbounded)) + .projection(&["id", "age", "name"]) + .take() + .await + .unwrap(); + + while let Some(entry) = scan.next().await.transpose().unwrap() { + let columns1 = entry.value().unwrap().columns; + let (_, record) = sort_items.pop_first().unwrap(); + + let schema = test_dyn_item_schema(); + let mask = ProjectionMask::roots( + &ArrowSchemaConverter::new() + .convert(schema.arrow_schema()) + .unwrap(), + [2, 3, 4], + ); + let mut record_ref = record.as_record_ref(); + record_ref.projection(&mask); + let columns2 = record_ref.columns; + + assert_eq!(columns1.len(), columns2.len()); + for i in 0..columns1.len() { + assert_eq!(columns1.get(i), columns2.get(i)); + } + } + } db.flush_wal().await.unwrap(); drop(db); remove("opfs_dir").await; @@ -350,7 +387,7 @@ mod tests { let columns = entry.value().unwrap().columns; let primary_key_col = columns.first().unwrap(); - assert_eq!(primary_key_col.datatype(), DataType::Int64); + assert_eq!(primary_key_col.datatype(), &DataType::Int64); assert_eq!(primary_key_col.desc.name, "id".to_string()); assert_eq!( *primary_key_col @@ -362,21 +399,21 @@ mod tests { ); let col = columns.get(1).unwrap(); - assert_eq!(col.datatype(), DataType::Int8); + assert_eq!(col.datatype(), &DataType::Int8); assert_eq!(col.desc.name, "age".to_string()); let age = col.value.as_ref().downcast_ref::>(); assert!(age.is_some()); assert_eq!(age.unwrap(), &Some(i as i8)); let col = columns.get(2).unwrap(); - assert_eq!(col.datatype(), DataType::String); + assert_eq!(col.datatype(), &DataType::String); assert_eq!(col.desc.name, "name".to_string()); let name = col.value.as_ref().downcast_ref::>(); assert!(name.is_some()); assert_eq!(name.unwrap(), &Some(i.to_string())); let col = columns.get(4).unwrap(); - assert_eq!(col.datatype(), DataType::Bytes); + assert_eq!(col.datatype(), &DataType::Bytes); assert_eq!(col.desc.name, "bytes".to_string()); let bytes = col.value.as_ref().downcast_ref::>>(); assert!(bytes.unwrap().is_none());