diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a8fdcc..ed8bff3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -# [Unreleased] +## [Unreleased] + +### Fixed +- Named schema resolution outside of union variants. + +### Updated +- Documentation. ## 0.2.0 - 2020-10-10 diff --git a/src/reader.rs b/src/reader.rs index ca9f70b..aeed588 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -485,7 +485,7 @@ pub(crate) fn decode_with_resolution( pub(crate) fn decode( schema: &Variant, reader: &mut R, - r_cxt: &Registry, + w_cxt: &Registry, ) -> Result { let value = match schema { Variant::Null => Value::Null, @@ -538,7 +538,7 @@ pub(crate) fn decode( let mut it = Vec::with_capacity(block_count as usize); for _ in 0..block_count { - let decoded = decode(&**items, reader, r_cxt)?; + let decoded = decode(&**items, reader, w_cxt)?; it.push(decoded); } @@ -550,7 +550,7 @@ pub(crate) fn decode( let mut hm = HashMap::new(); for _ in 0..block_count { let key = decode_string(reader)?; - let value = decode(values, reader, r_cxt)?; + let value = decode(values, reader, w_cxt)?; hm.insert(key, value); } @@ -560,7 +560,7 @@ pub(crate) fn decode( let mut v = IndexMap::with_capacity(fields.len()); for (field_name, field) in fields { let field_name = field_name.to_string(); - let field_value = decode(&field.ty, reader, r_cxt)?; + let field_value = decode(&field.ty, reader, w_cxt)?; let field_value = FieldValue::new(field_value); v.insert(field_name, field_value); } @@ -571,15 +571,22 @@ pub(crate) fn decode( }; Value::Record(rec) } + Variant::Fixed { size, .. } => { + let mut buf = vec![0; *size]; + reader + .read_exact(&mut buf) + .map_err(AvrowErr::DecodeFailed)?; + Value::Fixed(buf) + } Variant::Union { variants } => { let variant_idx: i64 = reader.read_varint().map_err(AvrowErr::DecodeFailed)?; - decode(&variants[variant_idx as usize], reader, r_cxt)? + decode(&variants[variant_idx as usize], reader, w_cxt)? } Variant::Named(schema_name) => { - let schema_variant = r_cxt + let schema_variant = w_cxt .get(schema_name) .ok_or(AvrowErr::NamedSchemaNotFound)?; - decode(schema_variant, reader, r_cxt)? + decode(schema_variant, reader, w_cxt)? } a => { return Err(AvrowErr::DecodeFailed(Error::new( diff --git a/src/value.rs b/src/value.rs index 90b9d79..9589682 100644 --- a/src/value.rs +++ b/src/value.rs @@ -71,7 +71,7 @@ impl Record { } /// Creates a record from a [BTreeMap](https://doc.rust-lang.org/std/collections/struct.BTreeMap.html) by consuming it. - /// The values in btree must implement Into. The name provided must match with the name in the record + /// The values in BTreeMap must implement Into. The name provided must match with the name in the record /// schema being provided to the writer. pub fn from_btree + Ord + Display, V: Into>( name: &str, @@ -242,9 +242,13 @@ impl Value { .write_f64::(*d) .map_err(AvrowErr::EncodeFailed)?; } + (ref value, Variant::Named(name)) => { + if let Some(schema) = cxt.get(name) { + value.encode(writer, schema, cxt)?; + } + } // Match with union happens first than more specific match arms (ref value, Variant::Union { variants, .. }) => { - // the get index function returns the index if the value's schema is in the variants of the union let (union_idx, schema) = resolve_union(&value, &variants, cxt)?; let union_idx = union_idx as i32; writer @@ -301,7 +305,6 @@ impl Value { .write_varint(idx as i32) .map_err(AvrowErr::EncodeFailed)?; } else { - // perf issues on creating error objects? return Err(AvrowErr::SchemaDataMismatch); } } @@ -639,6 +642,44 @@ mod tests { let _r = Record::from_btree("test", rec).unwrap(); } + #[derive(Debug, Serialize, Deserialize, PartialEq)] + struct SomeRecord { + one: Vec, + two: Vec, + } + + #[test] + fn named_schema_resolves() { + let schema = r##" + { + "type": "record", + "name": "SomeRecord", + "aliases": ["MyRecord"], + "fields" : [ + {"name": "one", "type":{"type": "fixed", "size": 5, "name": "md5"}}, + {"name": "two", "type":"md5"} + ] + } + "##; + + let schema = crate::Schema::from_str(schema).unwrap(); + let mut writer = crate::Writer::with_codec(&schema, vec![], crate::Codec::Null).unwrap(); + + let value = SomeRecord { + one: vec![0u8, 1, 2, 3, 4], + two: vec![0u8, 1, 2, 3, 4], + }; + + writer.serialize(&value).unwrap(); + + let output = writer.into_inner().unwrap(); + let reader = crate::Reader::new(output.as_slice()).unwrap(); + for i in reader { + let r: SomeRecord = from_value(&i).unwrap(); + assert_eq!(r, value); + } + } + #[derive(Debug, Serialize, Deserialize)] struct Mentees { id: i32, diff --git a/src/writer.rs b/src/writer.rs index 6ab09d4..e3a7a11 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -1,17 +1,17 @@ //! The Writer is the primary interface for writing values in avro encoded format. use crate::codec::Codec; -use crate::schema::Schema; -use crate::value::Value; use crate::config::{DEFAULT_FLUSH_INTERVAL, MAGIC_BYTES, SYNC_MARKER_SIZE}; use crate::error::{AvrowErr, AvrowResult}; use crate::schema::Registry; +use crate::schema::Schema; use crate::schema::Variant; use crate::serde_avro; use crate::util::{encode_long, encode_raw_bytes}; use crate::value::Map; -use serde::Serialize; +use crate::value::Value; use rand::{thread_rng, Rng}; +use serde::Serialize; use std::collections::HashMap; use std::default::Default; use std::io::Write;