Skip to content

Commit 594cb8b

Browse files
oleksii.diagilievkazemaksOG
oleksii.diagiliev
authored andcommitted
[SPARK-51023][CORE] log remote address on RPC exception
### What changes were proposed in this pull request? Add the remote address to the RPC exception log lines. It's already logged for `TransportRequestHandler.processStreamRequest()`, but not for other types of requests. ### Why are the changes needed? To simplify troubleshooting. We hit this in our production deployments for two cases: * when the executor and driver are running different Spark versions * when vulnerability scanner (internal security tool) sends malformed messages ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By verifying the logs of `RpcIntegrationSuite` - ``` 25/01/28 15:06:07.439 shuffle-server-3-15 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 5952848813688034638 from /127.0.0.1:61697 java.lang.RuntimeException: Thrown: the at org.apache.spark.network.RpcIntegrationSuite$1.receive(RpcIntegrationSuite.java:73) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:167) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111) ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#49718 from fe2s/SPARK-51023-log_remote_address_on_RPC_exception. Authored-by: oleksii.diagiliev <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent cf583bb commit 594cb8b

File tree

1 file changed

+11
-7
lines changed

1 file changed

+11
-7
lines changed

common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java

+11-7
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,9 @@ public void onFailure(Throwable e) {
176176
}
177177
});
178178
} catch (Exception e) {
179-
logger.error("Error while invoking RpcHandler#receive() on RPC id {}", e,
180-
MDC.of(LogKeys.REQUEST_ID$.MODULE$, req.requestId));
179+
logger.error("Error while invoking RpcHandler#receive() on RPC id {} from {}", e,
180+
MDC.of(LogKeys.REQUEST_ID$.MODULE$, req.requestId),
181+
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
181182
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
182183
} finally {
183184
req.body().release();
@@ -262,8 +263,9 @@ public String getID() {
262263
respond(new RpcResponse(req.requestId,
263264
new NioManagedBuffer(blockPushNonFatalFailure.getResponse())));
264265
} else {
265-
logger.error("Error while invoking RpcHandler#receive() on RPC id {}", e,
266-
MDC.of(LogKeys.REQUEST_ID$.MODULE$, req.requestId));
266+
logger.error("Error while invoking RpcHandler#receive() on RPC id {} from {}", e,
267+
MDC.of(LogKeys.REQUEST_ID$.MODULE$, req.requestId),
268+
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
267269
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
268270
}
269271
// We choose to totally fail the channel, rather than trying to recover as we do in other
@@ -279,7 +281,8 @@ private void processOneWayMessage(OneWayMessage req) {
279281
try {
280282
rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
281283
} catch (Exception e) {
282-
logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
284+
logger.error("Error while invoking RpcHandler#receive() for one-way message from {}.", e,
285+
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
283286
} finally {
284287
req.body().release();
285288
}
@@ -304,9 +307,10 @@ public void onFailure(Throwable e) {
304307
});
305308
} catch (Exception e) {
306309
logger.error("Error while invoking receiveMergeBlockMetaReq() for appId {} shuffleId {} "
307-
+ "reduceId {}", e, MDC.of(LogKeys.APP_ID$.MODULE$, req.appId),
310+
+ "reduceId {} from {}", e, MDC.of(LogKeys.APP_ID$.MODULE$, req.appId),
308311
MDC.of(LogKeys.SHUFFLE_ID$.MODULE$, req.shuffleId),
309-
MDC.of(LogKeys.REDUCE_ID$.MODULE$, req.reduceId));
312+
MDC.of(LogKeys.REDUCE_ID$.MODULE$, req.reduceId),
313+
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
310314
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
311315
}
312316
}

0 commit comments

Comments
 (0)