Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "chore(deps): upgrade common dependencies (#197)" #213

Merged
merged 1 commit into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ dependencies {

// Google libraries are managed by the the Kestra Platform so they are aligned on all plugins
api platform('com.google.cloud:libraries-bom')
api 'com.google.api-client:google-api-client:2.7.2'
api 'com.google.auth:google-auth-library-oauth2-http:1.31.0'
api 'com.google.apis:google-api-services-drive:v3-rev20250122-2.0.0'
api 'com.google.apis:google-api-services-sheets:v4-rev20250106-2.0.0'
api 'com.google.apis:google-api-services-calendar:v3-rev20250115-2.0.0'
api 'com.google.api-client:google-api-client:2.7.1'
api 'com.google.auth:google-auth-library-oauth2-http:1.30.1'
api 'com.google.apis:google-api-services-drive:v3-rev20241206-2.0.0'
api 'com.google.apis:google-api-services-sheets:v4-rev20241203-2.0.0'
api 'com.google.apis:google-api-services-calendar:v3-rev20241101-2.0.0'

// Logs
compileOnly'org.slf4j:slf4j-api'
Expand All @@ -63,9 +63,8 @@ dependencies {
api('org.apache.parquet:parquet-hadoop:1.15.0')

// For ORC parsing
api('org.apache.orc:orc-core:2.1.0')
api('org.apache.orc:orc-mapreduce:2.1.0')
api('org.apache.hive:hive-storage-api:4.0.1')
api('org.apache.orc:orc-core:1.8.7')
api('org.apache.orc:orc-mapreduce:1.8.7')

// Additional libraries for Parquet, Avro, ORC parsers
api('org.apache.hadoop:hadoop-hdfs-client:3.4.1') {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.csv.DuplicateHeaderMode;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -45,11 +44,11 @@ public List<List<Object>> parseCsv(InputStream inputStream, AbstractLoad.CsvOpti

InputStreamReader reader = new InputStreamReader(inputStream, charset);
CSVFormat format = getCsvFormat(csvOptions);
try (CSVParser parser = CSVParser.builder().setFormat(format).setReader(reader).get()) {
for (CSVRecord csvRecord : parser) {
try (CSVParser parser = new CSVParser(reader, format)) {
for (CSVRecord record : parser) {
List<Object> row = new ArrayList<>();

csvRecord.forEach(row::add);
record.forEach(row::add);
result.add(row);
}
return result;
Expand Down Expand Up @@ -94,7 +93,7 @@ public List<List<Object>> parseAvro(InputStream inputStream, boolean includeHead
}

while (reader.hasNext()) {
GenericRecord genericRecord = reader.next();
GenericRecord record = reader.next();
List<Object> row = new ArrayList<>();

if (includeHeaders && !isHeaderIncluded) {
Expand All @@ -110,7 +109,7 @@ public List<List<Object>> parseAvro(InputStream inputStream, boolean includeHead
}

schema.getFields()
.forEach(field -> row.add(genericRecord.get(field.name()).toString()));
.forEach(field -> row.add(record.get(field.name()).toString()));

result.add(row);
}
Expand All @@ -135,13 +134,13 @@ public List<List<Object>> parseParquet(InputStream inputStream, boolean includeH
try (ParquetReader<GenericRecord> reader = AvroParquetReader
.<GenericRecord>builder(inputFile).withConf(configuration).build()
) {
GenericRecord genericRecord;
while ((genericRecord = reader.read())!= null) {
GenericRecord record;
while ((record = reader.read())!= null) {
List<Object> row = new ArrayList<>();

if (includeHeaders && !isHeaderIncluded) {
List<Object> headers = new ArrayList<>(
genericRecord.getSchema()
List<Object> headers = new ArrayList<Object>(
record.getSchema()
.getFields()
.stream()
.map(Schema.Field::name)
Expand All @@ -152,8 +151,8 @@ public List<List<Object>> parseParquet(InputStream inputStream, boolean includeH
result.add(headers);
}

GenericRecord finalRecord = genericRecord;
genericRecord.getSchema()
GenericRecord finalRecord = record;
record.getSchema()
.getFields()
.forEach(field -> row.add(
finalRecord.get(field.name()).toString())
Expand Down Expand Up @@ -186,23 +185,23 @@ public List<List<Object>> parseORC(InputStream inputStream, boolean includeHeade
try (RecordReader rows = reader.rows()) {
if (includeHeaders) {
result.add(
new ArrayList<>(
new ArrayList<Object>(
schema.getFieldNames()
)
);
}

while (rows.nextBatch(rowBatch)) {
for (int row = 0; row < rowBatch.size; row++) {
List<Object> records = new ArrayList<>();
List<Object> record = new ArrayList<>();

for (ColumnVector vector : rowBatch.cols) {
records.add(
record.add(
getValue(vector, row)
);
}

result.add(records);
result.add(record);
}
}
}
Expand Down Expand Up @@ -238,20 +237,21 @@ private Object getValue(ColumnVector vector, int row) {

private CSVFormat getCsvFormat(AbstractLoad.CsvOptions csvOptions) throws IllegalVariableEvaluationException {
return CSVFormat.Builder.create()
.setDelimiter(
csvOptions.getFieldDelimiter() != null ?
this.runContext.render(csvOptions.getFieldDelimiter()).as(String.class).orElseThrow() :
CSVFormat.DEFAULT.getDelimiterString()
)
.setQuote(
csvOptions.getQuote() != null ?
this.runContext.render(csvOptions.getQuote()).as(String.class).orElseThrow().charAt(0) :
CSVFormat.DEFAULT.getQuoteCharacter()
)
.setRecordSeparator(CSVFormat.DEFAULT.getRecordSeparator())
.setIgnoreEmptyLines(true)
.setDuplicateHeaderMode(DuplicateHeaderMode.ALLOW_EMPTY)
.setSkipHeaderRecord(csvOptions.getSkipLeadingRows() != null && runContext.render(csvOptions.getSkipLeadingRows()).as(Long.class).orElseThrow() > 0).get();
.setDelimiter(
csvOptions.getFieldDelimiter() != null ?
this.runContext.render(csvOptions.getFieldDelimiter()).as(String.class).orElseThrow() :
CSVFormat.DEFAULT.getDelimiterString()
)
.setQuote(
csvOptions.getQuote() != null ?
this.runContext.render(csvOptions.getQuote()).as(String.class).orElseThrow().charAt(0) :
CSVFormat.DEFAULT.getQuoteCharacter()
)
.setRecordSeparator(CSVFormat.DEFAULT.getRecordSeparator())
.setIgnoreEmptyLines(true)
.setAllowDuplicateHeaderNames(false)
.setSkipHeaderRecord(csvOptions.getSkipLeadingRows() != null && runContext.render(csvOptions.getSkipLeadingRows()).as(Long.class).orElseThrow() > 0)
.build();
}

}