博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ高级原理详解
阅读量:3922 次
发布时间:2019-05-23

本文共 8533 字,大约阅读时间需要 28 分钟。

事务消息

事务消息与消费者关系不大;

代码实现

TransactionListener transactionListener = new TransactionListenerImpl();        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");//        producer.setNamesrvAddr("127.0.0.1:9876");        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue
(2000), new ThreadFactory() {
@Override public Thread newThread(Runnable r) {
Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {
"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace(); } } for (int i = 0; i < 100000; i++) {
Thread.sleep(1000); } producer.shutdown();

使用专有的TransactionMQProducer事务发送者;其他差不多

大体做了什么:定义了一个字符串数组,总共发送10次,每个字符串发送两次;
得到的结果:最先得到两条TagA,然后收到TagC的消息;
原因在producer.setTransactionListener(transactionListener);中
我们本地新建了类TransactionListenerImpl继承自TransactionListener;
并实现方法executeLocalTransaction和checkLocalTransaction;

具体原因:

保证本地事务和mq保持事务一致性,要吗一起成功,要吗一起失败;

举个例子:现在有订单系统(生产者),支付系统(第三方系统),下游服务(消费者)

1、生产者发送 half半消息 至RocketMQ中;
生产者需要确定mq的状态,发送half的半消息,下游服务(消费者)不可见。只是检查mq当前的状态。避免本地执行了事务确发现mq已经挂了。这时mq和本地事务就保持不一致了。
2、RocketMQ回复half消息 至 生产者

3、生产者 执行本地事务

订单系统需要检查本地数据库是否可以执行
4、生产者 返回本地事务状态
把当前数据库的状态发给mq
5、RocketMQ 未确定状态事务进行状态回查
6、生产者 检查本地事务状态
检查第三方系统状态是否正常,支付系统(比如说对接银行了)对账是否可以正常进行。
7、生产者 返回本地事务检查状态。

1、怎么保证half消费不给下游看到。

所谓的half消息,就是普通的消息,放在不同的位置。

如果订单系统,下游服务自己出问题了怎么办?

所以事务消息只保证了分布式事务消息的一半。 性能及吞吐量都会下降。

@Override    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String tags = msg.getTags(); if(StringUtils.contains(tags,"TagA")){
return LocalTransactionState.COMMIT_MESSAGE; }else if(StringUtils.contains(tags,"TagB")){
return LocalTransactionState.ROLLBACK_MESSAGE; }else{
return LocalTransactionState.UNKNOW; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String tags = msg.getTags(); if(StringUtils.contains(tags,"TagC")){
return LocalTransactionState.COMMIT_MESSAGE; }else if(StringUtils.contains(tags,"TagD")){
return LocalTransactionState.ROLLBACK_MESSAGE; }else{
return LocalTransactionState.UNKNOW; } }

COMMIT_MESSAGE:事务提交

ROLLBACK_MESSAGE:事务回滚
UNKNOW:
在发送消息的时候就会执行监听器的executeLocalTransaction方法;在这里提交了TagA,而回滚了TagB;其他的都是做UNKNOW操作。所以第一次只收到了两个TagA;过了一段时间才执行checkLocalTransaction;所以后面又收到TagC

ACL权限控制

修改配置$ROCKETMQ_HOME/conf/plain_acl.yml文件
配置ip白名单,用户名及密码,这个配置是热加载的。随时配置随时生效;
再修改broker.conf中修改“aclEnable=true” 鉴权规则就生效了;
在生产中用的比较少。知道有这么个东西就好,不详细了解。
springBoot整合
引入相关依赖,添加启动类,添加配置文件;
配置文件:
rocketmq.name-server=192.168.232.128:9876
rocketmq.producer.group=springBootGroup //加了这个可以直接点出我们需要的配置,大部分都是生产者的依赖;
代码实现
使用springboot后,代码实现会变的非常的简单:(这里就举两个典型的案例,其他的太多了)
普通的单向发送代码

@Resource    private RocketMQTemplate rocketMQTemplate;    public void sendMessage(String topic,String msg){
this.rocketMQTemplate.convertAndSend(topic,msg); }

所有的发送者都只需要使用rocketMQTemplate印出来即可,不用记其他的东西,并且像单向发送只要放进目标名和消息就完成了;

发送事务消息:(根据上面那个事务消息来做的)

public void sendMessageInTransaction(String topic,String msg) throws InterruptedException {
String[] tags = new String[] {
"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) {
//尝试在Header中加入一些自定义的属性。 Message
message = MessageBuilder.withPayload(msg) .setHeader(RocketMQHeaders.TRANSACTION_ID,"TransID_"+i) //发到事务监听器里后,这个自己设定的TAGS属性会丢失。但是上面那个属性不会丢失。 .setHeader(RocketMQHeaders.TAGS,tags[i % tags.length]) //MyProp在事务监听器里也能拿到,为什么就单单这个RocketMQHeaders.TAGS拿不到?这只能去调源码了。 .setHeader("MyProp","MyProp_"+i) .build(); String destination =topic+":"+tags[i % tags.length]; //这里发送事务消息时,还是会转换成RocketMQ的Message对象,再调用RocketMQ的API完成事务消息机制。 SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message,destination); System.out.printf("%s%n", sendResult); Thread.sleep(10); } }

事务消息的监听器

实现RocketMQLocalTransactionListener接口,封装起来了;也同样实现executeLocalTransaction(执行本地事务)和checkLocalTransaction(检查本地事务)方法;

@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")public class MyTransactionImpl implements RocketMQLocalTransactionListener {
private ConcurrentHashMap
localTrans = new ConcurrentHashMap<>(); @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID); String destination = arg.toString(); localTrans.put(transId,msg); //这个msg的实现类是GenericMessage,里面实现了toString方法 //在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。 //而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀 System.out.println("executeLocalTransaction msg = "+msg); //转成RocketMQ的Message对象 org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg); String tags = message.getTags(); if(StringUtils.contains(tags,"TagA")){
return RocketMQLocalTransactionState.COMMIT; }else if(StringUtils.contains(tags,"TagB")){
return RocketMQLocalTransactionState.ROLLBACK; }else{
return RocketMQLocalTransactionState.UNKNOWN; } } //延迟检查的时间间隔要有点奇怪。 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID).toString(); Message originalMessage = localTrans.get(transId); //这里能够获取到自定义的transaction_id属性 System.out.println("checkLocalTransaction msg = "+originalMessage); //获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性// String tags = msg.getHeaders().get(RocketMQHeaders.TAGS).toString(); String tags = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TAGS).toString(); if(StringUtils.contains(tags,"TagC")){
return RocketMQLocalTransactionState.COMMIT; }else if(StringUtils.contains(tags,"TagD")){
return RocketMQLocalTransactionState.ROLLBACK; }else{
return RocketMQLocalTransactionState.UNKNOWN; } }}

