原创

Java-延时队列-基于redis-StringRedisTemplate-2小时最多处理3次

特别注意:
这里不是一次push,多次重试,而是每次push,每次延时执行,一段时间内会限制次数

package cn.jiangjiesheng.proj.domain.jingbo.controller;

import cn.jiangjiesheng.proj.domain.jingbo.dto.DelayQueueVo;
import cn.jiangjiesheng.proj.domain.jingbo.service.OneEntOneWayService;
import io.swagger.v3.oas.annotations.Operation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
@RequestMapping("/test")
public class OneEntOneWayController {

    @Autowired
    private MessageProducer messageProducer;

    @Operation(summary = "测试延时队列")
    @PostMapping("/testDelayQueue")
    public Boolean testDelayQueue(@RequestBody DelayQueueVo delayQueueVo) {
        return messageProducer.pushMessage(delayQueueVo.getMsgFrom(),delayQueueVo.getBizMessage(), messageProducer.getDelyQueueDelySecond(), delayQueueVo.getReason());
    }
}
package cn.jiangjiesheng.eduinp.service.delyqueue;

import lombok.Data;

@Data
public class DelayQueueVo {
    private String msgFrom;
    private String bizMessage;
    private int dely;
    private String reason;

}
package cn.jiangjiesheng.eduinp.service.delyqueue;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
@Data
public class MessageDelyQueueConfig {

    @Value(value = "${delyQueue.delySecond:20}")
    private Integer delyQueueDelySecond;

