Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: struct be converted to String for json_sr format #9678

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,34 @@
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "JSON only supports MAP types with STRING keys"
}
},
{
"name": "struct must be converted to String",
"statements": [
"create stream TEST (FOO struct<id int, val struct<f0 string, f1 Map<String, String>, f2 Array<String>>>) with (kafka_topic='test_topic', value_format='json_sr', partitions=1);",
"CREATE STREAM INPUT (FOO STRING) WITH (kafka_topic='test_topic', value_format='JSON_SR');",
"CREATE STREAM OUTPUT as select * from INPUT;"
],
"inputs": [
{"topic": "test_topic", "value": {"FOO": {"id": 1, "val": {"f0": "k", "f1": {"a1":"1"}, "f2": ["x","y"]}}}}
],
"outputs": [
{"topic": "OUTPUT", "value": {"FOO": {"ID":1, "VAL": {"F0":"k","F1": {"a1":"1"},"F2":["x", "y"]}}}}
]
}
// {
// "name": "struct must be converted to String",
// "statements": [
// "create stream TEST (FOO struct<id int, val struct<f0 string, f1 string, f2 string>>) with (kafka_topic='mixx', value_format='json_sr', partitions=1);",
// "CREATE STREAM INPUT (FOO STRING) WITH (kafka_topic='mixx', value_format='JSON_SR');",
// "CREATE STREAM OUTPUT as select * from INPUT;"
// ],
// "inputs": [
// {"topic": "mixx", "value": {"FOO": {"id": 1, "val": {"f0": "k", "f1": "kk", "f2": "jj"}}}}
// ],
// "outputs": [
// {"topic": "OUTPUT", "value": {"FOO": {"ID":1, "VAL": {"F0":"k","F1": "{a1=1}","F2":"[x, y]"}}}}
// ]
// }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package io.confluent.ksql.serde.connect;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.ksql.serde.SerdeUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -129,7 +131,10 @@ private static void validateType(
Schema.Type.FLOAT32,
Schema.Type.FLOAT64,
Schema.Type.BOOLEAN,
Schema.Type.STRING
Schema.Type.STRING,
Schema.Type.STRUCT,
Schema.Type.ARRAY,
Schema.Type.MAP
};

private static void validateSchema(
Expand Down Expand Up @@ -170,7 +175,7 @@ private static Object maybeConvertLogicalType(
if (connectSchema.name() == null) {
return connectValue;
}
switch (connectSchema.name()) {
switch (connectSchema.name()) {
case Date.LOGICAL_NAME:
return Date.fromLogical(connectSchema, (java.util.Date) connectValue);
case Time.LOGICAL_NAME:
Expand Down Expand Up @@ -231,7 +236,10 @@ private Object toKsqlValue(
case STRUCT:
return toKsqlStruct(schema, connectSchema, (Struct) convertedValue, pathStr);
case STRING:
// use String.valueOf to convert various int types and Boolean to string
if (convertedValue instanceof Struct) {
return structToString((Struct) convertedValue);
}
// use String.valueOf to convert various int types, and Boolean to string
return String.valueOf(convertedValue);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think something wrong is gonna happen if we done here. The ConnectDataTranslator is a generic class for all Connect types (avro, protobuf, json). The struct->string conversion should be in JSON string, i.e. {k1=v1}. You can verify this with a plain JSON format or with JSON_SR in version 0.26 or lower. I actually not sure what Avro/Protobuf should do wit Struct->String.

  1. Could you check in older ksql versions if Struct->String is supported with Avro and Protobuf?
  2. Could you check in older ksql versions what Struct->String wit JSON_SR and plain JSON display?

Copy link
Contributor Author

@aliehsaeedii aliehsaeedii Nov 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think something wrong is gonna happen if we done here.

The issue addresses exactly this line for change (please have a look at the issue explanation)

case BYTES:
if (convertedValue instanceof byte[]) {
Expand All @@ -244,6 +252,47 @@ private Object toKsqlValue(
}
}

private String structToString(final Struct input) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an API we can use to generate a JSON string? Something that handle cases with String characters (special chars, nulls)? Not sure if we can re-use the ObjectMapper to generate a JSON string.

final StringBuilder sb = new StringBuilder("{");
boolean first = true;
final List<Field> fields = input.schema().fields();

for (Field field : fields) {
final Object value = input.get(field);
if (value != null) {
if (first) {
first = false;
} else {
sb.append(",");
}
sb.append("\"").append(field.name()).append("\"").append(":");
if (value instanceof Struct) {
sb.append(structToString((Struct) value));
} else if (value instanceof Map || value instanceof ArrayList) {
sb.append(objectToString(value));
} else if (value instanceof String) {
sb.append("\"").append(value).append("\"");
} else {
sb.append(value);
}
}
}

return sb.append("}").toString();
}

private <T> String objectToString(final T input) {
final ObjectMapper mapper = new ObjectMapper();
final String json;
try {
json = mapper.writeValueAsString(input);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return json;
}


private List<?> toKsqlArray(
final Schema valueSchema,
final Schema connectValueSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
Expand Down Expand Up @@ -815,14 +816,9 @@ public void shouldThrowIfCanNotCoerceToString() {
final byte[] bytes = givenAvroSerialized(AN_ORDER, ORDER_AVRO_SCHEMA);

// When:
final Exception e = assertThrows(
SerializationException.class,
() -> deserializer.deserialize(SOME_TOPIC, bytes)
);

final Object result = deserializer.deserialize(SOME_TOPIC, bytes);
// Then:
assertThat(e.getCause(), (hasMessage(containsString(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spena wrote this code. Maybe he knows about this...

"Cannot deserialize type struct as type string"))));
assertThat(result, is(toStructString(AN_ORDER)));
}

@Test
Expand Down Expand Up @@ -1790,4 +1786,20 @@ private static List<?> buildEntriesForMapWithOptionalKey(

return ImmutableList.of(e1, e2);
}

private static String toStructString (Map<String, Object> map) {
StringBuilder sb = new StringBuilder("{");
boolean first = true;

for (Map.Entry<String, Object> entry : map.entrySet()) {
if (first) {
first = false;
} else {
sb.append(",");
}
sb.append(entry.getKey()).append("=").append(entry.getValue());
}

return sb.append("}").toString();
}
}