Spring使用Rabbitmq (简单使用)

发布于:2021-06-11 11:11:51

1、pom.xml jar包引用






1
2
3 org.springframework
4 spring-test
5

6
7 org.springframework
8 spring-webmvc
9

10
11
12 org.springframework.amqp
13 spring-amqp
14 2.1.2.RELEASE
15

16
17 org.springframework.amqp
18 spring-rabbit
19 2.1.2.RELEASE
20

21
22
23 com.rabbitmq
24 amqp-client
25 5.6.0
26
27
28 org.slf4j
29 slf4j-api
30

31

32

33
34
35 org.slf4j
36 slf4j-api
37

38
39 org.slf4j
40 jcl-over-slf4j
41

42
43 org.apache.logging.log4j
44 log4j-api
45

46
47 org.apache.logging.log4j
48 log4j-core
49

50
51 org.apache.logging.log4j
52 log4j-1.2-api
53

54
55 org.apache.logging.log4j
56 log4j-jcl
57

58
59 org.apache.logging.log4j
60 log4j-slf4j-impl
61

62
63 org.apache.logging.log4j
64 log4j-web
65

66


View Code

2、resouces 下面的log4j2.xml






1
2
3
4
5
6
7
8
9
10
11

12

13
14
15
16
17
18

19

20


View Code

3、resouces 下面的spring/rabbitmq-context.xml






1
2 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
4 xmlns:context="http://www.springframework.org/schema/context"
5 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
6
7
8
9
10
11
12
13
14
15
20
21
23
24
25
26
27
28
29

30
31
32
33
34
35
36
37

38
39
40
41
42
43
44
45
49
50
51
52
53
54
55
56
57

58

59
60
61
62
63

64
65
66
67
68
69
70
71

72

73
74
75

76


View Code

?4、resouces 下面的rabbitmq.properties






1 mq.host=127.0.0.1
2 mq.username=lpz
3 mq.password=lpz
4 mq.port=5672
5 mq.virtual-host=/
6 #交易记录队列key
7 mq.routing-key-transaction-record=transaction.record.queue.key
8 #交易K线队列key
9 mq.routing-key-transaction-kline=transaction.kline.queue.key
10 #交易相关交换机
11 mq.exchange-transaction=transaction.exchange

View Code

5、定义消息发送类MessageProducer






1 package com.tandaima.rabbitmq.spring.service;
2
3 import org.apache.log4j.Logger;
4 import org.springframework.amqp.core.AmqpTemplate;
5 import org.springframework.beans.factory.annotation.Autowired;
6 import org.springframework.stereotype.Service;
7
8 @Service
9 public class MessageProducer {
10
11 private Logger logger= Logger.getLogger(MessageProducer.class);
12
13 @Autowired
14 private AmqpTemplate amqpTemplate;
15
16 /**
17 * 发送消息到队列
18 * @param exchange 通道名称
19 * @param queueKey 队列key
20 * @param content 内容
21 */
22 public void sendMessage(String exchange,String queueKey,Object content){
23 try {
24 amqpTemplate.convertAndSend(exchange,queueKey, content);
25 }catch (Exception e){
26 logger.error("RabbitMQ发送消息异常==>"+e.getMessage());
27 }
28 }
29 }

View Code

6、消息发送成功回调类ConfirmCallBackListener






1 package com.tandaima.rabbitmq.spring.service;
2 import org.apache.log4j.Logger;
3 import org.springframework.amqp.rabbit.connection.CorrelationData;
4 import org.springframework.amqp.rabbit.core.RabbitTemplate;
5 import org.springframework.context.annotation.Configuration;
6
7 @Configuration
8 public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback{
9 private Logger logger=Logger.getLogger(ConfirmCallBackListener.class);
10 /**
11 * CorrelationData 是在发送消息时传入回调方法的参数,可以用于区分消息对象。 CorrelationData对象中只有一个属性 String id。
12 * 通过这个参数,我们可以区分当前是发送哪一条消息时的回调,并通过ack参数来进行失败重发功能
13 *
14 * @param correlationData 回调的相关数据.
15 * @param ack true for ack, false for nack
16 * @param cause 专门给NACK准备的一个可选的原因,其他情况为null。
17 */
18 @Override
19 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
20 logger.info("exchange确认"+ack);
21 }
22 }

View Code

7、消息发送失败回调类ReturnCallBackListener






1 package com.tandaima.rabbitmq.spring.service;
2
3 import org.apache.log4j.Logger;
4 import org.springframework.amqp.core.Message;
5 import org.springframework.amqp.rabbit.core.RabbitTemplate;
6 import org.springframework.context.annotation.Configuration;
7
8 @Configuration
9 public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback{
10 private Logger logger=Logger.getLogger(ReturnCallBackListener.class);
11 @Override
12 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
13 logger.error("失败确认:"+message+" | "+replyCode+" | "+replyText+" | "+exchange+" | "+routingKey);
14 }
15 }

