Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extend the PulsarConnectionFactory to using temporary producers #169

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public class PulsarConnectionFactory

// see resetDefaultValues for final fields
private final transient Map<String, Producer<byte[]>> producers = new ConcurrentHashMap<>();
private final transient Map<String, Producer<byte[]>> tempProducers = new ConcurrentHashMap<>();
private final transient Set<PulsarConnection> connections =
Collections.synchronizedSet(new HashSet<>());
private final transient List<Consumer<?>> consumers = new CopyOnWriteArrayList<>();
Expand All @@ -125,6 +126,8 @@ public class PulsarConnectionFactory
private transient PulsarClient pulsarClient;
private transient PulsarAdmin pulsarAdmin;
private transient Map<String, Object> producerConfiguration;
private transient boolean useTempProducer = false;
private transient String tempTopicNamePrefix = "jms-temp-queue";
private transient ConsumerConfiguration defaultConsumerConfiguration;
private transient String systemNamespace = "public/default";
private transient String defaultClientId = null;
Expand Down Expand Up @@ -276,6 +279,17 @@ private synchronized void ensureInitialized(String connectUsername, String conne
Map<String, Object> configurationCopy = Utils.deepCopyMap(this.configuration);
try {

Map<String, Object> tempProducerConfiguration =
(Map<String, Object>) configurationCopy.remove("tempProducerConfig");
if (tempProducerConfiguration != null) {

this.useTempProducer =
Boolean.parseBoolean(
tempProducerConfiguration.getOrDefault("useTempProducer", false).toString());
this.tempTopicNamePrefix =
getAndRemoveString("tempTopicNamePrefix", "jms-temp-queue", tempProducerConfiguration);
}

Map<String, Object> producerConfiguration =
(Map<String, Object>) configurationCopy.remove("producerConfig");
if (producerConfiguration != null) {
Expand Down Expand Up @@ -650,6 +664,14 @@ public synchronized PulsarAdmin getPulsarAdmin() throws jakarta.jms.IllegalState
return pulsarAdmin;
}

public synchronized String getTempTopicNamePrefix() {
return tempTopicNamePrefix;
}

public synchronized boolean isUseTempProducer() {
return useTempProducer;
}

public synchronized String getSystemNamespace() {
return systemNamespace;
}
Expand Down Expand Up @@ -1019,6 +1041,15 @@ public void close() {
}
}

for (Producer<?> producer : tempProducers.values()) {
try {
producer.close();
} catch (PulsarClientException ignore) {
// ignore
Utils.handleException(ignore);
}
}

if (this.pulsarAdmin != null) {
this.pulsarAdmin.close();
}
Expand Down Expand Up @@ -1064,67 +1095,127 @@ Producer<byte[]> getProducerForDestination(Destination defaultDestination, boole
boolean enableJMSPriority = isEnableJMSPriority();
boolean producerJMSPriorityUseLinearMapping =
enableJMSPriority && isPriorityUseLinearMapping();
return producers.computeIfAbsent(
key,
d -> {
try {
return Utils.invoke(
() -> {
Map<String, Object> producerConfiguration = getProducerConfiguration();
ProducerBuilder<byte[]> producerBuilder =
pulsarClient
.newProducer()
.topic(applySystemNamespace(fullQualifiedTopicName))
.loadConf(producerConfiguration);
if (producerConfiguration.containsKey("batcherBuilder")) {
producerBuilder.batcherBuilder(
(BatcherBuilder) producerConfiguration.get("batcherBuilder"));
}
Map<String, String> properties = new HashMap<>();
if (transactions) {
properties.put("jms.transactions", "enabled");
} else {
properties.put("jms.transactions", "disabled");
}
if (enableJMSPriority) {
properties.put("jms.priority", "enabled");
properties.put(
"jms.priorityMapping",
producerJMSPriorityUseLinearMapping ? "linear" : "non-linear");
producerBuilder.messageRouter(
new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {

int key = PulsarMessage.readJMSPriority(msg);
return Utils.mapPriorityToPartition(
key,
metadata.numPartitions(),
producerJMSPriorityUseLinearMapping);
}
});
} else if (transactions && transactionsStickyPartitions) {
producerBuilder.messageRouter(
new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
long key = Long.parseLong(msg.getProperty("JMSTX"));
return signSafeMod(key, metadata.numPartitions());
}
});
}
producerBuilder.properties(properties);
return producerBuilder.create();
});
} catch (JMSException err) {
throw new RuntimeException(err);
}
});

