logobsky
首页 留言 登录
rabbitmq 相关知识

import cn.hutool.core.lang.UUID;

import lombok.extern.slf4j.Slf4j;

import org.slf4j.MDC;

import org.springframework.amqp.core.MessagePostProcessor;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.time.Duration;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.ThreadPoolExecutor;

import static com.tianji.common.constants.Constant.REQUEST_ID_HEADER;

@Slf4j

public class RabbitMqHelper {

private final RabbitTemplate rabbitTemplate;

private final MessagePostProcessor processor = new BasicIdMessageProcessor();

private final ThreadPoolTaskExecutor executor;

public RabbitMqHelper(RabbitTemplate rabbitTemplate) {

this.rabbitTemplate = rabbitTemplate;

executor = new ThreadPoolTaskExecutor();

//配置核心线程数

executor.setCorePoolSize(10);

//配置最大线程数

executor.setMaxPoolSize(15);

//配置队列大小

executor.setQueueCapacity(99999);

//配置线程池中的线程的名称前缀

executor.setThreadNamePrefix("mq-async-send-handler");

// 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务

// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

//执行初始化

executor.initialize();

}

/**

  1. 根据exchange和routingKey发送消息

*/

public void send(String exchange, String routingKey, T t) {

log.debug("准备发送消息,exchange:{}, RoutingKey:{}, message:{}", exchange, routingKey, t);

// 1.设置消息标示,用于消息确认,消息发送失败直接抛出异常,交给调用者处理

String id = UUID.randomUUID().toString(true);

CorrelationData correlationData = new CorrelationData(id);

// 2.设置发送超时时间为500毫秒

rabbitTemplate.setReplyTimeout(500);

// 3.发送消息,同时设置消息id

rabbitTemplate.convertAndSend(exchange, routingKey, t, processor, correlationData);

}

/**

  1. 根据exchange和routingKey发送消息,并且可以设置延迟时间

*/

public void sendDelayMessage(String exchange, String routingKey, T t, Duration delay) {

// 1.设置消息标示,用于消息确认,消息发送失败直接抛出异常,交给调用者处理

String id = UUID.randomUUID().toString(true);

CorrelationData correlationData = new CorrelationData(id);

// 2.设置发送超时时间为500毫秒

rabbitTemplate.setReplyTimeout(500);

// 3.发送消息,同时设置消息id

rabbitTemplate.convertAndSend(exchange, routingKey, t, new DelayedMessageProcessor(delay), correlationData);

}

/**

  1. 根据exchange和routingKey 异步发送消息,并指定一个延迟时间

*

  1. @param exchange 交换机
    1. @param routingKey 路由KEY
      1. @param t 数据
      2. @param 数据类型

*/

public void sendAsync(String exchange, String routingKey, T t, Long time) {

String requestId = MDC.get(REQUEST_ID_HEADER);

CompletableFuture.runAsync(() -> {

try {

MDC.put(REQUEST_ID_HEADER, requestId);

// 发送延迟消息

if (time != null && time > 0) {

sendDelayMessage(exchange, routingKey, t, Duration.ofMillis(time));

} else {

send(exchange, routingKey, t);

}

} catch (Exception e) {

log.error("推送消息异常,t:{},", t, e);

}

}, executor);

}

/**

  1. 根据exchange和routingKey 异步发送消息

*

  1. @param exchange 交换机
    1. @param routingKey 路由KEY
      1. @param t 数据
      2. @param 数据类型

*/

public void sendAsync(String exchange, String routingKey, T t) {

sendAsync(exchange, routingKey, t, null);

}

}

需要的类

接收者需要的注解

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "sign.point.queue",durable = "true")

,exchange =@Exchange(value = MqConstants.Exchange.LEARNING_EXCHANGE,type = ExchangeTypes.TOPIC)

,key = MqConstants.Key.SIGN_IN) ) @Queue(value = 是队列,需要修改。exchange是对应的交换机,key是对应的rountingkey

上一篇:mybatis-plus
下一篇:bitmap相关
验证码
评论留言 (0条)