2023年7月4日发(作者:)
SpringBoot整合RabbitMQ,简易的队列发送短信实例
在这个界⾯⾥⾯我们可以做些什么?可以⼿动创建虚拟host,创建⽤户,分配权限,创建交换机,创建队列等等,还有查看队列消息,消费效率,推送效率等等。⾸先先介绍⼀个简单的⼀个消息推送到接收的流程,提供⼀个简单的图:
黄⾊的圈圈就是我们的消息推送服务,将消息推送到 中间⽅框⾥⾯也就是 rabbitMq的服务器,然后经过服务器⾥⾯的交换机、队列等各种关系(后⾯会详细讲)将数据处理⼊列后,最终右边的蓝⾊圈圈消费者获取对应监听的消息。
rabbitMq简单编码 (实例:发送短信)
⾸先创建 rabbitmq-provider,⾥⽤到的jar依赖:
* @date: 2020/6/24 15:40 * @version: 3.0.0 * @since: 3.0.0 */@RefreshScope@Componentpublic class RabbitMqMsgProperties { // 扇形交换机名称 @Value("${}") private String fanoutExchangeName; // 备份换机名称 @Value("${}") private String alternateExchangeName; // TOPIC换机名称 @Value("${}") private String topicExchangeName; // 消息死信交换机名称 @Value("${}") private String deadExchangeName; // 备份队列名称 @Value("${}") private String alternateQueueName; // 短信消息队列名称 @Value("${}") private String smsQueueName; // 短信消息死信队列名称 @Value("${}") private String smsDeadQueueName; // 邮件消息队列名称 @Value("${}") private String emailQueueName; // 邮件消息死信队列名称 @Value("${}") private String emailDeadQueueName; // 上⾏消息队列名称 @Value("${}") private String upstreamQueueName; // 短信消息路由键 @Value("${}") private String smsRouteKey; // 邮件消息路由键 @Value("${}") private String emailRouteKey; // 上⾏消息路由键 @Value("${am}") private String upstreamRouteKey; // 微信消息队列名称 @Value("${}") private String wxQueueName; // 微信消息路由键 @Value("${}") private String wxRouteKey; // 微信消息死信队列名称 @Value("${}") private String wxDeadQueueName; public String getFanoutExchangeName() { return fanoutExchangeName; } public void setFanoutExchangeName(String fanoutExchangeName) { ExchangeName = fanoutExchangeName; } public String getSmsQueueName() { return smsQueueName; } public void setSmsQueueName(String smsQueueName) { ueName = smsQueueName; } public String getEmailQueueName() { return emailQueueName; } public void setEmailQueueName(String emailQueueName) { ueueName = emailQueueName; } public String getUpstreamQueueName() { return upstreamQueueName; } public void setUpstreamQueueName(String upstreamQueueName) { amQueueName = upstreamQueueName; } public String getTopicExchangeName() { return topicExchangeName; } public void setTopicExchangeName(String topicExchangeName) { xchangeName = topicExchangeName; } public String getSmsRouteKey() { return smsRouteKey; } public void setSmsRouteKey(String smsRouteKey) { teKey = smsRouteKey; } public String getEmailRouteKey() { return emailRouteKey; } public void setEmailRouteKey(String emailRouteKey) { outeKey = emailRouteKey; } public String getUpstreamRouteKey() { return upstreamRouteKey; } public void setUpstreamRouteKey(String upstreamRouteKey) { amRouteKey = upstreamRouteKey; } public String getAlternateExchangeName() { return alternateExchangeName; } public void setAlternateExchangeName(String alternateExchangeName) { ateExchangeName = alternateExchangeName; } public String getAlternateQueueName() { return alternateQueueName; } public void setAlternateQueueName(String alternateQueueName) { ateQueueName = alternateQueueName; } public String getSmsDeadQueueName() { return smsDeadQueueName; } public void setSmsDeadQueueName(String smsDeadQueueName) { dQueueName = smsDeadQueueName; } public String getEmailDeadQueueName() { return emailDeadQueueName; } public void setEmailDeadQueueName(String emailDeadQueueName) { eadQueueName = emailDeadQueueName; } public String getDeadExchangeName() { return deadExchangeName; } public void setDeadExchangeName(String deadExchangeName) { changeName = deadExchangeName; } public String getWxQueueName() { return wxQueueName; } public void setWxQueueName(String wxQueueName) { eName = wxQueueName; } public String getWxRouteKey() { return wxRouteKey; } public void setWxRouteKey(String wxRouteKey) { eKey = wxRouteKey; } public String getWxDeadQueueName() { return wxDeadQueueName; } public void setWxDeadQueueName(String wxDeadQueueName) { QueueName = wxDeadQueueName; }}package ;import .*;import gConnectionFactory;import tionFactory;import Template;import ier;import urableBeanFactory;import AutoConfiguration;import ionalOnBean;import ionalOnMissingBean;import ;import uration;import ;import ce;import p;import ;/** * @desc:消息 rabbitmq 配置类 * @author: * @date: 2020/6/24 15:07 * @version: 3.0.0 * @since: 3.0.0 */@Configuration@ConditionalOnBean(value = )public class RabbitMqMsgConfig { @Resource private RabbitMqMsgProperties rabbitMqMsgProperties; /** * 定义备份交换机 * @return 备份交换机 */ @Bean public FanoutExchange alternateExchange(){ return new FanoutExchange(ernateExchangeName()); } /** * 定义扇形交换机 * @return */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(outExchangeName()); } /** * 定义TOPIC交换机 * @return */ @Bean public TopicExchange topicExchange(){ Map
* @date: 2020/7/20 14:33 * @version: 3.0.0 * @since: 3.0.0 */@Api(tags = "消息controller")@RestController@RefreshScope@RequestMapping("/msg/api/sms")public class SmsController {
@Resource private RabbitTemplate rabbitTemplate; @Resource private RabbitMqMsgProperties rabbitMqMsgProperties; @ApiOperation(value = "短信发送接⼝") @PostMapping(value = "/send") public Boolean send(@RequestBody @Validated({}) MsgSendDTO msgSendDTO) throws JsonProcessingException {
// msgSendDTO 对象中是要发送给mq中的信息 此处省略处理 Message message = MessageBuilder .withBody((msgSendDTO).getBytes()) .setContentType(T_TYPE_TEXT_PLAIN) .build(); // 全局唯⼀ CorrelationData correlationData = new CorrelationData(new SnowFlakeIdGenerator().newId() +""); tAndSend(icExchangeName(), RouteKey(),message,correlationData); return true; }} Consumer接收mq对应队列信息pom包⽂件
Consumer接收类@Slf4j@Componentpublic class MsgSmsConsumer { @Resource private SmsProcessServiceImpl smsProcessServiceImpl; /** * 短信消息处理 * * @param msgContent 消息内容 * @param message 消息 */ @RabbitListener(queues = "") @RabbitHandler public void smsProcess(String msgContent, Message message, Channel channel) throws IOException { // 转换消息 MsgSendDTO msgSendDTO = ct(msgContent, ); boolean ack = true; BusinessException bizException = null; try { if (!y(msgSendDTO)) { ("收到[{}]消息[{}]", msgCategory, (msgSendDTO)); boolean sendRet = d(msgSendDTO); //处理之后业务 if (!sendRet) { throw new BusinessException(_SEND_FAILED, _SEND_()); } } } } catch (BusinessException e) { (sage()); ack = false; bizException = e; } if (!ack) { ("[{}]消息消费发⽣异常,错误信息:[{}]", msgCategory, sage(), bizException); ack(sageProperties().getDeliveryTag(), false, false); } else { ck(sageProperties().getDeliveryTag(), false); } } /** * sms死信消息处理 * * @param msgContent 消息内容 * @param message 消息 */ @RabbitListener(queues = "") @RabbitHandler public void smsDeadProcess(String msgContent, Message message, Channel channel) throws IOException { ("收到[sms]死信消息:[{}]", msgContent); ck(sageProperties().getDeliveryTag(), false); }} 详情介绍可参考他⼈链接:
发布者:admin,转转请注明出处:http://www.yc00.com/xiaochengxu/1688420861a135808.html
评论列表(0条)