Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement MdcExecutor to manage MDC context for query execution #15072

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

gortiz
Copy link
Contributor

@gortiz gortiz commented Feb 16, 2025

This simplification of #14994 is focused on adding MDC support on the query side. MDC is a common feature in logging frameworks that is usually build on top of thread local. The idea is that sometimes logs should include some contextual information we don't need on the rest of the code. A very clear example on Pinot is the request id. It may be useful to include that id on all the logs related to a query but we don't want to add the request id attribute/parameter everywhere just in case some part of the code will need it to create a log.

One important part of using MDC is to actually be sure that the variables are cleaned up once the thread finished the request. Therefore I had to add some try-finally blocks in the code. I strongly recommend to review this PR with the ignore white changes option.

This PR modifies the log4j2.xml file used on quickstarts to actually use these new MDC values. Actual deployments will likely use their log4j2.xml files, and they will need to be modified to log request IDs and stage IDs. Please remember that this is optional. Logs are still as helpful as before, even if MDC properties are unused. Specifically, logs that already included the request ID or stage ID haven't been changed.

It is important to know that in MSE we use more than one request id. Specifically, leaf stages change the request id used in the SSE part of the query. This is done in ServerPlanRequestUtils and it a controversial feature we may decide to change in the future. At logging level, it is not useful to have different request ids for the same query. This is why the request id set in MDC is shared between MSE and SSE parts. This will be easier to understand with some examples:

Max rows limit in hash join. Here, we can see two new values between brackets: qid and stg. The first one is the query id (similar to request id) and the second is the stage id. The worker id is also included in MDC, but the proposed log4j2.xml is not using it.

2025/02/16 13:45:23.864 ERROR [OpChainSchedulerService] [query-runner-on-45611-9-thread-6] [qid=884449079000000004,stg=2] (OpChain{884449079000000004_0_2}): Completed erroneously null {245=Cannot process join, reached number of rows limit: 1048576. Consider increasing the limit for the maximum number of rows in a join either via the query option 'maxRowsInJoin' or the 'max_rows_in_join' hint in the 'joinOptions'. Alternatively, if partial results are acceptable, the join overflow mode can be set to 'BREAK' either via the query option 'joinOverflowMode' or the 'join_overflow_mode' hint in the 'joinOptions'.

A log in SSE code when being called by MSE. Here we can see we have two different request ids in the same log. The one called qid in square brackets (close to stg) is the one used in MSE, while the other (included in the normal message and called requestId) is the fake SSE id created on ServerPlanRequestUtils

2025/02/16 13:43:54.724 INFO [ServerQueryLogger] [query-runner-on-45611-9-thread-2] [qid=884449079000000003,stg=2] Processed requestId=3584961748589019648,table=userAttributes_OFFLINE,segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/invalid/limit/value)=2/1/1/-1/0/0/0/1/0,schedulerWaitMs=-1,reqDeserMs=-1,totalExecMs=55,resSerMs=-1,totalTimeMs=-1,minConsumingFreshnessMs=-1,broker=unknown,numDocsScanned=10,scanInFilter=0,scanPostFilter=40,sched=MultistageEngine,threadCpuTimeNs(total/thread/sysActivity/resSer)=4098950/4098950/0/0

Same log, when called from SSE. Here the qid and requestId is the same both places and stg is not set

2025/02/16 13:48:52.902 INFO [ServerQueryLogger] [pqr-0] [qid=884449079000000001] Processed requestId=884449079000000001,table=userAttributes_OFFLINE,segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/invalid/limit/value)=2/1/1/-1/0/0/0/1/0,schedulerWaitMs=7,reqDeserMs=1,totalExecMs=3,resSerMs=3,totalTimeMs=14,minConsumingFreshnessMs=-1,broker=Broker_192.168.1.42_8000,numDocsScanned=10,scanInFilter=0,scanPostFilter=40,sched=FCFS,threadCpuTimeNs(total/thread/sysActivity/resSer)=947216/407270/539946/0

@codecov-commenter
Copy link

codecov-commenter commented Feb 16, 2025

Codecov Report

Attention: Patch coverage is 63.15789% with 56 lines in your changes missing coverage. Please review.

Project coverage is 63.42%. Comparing base (59551e4) to head (fdbfeff).
Report is 1754 commits behind head on master.

Files with missing lines Patch % Lines
...e/pinot/spi/executor/DecoratorExecutorService.java 12.12% 29 Missing ⚠️
...ava/org/apache/pinot/spi/executor/MdcExecutor.java 42.10% 10 Missing and 1 partial ⚠️
...che/pinot/core/query/scheduler/QueryScheduler.java 45.45% 6 Missing ⚠️
...va/org/apache/pinot/query/runtime/QueryRunner.java 85.36% 6 Missing ⚠️
...va/org/apache/pinot/spi/trace/LoggerConstants.java 76.47% 2 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #15072      +/-   ##
============================================
+ Coverage     61.75%   63.42%   +1.67%     
- Complexity      207     1484    +1277     
============================================
  Files          2436     2751     +315     
  Lines        133233   154546   +21313     
  Branches      20636    23818    +3182     
