Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_parser: Transition from single record to stream #4620

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

Athishpranav2003
Copy link
Contributor

Which issue(s) this PR fixes:
Fixes #4100

What this PR does / why we need it:
Transition filter_parser from using filter_with_time to filter_stream logic
Docs Changes:
N/A
Release Note:
N/A

@Athishpranav2003
Copy link
Contributor Author

@daipom i have raised this as a draft. This needs other changes like changing the tests around this, exception handling and etc.

For now lets have discussion on the base logic and in meantime i will add them once we fix it

@daipom
Copy link
Contributor

daipom commented Aug 29, 2024

Thanks! I will see this this weekend!

@daipom daipom self-requested a review August 29, 2024 08:28
@Athishpranav2003 Athishpranav2003 changed the title Transition filter_parser from single record to stream filter_parser: Transition from single record to stream Aug 29, 2024
@Athishpranav2003
Copy link
Contributor Author

@daipom thank you. The tests needs to be revamped for filter parser since the tests involve few instance variables. Do check it in freetime and let me know

@Athishpranav2003
Copy link
Contributor Author

@daipom any opinion on this logic? Should we go ahead with this?

@daipom
Copy link
Contributor

daipom commented Sep 6, 2024

@Athishpranav2003 Sorry for my late response.
I got COVID-19 last weekend and only got a little work done this week 😢
I will see this now, please wait a few more days 😢

Copy link
Contributor

@daipom daipom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Athishpranav2003 Sorry for my late response.
Basically, it looks good to me! Thanks for this fix!

We would like to address the tests while considering the following points.

  • The behavior of @emit_invalid_record_to_error.
    • The reason why the old code raises errors would be that the following base class logic handled the errors.
      • begin
        filtered_time, filtered_record = filter_with_time(tag, time, record)
        new_es.add(filtered_time, filtered_record) if filtered_time && filtered_record
        rescue => e
        router.emit_error_event(tag, time, record, e)
        end
    • In the new code, we would need to handle it on the plugin side.
  • Code readability.
    • It appears that adding some inner methods would improve code readability.
    • For example ...
      def filter_stream(tag, es)
        new_es = Fluent::MultiEventStream.new
        es.each do |time, record|
          begin
            filter_one_record(tag, time, record) do |filtered_time, filtered_record|
              new_es.add(filtered_time, filtered_record)
            end
          rescue => e
            router.emit_error_event(tag, time, record, e)
          end
        end
        new_es
      end
      
      def filter_one_record(tag, time, record)
        ...
        yield result_time, result_record
        ...
      end

lib/fluent/plugin/filter_parser.rb Outdated Show resolved Hide resolved
@daipom
Copy link
Contributor

daipom commented Sep 9, 2024

It appears that adding some inner methods would improve code readability.
For example ...

I guess we need to think a little better about handling error events...

@Athishpranav2003
Copy link
Contributor Author

@daipom yah
I wanted to make sure we are in same page. I will refactor the code and change the error part after that

@daipom
Copy link
Contributor

daipom commented Sep 9, 2024

@daipom yah I wanted to make sure we are in same page. I will refactor the code and change the error part after that

Thanks! This would be the right direction!

@Athishpranav2003
Copy link
Contributor Author

@daipom you can check this now

@Athishpranav2003
Copy link
Contributor Author

@daipom I guess the CI test fail is not related to this change

@daipom
Copy link
Contributor

daipom commented Sep 10, 2024

Thanks!! I will see this!

I guess the CI test fail is not related to this change

Yes, They are not related.

Copy link
Contributor

@daipom daipom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Thanks!
Looks good to me for normal cases.

I'm concerned about some abnormal cases.
The behavior of some of rescue cases appears to have changed.
I would like to have it checked.

I would be very happy if you could add some automated tests for the rescue cases if possible, as there seems to be a lack of those tests.

lib/fluent/plugin/filter_parser.rb Show resolved Hide resolved
lib/fluent/plugin/filter_parser.rb Outdated Show resolved Hide resolved
test/plugin/test_filter_parser.rb Outdated Show resolved Hide resolved
@Athishpranav2003
Copy link
Contributor Author

