diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java index ea31f4af..bc9471fb 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java @@ -20,12 +20,13 @@ public final class AvroConverter { private AvroConverter() { } - + public static Schema avro(String namespace, String name, RelDataType dataType) { if (dataType.isStruct()) { List fields = dataType.getFieldList().stream() .filter(x -> !x.getName().startsWith("__")) // don't write out hidden fields - .map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x), null)) + .map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), + describe(x), null)) .collect(Collectors.toList()); return Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields); } else { @@ -57,20 +58,29 @@ public static Schema avro(String namespace, String name, RelProtoDataType relPro } private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean nullable) { + return withNullability(Schema.create(rawType), nullable); + } + + private static Schema withNullability(Schema schema, boolean nullable) { if (nullable) { - return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(rawType)); + return Schema.createUnion(Schema.create(Schema.Type.NULL), schema); } else { - return Schema.create(rawType); + return schema; } } - + public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) { + return rel(schema, typeFactory, false); + } + + public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boolean skipNestedRows) { RelDataType unknown = typeFactory.createUnknownType(); switch (schema.getType()) { case RECORD: return typeFactory.createStructType(schema.getFields().stream() - .map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory))) + .map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory, skipNestedRows))) .filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL) + .filter(x -> !skipNestedRows || x.getValue().getSqlTypeName() != SqlTypeName.ROW) .filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName()) .collect(Collectors.toList())); case INT: @@ -88,7 +98,7 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) { case UNION: if (schema.isNullable() && schema.getTypes().size() == 2) { Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get(); - return typeFactory.createTypeWithNullability(rel(innerType, typeFactory), true); + return typeFactory.createTypeWithNullability(rel(innerType, typeFactory, skipNestedRows), true); } else { // TODO support more elaborate union types return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true); @@ -98,8 +108,12 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) { } } + public static RelDataType rel(Schema schema, boolean skipNestedRows) { + return rel(schema, DataType.DEFAULT_TYPE_FACTORY, skipNestedRows); + } + public static RelDataType rel(Schema schema) { - return rel(schema, DataType.DEFAULT_TYPE_FACTORY); + return rel(schema, false); } private static RelDataType createRelTypeWithNullability(RelDataTypeFactory typeFactory, SqlTypeName typeName, boolean nullable) { @@ -107,8 +121,8 @@ private static RelDataType createRelTypeWithNullability(RelDataTypeFactory typeF return typeFactory.createTypeWithNullability(rawType, nullable); } - public static RelProtoDataType proto(Schema schema) { - return RelDataTypeImpl.proto(rel(schema, new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT))); + public static RelProtoDataType proto(Schema schema, boolean skipNestedRows) { + return RelDataTypeImpl.proto(rel(schema, new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT), skipNestedRows)); } private static String describe(RelDataTypeField dataType) { diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java index 2a16543a..64977959 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java @@ -3,21 +3,14 @@ import org.apache.calcite.sql.SqlWriter; //import org.apache.calcite.sql.SqlWriterConfig; import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlRowTypeNameSpec; import org.apache.calcite.sql.SqlBasicTypeNameSpec; -import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.sql.SqlRowTypeNameSpec; -import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.dialect.AnsiSqlDialect; -import org.apache.calcite.sql.fun.SqlRowOperator; import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.sql.pretty.SqlPrettyWriter; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; -import org.apache.calcite.sql.util.SqlShuttle; +import org.apache.calcite.sql.pretty.SqlPrettyWriter; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -136,30 +129,9 @@ public QueryImplementor(RelNode relNode) { public void implement(SqlWriter w) { RelToSqlConverter converter = new RelToSqlConverter(w.getDialect()); SqlImplementor.Result result = converter.visitRoot(relNode); - SqlSelect select = result.asSelect(); - if (select.getSelectList() != null) { - select.setSelectList((SqlNodeList) select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR)); - } - w.literal(select.toSqlString(w.getDialect()).getSql()); + w.literal(result.asSelect().toSqlString(w.getDialect()).getSql()); } - - // A `ROW(...)` operator which will unparse as just `(...)`. - private final SqlRowOperator SILENT_COLUMN_LIST = new SqlRowOperator(""); // empty string name - - // a shuttle that replaces `Row(...)` with just `(...)` - private final SqlShuttle REMOVE_ROW_CONSTRUCTOR = new SqlShuttle() { - @Override - public SqlNode visit(SqlCall call) { - List operands = call.getOperandList().stream().map(x -> x.accept(this)).collect(Collectors.toList()); - if (call.getKind() == SqlKind.ROW || call.getKind() == SqlKind.COLUMN_LIST - || call.getOperator() instanceof SqlRowOperator) { - return SILENT_COLUMN_LIST.createCall(call.getParserPosition(), operands); - } else { - return call.getOperator().createCall(call.getParserPosition(), operands); - } - } - }; - } + } /** * Implements a CREATE TABLE...WITH... DDL statement. diff --git a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java index 40a2614d..763bc1ea 100644 --- a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java +++ b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java @@ -28,10 +28,10 @@ public void implementsFlinkCreateTableDDL() { // Output isn't necessarily deterministic, but should be something like: // CREATE TABLE IF NOT EXISTS "DATABASE"."TABLE1" ("idValue1" VARCHAR) WITH // ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='topic1') - assertTrue(out.contains("CREATE TABLE IF NOT EXISTS \"DATABASE\".\"TABLE1\" (\"idValue1\" VARCHAR) WITH ")); - assertTrue(out.contains("'connector'='kafka'")); - assertTrue(out.contains("'properties.bootstrap.servers'='localhost:9092'")); - assertTrue(out.contains("'topic'='topic1'")); - assertFalse(out.contains("Row")); + assertTrue(out, out.contains("CREATE TABLE IF NOT EXISTS \"DATABASE\".\"TABLE1\" (\"idValue1\" VARCHAR) WITH ")); + assertTrue(out, out.contains("'connector'='kafka'")); + assertTrue(out, out.contains("'properties.bootstrap.servers'='localhost:9092'")); + assertTrue(out, out.contains("'topic'='topic1'")); + assertFalse(out, out.contains("Row")); } }