Skip to content

Commit

Permalink
AVRO-4055: Validate records properly when parsing a schema
Browse files Browse the repository at this point in the history
  • Loading branch information
woile committed Sep 27, 2024
1 parent b656664 commit c55dee6
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 23 deletions.
3 changes: 3 additions & 0 deletions avro/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ pub enum Error {
#[error("Invalid namespace {0}. It must match the regex '{1}'")]
InvalidNamespace(String, &'static str),

#[error("Invalid schema: The record type should be inside the type field")]
InvalidSchemaRecord(String),

#[error("Duplicate enum symbol {0}")]
EnumSymbolDuplicate(String),

Expand Down
80 changes: 75 additions & 5 deletions avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ impl RecordField {
validate_record_field_name(&name)?;

// TODO: "type" = "<record name>"
let schema = parser.parse_complex(field, &enclosing_record.namespace)?;
let schema = parser.parse_complex(field, &enclosing_record.namespace, true)?;

let default = field.get("default").cloned();
Self::resolve_default_value(
Expand Down Expand Up @@ -1232,7 +1232,7 @@ impl Parser {
fn parse(&mut self, value: &Value, enclosing_namespace: &Namespace) -> AvroResult<Schema> {
match *value {
Value::String(ref t) => self.parse_known_schema(t.as_str(), enclosing_namespace),
Value::Object(ref data) => self.parse_complex(data, enclosing_namespace),
Value::Object(ref data) => self.parse_complex(data, enclosing_namespace, false),
Value::Array(ref data) => self.parse_union(data, enclosing_namespace),
_ => Err(Error::ParseSchemaFromValidJson),
}
Expand Down Expand Up @@ -1294,6 +1294,11 @@ impl Parser {
return Ok(resolving_schema.clone());
}

// For good error reporting we add this check
if name.name == *"record" {
return Err(Error::InvalidSchemaRecord(name.to_string()));
}

let value = self
.input_schemas
.remove(&fully_qualified_name)
Expand Down Expand Up @@ -1353,6 +1358,7 @@ impl Parser {
&mut self,
complex: &Map<String, Value>,
enclosing_namespace: &Namespace,
from_field: bool,
) -> AvroResult<Schema> {
// Try to parse this as a native complex type.
fn parse_as_native_complex(
Expand Down Expand Up @@ -1542,14 +1548,19 @@ impl Parser {
}
match complex.get("type") {
Some(Value::String(t)) => match t.as_str() {
"record" => self.parse_record(complex, enclosing_namespace),
"record" => {
if from_field {
return self.fetch_schema_ref(t, enclosing_namespace);
}
self.parse_record(complex, enclosing_namespace)
}
"enum" => self.parse_enum(complex, enclosing_namespace),
"array" => self.parse_array(complex, enclosing_namespace),
"map" => self.parse_map(complex, enclosing_namespace),
"fixed" => self.parse_fixed(complex, enclosing_namespace),
other => self.parse_known_schema(other, enclosing_namespace),
},
Some(Value::Object(data)) => self.parse_complex(data, enclosing_namespace),
Some(Value::Object(data)) => self.parse_complex(data, enclosing_namespace, false),
Some(Value::Array(variants)) => self.parse_union(variants, enclosing_namespace),
Some(unknown) => Err(Error::GetComplexType(unknown.clone())),
None => Err(Error::GetComplexTypeField),
Expand Down Expand Up @@ -4932,7 +4943,7 @@ mod tests {
"type": "Bar"
}
]
}
}
"#;

#[derive(
Expand Down Expand Up @@ -6821,4 +6832,63 @@ mod tests {
assert_eq!("92f2ccef718c6754", fp_rabin.to_string());
Ok(())
}

#[test]
fn avro_4055_should_fail_to_parse_invalid_schema() -> TestResult {
// This is invalid because the record type should be inside the type field.
let invalid_schema_str = r#"
{
"type": "record",
"name": "SampleSchema",
"fields": [
{
"name": "order",
"type": "record",
"fields": [
{
"name": "order_number",
"type": ["null", "string"],
"default": null
},
{ "name": "order_date", "type": "string" }
]
}
]
}"#;

let schema = Schema::parse_str(invalid_schema_str);
println!("{:?}", schema);
assert!(schema.is_err());
assert_eq!(
schema.unwrap_err().to_string(),
"Invalid schema: The record type should be inside the type field"
);

let valid_schema = r#"
{
"type": "record",
"name": "SampleSchema",
"fields": [
{
"name": "order",
"type": {
"type": "record",
"name": "Order",
"fields": [
{
"name": "order_number",
"type": ["null", "string"],
"default": null
},
{ "name": "order_date", "type": "string" }
]
}
}
]
}"#;
let schema = Schema::parse_str(valid_schema);
assert!(schema.is_ok());

Ok(())
}
}
37 changes: 19 additions & 18 deletions avro/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2771,35 +2771,36 @@ Field with name '"b"' is not a member of the map items"#,
use crate::ser::Serializer;
use serde::Serialize;

let schema_str = r#"{
let schema_str = r#"
{
"type": "record",
"name": "NamespacedMessage",
[NAMESPACE]
"fields": [
{
"type": "record",
"name": "field_a",
"fields": [
{
"name": "enum_a",
"type": {
"type": {
"type": "record",
"name": "NestedMessage",
"fields": [
{
"name": "enum_a",
"type": {
"type": "enum",
"name": "EnumType",
"symbols": [
"SYMBOL_1",
"SYMBOL_2"
],
"symbols": ["SYMBOL_1", "SYMBOL_2"],
"default": "SYMBOL_1"
}
},
{
"name": "enum_b",
"type": "EnumType"
}
},
{
"name": "enum_b",
"type": "EnumType"
}
]
]
}
}
]
}"#;
}
"#;
let schema_str = schema_str.replace(
"[NAMESPACE]",
if with_namespace {
Expand Down

0 comments on commit c55dee6

Please sign in to comment.