Skip to content

Commit

Permalink
Merge AvroConverters
Browse files Browse the repository at this point in the history
  • Loading branch information
jogrogan committed Feb 7, 2025
1 parent aeb8b20 commit 9717151
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 208 deletions.
3 changes: 3 additions & 0 deletions hoptimator-avro/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ dependencies {
implementation project(':hoptimator-api')
implementation libs.avro
implementation libs.calcite.core

testImplementation libs.junit
testImplementation libs.assertj
}

publishing {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package com.linkedin.hoptimator.avro;

import java.util.AbstractMap;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.avro.Schema;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;

import java.util.AbstractMap;
import java.util.List;
import java.util.stream.Collectors;

/** Converts between Avro and Calcite's RelDataType */
public final class AvroConverter {
Expand All @@ -23,13 +23,11 @@ private AvroConverter() {

public static Schema avro(String namespace, String name, RelDataType dataType) {
if (dataType.isStruct()) {
List<Schema.Field> fields = dataType.getFieldList()
.stream()
.map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x),
null))
List<Schema.Field> fields = dataType.getFieldList().stream()
.map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x), null))
.collect(Collectors.toList());
return createAvroSchemaWithNullability(
Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields), dataType.isNullable());
return createAvroSchemaWithNullability(Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields),
dataType.isNullable());
} else {
switch (dataType.getSqlTypeName()) {
case INTEGER:
Expand All @@ -51,10 +49,9 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
case ARRAY:
return createAvroSchemaWithNullability(Schema.createArray(avro(null, null, dataType.getComponentType())),
dataType.isNullable());
// TODO support map types
// Appears to require a Calcite version bump
// case MAP:
// return createAvroSchemaWithNullability(Schema.createMap(avroPrimitive(dataType.getValueType())), dataType.isNullable());
case MAP:
return createAvroSchemaWithNullability(Schema.createMap(avro(null, null, dataType.getValueType())),
dataType.isNullable());
case UNKNOWN:
case NULL:
return Schema.createUnion(Schema.create(Schema.Type.NULL));
Expand Down Expand Up @@ -82,55 +79,74 @@ private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean
}

public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) {
return rel(schema, typeFactory, false);
}

/** Converts Avro Schema to RelDataType.
* Nullability is preserved except for array types, JDBC is incapable of interpreting e.g. "FLOAT NOT NULL ARRAY"
* causing "NOT NULL" arrays to get demoted to "ANY ARRAY" which is not desired.
*/
public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boolean nullable) {
RelDataType unknown = typeFactory.createUnknownType();
switch (schema.getType()) {
case RECORD:
return typeFactory.createStructType(schema.getFields()
.stream()
.map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory)))
return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getFields().stream()
.map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory, nullable)))
.filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL)
.filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName())
.collect(Collectors.toList()));
.collect(Collectors.toList())), nullable);
case INT:
return createRelType(typeFactory, SqlTypeName.INTEGER);
return createRelType(typeFactory, SqlTypeName.INTEGER, nullable);
case LONG:
return createRelType(typeFactory, SqlTypeName.BIGINT);
return createRelType(typeFactory, SqlTypeName.BIGINT, nullable);
case ENUM:
case FIXED:
case STRING:
return createRelType(typeFactory, SqlTypeName.VARCHAR);
return createRelType(typeFactory, SqlTypeName.VARCHAR, nullable);
case FIXED:
return createRelType(typeFactory, SqlTypeName.VARBINARY, schema.getFixedSize(), nullable);
case BYTES:
return createRelType(typeFactory, SqlTypeName.VARBINARY, nullable);
case FLOAT:
return createRelType(typeFactory, SqlTypeName.FLOAT);
return createRelType(typeFactory, SqlTypeName.FLOAT, nullable);
case DOUBLE:
return createRelType(typeFactory, SqlTypeName.DOUBLE);
return createRelType(typeFactory, SqlTypeName.DOUBLE, nullable);
case BOOLEAN:
return createRelType(typeFactory, SqlTypeName.BOOLEAN);
return createRelType(typeFactory, SqlTypeName.BOOLEAN, nullable);
case ARRAY:
return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory), -1);
// TODO support map types
// Appears to require a Calcite version bump
// case MAP:
// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory));
return typeFactory.createTypeWithNullability(
typeFactory.createArrayType(rel(schema.getElementType(), typeFactory, true), -1), nullable);
case MAP:
return typeFactory.createTypeWithNullability(
typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory, nullable)), nullable);
case UNION:
boolean isNullable = schema.isNullable();
if (schema.isNullable() && schema.getTypes().size() == 2) {
Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get();
return typeFactory.createTypeWithNullability(rel(innerType, typeFactory), true);
} else {
// TODO support more elaborate union types
return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true);
return typeFactory.createTypeWithNullability(rel(innerType, typeFactory, true), true);
}
return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getTypes().stream()
.filter(x -> x.getType() != Schema.Type.NULL)
.map(x -> new AbstractMap.SimpleEntry<>(x.getName(), rel(x, typeFactory, isNullable)))
.filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL)
.filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName())
.collect(Collectors.toList())), isNullable);
default:
return typeFactory.createUnknownType();
return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true);
}
}

public static RelDataType rel(Schema schema) {
return rel(schema, new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT));
}

private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName) {
RelDataType rawType = typeFactory.createSqlType(typeName);
return typeFactory.createTypeWithNullability(rawType, false);
private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName, boolean nullable) {
return createRelType(typeFactory, typeName, RelDataType.PRECISION_NOT_SPECIFIED, nullable);
}

private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName,
int precision, boolean nullable) {
RelDataType rawType = typeFactory.createSqlType(typeName, precision);
return typeFactory.createTypeWithNullability(rawType, nullable);
}

public static RelProtoDataType proto(Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.hoptimator.catalog;
package com.linkedin.hoptimator.avro;

import org.apache.avro.Schema;
import org.apache.calcite.plan.RelOptUtil;
Expand Down

This file was deleted.

5 changes: 3 additions & 2 deletions hoptimator-operator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,20 @@ plugins {
}

dependencies {
implementation project(':hoptimator-avro')
implementation project(':hoptimator-planner')
implementation project(':hoptimator-catalog') // <-- marked for deletion
implementation project(':hoptimator-util')
implementation project(':hoptimator-k8s')
implementation project(':hoptimator-models') // <-- marked for deletion

implementation libs.calcite.core
implementation libs.kubernetes.client
implementation libs.kubernetes.extended.client
implementation libs.slf4j.api
implementation libs.commons.cli
implementation libs.avro

testImplementation libs.junit
testImplementation libs.assertj
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.hoptimator.operator.subscription;

import com.linkedin.hoptimator.catalog.AvroConverter;
import com.linkedin.hoptimator.avro.AvroConverter;
import com.linkedin.hoptimator.catalog.Resource;
import com.linkedin.hoptimator.planner.Pipeline;

Expand Down

0 comments on commit 9717151

Please sign in to comment.