MENU

Spring Boot 整合 RabbitMQ

November 12, 2017 • Read: 491 • Spring

Spring Boot整合RabbitMQ是非常容易的,下面将通过一个最简单的例子实现消息的发送和接收;仅引用spring-boot-starter-amqp这个starter即可完成rabbitMQ的整合。

首先,创建一个maven项目,pom.xml加入依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

额外增加pring-boot-starter-web是为了方便调用生产消息。

增加配置文件application.yml,配置RabbitMQ相关信息:

spring:
  rabbitmq:
    host: 66.66.66.66
    port: 5672
    username: wangxuesong
    password: 123456

创建配置类RabbitConfig,用来配置Exchange(交换器)、Binding(绑定)、Queue(队列)等;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue firstQueue() {
        return new Queue("firstQueue");
    }
}

这里,使用最简单配置(使用默认Exchange),仅配置一个Queue。

接下来,创建生产者和消费者类;生产者代码非常简单,使用AmqpTemplate发送消息,convertAndSend(String routingKey, final Object object)方法使用了默认Exchange,第一个参数为routingKey,第二个参数为发送的消息对象。

 @Component
public class Producer {

    @Resource
    private AmqpTemplate rabbitTemplate;

    public void send() {
        this.rabbitTemplate.convertAndSend("firstQueue", "hello "
                + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));
    }
} 

为了方便调用,创建一个Controller:

@RestController
public class Controller {

    @Resource
    private Producer producer;

    @RequestMapping
    public void send() {
        producer.send();
    }
}

创建Spring Boot程序入口:

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

暂不创建消费者类,直接启动程序,通过请求http://127.0.0.1:8080/send测试生产消息。

image

查看web管理,已经创建了队列firstQueue和15个消息。

下面是消费消息的代码:

@Component
@EnableRabbit
@RabbitListener(queues = "firstQueue")
public class Consumer {

    @RabbitHandler
    public void process(String message) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println(message);
    }
}

使用@RabbitListener(queues = "firstQueue")设置监听队列,@RabbitHandler设置消费方法,Thread.sleep(1000)模拟执行耗时。

再次启动程序,每个1秒打印一条记录消费一个消息。

Last Modified: July 21, 2019