package io.streamnative.examples.transaction;
import com.beust.jcommander.JCommander;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class TransactionSyncExample {
private static final Logger log = LoggerFactory.getLogger(TransactionSyncExample.class);
public static void main(String[] args) throws Exception {
JCommanderPulsar jct = new JCommanderPulsar();
JCommander jCommander = new JCommander(jct, args);
if (jct.help) {
jCommander.usage();
return;
}
String inputTopic = "persistent://public/default/input-topic";
String outputTopicOne = "persistent://public/default/output-topic-1";
String outputTopicTwo = "persistent://public/default/output-topic-2";
PulsarClient client = PulsarClient.builder()
// Create a Pulsar client and enable Transactions.
.enableTransaction(true)
.serviceUrl(jct.serviceUrl)
.authentication(
AuthenticationFactoryOAuth2.clientCredentials(new URL(jct.issuerUrl), new URL(jct.credentialsUrl), jct.audience))
.build();
// Create three producers to produce messages to input and output topics.
ProducerBuilder<String> producerBuilder = client.newProducer(Schema.STRING);
Producer<String> inputProducer = producerBuilder.topic(inputTopic)
.sendTimeout(0, TimeUnit.SECONDS).create();
Producer<String> outputProducerOne = producerBuilder.topic(outputTopicOne)
.sendTimeout(0, TimeUnit.SECONDS).create();
Producer<String> outputProducerTwo = producerBuilder.topic(outputTopicTwo)
.sendTimeout(0, TimeUnit.SECONDS).create();
// Create three consumers to consume messages from input and output topics.
Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
.subscriptionName("your-subscription-name").topic(inputTopic).subscribe();
Consumer<String> outputConsumerOne = client.newConsumer(Schema.STRING)
.subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe();
Consumer<String> outputConsumerTwo = client.newConsumer(Schema.STRING)
.subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe();
int count = 2;
// Produce messages to topics.
for (int i = 0; i < count; i++) {
inputProducer.send("Hello Pulsar! count : " + i);
}
// consume messages and produce to output topics with transaction
for (int i = 0; i < count; i++) {
// The consumer successfully receives messages. Then, create a transaction.
Message<String> message = inputConsumer.receive();
Transaction txn = null;
try {
txn = client.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
// process the message here...
// The producers produce messages to output topics with the transaction
outputProducerOne.newMessage(txn).value("Hello Pulsar! outputTopicOne count : " + i).send();
outputProducerTwo.newMessage(txn).value("Hello Pulsar! outputTopicTwo count : " + i).send();
// The consumers acknowledge the input message with the transaction
inputConsumer.acknowledgeAsync(message.getMessageId(), txn).get();
// commit the transaction
txn.commit();
} catch (ExecutionException e) {
if (!(e.getCause() instanceof PulsarClientException.TransactionConflictException)) {
// if not TransactionConflictException,
// we should redeliver or negativeAcknowledge this message
// if you don't redeliver or negativeAcknowledge, the message will not receive again
inputConsumer.negativeAcknowledge(message);
}
// if a transaction has been created, should abort this transaction
if (txn != null) {
txn.abort();
}
}
}
// consume messages from output topics and print them
for (int i = 0; i < count; i++) {
Message<String> message = outputConsumerOne.receive();
System.out.println("Receive transaction message: " + message.getValue());
}
for (int i = 0; i < count; i++) {
Message<String> message = outputConsumerTwo.receive();
System.out.println("Receive transaction message: " + message.getValue());
}
}
}