@daipom

def test_call_emit_error_event_when_parser_error
d = create_driver(CONFIG_INVALID_TIME_VALUE)
flexmock(d.instance.router).should_receive(:emit_error_event).
with(String, Integer, Hash, ParserError).once
d.run do
d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => '{"time":[], "f1":"v1"}'})
end
end

could you please point which emit_event is this call part of

@Athishpranav2003
Copy link
Contributor Author

After doing some verification

def filter_stream(tag, es)
new_es = MultiEventStream.new
if @has_filter_with_time
es.each do |time, record|
begin
filtered_time, filtered_record = filter_with_time(tag, time, record)
new_es.add(filtered_time, filtered_record) if filtered_time && filtered_record
rescue => e
router.emit_error_event(tag, time, record, e)
end
end
else
es.each do |time, record|
begin
filtered_record = filter(tag, time, record)
new_es.add(time, filtered_record) if filtered_record
rescue => e
router.emit_error_event(tag, time, record, e)
end
end
end
new_es
end

The emit error event is in line 90/99. This implies that earlier we raise the exception in filter one record and capture in the filter stream. Now since we have implemented stream directly so the raise not needed

Failure: test_call_emit_error_event_when_parser_error(ParserFilterTest):
  in mock 'flexmock(Fluent::Test::Driver::TestEventRouter)': no matching handler found for emit_error_event("test", 1272942121, {"data"=>"{\"time\":[], \"f1\":\"v1\"}"}, #<Fluent::Plugin::Parser::ParserError: value must be a string or a number: [](Array)>)
  Defined expectations:
    should_receive(:emit_error_event).with("String", Integer, Hash, Fluent::Plugin::Parser::ParserError).once.
  <false> is not true.
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/test_unit_integration.rb:57:in `make_assertion'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/test_unit_integration.rb:65:in `check'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/core_class_methods.rb:104:in `check'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/expectation_director.rb:41:in `call'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/core.rb:150:in `block in method_missing'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/core.rb:289:in `flexmock_wrap'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/core.rb:146:in `method_missing'
(eval at /home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/partial_mock.rb:429):3:in `emit_error_event'
/home/aggressive_racer1/projects/fluentd/lib/fluent/plugin/filter.rb:90:in `rescue in block in filter_stream'
/home/aggressive_racer1/projects/fluentd/lib/fluent/plugin/filter.rb:86:in `block in filter_stream'
/home/aggressive_racer1/projects/fluentd/lib/fluent/event.rb:110:in `each'
/home/aggressive_racer1/projects/fluentd/lib/fluent/plugin/filter.rb:85:in `filter_stream'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/filter.rb:49:in `block (2 levels) in instance_hook_after_started'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/event_feeder.rb:43:in `feed_to_plugin'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/event_feeder.rb:87:in `feed'
/home/aggressive_racer1/projects/fluentd/test/plugin/test_filter_parser.rb:664:in `block in test_call_emit_error_event_when_parser_error'
     661:     flexmock(d.instance.router).should_receive(:emit_error_event).
     662:       with("String", Integer, Hash, ParserError).once
     663:     d.run do
  => 664:       d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => '{"time":[], "f1":"v1"}'})
     665:     end
     666:   end
     667: 
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/base.rb:204:in `block in run_actual'
/usr/share/ruby/timeout.rb:186:in `block in timeout'
/usr/share/ruby/timeout.rb:41:in `handle_timeout'
/usr/share/ruby/timeout.rb:195:in `timeout'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/base.rb:203:in `run_actual'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/base.rb:96:in `run'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/base_owner.rb:130:in `run'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/event_feeder.rb:39:in `run'
/home/aggressive_racer1/projects/fluentd/test/plugin/test_filter_parser.rb:663:in `test_call_emit_error_event_when_parser_error'

I used the following jugad way to find the function call

@daipom
Copy link
Contributor

daipom commented Sep 12, 2024

Now since we have implemented stream directly so the raise not needed

Yes, we don't need raise in the new implementation.
However, router.emit_error_event should be called as before.

@daipom
Copy link
Contributor

daipom commented Sep 12, 2024

def test_call_emit_error_event_when_parser_error
d = create_driver(CONFIG_INVALID_TIME_VALUE)
flexmock(d.instance.router).should_receive(:emit_error_event).
with(String, Integer, Hash, ParserError).once
d.run do
d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => '{"time":[], "f1":"v1"}'})
end
end

could you please point which emit_event is this call part of

In the current implementation:

rescue Fluent::Plugin::Parser::ParserError => e
if @emit_invalid_record_to_error
raise e
else
return FAILED_RESULT
end

In the new implementation:

rescue Fluent::Plugin::Parser::ParserError => e
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, e)
end

This change is OK because emit_error_event is called similarly after all.

@Athishpranav2003
Copy link
Contributor Author

Athishpranav2003 commented Sep 12, 2024

@daipom
i am not sure what is this issue

Failure: test_filter_key_not_exist(ParserFilterTest):
  in mock 'flexmock(Fluent::Test::Driver::TestEventRouter)': no matching handler found for emit_error_event("test", 1272942121, {"foo"=>"bar"}, #<ArgumentError: data does not exist>)
  Defined expectations:
    should_receive(:emit_error_event).with(String, Integer, Hash, #<ArgumentError: data does not exist>).once.
  <false> is not true.

For now i have pushed the alternative code. Now the functionality looks visibly same as before. Could you please let me know which part is still ambigious and needs tests?

@daipom
Copy link
Contributor

daipom commented Sep 12, 2024

@daipom i am not sure what is this issue

Failure: test_filter_key_not_exist(ParserFilterTest):
  in mock 'flexmock(Fluent::Test::Driver::TestEventRouter)': no matching handler found for emit_error_event("test", 1272942121, {"foo"=>"bar"}, #<ArgumentError: data does not exist>)
  Defined expectations:
    should_receive(:emit_error_event).with(String, Integer, Hash, #<ArgumentError: data does not exist>).once.
  <false> is not true.

I see. This test code specifies the content of expected arguments too strictly.
It appears that this requires all values of the instance match, but the stacktrace data can be different.
We should soften the conditions.

diff --git a/test/plugin/test_filter_parser.rb b/test/plugin/test_filter_parser.rb
index ec36c836..4201cec8 100644
--- a/test/plugin/test_filter_parser.rb
+++ b/test/plugin/test_filter_parser.rb
@@ -650,7 +650,7 @@ class ParserFilterTest < Test::Unit::TestCase
   def test_filter_key_not_exist
     d = create_driver(CONFIG_NOT_IGNORE)
     flexmock(d.instance.router).should_receive(:emit_error_event).
-      with(String, Integer, Hash, ArgumentError.new("data does not exist")).once
+      with(String, Integer, Hash, ArgumentError).once
     assert_nothing_raised {
       d.run do
         d.feed(@tag, Fluent::EventTime.now.to_i, {'foo' => 'bar'})

Copy link
Contributor

@daipom daipom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!
I think it could be simpler, so I would suggest the following code change.

lib/fluent/plugin/filter_parser.rb Outdated Show resolved Hide resolved
lib/fluent/plugin/filter_parser.rb Outdated Show resolved Hide resolved
lib/fluent/plugin/filter_parser.rb Outdated Show resolved Hide resolved
lib/fluent/plugin/filter_parser.rb Outdated Show resolved Hide resolved
lib/fluent/plugin/filter_parser.rb Outdated Show resolved Hide resolved
test/plugin/test_filter_parser.rb Show resolved Hide resolved
@Athishpranav2003
Copy link
Contributor Author

Got it @daipom
Will make the changes in sometime

@daipom
Copy link
Contributor

daipom commented Sep 12, 2024

Thanks!

@Athishpranav2003
Copy link
Contributor Author

@daipom i have addressed the comments. Additionally i felt there was a redundant rescue in the prev code so i have removed that as well. Is it fine now?

Copy link
Contributor

@daipom daipom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!! The code change looks good to me!
I'd like to confirm the behavior a little more.
We would need more tests of anomaly scenarios.
I will add some tests later.

@daipom
Copy link
Contributor

daipom commented Sep 16, 2024

I made another PR to add some test cases for abnormal cases.

Let's rebase after it is merged.

I confirmed the test results on my local for this PR.
Basically, there is no problem, but a few tests fail.
It is because this PR removes the following rescue, and it changes the error message a little bit.

rescue Fluent::Plugin::Parser::ParserError => e
if @emit_invalid_record_to_error
raise e
else
return FAILED_RESULT
end

With this, I believe we should stop removing this rescue, even if this is redundant.
It is preferable to keep the error message as unchanged as possible because it could cause a disadvantage to existing users.

The result of #4638 tests applied to this PR:

Failure: test_parser_error[invalid format with default](ParserFilterTest::abnormal cases)
/home/daipom/work/fluentd/fluentd/test/plugin/test_filter_parser.rb:546:in `run_and_assert'
/home/daipom/work/fluentd/fluentd/test/plugin/test_filter_parser.rb:691:in `test_parser_error'
     688:         </parse>
     689:       EOC
     690: 
  => 691:       run_and_assert(driver, **data.except(:additional_config))
     692:     end
     693: 
     694:     data(
<[[],
 [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
 [[Fluent::Plugin::Parser::ParserError,
   "value must be a string or a number: [](Array)"]]]> expected but was
<[[],
 [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
 [[Fluent::Plugin::Parser::ParserError,
   "parse failed value must be a string or a number: [](Array)"]]]>

diff:
  [[],
   [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
   [[Fluent::Plugin::Parser::ParserError,
?    "parse failed value must be a string or a number: [](Array)"]]]
======================================================================================================
F
======================================================================================================
Failure: test_parser_error[mixed valid and invalid with default](ParserFilterTest::abnormal cases)
/home/daipom/work/fluentd/fluentd/test/plugin/test_filter_parser.rb:546:in `run_and_assert'
/home/daipom/work/fluentd/fluentd/test/plugin/test_filter_parser.rb:691:in `test_parser_error'
     688:         </parse>
     689:       EOC
     690: 
  => 691:       run_and_assert(driver, **data.except(:additional_config))
     692:     end
     693: 
     694:     data(
<[[{"f1"=>"v1"}],
 [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
 [[Fluent::Plugin::Parser::ParserError,
   "value must be a string or a number: [](Array)"]]]> expected but was
<[[{"f1"=>"v1"}],
 [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
 [[Fluent::Plugin::Parser::ParserError,
   "parse failed value must be a string or a number: [](Array)"]]]>

diff:
  [[{"f1"=>"v1"}],
   [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
   [[Fluent::Plugin::Parser::ParserError,
?    "parse failed value must be a string or a number: [](Array)"]]]

@Athishpranav2003
Copy link
Contributor Author

@daipom thanks for this help. I had difficulty in figuring out the edgecases. Thanks for doing it along with this to make it move ahead

@Athishpranav2003
Copy link
Contributor Author

@daipom i have reverted the change with redundant rescue. Can you point out the change in the error messages if its still present in this pr?

Copy link
Contributor

@daipom daipom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I have confirmed this change passes tests on #4638.
I believe we are now ready to merge! (after #4638)
Thanks so much for this fix!

@daipom
Copy link
Contributor

daipom commented Sep 18, 2024

@ashie @kenhys
Which version should we release this on?

This solves the limitation of #4474, which is released on v1.17.0.
It's ambiguous whether it's a bug fix or an enhancement.

I think it is possible to release this on v1.17.
On the other hand, since this is a large fix for filter_parser, 1.18 may be safer, given the risk of regression.

@daipom daipom added this to the v1.18.0 milestone Sep 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

JSON parser plugin can sometimes output records as strings rather than Hash
2 participants