RabbitMQ实战代码-Java

Java下使用

Maven依赖

RabbitMQ 支持多种语言访问,以 Java 为例看下一般使用 RabbitMQ 的步骤。

maven工程的pom文件中添加依赖

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>

获取连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtils {
public static Connection getConnection() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("psvmc");
factory.setPassword("psvmc");
factory.setHost("mq.psvmc.cn");
//建立到代理服务器到连接
Connection conn = factory.newConnection();
return conn;
}
}

QUEUE

我这里暂且把当前的这种方式定义为队列模式

队列模式的特点

  • 先打开生产者发送消息消息不会丢失
  • 多个消费者不会收到同一个消息 由服务器去分配
  • 生产者把消息直接放在队列中 队列由生产者创建
  • 发布消息是交换机的名字填空字符串
  • RabbitMQ内置一个名称为空字符串的默认交换机,它根据Routing key将消息路由到与队列名与Routing key完全相等的队列中

消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class MyProducer {
public final static String QUEUE_NAME = "rabbitMQ.work1";

public static void main(String[] args) throws Exception {
//创建一个新的连接
Connection connection = ConnectionUtils.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
// 声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

int message = 1;
while (message < 1000) {
//发送消息到队列中
channel.basicPublish(
"",
QUEUE_NAME,
null,
("" + message).getBytes("UTF-8")
);
System.out.println("发送消息:" + message);
Thread.sleep(2000);
message += 1;
}

//关闭通道和连接
channel.close();
connection.close();
}
}

消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.rabbitmq.client.*;

import java.io.IOException;