if (isTempTopic(fullQualifiedTopicName))
return getProducer(
tempProducers,
key,
transactions,
enableJMSPriority,
fullQualifiedTopicName,
transactionsStickyPartitions,
producerJMSPriorityUseLinearMapping);
else
return getProducer(
producers,
key,
transactions,
enableJMSPriority,
fullQualifiedTopicName,
transactionsStickyPartitions,
producerJMSPriorityUseLinearMapping);
} catch (RuntimeException err) {
throw (JMSException) err.getCause();
}
}

private boolean isTempTopic(String fullQualifiedTopicName) {
return isUseTempProducer() && fullQualifiedTopicName.contains(getTempTopicNamePrefix());
}

private Producer<byte[]> getProducer(
Map<String, Producer<byte[]>> producerCache,
String key,
boolean transactions,
boolean enableJMSPriority,
String fullQualifiedTopicName,
boolean transactionsStickyPartitions,
boolean producerJMSPriorityUseLinearMapping) {

return producerCache.computeIfAbsent(
key,
d -> {
try {
return Utils.invoke(
() ->
createProducer(
transactions,
fullQualifiedTopicName,
enableJMSPriority,
producerJMSPriorityUseLinearMapping,
transactionsStickyPartitions));
} catch (JMSException err) {
throw new RuntimeException(err);
}
});
}

void closeTempProducer(Producer<byte[]> producer) {
if (isUseTempProducer() && producer != null) {
var topicName = producer.getTopic();

if (tempProducers.containsKey(topicName)) {
var tempProducer = tempProducers.remove(topicName);
tempProducer.closeAsync();

log.debug(
"Temporary producer {} with destination {} is closed",
producer.getProducerName(),
topicName);
}
}
}

private Producer<byte[]> createProducer(
boolean transactions,
String fullQualifiedTopicName,
boolean enableJMSPriority,
boolean producerJMSPriorityUseLinearMapping,
boolean transactionsStickyPartitions)
throws PulsarClientException {
Map<String, Object> producerConfiguration = getProducerConfiguration();
ProducerBuilder<byte[]> producerBuilder =
pulsarClient
.newProducer()
.topic(applySystemNamespace(fullQualifiedTopicName))
.loadConf(producerConfiguration);
if (producerConfiguration.containsKey("batcherBuilder")) {
producerBuilder.batcherBuilder((BatcherBuilder) producerConfiguration.get("batcherBuilder"));
}
Map<String, String> properties = new HashMap<>();
if (transactions) {
properties.put("jms.transactions", "enabled");
} else {
properties.put("jms.transactions", "disabled");
}
if (enableJMSPriority) {
properties.put("jms.priority", "enabled");
properties.put(
"jms.priorityMapping", producerJMSPriorityUseLinearMapping ? "linear" : "non-linear");
producerBuilder.messageRouter(
new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {

int key = PulsarMessage.readJMSPriority(msg);
return Utils.mapPriorityToPartition(
key, metadata.numPartitions(), producerJMSPriorityUseLinearMapping);
}
});
} else if (transactions && transactionsStickyPartitions) {
producerBuilder.messageRouter(
new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
long key = Long.parseLong(msg.getProperty("JMSTX"));
return signSafeMod(key, metadata.numPartitions());
}
});
}
producerBuilder.properties(properties);
return producerBuilder.create();
}

