Skip to content

Commit

Permalink
Optionally skip nested Rows
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Mar 30, 2024
1 parent e1ad4b5 commit ebbabd0
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ public final class AvroConverter {

private AvroConverter() {
}

public static Schema avro(String namespace, String name, RelDataType dataType) {
if (dataType.isStruct()) {
List<Schema.Field> fields = dataType.getFieldList().stream()
.filter(x -> !x.getName().startsWith("__")) // don't write out hidden fields
.map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x), null))
.map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()),
describe(x), null))
.collect(Collectors.toList());
return Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields);
} else {
Expand Down Expand Up @@ -57,20 +58,29 @@ public static Schema avro(String namespace, String name, RelProtoDataType relPro
}

private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean nullable) {
return withNullability(Schema.create(rawType), nullable);
}

private static Schema withNullability(Schema schema, boolean nullable) {
if (nullable) {
return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(rawType));
return Schema.createUnion(Schema.create(Schema.Type.NULL), schema);
} else {
return Schema.create(rawType);
return schema;
}
}

public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) {
return rel(schema, typeFactory, false);
}

public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boolean skipNestedRows) {
RelDataType unknown = typeFactory.createUnknownType();
switch (schema.getType()) {
case RECORD:
return typeFactory.createStructType(schema.getFields().stream()
.map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory)))
.map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory, skipNestedRows)))
.filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL)
.filter(x -> !skipNestedRows || x.getValue().getSqlTypeName() != SqlTypeName.ROW)
.filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName())
.collect(Collectors.toList()));
case INT:
Expand All @@ -88,7 +98,7 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) {
case UNION:
if (schema.isNullable() && schema.getTypes().size() == 2) {
Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get();
return typeFactory.createTypeWithNullability(rel(innerType, typeFactory), true);
return typeFactory.createTypeWithNullability(rel(innerType, typeFactory, skipNestedRows), true);
} else {
// TODO support more elaborate union types
return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true);
Expand All @@ -98,17 +108,21 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) {
}
}

public static RelDataType rel(Schema schema, boolean skipNestedRows) {
return rel(schema, DataType.DEFAULT_TYPE_FACTORY, skipNestedRows);
}

public static RelDataType rel(Schema schema) {
return rel(schema, DataType.DEFAULT_TYPE_FACTORY);
return rel(schema, false);
}

private static RelDataType createRelTypeWithNullability(RelDataTypeFactory typeFactory, SqlTypeName typeName, boolean nullable) {
RelDataType rawType = typeFactory.createSqlType(typeName);
return typeFactory.createTypeWithNullability(rawType, nullable);
}

public static RelProtoDataType proto(Schema schema) {
return RelDataTypeImpl.proto(rel(schema, new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT)));
public static RelProtoDataType proto(Schema schema, boolean skipNestedRows) {
return RelDataTypeImpl.proto(rel(schema, new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT), skipNestedRows));
}

private static String describe(RelDataTypeField dataType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,14 @@
import org.apache.calcite.sql.SqlWriter;
//import org.apache.calcite.sql.SqlWriterConfig;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlRowTypeNameSpec;
import org.apache.calcite.sql.SqlBasicTypeNameSpec;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlRowTypeNameSpec;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
import org.apache.calcite.sql.fun.SqlRowOperator;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.pretty.SqlPrettyWriter;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.util.SqlShuttle;
import org.apache.calcite.sql.pretty.SqlPrettyWriter;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
Expand Down Expand Up @@ -136,30 +129,9 @@ public QueryImplementor(RelNode relNode) {
public void implement(SqlWriter w) {
RelToSqlConverter converter = new RelToSqlConverter(w.getDialect());
SqlImplementor.Result result = converter.visitRoot(relNode);
SqlSelect select = result.asSelect();
if (select.getSelectList() != null) {
select.setSelectList((SqlNodeList) select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR));
}
w.literal(select.toSqlString(w.getDialect()).getSql());
w.literal(result.asSelect().toSqlString(w.getDialect()).getSql());
}

// A `ROW(...)` operator which will unparse as just `(...)`.
private final SqlRowOperator SILENT_COLUMN_LIST = new SqlRowOperator(""); // empty string name

// a shuttle that replaces `Row(...)` with just `(...)`
private final SqlShuttle REMOVE_ROW_CONSTRUCTOR = new SqlShuttle() {
@Override
public SqlNode visit(SqlCall call) {
List<SqlNode> operands = call.getOperandList().stream().map(x -> x.accept(this)).collect(Collectors.toList());
if (call.getKind() == SqlKind.ROW || call.getKind() == SqlKind.COLUMN_LIST
|| call.getOperator() instanceof SqlRowOperator) {
return SILENT_COLUMN_LIST.createCall(call.getParserPosition(), operands);
} else {
return call.getOperator().createCall(call.getParserPosition(), operands);
}
}
};
}
}

/**
* Implements a CREATE TABLE...WITH... DDL statement.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ public void implementsFlinkCreateTableDDL() {
// Output isn't necessarily deterministic, but should be something like:
// CREATE TABLE IF NOT EXISTS "DATABASE"."TABLE1" ("idValue1" VARCHAR) WITH
// ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='topic1')
assertTrue(out.contains("CREATE TABLE IF NOT EXISTS \"DATABASE\".\"TABLE1\" (\"idValue1\" VARCHAR) WITH "));
assertTrue(out.contains("'connector'='kafka'"));
assertTrue(out.contains("'properties.bootstrap.servers'='localhost:9092'"));
assertTrue(out.contains("'topic'='topic1'"));
assertFalse(out.contains("Row"));
assertTrue(out, out.contains("CREATE TABLE IF NOT EXISTS \"DATABASE\".\"TABLE1\" (\"idValue1\" VARCHAR) WITH "));
assertTrue(out, out.contains("'connector'='kafka'"));
assertTrue(out, out.contains("'properties.bootstrap.servers'='localhost:9092'"));
assertTrue(out, out.contains("'topic'='topic1'"));
assertFalse(out, out.contains("Row"));
}
}

0 comments on commit ebbabd0

Please sign in to comment.