View Code

8、定义TransactionKlineMessage






1 package com.tandaima.rabbitmq.spring.receive;
2
3 import com.alibaba.fastjson.JSON;
4 import com.rabbitmq.client.Channel;
5 import org.apache.log4j.Logger;
6 import org.springframework.amqp.core.Message;
7 import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
8 import org.springframework.context.annotation.Configuration;
9
10 @Configuration
11 public class TransactionKlineMessage implements ChannelAwareMessageListener {
12 private Logger logger=Logger.getLogger(TransactionKlineMessage.class);
13 @Override
14 public void onMessage(Message message, Channel channel) throws Exception {
15 try{
16 String str = new String(message.getBody());
17 logger.info("接收到交易K线信息==>:" + str);
18 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
19 }catch(Exception e){
20 logger.error("接收到交易K线信息异常回滚消息到队列中==>"+e.getMessage());
21 //消息回滚通道
22 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
23 }
24 }
25
26 @Override
27 public void onMessage(Message message) {
28 logger.info("消息==>"+JSON.toJSONString(message));
29 }
30 }

View Code

9、定义TransactionRecordMessage






1 package com.tandaima.rabbitmq.spring.receive;
2
3 import com.alibaba.fastjson.JSON;
4 import com.rabbitmq.client.Channel;
5 import org.apache.log4j.Logger;
6 import org.springframework.amqp.core.Message;
7 import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
8 import org.springframework.context.annotation.Configuration;
9
10 @Configuration
11 public class TransactionRecordMessage implements ChannelAwareMessageListener {
12 private Logger logger=Logger.getLogger(TransactionRecordMessage.class);
13 @Override
14 public void onMessage(Message message, Channel channel) throws Exception {
15 try{
16 String str = new String(message.getBody());
17 logger.info("接收到交易记录信息==>:" + str);
18 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
19 }catch(Exception e){
20 logger.error("接收到交易记录信息异常回滚消息到队列中==>"+e.getMessage());
21 //消息回滚通道
22 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
23 }
24 }
25
26 @Override
27 public void onMessage(Message message) {
28 logger.info("消息==>"+JSON.toJSONString(message));
29 }
30 }

View Code

10、测试类RabbitmqMessageTest





1 package com.tandaima.rabbitmq.spring;
2
3 import com.tandaima.rabbitmq.spring.service.MessageProducer;
4 import org.junit.Test;
5 import org.junit.runner.RunWith;
6 import org.springframework.beans.factory.annotation.Autowired;
7 import org.springframework.test.context.ContextConfiguration;
8 import org.springframework.test.context.junit4.SpringRunner;
9
10 import java.util.Date;
11
12 @RunWith(SpringRunner.class)
13 @ContextConfiguration(locations = {"classpath:spring/rabbitmq-context.xml"})
14 public class RabbitmqMessageTest {
15 @Autowired
16 private MessageProducer messageProducer;
17
18 private String exchange="transaction.exchange";
19
20 @Test
21 public void senTransactionRecordMsg(){
22 for(int i=1;i<=5;i++){
23 String routingKeyTransactionRecord = "transaction.record.queue.key";
24 messageProducer.sendMessage(
25 exchange,
26 routingKeyTransactionRecord,
27 "我是内容"+ i);
28 }
29 }
30
31 /**
32 * 10w条数据 发送38秒496毫秒
33 */
34 @Test
35 public void senTransactionKlineMsg(){
36 Date begin=new Date(); //开始时间
37 for(int i=1;i<=10;i++){
38 String routingKeyTransactionKlin = "transaction.kline.queue.key";
39 messageProducer.sendMessage(
40 exchange,
41 routingKeyTransactionKlin,
42 "我是内容"+ i);
43 }
44 System.out.println(getString(begin,new Date()));
45 }
46 private String getString(Date begin,Date end){
47 long between = end.getTime() - begin.getTime();// 得到两者的毫秒数
48 long day = between / (24 * 60 * 60 * 1000);
49 long hour = (between / (60 * 60 * 1000) - day * 24);
50 long min = ((between / (60 * 1000)) - day * 24 * 60 - hour * 60);
51 long s = (between / 1000 - day * 24 * 60 * 60 - hour * 60 * 60 - min * 60);
52 long ms = (between - day * 24 * 60 * 60 * 1000 - hour * 60 * 60 * 1000 - min * 60 * 1000 - s * 1000);
53 return (day + "天" + hour + "小时" + min + "分" + s + "秒" + ms + "毫秒");
54 }
55 }

View Code

测试效果



我这里用的是路由模式交换机、两个队列使用一个交换机


?



转载于:https://www.cnblogs.com/longpizi/p/11056755.html






相关资源:rabbitmqspring集成示例demo

相关推荐

最新更新

猜你喜欢