Skip to content

Commit

Permalink
feat: Add parameter binding to the Query tasks and Trigger triggers #376
Browse files Browse the repository at this point in the history
 (#493)

add parameters binding
keep legacy se of Statement for Query
use PreparedStatement when using parameters
  • Loading branch information
mgabelle authored Jan 27, 2025
1 parent 7045b66 commit aee7888
Show file tree
Hide file tree
Showing 28 changed files with 186 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetchType(Property.of(this.renderFetchType(runContext)))
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.parameters(this.getParameters())
.build();
return query.run(runContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetchType(Property.of(this.renderFetchType(runContext)))
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.parameters(this.getParameters())
.build();

return query.run(runContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetchType(Property.of(this.renderFetchType(runContext)))
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.parameters(this.getParameters())
.build();
return query.run(runContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static io.kestra.core.models.tasks.common.FetchType.FETCH;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -68,6 +69,26 @@ void select() throws Exception {
assertThat(runOutput.getRow().get("Ipv6"), is("2a02:aa08:e000:3100:0:0:0:2"));
}

@Test
void selectWithParameters() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

Query task = Query.builder()
.url(Property.of(getUrl()))
.username(null)
.password(null)
.fetchType(Property.of(FetchType.FETCH_ONE))
.timeZoneId(Property.of("Europe/Paris"))
.parameters(Property.of(Map.of("num", 123)))
.sql(Property.of("select * from clickhouse_types where Int8 = :num"))
.build();

AbstractJdbcQuery.Output runOutput = task.run(runContext);
assertThat(runOutput.getRow(), notNullValue());

assertThat(runOutput.getRow().get("Int8"), is(123));
}

@Test
void update() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetchType(Property.of(this.renderFetchType(runContext)))
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.parameters(this.getParameters())
.build();

return query.run(runContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetchType(Property.of(this.renderFetchType(runContext)))
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.parameters(this.getParameters())
.build();
return query.run(runContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,21 @@ public class Trigger extends AbstractJdbcTrigger {
@Override
protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Exception {
var query = Query.builder()
.id(this.id)
.type(Query.class.getName())
.url(this.getUrl())
.username(this.getUsername())
.password(this.getPassword())
.timeZoneId(this.getTimeZoneId())
.sql(this.getSql())
.fetch(this.isFetch())
.store(this.isStore())
.fetchOne(this.isFetchOne())
.fetchType(Property.of(this.renderFetchType(runContext)))
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.build();
.id(this.id)
.type(Query.class.getName())
.url(this.getUrl())
.username(this.getUsername())
.password(this.getPassword())
.timeZoneId(this.getTimeZoneId())
.sql(this.getSql())
.fetch(this.isFetch())
.store(this.isStore())
.fetchOne(this.isFetchOne())
.fetchType(Property.of(this.renderFetchType(runContext)))
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.parameters(this.getParameters())
.build();
return query.run(runContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetchType(Property.of(this.renderFetchType(runContext)))
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.parameters(this.getParameters())
.build();
return query.run(runContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* - https://duckdb.org/docs/sql/data_types/overview
*/
@KestraTest
public class DuckDbTest {
class DuckDbTest {
@Inject
private RunContextFactory runContextFactory;

Expand Down Expand Up @@ -153,7 +153,7 @@ void select() throws Exception {
assertThat(runOutput.getRow().get("t_boolean"), is(true));
assertThat(runOutput.getRow().get("t_date"), is(LocalDate.parse("1992-09-20")));
assertThat(runOutput.getRow().get("t_double"), is(12345.12345D));
assertThat(runOutput.getRow().get("t_decimal"), is(BigDecimal.valueOf(12345123L, 3)));;
assertThat(runOutput.getRow().get("t_decimal"), is(BigDecimal.valueOf(12345123L, 3)));
assertThat(runOutput.getRow().get("t_hugeint"), is("9223372036854775807"));
assertThat(runOutput.getRow().get("t_integer"), is(2147483647));
assertThat(runOutput.getRow().get("t_interval"), is("28 days"));
Expand Down Expand Up @@ -192,7 +192,7 @@ void selectFromExistingFileInUrl() throws Exception {
assertThat(runOutput.getRow().get("t_boolean"), is(true));
assertThat(runOutput.getRow().get("t_date"), is(LocalDate.parse("1992-09-20")));
assertThat(runOutput.getRow().get("t_double"), is(12345.12345D));
assertThat(runOutput.getRow().get("t_decimal"), is(BigDecimal.valueOf(12345123L, 3)));;
assertThat(runOutput.getRow().get("t_decimal"), is(BigDecimal.valueOf(12345123L, 3)));
assertThat(runOutput.getRow().get("t_hugeint"), is("9223372036854775807"));
assertThat(runOutput.getRow().get("t_integer"), is(2147483647));
assertThat(runOutput.getRow().get("t_interval"), is("28 days"));
Expand All @@ -209,6 +209,26 @@ void selectFromExistingFileInUrl() throws Exception {
assertThat(runOutput.getRow().get("t_enum"), is("happy"));
}

@Test
void selectFromExistingFileInUrlWithParameters() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

URL resource = DuckDbTest.class.getClassLoader().getResource("db/duck.db");

Query task = Query.builder()
.fetchType(Property.of(FETCH_ONE))
.timeZoneId(Property.of("Europe/Paris"))
.url(Property.of("jdbc:duckdb:"+ Objects.requireNonNull(resource).getPath()))
.parameters(Property.of(Map.of("num", 2147483647)))
.sql(Property.of("SELECT * FROM duck_types WHERE t_integer = :num"))
.build();

AbstractJdbcQuery.Output runOutput = task.run(runContext);
assertThat(runOutput.getRow(), notNullValue());

assertThat(runOutput.getRow().get("t_integer"), is(2147483647));
}


@Test
void selectFromExistingFileInParameter() throws Exception {
Expand All @@ -232,7 +252,7 @@ void selectFromExistingFileInParameter() throws Exception {
assertThat(runOutput.getRow().get("t_boolean"), is(true));
assertThat(runOutput.getRow().get("t_date"), is(LocalDate.parse("1992-09-20")));
assertThat(runOutput.getRow().get("t_double"), is(12345.12345D));
assertThat(runOutput.getRow().get("t_decimal"), is(BigDecimal.valueOf(12345123L, 3)));;
assertThat(runOutput.getRow().get("t_decimal"), is(BigDecimal.valueOf(12345123L, 3)));
assertThat(runOutput.getRow().get("t_hugeint"), is("9223372036854775807"));
assertThat(runOutput.getRow().get("t_integer"), is(2147483647));
assertThat(runOutput.getRow().get("t_interval"), is("28 days"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetchType(Property.of(this.renderFetchType(runContext)))
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.parameters(this.getParameters())
.build();
return query.run(runContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,26 @@ void update() throws Exception {
assertThat(runOutput.getRow().get("d"), is("D"));
}

@Test
void selectWithParameters() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

Query task = Query.builder()
.url(Property.of(getUrl()))
.username(Property.of(getUsername()))
.password(Property.of(getPassword()))
.fetchType(Property.of(FETCH_ONE))
.timeZoneId(Property.of("Europe/Paris"))
.sql(Property.of("select * from mysql_types where concert_id=:concert_id"))
.parameters(Property.of(Map.of("concert_id", "1")))
.build();

AbstractJdbcQuery.Output runOutput = task.run(runContext);
assertThat(runOutput.getRow(), notNullValue());

assertThat(runOutput.getRow().get("concert_id"), is("1"));
}

@Override
protected String getUrl() {
return "jdbc:mysql://127.0.0.1:64790/kestra";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetchType(Property.of(this.renderFetchType(runContext)))
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.parameters(this.getParameters())
.build();
return query.run(runContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.util.Map;

import static io.kestra.core.models.tasks.common.FetchType.FETCH_ONE;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -69,6 +70,26 @@ void select() throws Exception {
assertThat(runOutput.getRow().get("T_TIMESTAMP_LOCAL"), is(LocalDateTime.parse("1998-01-23T12:00:00")));
}

@Test
void selectWithParameters() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

Query task = Query.builder()
.url(Property.of(getUrl()))
.username(Property.of(getUsername()))
.password(Property.of(getPassword()))
.fetchType(Property.of(FETCH_ONE))
.timeZoneId(Property.of("Europe/Paris"))
.parameters(Property.of(Map.of("char", "aa")))
.sql(Property.of("select * from oracle_types where T_CHAR = :char"))
.build();

AbstractJdbcQuery.Output runOutput = task.run(runContext);
assertThat(runOutput.getRow(), notNullValue());

assertThat(runOutput.getRow().get("T_CHAR"), is("aa"));
}

@Test
void update() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
code = """
id: pinot_query
namespace: company.team
tasks:
- id: query
type: o.kestra.plugin.jdbc.pinot.Query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetchType(Property.of(this.renderFetchType(runContext)))
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.parameters(this.getParameters())
.build();
return query.run(runContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetchType(Property.of(this.renderFetchType(runContext)))
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.parameters(this.getParameters())
.build();
return query.run(runContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.additionalVars(this.additionalVars)
.warehouse(this.getWarehouse())
.database(this.getDatabase())
.parameters(this.getParameters())
.build();
return query.run(runContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetchType(Property.of(this.renderFetchType(runContext)))
.fetchSize(this.getFetchSize())
.additionalVars(this.additionalVars)
.parameters(this.getParameters())
.build();
return query.run(runContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetchOne(this.isFetchOne())
.fetchType(Property.of(this.renderFetchType(runContext)))
.additionalVars(this.additionalVars)
.parameters(this.getParameters())
.build();
return query.run(runContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetchOne(this.isFetchOne())
.fetchType(Property.of(this.renderFetchType(runContext)))
.additionalVars(this.additionalVars)
.parameters(this.getParameters())
.build();

return query.run(runContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetchOne(this.isFetchOne())
.fetchType(Property.of(this.renderFetchType(runContext)))
.additionalVars(this.additionalVars)
.parameters(this.getParameters())
.build();
return query.run(runContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Except
.fetchOne(this.isFetchOne())
.fetchType(Property.of(this.renderFetchType(runContext)))
.additionalVars(this.additionalVars)
.parameters(this.getParameters())
.build();
return query.run(runContext);
}
Expand Down
Loading

0 comments on commit aee7888

Please sign in to comment.