    @Value(value = "${delyQueue.handleMaxTimes:5}")
    private Integer delyQueueHandleMaxTimes;

}
 public static void main(String[] args) {
        try {
            SpringApplication.run(ProjApplication.class, args);
            try {
                MessageConsumer applicationRunnerService = (MessageConsumer) ContextUtil.getBean("messageConsumer");
                applicationRunnerService.consumer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
package cn.jiangjiesheng.eduinp.service.delyqueue;

import cn.hutool.crypto.digest.MD5;
import com.google.common.base.Preconditions;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;


/**
 * 延时队列
 * https://blog.51cto.com/u_15339304/5820536
 * https://blog.csdn.net/qq_32447301/article/details/106558559
 */
@Slf4j
@Service
public class MessageDelyQueueService {

    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private MessageDelyQueueConfig messageDelyQueueConfig;

    private final static String TAG_MSG_FROM_FREFIX = "_MsgFrom_";
    private final static String SPLIT_1 = "#1#";
    private final static String SPLIT_2 = "#2#";
    private final static String SPLIT_3 = "#3#";

    /**
     * 延迟队列名称,可以根据不通的业务处理设置不同的队列
     */
    private static final String DELY_QUEUE_NAME = "delyQueue:project:proj:queueMame:commonQueue";

    /**
     * 锁key
     */
    public static final String LOCK_KEY = "delyQueue:project:proj:lockKey:OneEntOneWayMsg";

    //记录一定时间内的重试次数
    public static final String REPEAT_KEY = "delyQueue:project:proj:repeatCount:";
    //重启次数周期
    public static final Integer RETRY_TIMES_CYCLE = 2 * 60 * 60; //120分钟(2小时)

    private static MD5 md5Obj = MD5.create();


    /**
     * 发送数据
     *
     * @param bizMessage 消息
     * @param dely       延迟多久(秒)
     */
    public Boolean pushMessage(String msgFrom, String bizMessage, int dely) {

        Preconditions.checkArgument(StringUtils.isNotBlank(bizMessage), "bizMessage不能为空");
        if (dely < 0) {
            dely = 0;
        }

        String message = addMsgFrom(bizMessage, msgFrom);

        boolean canPushQueue = checkHasRetryCountAndSet(message);
        if (!canPushQueue) {
            log.error("当前消息{}分钟内,已处理{}次数,暂时不再加入队列", RETRY_TIMES_CYCLE / 60, messageDelyQueueConfig.getDelyQueueHandleMaxTimes());
            return false;
        }

        long score = System.currentTimeMillis() + dely * 1000;
        //  String msg = JSONObject.toJSONString(message);
        //TODO 注意: 根据score范围获取 0-当前时间戳可以拉取当前时间及以前的需要被消费的消息,所以 如果有dely * 1000,那就是 dely * 1000后才能pull出消息


         //TODO 这个可能会有异常,包怎么改都不行 java.lang.StackOverflowError: null
        //    at org.springframework.data.redis.connection.DefaultedRedisConnection.zAdd(DefaultedRedisConnection.java:938)
        // 一般的延时任务就使用 ScheduledExecutorService
        Boolean add = redisTemplate.opsForZSet().add(DELY_QUEUE_NAME, message, score);

//        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
//        executor.schedule(() -> {
//            // 执行任务
//        }, 1, TimeUnit.SECONDS); // 延迟1秒钟后执行任务

        return add;
    }

    private boolean checkHasRetryCountAndSet(String message) {

        String msg4RepeatCheck = getMsg4RepeatCheck(message);
        String msgFrom = getMsgFrom(message);

        String md5 = md5Obj.digestHex(msg4RepeatCheck.replaceAll(" ", ""), "UTF-8");

        ValueOperations<String, String> stringStringValueOperations = redisTemplate.opsForValue();
        String md5Key = REPEAT_KEY + md5;
        String hasRetryCountStr = stringStringValueOperations.get(md5Key);
        Integer hasRetryCount = StringUtils.isBlank(hasRetryCountStr) ? 0 : Integer.valueOf(hasRetryCountStr);

        if (!msgFrom.endsWith(MessageDelyQueueConstants._NO_MAX_TIMES) && hasRetryCount >= messageDelyQueueConfig.getDelyQueueHandleMaxTimes()) {
            log.info("当前设置第{}次重试,已达到最大次数,delyQueueHandleMaxTimes:{}", hasRetryCount + 1, messageDelyQueueConfig.getDelyQueueHandleMaxTimes());
            return false;
        }
        log.info("当前设置第{}次重试", hasRetryCount + 1);

        if (hasRetryCount == 0) {
            hasRetryCount = 1;
            stringStringValueOperations.set(md5Key, hasRetryCount.toString(), RETRY_TIMES_CYCLE, TimeUnit.SECONDS);
        } else {
            //修改数据,但是不修改剩余有效期
            ++hasRetryCount;
            Long expire = redisTemplate.getExpire(md5Key, TimeUnit.SECONDS);
            if (expire != null && expire > 0) {
                stringStringValueOperations.set(md5Key, hasRetryCount.toString(), expire, TimeUnit.SECONDS);
            }
        }
        return true;
    }


    /**
     * 拉取最新需要
     * 被消费的消息
     * rangeByScore 根据score范围获取 0-当前时间戳可以拉取当前时间及以前的需要被消费的消息
     *
     * @return
     */
    public List<String> pull() {

        List<String> msgList = new ArrayList<>();
        try {
            //TODO 注意: 根据score范围获取 0-当前时间戳可以拉取当前时间及以前的需要被消费的消息,所以 如果有dely * 1000,那就是 dely * 1000后才能pull出消息
            Set<String> strings = redisTemplate.opsForZSet().rangeByScore(DELY_QUEUE_NAME, 0, System.currentTimeMillis());
            if (strings == null) {
                return null;
            }
//            msgList = strings.stream().map(msg -> {
//                String message = null;
//                try {
//                    message = JSONObject.parseObject(msg, String.class);
//                } catch (Exception e) {
//                    e.printStackTrace();
//                }
//                return message;
            msgList = new ArrayList<>(strings);
        } catch (Exception e) {
            log.error(e.toString());
        }
        return msgList;
    }

    /**
     * 移除消息
     *
     * @param message
     */
    @SneakyThrows
    public Boolean remove(String message) {
        Long remove = redisTemplate.opsForZSet().remove(DELY_QUEUE_NAME, message);// JSONObject.toJSONString(message)
        return remove > 0;
    }

    /**
     * 获取锁,这是使用的锁的方式比较简单 ,reids
     * 实现分布式锁比较复杂,这里不介绍
     *
     * @return
     */
    public Boolean getLock() {
        boolean lock = false;
        //获得锁
        lock = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, DELY_QUEUE_NAME + "is locking !", 30, TimeUnit.SECONDS);
        return lock;
    }

    /**
     * 释放锁
     */
    public void releaseLock() {
        redisTemplate.delete(LOCK_KEY);
    }

    private String addMsgFrom(String bizMessage, String msgFrom) {
        if (StringUtils.isBlank(bizMessage)) {
            return bizMessage;
        }
        if (bizMessage.startsWith(TAG_MSG_FROM_FREFIX)) {
            return bizMessage;
        } else {
            //String unique  = UUID.randomUUID().toString();
            long unique = System.currentTimeMillis();
            return TAG_MSG_FROM_FREFIX + msgFrom + SPLIT_1 + unique + SPLIT_2 + bizMessage;
        }
    }

    public String getMsgFrom(String message) {
        if (message.startsWith(TAG_MSG_FROM_FREFIX)) {
            int index = message.indexOf(SPLIT_1);
            String tag = message.substring(0, index);
            return tag.replace(TAG_MSG_FROM_FREFIX, "");
        } else {
            return null;
        }
    }

    public String getBizMessage(String message) {
        if (message.startsWith(TAG_MSG_FROM_FREFIX)) {
            int index = message.indexOf(SPLIT_2) + SPLIT_2.length();
            return message.substring(index);
        } else {
            return message;
        }
    }


    private String getMsg4RepeatCheck(String message) {
        if (message.startsWith(TAG_MSG_FROM_FREFIX) && message.contains(SPLIT_1) && message.contains(SPLIT_2)) {
            String unique = message.substring(message.indexOf(SPLIT_1), message.indexOf(SPLIT_2) + SPLIT_2.length());
            return message.replace(unique, SPLIT_3);
        }
        return message;
    }

//    public static void main(String[] args) {
//        String add = addMsgFrom("message", "exam");
//        String msgFrom = getMsgFrom(add);
//        String bizMessage = getBizMessage(add);
//
//        String msg4RepeatCheck = getMsg4RepeatCheck(add);
//        System.out.println(msgFrom);
//        System.out.println(bizMessage);
//        System.out.println(msg4RepeatCheck);
//        System.out.println(add);
//    }
}
package cn.jiangjiesheng.eduinp.service.delyqueue;


import cn.hutool.core.date.DateUtil;
import cn.jiangjiesheng.eduinp.service.org.OrgService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.*;

@Slf4j
@Service
public class MessageConsumer {
    public static ExecutorService executorService = new ThreadPoolExecutor(20, 50,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(2048), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

    @Autowired
    private MessageDelyQueueService delyQueueService;

    @Autowired
    private OrgService orgService;

    private boolean stopFlag = false;

    public void setStopFlag(boolean stopFlag) {
        this.stopFlag = stopFlag;
    }

    /**
     * 放在启动类中调用
     */
    @SneakyThrows
    public void consumer() {
        executorService.execute(() -> {
            while (!stopFlag) {
                List<String> messageList = delyQueueService.pull();
                if (messageList.size() == 0) {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                handleMessage(messageList);
            }
        });
    }

    /**
     * 消息处理
     *
     * @param messageList
     */
    public void handleMessage(List<String> messageList) {

        for (String message : messageList) {
            //实测好像第二次重试的就没有进来
            log.info("consumer message:{},date:{}", message, DateUtil.now());

            try {
                String msgFrom = this.delyQueueService.getMsgFrom(message);
                String bizMessage = this.delyQueueService.getBizMessage(message);
                //todo ... 处理业务消息,先解析bizMessage得到入参,再去调用push消息的封装方法(不会出现死循环)

                switch (msgFrom) {
                    case MessageDelyQueueConstants.MSG_FROM_SYNC_XGF_INFO_2_ENTERPRISE_STAFF_INFO:
                       String[] bizMessageSplit = bizMessage.split(",");
                       String entId = bizMessageSplit[0];
                       String inpAdminUserId = bizMessageSplit[1];
                       String entName = bizMessageSplit[2];
                       orgService.syncXgfInfo2EnterpriseStaffInfo(entId,inpAdminUserId,entName);
                        break;
                    default:
                        break;
                }



            } catch (Exception e) {
                //TODO 最好已记录下最大延迟次数
                log.error("consumer message:出现异常:{},异常:{}", message, e);
            }
            //TODO 注意:因为下次重试的msg还是一样的内容,那这里就把消息更清除 了,所以每次的消息也要保证唯一
            this.delyQueueService.remove(message);
        }
    }

}
package cn.jiangjiesheng.eduinp.service.delyqueue;

import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class MessageProducer {

    @Autowired
    MessageDelyQueueService delyQueueService;
    @Autowired
    private MessageDelyQueueConfig messageDelyQueueConfig;

    /**
     * 发送数据
     *
     * @param bizMessage 消息
     * @param dely       延迟多久(秒)
     */
    public Boolean pushMessage(String msgFrom, String bizMessage, int dely, String reason) {
        log.info("加入延时队列入参:msgFrom:{},bizMessage:{},时间:{},dely:{},加入原因:{}", msgFrom, bizMessage, DateUtil.now(), dely, reason);
        return delyQueueService.pushMessage(msgFrom, bizMessage, dely);
    }

    public int getDelyQueueDelySecond() {
        return messageDelyQueueConfig.getDelyQueueDelySecond();
    }
}
package cn.jiangjiesheng.eduinp.service.delyqueue;

public class MessageDelyQueueConstants {
     public final static String _NO_MAX_TIMES = "_NoMaxTimes";
    public final static String MSG_FROM_SYNC_XGF_INFO_2_ENTERPRISE_STAFF_INFO = "syncXgfInfo2EnterpriseStaffInfo_NoMaxTimes";
}
正文到此结束
本文目录