Java-延时队列-基于redis-StringRedisTemplate-2小时最多处理3次
特别注意:
这里不是一次push,多次重试,而是每次push,每次延时执行,一段时间内会限制次数
- 补充说明
以下延迟队列会因为导包不正确导致zadd方法未实现,进而死循环造成堆栈溢出
简单的来搞,就是没有重试的能力,
《http://www.mobiletrain.org/about/BBS/147534.html》 https://blog.csdn.net/qq_39994174/article/details/130969095 注意一般使用的情况下,这个优先使用,看起来重启会丢数据
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);executor.schedule(() -> { // 执行任务
}, 1, TimeUnit.SECONDS); // 延迟1秒钟后执行任务
package cn.jiangjiesheng.proj.domain.jingbo.controller;
import cn.jiangjiesheng.proj.domain.jingbo.dto.DelayQueueVo;
import cn.jiangjiesheng.proj.domain.jingbo.service.OneEntOneWayService;
import io.swagger.v3.oas.annotations.Operation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
@RequestMapping("/test")
public class OneEntOneWayController {
@Autowired
private MessageProducer messageProducer;
@Operation(summary = "测试延时队列")
@PostMapping("/testDelayQueue")
public Boolean testDelayQueue(@RequestBody DelayQueueVo delayQueueVo) {
return messageProducer.pushMessage(delayQueueVo.getMsgFrom(),delayQueueVo.getBizMessage(), messageProducer.getDelyQueueDelySecond(), delayQueueVo.getReason());
}
}
package cn.jiangjiesheng.eduinp.service.delyqueue;
import lombok.Data;
@Data
public class DelayQueueVo {
private String msgFrom;
private String bizMessage;
private int dely;
private String reason;
}
package cn.jiangjiesheng.eduinp.service.delyqueue;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
@Data
public class MessageDelyQueueConfig {
@Value(value = "${delyQueue.delySecond:20}")
private Integer delyQueueDelySecond;
@Value(value = "${delyQueue.handleMaxTimes:5}")
private Integer delyQueueHandleMaxTimes;
}
public static void main(String[] args) {
try {
SpringApplication.run(ProjApplication.class, args);
try {
MessageConsumer applicationRunnerService = (MessageConsumer) ContextUtil.getBean("messageConsumer");
applicationRunnerService.consumer();
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
package cn.jiangjiesheng.eduinp.service.delyqueue;
import cn.hutool.crypto.digest.MD5;
import com.google.common.base.Preconditions;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* 延时队列
* https://blog.51cto.com/u_15339304/5820536
* https://blog.csdn.net/qq_32447301/article/details/106558559
*/
@Slf4j
@Service
public class MessageDelyQueueService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private MessageDelyQueueConfig messageDelyQueueConfig;
private final static String TAG_MSG_FROM_FREFIX = "_MsgFrom_";
private final static String SPLIT_1 = "#1#";
private final static String SPLIT_2 = "#2#";
private final static String SPLIT_3 = "#3#";
/**
* 延迟队列名称,可以根据不通的业务处理设置不同的队列
*/
private static final String DELY_QUEUE_NAME = "delyQueue:project:proj:queueMame:commonQueue";
/**
* 锁key
*/
public static final String LOCK_KEY = "delyQueue:project:proj:lockKey:OneEntOneWayMsg";
//记录一定时间内的重试次数
public static final String REPEAT_KEY = "delyQueue:project:proj:repeatCount:";
//重启次数周期
public static final Integer RETRY_TIMES_CYCLE = 2 * 60 * 60; //120分钟(2小时)
private static MD5 md5Obj = MD5.create();
/**
* 发送数据
*
* @param bizMessage 消息
* @param dely 延迟多久(秒)
*/
public Boolean pushMessage(String msgFrom, String bizMessage, int dely) {
Preconditions.checkArgument(StringUtils.isNotBlank(bizMessage), "bizMessage不能为空");
if (dely < 0) {
dely = 0;
}
String message = addMsgFrom(bizMessage, msgFrom);
boolean canPushQueue = checkHasRetryCountAndSet(message);
if (!canPushQueue) {
log.error("当前消息{}分钟内,已处理{}次数,暂时不再加入队列", RETRY_TIMES_CYCLE / 60, messageDelyQueueConfig.getDelyQueueHandleMaxTimes());
return false;
}
long score = System.currentTimeMillis() + dely * 1000;
// String msg = JSONObject.toJSONString(message);
//TODO 注意: 根据score范围获取 0-当前时间戳可以拉取当前时间及以前的需要被消费的消息,所以 如果有dely * 1000,那就是 dely * 1000后才能pull出消息
//TODO 这个可能会有异常,包怎么改都不行 java.lang.StackOverflowError: null
// at org.springframework.data.redis.connection.DefaultedRedisConnection.zAdd(DefaultedRedisConnection.java:938)
// 一般的延时任务就使用 ScheduledExecutorService
Boolean add = redisTemplate.opsForZSet().add(DELY_QUEUE_NAME, message, score);
// ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
// executor.schedule(() -> {
// // 执行任务
// }, 1, TimeUnit.SECONDS); // 延迟1秒钟后执行任务
return add;
}
private boolean checkHasRetryCountAndSet(String message) {
String msg4RepeatCheck = getMsg4RepeatCheck(message);
String msgFrom = getMsgFrom(message);
String md5 = md5Obj.digestHex(msg4RepeatCheck.replaceAll(" ", ""), "UTF-8");
ValueOperations<String, String> stringStringValueOperations = redisTemplate.opsForValue();
String md5Key = REPEAT_KEY + md5;
String hasRetryCountStr = stringStringValueOperations.get(md5Key);
Integer hasRetryCount = StringUtils.isBlank(hasRetryCountStr) ? 0 : Integer.valueOf(hasRetryCountStr);
if (!msgFrom.endsWith(MessageDelyQueueConstants._NO_MAX_TIMES) && hasRetryCount >= messageDelyQueueConfig.getDelyQueueHandleMaxTimes()) {
log.info("当前设置第{}次重试,已达到最大次数,delyQueueHandleMaxTimes:{}", hasRetryCount + 1, messageDelyQueueConfig.getDelyQueueHandleMaxTimes());
return false;
}
log.info("当前设置第{}次重试", hasRetryCount + 1);
if (hasRetryCount == 0) {
hasRetryCount = 1;
stringStringValueOperations.set(md5Key, hasRetryCount.toString(), RETRY_TIMES_CYCLE, TimeUnit.SECONDS);
} else {
//修改数据,但是不修改剩余有效期
++hasRetryCount;
Long expire = redisTemplate.getExpire(md5Key, TimeUnit.SECONDS);
if (expire != null && expire > 0) {
stringStringValueOperations.set(md5Key, hasRetryCount.toString(), expire, TimeUnit.SECONDS);
}
}
return true;
}
/**
* 拉取最新需要
* 被消费的消息
* rangeByScore 根据score范围获取 0-当前时间戳可以拉取当前时间及以前的需要被消费的消息
*
* @return
*/
public List<String> pull() {
List<String> msgList = new ArrayList<>();
try {
//TODO 注意: 根据score范围获取 0-当前时间戳可以拉取当前时间及以前的需要被消费的消息,所以 如果有dely * 1000,那就是 dely * 1000后才能pull出消息
Set<String> strings = redisTemplate.opsForZSet().rangeByScore(DELY_QUEUE_NAME, 0, System.currentTimeMillis());
if (strings == null) {
return null;
}
// msgList = strings.stream().map(msg -> {
// String message = null;
// try {
// message = JSONObject.parseObject(msg, String.class);
// } catch (Exception e) {
// e.printStackTrace();
// }
// return message;
msgList = new ArrayList<>(strings);
} catch (Exception e) {
log.error(e.toString());
}
return msgList;
}
/**
* 移除消息
*
* @param message
*/
@SneakyThrows
public Boolean remove(String message) {
Long remove = redisTemplate.opsForZSet().remove(DELY_QUEUE_NAME, message);// JSONObject.toJSONString(message)
return remove > 0;
}
/**
* 获取锁,这是使用的锁的方式比较简单 ,reids
* 实现分布式锁比较复杂,这里不介绍
*
* @return
*/
public Boolean getLock() {
boolean lock = false;
//获得锁
lock = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, DELY_QUEUE_NAME + "is locking !", 30, TimeUnit.SECONDS);
return lock;
}
/**
* 释放锁
*/
public void releaseLock() {
redisTemplate.delete(LOCK_KEY);
}
private String addMsgFrom(String bizMessage, String msgFrom) {
if (StringUtils.isBlank(bizMessage)) {
return bizMessage;
}
if (bizMessage.startsWith(TAG_MSG_FROM_FREFIX)) {
return bizMessage;
} else {
//String unique = UUID.randomUUID().toString();
long unique = System.currentTimeMillis();
return TAG_MSG_FROM_FREFIX + msgFrom + SPLIT_1 + unique + SPLIT_2 + bizMessage;
}
}
public String getMsgFrom(String message) {
if (message.startsWith(TAG_MSG_FROM_FREFIX)) {
int index = message.indexOf(SPLIT_1);
String tag = message.substring(0, index);
return tag.replace(TAG_MSG_FROM_FREFIX, "");
} else {
return null;
}
}
public String getBizMessage(String message) {
if (message.startsWith(TAG_MSG_FROM_FREFIX)) {
int index = message.indexOf(SPLIT_2) + SPLIT_2.length();
return message.substring(index);
} else {
return message;
}
}
private String getMsg4RepeatCheck(String message) {
if (message.startsWith(TAG_MSG_FROM_FREFIX) && message.contains(SPLIT_1) && message.contains(SPLIT_2)) {
String unique = message.substring(message.indexOf(SPLIT_1), message.indexOf(SPLIT_2) + SPLIT_2.length());
return message.replace(unique, SPLIT_3);
}
return message;
}
// public static void main(String[] args) {
// String add = addMsgFrom("message", "exam");
// String msgFrom = getMsgFrom(add);
// String bizMessage = getBizMessage(add);
//
// String msg4RepeatCheck = getMsg4RepeatCheck(add);
// System.out.println(msgFrom);
// System.out.println(bizMessage);
// System.out.println(msg4RepeatCheck);
// System.out.println(add);
// }
}
package cn.jiangjiesheng.eduinp.service.delyqueue;
import cn.hutool.core.date.DateUtil;
import cn.jiangjiesheng.eduinp.service.org.OrgService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.*;
@Slf4j
@Service
public class MessageConsumer {
public static ExecutorService executorService = new ThreadPoolExecutor(20, 50,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(2048), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
@Autowired
private MessageDelyQueueService delyQueueService;
@Autowired
private OrgService orgService;
private boolean stopFlag = false;
public void setStopFlag(boolean stopFlag) {
this.stopFlag = stopFlag;
}
/**
* 放在启动类中调用
*/
@SneakyThrows
public void consumer() {
executorService.execute(() -> {
while (!stopFlag) {
List<String> messageList = delyQueueService.pull();
if (messageList.size() == 0) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
handleMessage(messageList);
}
});
}
/**
* 消息处理
*
* @param messageList
*/
public void handleMessage(List<String> messageList) {
for (String message : messageList) {
//实测好像第二次重试的就没有进来
log.info("consumer message:{},date:{}", message, DateUtil.now());
try {
String msgFrom = this.delyQueueService.getMsgFrom(message);
String bizMessage = this.delyQueueService.getBizMessage(message);
//todo ... 处理业务消息,先解析bizMessage得到入参,再去调用push消息的封装方法(不会出现死循环)
switch (msgFrom) {
case MessageDelyQueueConstants.MSG_FROM_SYNC_XGF_INFO_2_ENTERPRISE_STAFF_INFO:
String[] bizMessageSplit = bizMessage.split(",");
String entId = bizMessageSplit[0];
String inpAdminUserId = bizMessageSplit[1];
String entName = bizMessageSplit[2];
orgService.syncXgfInfo2EnterpriseStaffInfo(entId,inpAdminUserId,entName);
break;
default:
break;
}
} catch (Exception e) {
//TODO 最好已记录下最大延迟次数
log.error("consumer message:出现异常:{},异常:{}", message, e);
}
//TODO 注意:因为下次重试的msg还是一样的内容,那这里就把消息更清除 了,所以每次的消息也要保证唯一
this.delyQueueService.remove(message);
}
}
}
package cn.jiangjiesheng.eduinp.service.delyqueue;
import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MessageProducer {
@Autowired
MessageDelyQueueService delyQueueService;
@Autowired
private MessageDelyQueueConfig messageDelyQueueConfig;
/**
* 发送数据
*
* @param bizMessage 消息
* @param dely 延迟多久(秒)
*/
public Boolean pushMessage(String msgFrom, String bizMessage, int dely, String reason) {
log.info("加入延时队列入参:msgFrom:{},bizMessage:{},时间:{},dely:{},加入原因:{}", msgFrom, bizMessage, DateUtil.now(), dely, reason);
return delyQueueService.pushMessage(msgFrom, bizMessage, dely);
}
public int getDelyQueueDelySecond() {
return messageDelyQueueConfig.getDelyQueueDelySecond();
}
}
package cn.jiangjiesheng.eduinp.service.delyqueue;
public class MessageDelyQueueConstants {
public final static String _NO_MAX_TIMES = "_NoMaxTimes";
public final static String MSG_FROM_SYNC_XGF_INFO_2_ENTERPRISE_STAFF_INFO = "syncXgfInfo2EnterpriseStaffInfo_NoMaxTimes";
}
正文到此结束
- 本文标签: Spring Boot Spring
- 本文链接: https://code.jiangjiesheng.cn/article/38
- 版权声明: 本文由小江同学原创发布,转载请先联系本站长,谢谢。