前言:昨天,对RocketMq事务消息的概念有了初步的了解,现在咱们通过代码,来试试RocketMq的事务消息。
消费端配置:
@Component
public class MqConsumer {
private DefaultMQPushConsumer consumer;
@Value("${mq.nameserver.addr}")
private String nameAddr;
@Value("${mq.topicname}")
private String topicName;
@PostConstruct
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer("TopicTest");
consumer.setNamesrvAddr(nameAddr);
consumer.subscribe(topicName, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("消费了");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
生产者配置:
public class MqProducer {
private DefaultMQProducer producer;
private TransactionMQProducer transactionMQProducer;
@Value("${mq.nameserver.addr}")
private String nameAddr;
@Value("${mq.topicname}")
private String topicName;
@PostConstruct
public void init() throws MQClientException {
//做mq producer的初始化
producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr(nameAddr);
producer.start();
transactionMQProducer = new TransactionMQProducer("transaction_producer_group");
transactionMQProducer.setNamesrvAddr(nameAddr);
transactionMQProducer.start();
transactionMQProducer.setTransactionListener(new TransactionListener() {
//此方法是事务消息的监听方法,对本地事务处理,处理完成后,返回给broker,也就是当Prepare消息发送后,进入此阶段,进入下面的代码块
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
//此方法是事务消息的第二段的处理方法,也就是当消息丢失后,会回查,回查会进入下面的代码块。
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
});
}
}