SpringBoot整合RabbitMQ,简易的队列发送短信实例

SpringBoot整合RabbitMQ,简易的队列发送短信实例

2023年7月4日发(作者:)

SpringBoot整合RabbitMQ,简易的队列发送短信实例

在这个界⾯⾥⾯我们可以做些什么?可以⼿动创建虚拟host,创建⽤户,分配权限,创建交换机,创建队列等等,还有查看队列消息,消费效率,推送效率等等。⾸先先介绍⼀个简单的⼀个消息推送到接收的流程,提供⼀个简单的图:

黄⾊的圈圈就是我们的消息推送服务,将消息推送到 中间⽅框⾥⾯也就是 rabbitMq的服务器,然后经过服务器⾥⾯的交换机、队列等各种关系(后⾯会详细讲)将数据处理⼊列后,最终右边的蓝⾊圈圈消费者获取对应监听的消息。

rabbitMq简单编码 (实例:发送短信)

⾸先创建 rabbitmq-provider,⾥⽤到的jar依赖: spring-boot-starter-amqp spring-boot-starter-web mq amqp-client -amqp mq-amqp-client 1.0.5 配置 ## 配置rabbitMQ 信息 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest # 开启发送确认 publisher-confirms: true # 开启发送失败退回 publisher-returns: true# 消息 rabbitmq 的⾃定义相关配置rabbit: msg: exchange: fanout: name: msg_fanout_exchange topic: name: msg_topic_exchange alternate: name: msg_alternate_exchange dead: name: msg_dead_exchange queue: sms: name: dead: name: upstream: name: am alternate: name: ate route: sms: upstream: am  消息 rabbitMq 属性配置 配置交换机,并绑定/** * @desc:消息 rabbitMq 属性配置 * @author:

* @date: 2020/6/24 15:40 * @version: 3.0.0 * @since: 3.0.0 */@RefreshScope@Componentpublic class RabbitMqMsgProperties { // 扇形交换机名称 @Value("${}") private String fanoutExchangeName; // 备份换机名称 @Value("${}") private String alternateExchangeName; // TOPIC换机名称 @Value("${}") private String topicExchangeName; // 消息死信交换机名称 @Value("${}") private String deadExchangeName; // 备份队列名称 @Value("${}") private String alternateQueueName; // 短信消息队列名称 @Value("${}") private String smsQueueName; // 短信消息死信队列名称 @Value("${}") private String smsDeadQueueName; // 邮件消息队列名称 @Value("${}") private String emailQueueName; // 邮件消息死信队列名称 @Value("${}") private String emailDeadQueueName; // 上⾏消息队列名称 @Value("${}") private String upstreamQueueName; // 短信消息路由键 @Value("${}") private String smsRouteKey; // 邮件消息路由键 @Value("${}") private String emailRouteKey; // 上⾏消息路由键 @Value("${am}") private String upstreamRouteKey; // 微信消息队列名称 @Value("${}") private String wxQueueName; // 微信消息路由键 @Value("${}") private String wxRouteKey; // 微信消息死信队列名称 @Value("${}") private String wxDeadQueueName; public String getFanoutExchangeName() { return fanoutExchangeName; } public void setFanoutExchangeName(String fanoutExchangeName) { ExchangeName = fanoutExchangeName; } public String getSmsQueueName() { return smsQueueName; } public void setSmsQueueName(String smsQueueName) { ueName = smsQueueName; } public String getEmailQueueName() { return emailQueueName; } public void setEmailQueueName(String emailQueueName) { ueueName = emailQueueName; } public String getUpstreamQueueName() { return upstreamQueueName; } public void setUpstreamQueueName(String upstreamQueueName) { amQueueName = upstreamQueueName; } public String getTopicExchangeName() { return topicExchangeName; } public void setTopicExchangeName(String topicExchangeName) { xchangeName = topicExchangeName; } public String getSmsRouteKey() { return smsRouteKey; } public void setSmsRouteKey(String smsRouteKey) { teKey = smsRouteKey; } public String getEmailRouteKey() { return emailRouteKey; } public void setEmailRouteKey(String emailRouteKey) { outeKey = emailRouteKey; } public String getUpstreamRouteKey() { return upstreamRouteKey; } public void setUpstreamRouteKey(String upstreamRouteKey) { amRouteKey = upstreamRouteKey; } public String getAlternateExchangeName() { return alternateExchangeName; } public void setAlternateExchangeName(String alternateExchangeName) { ateExchangeName = alternateExchangeName; } public String getAlternateQueueName() { return alternateQueueName; } public void setAlternateQueueName(String alternateQueueName) { ateQueueName = alternateQueueName; } public String getSmsDeadQueueName() { return smsDeadQueueName; } public void setSmsDeadQueueName(String smsDeadQueueName) { dQueueName = smsDeadQueueName; } public String getEmailDeadQueueName() { return emailDeadQueueName; } public void setEmailDeadQueueName(String emailDeadQueueName) { eadQueueName = emailDeadQueueName; } public String getDeadExchangeName() { return deadExchangeName; } public void setDeadExchangeName(String deadExchangeName) { changeName = deadExchangeName; } public String getWxQueueName() { return wxQueueName; } public void setWxQueueName(String wxQueueName) { eName = wxQueueName; } public String getWxRouteKey() { return wxRouteKey; } public void setWxRouteKey(String wxRouteKey) { eKey = wxRouteKey; } public String getWxDeadQueueName() { return wxDeadQueueName; } public void setWxDeadQueueName(String wxDeadQueueName) { QueueName = wxDeadQueueName; }}package ;import .*;import gConnectionFactory;import tionFactory;import Template;import ier;import urableBeanFactory;import AutoConfiguration;import ionalOnBean;import ionalOnMissingBean;import ;import uration;import ;import ce;import p;import ;/** * @desc:消息 rabbitmq 配置类 * @author: * @date: 2020/6/24 15:07 * @version: 3.0.0 * @since: 3.0.0 */@Configuration@ConditionalOnBean(value = )public class RabbitMqMsgConfig { @Resource private RabbitMqMsgProperties rabbitMqMsgProperties; /** * 定义备份交换机 * @return 备份交换机 */ @Bean public FanoutExchange alternateExchange(){ return new FanoutExchange(ernateExchangeName()); } /** * 定义扇形交换机 * @return */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(outExchangeName()); } /** * 定义TOPIC交换机 * @return */ @Bean public TopicExchange topicExchange(){ Map arguments = new HashMap<>(); return new TopicExchange(icExchangeName(),true,false,arguments); } /** * 死信交换机 * @return */ @Bean public DirectExchange deadExchange(){ return new DirectExchange(dExchangeName(),true,false); } /** * 备份队列 * @return 队列 */ @Bean public Queue alternateQueue(){ return new Queue(ernateQueueName()); } /** * 短信队列 * @return */ @Bean public Queue smsQueue(){ Map args = new HashMap<>(2); // x-dead-letter-exchange 这⾥声明当前队列绑定的死信交换机 // x-dead-letter-routing-key 死信路由key,默认使⽤原有路由key ("x-dead-letter-exchange", dExchangeName()); return e(QueueName()) .withArguments(args) .build(); } /** * 微信队列 * @return */ @Bean public Queue wxQueue(){ Map args = new HashMap<>(2); // x-dead-letter-exchange 这⾥声明当前队列绑定的死信交换机 // x-dead-letter-routing-key 死信路由key,默认使⽤原有路由key ("x-dead-letter-exchange", dExchangeName()); return e(ueueName()) .withArguments(args) .build(); } /** * 邮件队列 * @return */ @Bean public Queue emailQueue(){ Map args = new HashMap<>(2); // x-dead-letter-exchange 这⾥声明当前队列绑定的死信交换机 // x-dead-letter-routing-key 死信路由key,默认使⽤原有路由key ("x-dead-letter-exchange", dExchangeName()); return e(ilQueueName()) .withArguments(args) .build(); } /** * sms 死信队列 * @return */ @Bean public Queue smsDeadQueue(){ return new Queue(DeadQueueName()); } /** * email 死信队列 * @return */ @Bean public Queue emailDeadQueue(){ return new Queue(ilDeadQueueName()); } /** * wx 死信队列 * @return */ @Bean public Queue wxDeadQueue(){ return new Queue(eadQueueName()); } /** * 服务商上⾏队列 * @return */ @Bean public Queue upstreamQueue(){ return new Queue(treamQueueName()); } /** * 绑定sms死信列到死信交换机 * @param queue 死信队列 * @return */ @Bean public Binding bindSmsDead(@Qualifier("smsDeadQueue") Queue queue, @Qualifier("deadExchange") DirectExchange directExchange){ return (queue).to(directExchange).with(RouteKey()); } /** * 绑定email死信列到死信交换机 * @param queue 死信队列 * @return */ @Bean public Binding bindEmailDead(@Qualifier("emailDeadQueue") Queue queue, @Qualifier("deadExchange") DirectExchange directExchange){ return (queue).to(directExchange).with(ilRouteKey()); } /** * 绑定wx死信列到死信交换机 * @param queue 死信队列 * @return */ @Bean public Binding bindWxDead(@Qualifier("wxDeadQueue") Queue queue, @Qualifier("deadExchange") DirectExchange directExchange){ return (queue).to(directExchange).with(outeKey()); } /** * 绑定备份列到备份交换机 * @param queue 备份队列 * @return */ @Bean public Binding bindAlternate(@Qualifier("alternateQueue") Queue queue, @Qualifier("alternateExchange") FanoutExchange fanoutExchange){ return (queue).to(fanoutExchange); } /** * 绑定短信队列到TOPIC交换机 * @param queue * @param topicExchange * @return */ @Bean public Binding bindSms(@Qualifier("smsQueue") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange){ return (queue).to(topicExchange).with(RouteKey()); } /** * 绑定邮件队列 * @param queue * @param topicExchange * @return */ @Bean public Binding bindEmail(@Qualifier("emailQueue") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange){ return (queue).to(topicExchange).with(ilRouteKey()); } /** * 绑定微信队列 * @param queue * @param topicExchange * @return */ @Bean public Binding bindWx(@Qualifier("wxQueue") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange){ return (queue).to(topicExchange).with(outeKey()); } /** * 绑定上⾏消息队列 * @param queue * @param topicExchange * @return */ @Bean public Binding bindUpstream(@Qualifier("upstreamQueue") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange){ return (queue).to(topicExchange).with(treamRouteKey()); } @Bean @ConditionalOnMissingBean() @ConditionalOnBean() @Scope(_PROTOTYPE) public RabbitTemplate rabbitTemplate(@Qualifier("rabbitConnectionFactory") CachingConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); datory(true); firmCallback(new RabbitMqMsgConfirmCallback()); urnCallback(new RabbitMqMsgReturnCallback()); return template; } @Bean @ConditionalOnBean() @Scope(_PROTOTYPE) public RabbitTemplate aliRabbitTemplate(@Qualifier("aliRabbitConnectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); datory(true); firmCallback(new RabbitMqMsgConfirmCallback()); urnCallback(new RabbitMqMsgReturnCallback()); return template; }}    controller 类 简易代码/** * @desc:短信控制类 * @author:

* @date: 2020/7/20 14:33 * @version: 3.0.0 * @since: 3.0.0 */@Api(tags = "消息controller")@RestController@RefreshScope@RequestMapping("/msg/api/sms")public class SmsController {

@Resource private RabbitTemplate rabbitTemplate; @Resource private RabbitMqMsgProperties rabbitMqMsgProperties; @ApiOperation(value = "短信发送接⼝") @PostMapping(value = "/send") public Boolean send(@RequestBody @Validated({}) MsgSendDTO msgSendDTO) throws JsonProcessingException {

// msgSendDTO 对象中是要发送给mq中的信息 此处省略处理 Message message = MessageBuilder .withBody((msgSendDTO).getBytes()) .setContentType(T_TYPE_TEXT_PLAIN) .build(); // 全局唯⼀ CorrelationData correlationData = new CorrelationData(new SnowFlakeIdGenerator().newId() +""); tAndSend(icExchangeName(), RouteKey(),message,correlationData); return true; }}  Consumer接收mq对应队列信息pom包⽂件 spring-boot-starter-amqp spring-boot-starter-web mq amqp-client -amqp mq-amqp-client 1.0.5  配置⽂件信息 # rabbitMq 的相关配置 rabbitmq: host: 192.168.118.160 port: 5672 username: admin password: admin listener: simple: acknowledge-mode: manual concurrency: 1 # 并发线程 default-requeue-rejected: false  ⼯具类JsonUtils/** * JSON处理辅助功能 */public final class JsonUtils { /** * MAP对象类型 */ private static final MapType MAP_TYPE; /** * MAP对象类型 */ private static final CollectionType LIST_TYPE; /** * 默认JSON对象映射器 */ private static ObjectMapper defaultMapper; // 静态变量初始化 static { MAP_TYPE = tInstance().constructMapType(, , ); LIST_TYPE = tInstance().constructCollectionType(, MAP_TYPE); defaultMapper = new ObjectMapper(); ializationInclusion(_NULL); e(_ON_UNKNOWN_PROPERTIES); (_MISSING_VALUES); (_SINGLE_QUOTES); (_UNQUOTED_FIELD_NAMES); JavaTimeModule javaTimeModule = new JavaTimeModule(); ializer(, new LocalDateTimeSerializer()); erializer(, new LocalDateTimeDeserializer()); erModule(javaTimeModule); } /** * 构造⽅法(静态类禁⽌创建) */ private JsonUtils() { } /** * 对象输出JSON⽂本 * * @param out 输出 * @param object 对象 */ public static void toOutput(OutputStream out, Object object) { if (object == null) { return; } try { alue(out, object); } catch (IOException e) { throw new RuntimeException(e); } } /** * 对象转换为JSON⽂本 * * @param object 对象 * @return String JSON⽂本 */ public static String toText(Object object) { if (object == null) { return null; } try { return alueAsString(object); } catch (JsonProcessingException e) { tackTrace(); throw new RuntimeException(e); } } /** * JSON⽂本转换为对象 * * @param 类型 * @param jsonText JSON⽂本 * @param cls 类型 * @return T 数据对象 */ public static T toObject(String jsonText, Class cls) { if (jsonText == null || y()) { return null; } try { return lue(jsonText, cls); } catch (IOException e) { throw new RuntimeException(e); } } /** * JSON⽂本转换为对象 * * @param jsonText JSON⽂本 * @return Map */ public static Map toMap(String jsonText) { if (jsonText == null || y()) { return null; } try { return lue(jsonText, MAP_TYPE); } catch (IOException e) { throw new RuntimeException(e); } } /** * JSON⽂本转换为对象 * * @param 类型 * @param jsonText JSON⽂本 * @param cls 类型 * @return Map */ public static Map toMap(String jsonText, Class cls) { if (jsonText == null || y()) { return null; } try { return lue(jsonText, tInstance().constructMapType(, , cls)); } catch (IOException e) { throw new RuntimeException(e); } } /** * JSON⽂本转换为列表 * * @param jsonText JSON⽂本 * @return List */ public static List> toList(String jsonText) { if (jsonText == null || y()) { return null; } try { return lue(jsonText, LIST_TYPE); } catch (IOException e) { throw new RuntimeException(e); } } /** * JSON⽂本转换为列表 * * @param 类型 * @param jsonText JSON⽂本 * @param cls 类型 * @return List 数据列表 */ public static List toList(String jsonText, Class cls) { if (jsonText == null || y()) { return null; } try { return lue(jsonText, tInstance().constructCollectionType(, cls)); } catch (IOException e) { throw new RuntimeException(e); } }}  

Consumer接收类@Slf4j@Componentpublic class MsgSmsConsumer { @Resource private SmsProcessServiceImpl smsProcessServiceImpl; /** * 短信消息处理 * * @param msgContent 消息内容 * @param message 消息 */ @RabbitListener(queues = "") @RabbitHandler public void smsProcess(String msgContent, Message message, Channel channel) throws IOException { // 转换消息 MsgSendDTO msgSendDTO = ct(msgContent, ); boolean ack = true; BusinessException bizException = null; try { if (!y(msgSendDTO)) { ("收到[{}]消息[{}]", msgCategory, (msgSendDTO)); boolean sendRet = d(msgSendDTO); //处理之后业务 if (!sendRet) { throw new BusinessException(_SEND_FAILED, _SEND_()); } } } } catch (BusinessException e) { (sage()); ack = false; bizException = e; } if (!ack) { ("[{}]消息消费发⽣异常,错误信息:[{}]", msgCategory, sage(), bizException); ack(sageProperties().getDeliveryTag(), false, false); } else { ck(sageProperties().getDeliveryTag(), false); } } /** * sms死信消息处理 * * @param msgContent 消息内容 * @param message 消息 */ @RabbitListener(queues = "") @RabbitHandler public void smsDeadProcess(String msgContent, Message message, Channel channel) throws IOException { ("收到[sms]死信消息:[{}]", msgContent); ck(sageProperties().getDeliveryTag(), false); }}   详情介绍可参考他⼈链接:

发布者:admin,转转请注明出处:http://www.yc00.com/xiaochengxu/1688420861a135808.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信