需要注意的点:

1、使用springboot发送消息后,消息的格式不再是我们mq的msg的格式了而是org.springframework.messaging的格式;
2、(rocketMQTemplate.convertAndSend(topic,msg);)topic不再是我们之前mq中单纯的topic,目标名的概念;而是融合了tag和topic连接在了一起;String destination =topic+":"+tags[i % tags.length];以冒号连接在一起了;
3、key属性也放在了Header中,Header是头消息;
4、事务消息中每一个监听器需要指定一个rocketMQTemplate,而我们的rocketMQTemplate就只有一个,但是springboot给我们提供了一种方式用于添加新的rocketMQTemplate;
@RocketMQTransactionListener(rocketMQTemplateBeanName = “rocketMQTemplate”)

添加新的rocketMQTemplate 扩展一个即可;

@ExtRocketMQTemplateConfiguration()public class ExtRocketMQTemplate extends RocketMQTemplate {
}

消费者

这个MyConsumerGroup就是上面配置文件配置的
TestTopic就是目标名

@Component@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode= ConsumeMode.CONCURRENTLY)public class SpringConsumer implements RocketMQListener
{
@Override public void onMessage(String message) {
System.out.println("Received message : "+ message); //业务逻辑处理 }}

消费者中所有的属性都在RocketMQMessageListener注解中,我们都可以在注解中指定即可;这些属性很重要,但是随时都在变。对消息样例能理解,需要什么版本再去了解。

转载地址:http://gsqrn.baihongyu.com/

你可能感兴趣的文章
剑指offer 有序二维数组的元素查找 字符串的空格的替换 go 语言实现
查看>>
剑指offer 二进制中1的个数,数值的整数次方 go语言实现
查看>>
剑指offer 数组奇偶交换位置 获取链表的倒数第k个元素 go语言实现
查看>>
go语言机器学习第一章读书笔记
查看>>
go语言机器学习-矩阵-概率伦-统计学
查看>>
go语言机器学习第三章评估和验证
查看>>
go语言机器学习-回归模型
查看>>
go语言的并发原理——MPG模型
查看>>
go语言机器学习分类之逻辑回归与k-NN
查看>>
go语言机器学习--集群技术
查看>>
go语言学习时间序列和异常检测
查看>>
go语言神经网络和深度学习
查看>>
GO语言goWeb学习笔记之Http协议与goWeb简洁
查看>>
GoWeb的数据库操作
查看>>
Linux实操--实用指令Day3
查看>>
Mysql 事务处理
查看>>
Linux实操--实用指令Day4
查看>>
Linux实操--实用指令Day3
查看>>
spring+springboot认识
查看>>
Leetcode 136. 只出现一次的数字
查看>>