Java-延时任务处理-睡眠时间后调用api-避免调用频繁失败
后面换成其它可以去掉的锁
@Value(value = "${apiInvokePeriodMins:8}")
@Getter
@Setter
private Integer apiInvokePeriodMins;
private final static Object lock = new Object();
private final static ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
int deley = getApiInvokePeriodMins() * 60;秒 -> 分钟
int deleyStart = 0;//这个延时是增加的
for (int i = 0, len = publicSentimentDictionaryData.size(); i < len; i++) {
try {
synchronized (lock) { //这个锁应该放在调用的api的位置
log.info("{} deley:{}", TAG, deley);
//log.info("{} deleyStart:{}", TAG, deleyStart);
if (StringUtils.isBlank(huatibianma)) {
//跳过,就不延时了
//deleyStart += 0;
continue;
}
//任务处理,调用api..
boolean isLastTask = i >= (len - 1);
long insertTotal = doSyncAndSave(dictionaryData.get("token").toString(), huatibianma, huatimingcheng, cursor, size, 0,isLastTask);
//记录结果(map)
result.put(huatimingcheng, String.format("新增%d条记录", insertTotal));
//if (i < (len - 1)) { //这个应该放在 doSyncAndSave 中间
// log.info("{}等待执行下一次调用", TAG);
//等于的话,就是最后一个了,不要等待了
// Thread.sleep(deley * 1000);
//} else {
// log.info("{}任务已全部完成", TAG);
//}
if (isLastTask) {
log.info("{}任务已全部完成,不再等待", TAG);
}
}
//失败的30分钟后再调用
} catch (Exception e) {
log.error("{}处理话题编号:{},话题名称:{},出现异常:{}", TAG, huatibianma, huatimingcheng, e);
}
}
//循环结束后打印结果
System.out.println(JSON.toJSONString(result));
//存在问题的继续发起调用
if (!TOO_MANY_REQUEST_KEYWORD_LIST.isEmpty()) {
log.warn("{}存在调用api失败的情况,将在15分钟后重试", TAG);
executor.schedule(() -> {
for (Map<String, Object> map : TOO_MANY_REQUEST_KEYWORD_LIST) {
log.warn("{}存在调用api失败的情况,正在开始重试:{}", TAG, JSON.toJSONString(map));
syncPublicSentiment(map.get("subjectCode").toString(), map.get("cursor").toString(), map.get("size").toString());
}
}, 15, TimeUnit.MINUTES);
}
对于api是递归调用的,要众和判断热舞
boolean isLastTask = i >= (len - 1);
long insertTotal = doSyncAndSave(dictionaryData.get("token").toString(), huatibianma, huatimingcheng, cursor, size, 0,isLastTask);
if (isLastTask) {
log.info("{}任务已全部完成,不再等待", TAG);
}
private long doSyncAndSave(String token, String subjectCode, String subjectName, String cursor, String size, long insertTotal, boolean isLastTask) {
try {
log.info("{}开始同步当前页:token:{},subjectCode:{}, subjectName:{}, cursor:{},size:{},insertTotal:{}", TAG, token, subjectCode, subjectName, cursor, size, insertTotal);
JSONObject dataByApi = getDataByApi(token, subjectCode, cursor, size);
if (dataByApi == null || isTooManyRequest(dataByApi)) {
//直接终止当前的话题编号,确保cursor可以连续
doDelay();
return insertTotal;
}
JSONArray jsonArray = dataByApi.getJSONArray("data");
if (jsonArray == null || jsonArray.size() == 0) {
doDelay();
return insertTotal;
}
// 如果 getDataByApi 调用失败,也应该延时
List<Map<String, Object>> apiDataList = (List<Map<String, Object>>) JSONArray.parse(JSON.toJSONString(jsonArray));
int apiDataReturnCount = apiDataList.size();
if (apiDataReturnCount < Integer.valueOf(size) || StringUtils.isBlank(nextCursor)) {
log.info("{}没有下一页的api数据了,当前入参cursor:{},size:{},出参cursor:{},size:{}", TAG, cursor, size, nextCursor, apiDataList.size());
if (!isLastTask) {
doDelay();
} else {
log.info("{}任务已全部完成,不再等待", TAG);
}
return insertTotal;
} else {
doDelay();
return doSyncAndSave(token, subjectCode, subjectName, nextCursor, size, insertTotal, isLastTask);
}
} catch (Exception e) {
log.error("{}处理数据报错,token:{},subjectCode:{}, subjectName:[}, cursor:{},size:{},异常:{}", TAG, token, subjectCode, subjectName, cursor, size, e);
}
return insertTotal;
}
private void doDelay() throws InterruptedException {
//等于的话,就是最后一个了,不要等待了
log.info("{}等待执行下一次调用", TAG);
Thread.sleep(getDaleyTimeValue() * 1000);
}
private long getDaleyTimeValue(){
return getApiInvokePeriodMins() * 60; // * 60
}
正文到此结束