============================================
+ Hits          82274    98028   +15754     
- Misses        44911    49124    +4213     
- Partials       6048     7394    +1346     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.40% <63.15%> (+1.69%) ⬆️
java-21 63.32% <63.15%> (+1.69%) ⬆️
skip-bytebuffers-false 63.42% <63.15%> (+1.68%) ⬆️
skip-bytebuffers-true 63.29% <63.15%> (+35.56%) ⬆️
temurin 63.42% <63.15%> (+1.67%) ⬆️
unittests 63.42% <63.15%> (+1.67%) ⬆️
unittests1 56.04% <63.15%> (+9.15%) ⬆️
unittests2 33.96% <0.00%> (+6.23%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.


@Override
protected void registerOnMDC() {
queryRequest.registerOnMdc();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why go through queryRequest to register on MDC ? Instead a call can be made directly to LoggerConstants.QUERY_ID_KEY ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep MDC management clean and centralized.

_executorService = executorService;
}

protected abstract boolean alreadyRegistered();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least with the current implementation, this fn. does not have to be abstract ? Both the implementations are the same.

* TODO: Convert this class and its usages into an Executor instead of an ExecutorService
*
*/
public abstract class MdcExecutor implements ExecutorService {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current scope, this class is only used for query execution threads. So this can be less generic. It can be named MdcQueryExecutor. The class can also take in a context then ? The root of my questions are because the following code pattern seems overkill in the current scope.

MdcExecutor mdcExecutor = new MdcExecutor(executorService) {
      @Override
      protected boolean alreadyRegistered() {
        return LoggerConstants.QUERY_ID_KEY.isRegistered();
      }

      @Override
      protected void registerOnMDC() {
        executionContext.registerOnMDC();
      }

      @Override
      protected void unregisterFromMDC() {
        executionContext.unregisterFromMDC();
      }
    };

Copy link
Contributor Author

@gortiz gortiz Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first point makes sense. The fact that only affects queries is implicit given it is in the query runtime package, but we can change the implementation to receive a listener to customize the stuff we want to add to MDC.

But I don't understand the second part. There are two implementations in this PR. One uses OpChainExecutionContext and the other uses ServerQueryRequest

Copy link
Collaborator

@vrajat vrajat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the code structure now. I hadn't grokked the Decorator executor and how every task is decorated.

A follow up (out-of-band) discussion is the effort required to allow multiple decorators using this pattern. e.g. OOM protection.

@@ -40,6 +40,7 @@
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.spi.executor.MdcExecutor;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be re-arranged, it's currently causing the spotless plugin check to fail.

@@ -140,8 +142,25 @@ public synchronized void shutDown() {
@Override
public InstanceResponseBlock execute(ServerQueryRequest queryRequest, ExecutorService executorService,
@Nullable ResultsBlockStreamer streamer) {
MdcExecutor mdcExecutor = new MdcExecutor(executorService) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to allocate an instance for every query request? Can't we use a common ThreadPoolExecutor implementation with an overridden afterExecute that clears the MDC context instead?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I just realized what you're aiming to do with the decorator pattern here - all the other alternatives would probably involve significantly more changes to make sure that the right executor is being created and used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is easier and safer to do it this way. This allocation is super cheap, given we just decorate an actual executor and the decorator has almost no state

See
https://logging.apache.org/log4j/2.x/manual/pattern-layout.html#converter-not-empty
and https://logging.apache.org/log4j/2.x/manual/pattern-layout.html#converter-thread-context-map
-->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's pinot.component.type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is another contextual variable I'm using in the PR where I'm improving errors. There I'm using that to say whether the log comes from a server, controller or broker, which is very useful in Quickstarts where all logs are merged. But shouldn't be used here. I'm removing it.

registerOnMdcIfNotSet(value, false);
}

public boolean registerOnMdcIfNotSet(String value, boolean override) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What situations would we want to override the MDC value in? Currently we're not overriding in any case right? And we're simply logging if we see some MDC value already registered unexpectedly (potentially from missed cleanup earlier)? Shouldn't we override it in that case instead of using the older stale value?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if the older value is the same as the value we want to register (in case we already set it at some previous point in the thread's execution) it doesn't hurt to override the value right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It hurts because if we don't override the values, we can see where the original came from. Anyway, we can discuss that in another PR.

I've just modified the code to have:

  1. String registerInMdc(String value), which always override
  2. String registerInMdc(String value, boolean override), which overrides depending on the paramenter
  3. String registerInMdcIfNotExist(String value), which never overrides

All three methods always return the older value (or null).

Comment on lines 44 to 48
protected abstract <T> Callable<T> decorate(Callable<T> task);

protected abstract Runnable decorate(Runnable task);

protected <T> Collection<? extends Callable<T>> decorateTasks(Collection<? extends Callable<T>> tasks) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: some Javadocs could be useful here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

_executorService.execute(decorate(command));
}

public static abstract class BeforeAfter extends DecoratorExecutorService {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unused? Is it just mean to be an example extensible implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is unused, but it can help future implementations and is a good example for other custom implementations as well.

@gortiz
Copy link
Contributor Author

gortiz commented Feb 25, 2025

I've changed some interfaces and improved javadocs as requested. I hope it will be easier to read now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants