Skip to content

Commit 688f84f

Browse files
authored
add details to message reading logs (#1943)
* add details to schema conversion logs * lint * lint
1 parent b97157b commit 688f84f

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/BasicMessageContentReader.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,11 @@ private void ensureExistence(Integer schemaVersion, Integer schemaId) {
5353
schemaExistenceEnsurer.ensureSchemaExists(topic, SchemaId.valueOf(schemaId));
5454
}
5555
} catch (SchemaExistenceEnsurer.SchemaNotLoaded ex) {
56-
throw new RetryableReceiverError("Requested schema not present yet...", ex);
56+
throw new RetryableReceiverError(
57+
String.format(
58+
"Requested schema not present yet for topic: %s, schemaVersion: %s, schemaId: %s",
59+
topic.getQualifiedName(), schemaVersion, schemaId),
60+
ex);
5761
}
5862
}
5963
}

hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,10 @@ private Optional<Message> getMessageFromReadQueue() {
148148
readQueue.poll();
149149
return Optional.of(message);
150150
} catch (RetryableReceiverError ex) {
151-
logger.warn("Cannot convert record to message... Operation will be delayed", ex);
151+
logger.warn(
152+
"Cannot convert record to message for subscription {}, operation will be delayed",
153+
subscription.getQualifiedName(),
154+
ex);
152155
return Optional.empty();
153156
}
154157
}

0 commit comments

Comments
 (0)