项目结构如下:
首先一个父目录:SpringBoot-RocketMQ
然后下面分别是三个子目录:
生产者:springboot-dubbo-provider
接口:springboot-dubbo-interface
消费者:springboot-dubbo-consumer
(由于复用的之前dubbo项目,目录名称没有改过来)
项目源码我挂在github上面了,直接拉取master分支即可:https://github.com/shengwanping/SpringBoot-RocketMQ
1、SpringBoot-RocketMQ中pom.xml加入:
org.springframework.boot spring-boot-dependencies 2.3.12.RELEASE pom import org.apache.rocketmq rocketmq-spring-boot-starter 2.2.0 pom import
2、springboot-dubbo-provider中pom.xml加入:
org.example springboot-dubbo-interface 1.0-SNAPSHOT org.springframework.boot spring-boot-starter-web org.apache.rocketmq rocketmq-spring-boot-starter 2.2.0
3、springboot-dubbo-consumer中pom.xml加入:
org.example springboot-dubbo-interface 1.0-SNAPSHOT org.springframework.boot spring-boot-starter-web org.apache.rocketmq rocketmq-spring-boot-starter 2.2.0
4、springboot-dubbo-interface 只是一个接口工具包,pom.xml中不需要额外配置
1、生产者application.yml配置如下
server:port: 8010rocketmq:name-server: localhost:9876 # 连接Rocketmq Name Server服务注册中心producer: # 生产者group: producer-one # 生产者组.随意取名
2、消费者application.yml配置如下
server:port: 8011rocketmq:name-server: localhost:9876 # 连接Rocketmq Name Server服务注册中心producer: # 消费者者group: consumer-one # 消费者组.随意取名
1、接口工具包下面只有一个接口:
public interface DemoService {void sendHello();
}
2、生产者包下有两个类:
这里用启动类直接调用了接口
package org.dubbo.provider;import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.dubbo.DemoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** 模拟 消息生产者*/
@Service("demoService")
public class DemoServiceImpl implements DemoService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Overridepublic void sendHello() {// 向Proder发送消息 topic 发送的消息rocketMQTemplate.convertAndSend("topic_001", "Hello RocketMQ");}
}
package org.dubbo.provider;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;@SpringBootApplication
public class ProviderApplication {public static void main(String[] args) {// 获取上下文ConfigurableApplicationContext context = SpringApplication.run(ProviderApplication.class, args);// 模拟Controller调用这个接口(启动后直接调用sendHello()方法,发送消息)DemoServiceImpl demoService = (DemoServiceImpl) context.getBean("demoService");demoService.sendHello();}
}
3、消费者包下有两个类:
package org.dubbo.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}
}
package org.dubbo.consumer;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 消息 消费者*/
@Component
// 指定topic 和 消费者组
@RocketMQMessageListener(topic = "topic_001", consumerGroup = "${rocketmq.producer.group}")
public class ConsumerMode implements RocketMQListener { // 继承RocketMQListener接口@Overridepublic void onMessage(String s) {System.out.println("收到的消息是:"+s);}
}
完成了如上配置,生产者和消费者的代码就完成了,然后需要启动rocketmq的NameServer(路由注册中心)和BrokerServer
如何启动NameServer和Broker请参考下面这篇文章:
RocketMQ下载,RocketMQ可视化控制台下载
启动 NameServer和Broker 之后我们启动生产者和消费者 这时如果在消费者下面打印“收到的消息是:Hello RocketMQ”,则说明成功了。
如果有安装启动RocketMQ可视化管理平台,我们还能在上面清晰看到生产者发送的消息
本人也是刚刚学习RocketMQ,有什么不足的欢迎大家留言交流!