public class MyCustomer {
private final static String QUEUE_NAME = "rabbitMQ.work1";

public static void main(String[] args) throws Exception {

//创建一个新的连接
Connection connection = ConnectionUtils.getConnection();
//创建一个通道
final Channel channel = connection.createChannel();
//每次从队列获取的数量
channel.basicQos(1);
//声明要关注的队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
//告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("接收消息:" + message);
long deliveryTag = envelope.getDeliveryTag();
//确认消息
try {
Thread.sleep(3000);
channel.basicAck(deliveryTag, false);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}

注1:queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数

注2:basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体

EXCHANGE

我这里暂且把当前的这种方式定义为路由模式

路由模式的特点

  • 先打开生产者发送消息,消息会丢失
  • 多个消费者会收到同一个消息 由服务器根据规则去分配
  • 需要队列和路由进行绑定
  • 队列可以多次和路由绑定 只要routingKey不同即可
  • 交换机类型:fanout(发布订阅模式),direct(精准匹配模式), topic(通配符模式), headers(头匹配模式)

fanout(发布订阅模式)

这种模式的特点

  • routingKey 为空字符串
  • 只要订阅后都能收到消息

消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class MyProducer {

public final static String EXCHANGE_NAME = "myexchange";

public static void main(String[] args) throws Exception {
//创建一个新的连接
Connection connection = ConnectionUtils.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明一个交换机 发布订阅模式
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

int message = 1;
while (message < 1000) {
//发送消息到队列中
channel.basicPublish(
EXCHANGE_NAME,
"",
null,
("" + message).getBytes("UTF-8")
);
System.out.println("发送消息:" + message);
Thread.sleep(2000);
message += 1;
}

//关闭通道和连接
channel.close();
connection.close();
}
}

消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.rabbitmq.client.*;
import java.io.IOException;

public class MyCustomer {

public final static String EXCHANGE_NAME = "myexchange";
private final static String QUEUE_NAME = "rabbitMQ.queue1";

public static void main(String[] args) throws Exception {
//创建一个新的连接
Connection connection = ConnectionUtils.getConnection();
//创建一个通道
final Channel channel = connection.createChannel();
//每次从队列获取的数量
channel.basicQos(1);
//声明定义队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
//告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("接收消息:" + message);
long deliveryTag = envelope.getDeliveryTag();
//确认消息
try {
Thread.sleep(3000);
channel.basicAck(deliveryTag, false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}

direct(精准匹配模式)

特点

  • 根据routingKey精准匹配消息

topic(通配符模式)

特点

  • 根据routingKey模糊匹配消息

  • routingKey为aa.bb形式

  • 可以用*#进行匹配

    a.*可以匹配 a.a、a.b 不能匹配a.b.c

    a.#既可以匹配 a.a、a.b 也能匹配a.b.c

headers(头匹配模式)

  • x-match = all :表示所有的键值对都匹配才能接受到消息
  • x-match = any :表示只要有键值对匹配就能接受到消息

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "headers", false, true, null);
//创建队列
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
//设置消息头键值对信息
Map<String, Object> headers = new Hashtable<String,Object>();
//这里x-match有两种类型
//all:表示所有的键值对都匹配才能接受到消息
//any:表示只要有键值对匹配就能接受到消息
headers.put("x-match", "any");
headers.put("name", "jack");
headers.put("age" , 31);

//把队列绑定到路由上并指定headers
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", headers);

生产者

1
2
3
4
5
6
7
8
9
10
11
//声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "headers", false, true, null);

//设置消息头键值对信息
Map<String, Object> headers = new Hashtable<String, Object>();
headers.put("name", "jack");
headers.put("age", 30);
Builder builder = new Builder();
builder.headers(headers);

channel.basicPublish(EXCHANGE_NAME, "", builder.build(), message.getBytes());

上面的例子中name的值都为jack 匹配上了一个 就能收到消息

发送工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.xhkjedu.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class RabbitMQUtils {
private static Connection getConnection(String host, int port, String username, String password) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
//建立到代理服务器到连接
return factory.newConnection();
}


/**
* 发送消息
*
* @param host IP
* @param port 端口
* @param username 用户名
* @param password 密码
* @param queueName 队列名
* @param message 消息
* @throws Exception 异常
*/
public static void sendMsg(String host, int port, String username, String password, String queueName, String message) throws Exception {
//创建一个新的连接
Connection connection = RabbitMQUtils.getConnection(host, port, username, password);
//创建一个通道
Channel channel = connection.createChannel();
// 声明一个队列
channel.queueDeclare(queueName, false, false, false, null);
//发送消息到队列中
channel.basicPublish(
"",
queueName,
null,
message.getBytes(StandardCharsets.UTF_8)
);
System.out.println("发送消息:" + message);
//关闭通道和连接
channel.close();
connection.close();
}

public static void main(String[] args) throws Exception {
RabbitMQUtils.sendMsg("192.168.7.101", 6611, "rabbit", "MQ@yxkj", "task-rabbitmq_topic", "123");
}
}

注意

channel.queueDeclare(queueName, false, false, false, null);这是创建了一个队列,如果队列本来就存在,可以不用写这句代码。

Spring集成

这里的示例是用的QUEUE的方式

注意下面的这行配置

1
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="" routing-key="myqueue"/>

一定要配置的是routing-key="myqueue" 不要配成queue="myqueue" 我就是在这里折腾了好久。

下面是具体的配置:

1) 添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.2</version>
</dependency>

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.6.RELEASE</version>
</dependency>

</dependencies>

2) 在resources中添加文件rabbitmq.xml 内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">

<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="mq.psvmc.cn"/>
<property name="username" value="psvmc"/>
<property name="password" value="psvmc"/>
<property name="port" value="5672"/>
<property name="channelCacheSize" value="5"/>
</bean>

<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin connection-factory="connectionFactory"/>

<!--定义queue -->
<rabbit:queue name="myqueue" auto-declare="true" durable="false" auto-delete="false" exclusive="false"/>


<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="" routing-key="myqueue"/>

<!-- 监听生产者发送的消息开始 -->
<!-- 消息接收者 -->
<bean id="messageReceiver" class="cn.psvmc.spring.MessageConsumer"></bean>

<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<!-- acknowledeg = "manual":意为表示该消费者的ack方式为手动 ;acknowledge="auto"表示自动-->
<!-- prefetch=1设置预取消息数目为1 -->
<rabbit:listener-container
prefetch="1"
connection-factory="connectionFactory"
auto-declare="true"
acknowledge="manual">
<rabbit:listener queue-names="myqueue" ref="messageReceiver" method="onMessage"/>
</rabbit:listener-container>
<!-- 监听生产者发送的消息结束 -->
</beans>

3) 消息的生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class MessageProducer {
public static void main(String[] args) throws Exception {
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:rabbitmq.xml");
RabbitTemplate template = context.getBean(RabbitTemplate.class);
int message = 1;
while (message < 100) {
//发送消息到队列中
template.convertAndSend("" + message);
System.out.println("+ 发送消息:" + message);
Thread.sleep(2000);
message += 1;
}
((ClassPathXmlApplicationContext) context).destroy();
}
}

4) 消息的消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.*;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

public class MessageConsumer implements ChannelAwareMessageListener {

/**
* 处理收到的rabbit消息的回调方法。
*
* @param message AMQP封装消息对象
* @param channel 信道对象,可以进行确认回复
* @throws Exception Any.
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("- 收到消息:" + new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}