synchronized boolean isUsePulsarAdmin() {
return usePulsarAdmin;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,7 @@ private void sendMessage(Destination defaultDestination, Message message) throws
pulsarMessage.send(typedMessageBuilder, disableMessageTimestamp, session);
} finally {
session.unblockTransactionOperations();
session.getFactory().closeTempProducer(producer);
}
if (message != pulsarMessage) {
applyBackMessageProperties(message, pulsarMessage);
Expand All @@ -1269,77 +1270,82 @@ private void sendMessage(
}
session.executeCriticalOperation(
() -> {
Producer<byte[]> producer = session.getProducerForDestination(defaultDestination);
message.setJMSDestination(defaultDestination);
PulsarMessage pulsarMessage = prepareMessageForSend(message);
CompletionListener endActivityCompletionListener =
new CompletionListener() {
@Override
public void onCompletion(Message message) {
try {
completionListener.onCompletion(message);
} finally {
session.unblockTransactionOperations();
}
}

@Override
public void onException(Message message, Exception exception) {
try {
completionListener.onException(message, exception);
} finally {
session.unblockTransactionOperations();
}
}
};
CompletionListener finalCompletionListener = endActivityCompletionListener;
if (pulsarMessage != message) {
finalCompletionListener =
Producer<byte[]> producer = null;
try {
producer = session.getProducerForDestination(defaultDestination);
message.setJMSDestination(defaultDestination);
PulsarMessage pulsarMessage = prepareMessageForSend(message);
CompletionListener endActivityCompletionListener =
new CompletionListener() {
@Override
public void onCompletion(Message completedMessage) {
// we have to pass the original message to the called
applyBackMessageProperties(message, pulsarMessage);
endActivityCompletionListener.onCompletion(message);
public void onCompletion(Message message) {
try {
completionListener.onCompletion(message);
} finally {
session.unblockTransactionOperations();
}
}

@Override
public void onException(Message completedMessage, Exception e) {
// we have to pass the original message to the called
applyBackMessageProperties(message, pulsarMessage);
endActivityCompletionListener.onException(message, e);
public void onException(Message message, Exception exception) {
try {
completionListener.onException(message, exception);
} finally {
session.unblockTransactionOperations();
}
}
};
}
CompletionListener finalCompletionListener = endActivityCompletionListener;
if (pulsarMessage != message) {
finalCompletionListener =
new CompletionListener() {
@Override
public void onCompletion(Message completedMessage) {
// we have to pass the original message to the called
applyBackMessageProperties(message, pulsarMessage);
endActivityCompletionListener.onCompletion(message);
}

session.blockTransactionOperations();
TypedMessageBuilder<byte[]> typedMessageBuilder;
if (session.getTransacted()) {
Transaction transaction = session.getTransaction();
if (transaction != null) {
typedMessageBuilder = producer.newMessage(transaction);
@Override
public void onException(Message completedMessage, Exception e) {
// we have to pass the original message to the called
applyBackMessageProperties(message, pulsarMessage);
endActivityCompletionListener.onException(message, e);
}
};
}

session.blockTransactionOperations();
TypedMessageBuilder<byte[]> typedMessageBuilder;
if (session.getTransacted()) {
Transaction transaction = session.getTransaction();
if (transaction != null) {
typedMessageBuilder = producer.newMessage(transaction);
} else {
// emulated transactions
typedMessageBuilder = producer.newMessage();
if (defaultDeliveryDelay > 0) {
typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS);
}
if (uncommittedMessages == null) {
uncommittedMessages = new ArrayList<>();
}
uncommittedMessages.add(new PreparedMessage(typedMessageBuilder, pulsarMessage));
session.registerProducerWithTransaction(this);
finalCompletionListener.onCompletion(pulsarMessage);
return null;
}
} else {
// emulated transactions
typedMessageBuilder = producer.newMessage();
if (defaultDeliveryDelay > 0) {
typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS);
}
if (uncommittedMessages == null) {
uncommittedMessages = new ArrayList<>();
}
uncommittedMessages.add(new PreparedMessage(typedMessageBuilder, pulsarMessage));
session.registerProducerWithTransaction(this);
finalCompletionListener.onCompletion(pulsarMessage);
return null;
}
} else {
typedMessageBuilder = producer.newMessage();
}
if (defaultDeliveryDelay > 0) {
typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS);
if (defaultDeliveryDelay > 0) {
typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS);
}
pulsarMessage.sendAsync(
typedMessageBuilder, finalCompletionListener, session, this, disableMessageTimestamp);
} finally {
session.getFactory().closeTempProducer(producer);
}
pulsarMessage.sendAsync(
typedMessageBuilder, finalCompletionListener, session, this, disableMessageTimestamp);
return null;
});
}
Expand Down
Loading