@@ -19,7 +19,8 @@ use std::any::Any;
19
19
use std:: fmt:: { Debug , Formatter } ;
20
20
use std:: sync:: Arc ;
21
21
22
- use arrow:: array:: RecordBatch ;
22
+ use arrow:: array:: { Array , RecordBatch } ;
23
+ use arrow:: compute:: { filter, is_not_null} ;
23
24
use arrow:: {
24
25
array:: {
25
26
ArrayRef , Float32Array , Float64Array , Int16Array , Int32Array , Int64Array ,
@@ -104,6 +105,12 @@ impl ApproxPercentileCont {
104
105
None
105
106
} ;
106
107
108
+ if args. ignore_nulls {
109
+ return not_impl_err ! (
110
+ "IGNORE NULLS clause not yet supported for APPROX_PERCENTILE_CONT"
111
+ ) ;
112
+ }
113
+
107
114
let accumulator: ApproxPercentileAccumulator = match args. input_type {
108
115
t @ ( DataType :: UInt8
109
116
| DataType :: UInt16
@@ -393,8 +400,12 @@ impl Accumulator for ApproxPercentileAccumulator {
393
400
}
394
401
395
402
fn update_batch ( & mut self , values : & [ ArrayRef ] ) -> datafusion_common:: Result < ( ) > {
396
- let values = & values[ 0 ] ;
397
- let sorted_values = & arrow:: compute:: sort ( values, None ) ?;
403
+ // respect nulls by default
404
+ let mut values = values[ 0 ] ;
405
+ if let Some ( nulls) = values. nulls ( ) {
406
+ values = filter ( & values, & is_not_null ( values) ?) ?;
407
+ }
408
+ let sorted_values = & arrow:: compute:: sort ( & values, None ) ?;
398
409
let sorted_values = ApproxPercentileAccumulator :: convert_to_float ( sorted_values) ?;
399
410
self . digest = self . digest . merge_sorted_f64 ( & sorted_values) ;
400
411
Ok ( ( ) )
0 commit comments