Java-代码段-http接口调用自身服务中的其他http接口(mock)-并建立socket连接发送和接收报文实例
1. controller入口
@ApiOperation("模拟平台端+现场机socket交互过程,需要Authorization")
@PostMapping(path = "/testSocketBusiness")
public ResponseBean<Map<Object, Object>> testSocketBusiness(@RequestBody SocketTestClient.SocketAllParams socketAllParams) {
// 建立socket连接 "\n")
Map<Object, Object> res = socketTestClient.doTestAll(socketAllParams.getFirstUrlRequest(), socketAllParams.getSocketMsgList(), socketAllParams.getAutoUseNowDataTime());
return new ResponseBean<>(200, "请求结束", res);
}
1.1 接口入参示例及说明
对应急排水的现场机回复模拟:
在日志中找到 雨水启动应急排水完成:mn:7899871(有mn在线的)
SELECT qn,mn,cn,resp_detail from your_table where mn = '7899871' ORDER BY req_time desc;
拿到qn,接口报文列表填:(qn放在第1条9011上就行,注意提前改好PW、MN)
"QN=20250530114643967;ST=91;CN=9011;PW=123456;MN=7899871;Flag=4;CP=&&QnRtn=1&&",
"ST=91;CN=9012;PW=123456;MN=7899871;Flag=4;CP=&&ExeRtn=1&&"
{
"firstUrlRequest": {
"url": "/bms/waterMonitorEquip/remoteControl",
"method": "PUT",
"urlParams": {
"monitorId": 3300,
"cn": "3020",
"polId": "md0501",
"infoId": "i42002"
}
},
"socketMsgList": [
"ST=91;CN=9011;PW=123456;MN=33445566;Flag=4;CP=&&QnRtn=1&&",
"ST=32;CN=3020;PW=123456;MN=33445566;Flag=4;CP=&&DataTime=20250528111758;PolId=md0501;i42002-Info=2&&",
"ST=91;CN=9012;PW=123456;MN=33445566;Flag=4;CP=&&ExeRtn=1&&"
],
"autoUseNowDataTime": true,
"【示例非必要不改动】反控提取类入参示例(前不要长度、不要QN(接口会生成统一QN),后不要结尾,DataTime是否自动更新取决于autoUseNowDataTime)": {
"firstUrlRequest": {
"url": "/bms/waterMonitorEquip/remoteControl",
"method": "PUT",
"urlParams": {
"monitorId": 3319,
"cn": "3020",
"polId": "md0501",
"infoId": "i42002"
}
},
"socketMsgList": [
"ST=91;CN=9011;PW=123456;MN=7899871;Flag=4;CP=&&QnRtn=1&&",
"ST=32;CN=3020;PW=123456;MN=7899871;Flag=4;CP=&&DataTime=20250528111758;PolId=md0501;i42002-Info=2&&",
"ST=91;CN=9012;PW=123456;MN=7899871;Flag=4;CP=&&ExeRtn=1&&"
],
"autoUseNowDataTime": true
},
"【示例非必要不改动】现场机回复模拟示例,类似上面的反控(3条变2条,第1条必须有QN)": {
"说明": [
"对应急排水的现场机回复模拟:",
"在日志中找到 雨水启动应急排水完成:mn:7899871(有mn在线的)",
"SELECT qn,mn,cn,resp_detail from t_control_record where mn = '7899871' ORDER BY req_time desc;",
"拿到qn,接口报文列表填:(qn放在第1条9011上就行,注意提前改好PW、MN)",
"QN=20250530114643967;ST=91;CN=9011;PW=123456;MN=7899871;Flag=4;CP=&&QnRtn=1&&",
"ST=91;CN=9012;PW=123456;MN=7899871;Flag=4;CP=&&ExeRtn=1&&"
]
},
"【示例非必要不改动】监测数据入参示例(前不要长度、不要QN,后不要结尾,DataTime是否自动更新取决于autoUseNowDataTime)": {
"socketMsgList": [
"ST=32;CN=2011;PW=123456;MN=7899871;Flag=5;CP=&&DataTime=20250527105200;w00000-Rtd=8.8,w00000-Flag=N;w01018-Rtd=448,w01018-SampleTime=20250527105100,w01018-Flag=D&&"
],
"autoUseNowDataTime": false
},
"查找测点对应的MN/PW": [
"select distinct org.id as orgId,monitor.id as monitorId,monitor.name as monitorName,org.name as orgName,monitor.type as monitorType,concat (mi.item_code,mi.serial_number) AS \"支持的因子【关键】\",case monitor.type when 0 then '水' when 1 then '雨水' when 2 then '气' when 3 then '气无' else monitor.type :: VARCHAR END as \"测点类型\",mn.mn as mn,mn.pw as pw from bis_auto_mon_item mi INNER JOIN bis_auto_monitor_equipment me ON mi.main_id=me.id INNER JOIN bis_monitor_base monitor ON mi.monitor_id=monitor.id INNER join bis_main_org org on monitor.main_org_id=org.id INNER join bis_mn_info mn on monitor.id=mn.monitor_id INNER JOIN bis_scheme_factor_monitor sm ON sm.equipment_id = me.id INNER JOIN bis_scheme s ON s.id = sm.scheme_id AND s.status IN (0, 3) AND s.type = 2 where monitor.del_flag=1 and mn.del_flag=1 and org.del_flag=1 -- and org.name like '%DB迁移测试zgq%' and monitor.name like '%雨水测试测点zgq%' -- and org.id = 15094 -- and monitor.id = 3313 -- and mn.mn = '456456' ORDER BY monitor.type;"
],
"一次采集日志:": [
"tail -200f /home/logs/yourproject/collect/connect.log",
"tail -200f /home/logs/yourproject/collect/receive.log [重点]",
"tail -200f /home/logs/yourproject/collect/send.log",
"tail -200f /home/logs/yourproject/debug.log [重点]",
"tail -200f /home/logs/yourproject/error.log"
]
}
1.2 接口出参示例
{
"code": 200,
"msg": "请求结束",
"data": {
"请求结束,当期是监测数据上传模式,返回qn列表:": [
"20250529132932726"
]
},
"timestamp": "2025-05-29 13:29:36",
"traceId": "cb9557eaed1543aa8749bf2a545816ef"
}
2. mock test方式调用自身controller下的http接口并建立socket连接发送和接收报文实例
SocketTestClient.java
package cn.jiangjiesheng.code.service.common;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.jiangjiesheng.code.core.utils.ControllerInvoker;
import cn.jiangjiesheng.code.core.utils.HttpServletUtil;
import cn.jiangjiesheng.code.core.utils.StringUtils;
import cn.jiangjiesheng.code.exception.GnException;
import com.google.api.client.util.Lists;
import com.google.common.collect.Maps;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.*;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 模拟现场机,当前服务本身就相当于是现场机,发送socket连接到一次采集服务【正常业务的场景就不能通过connectToServer()创建连接】
*/
@Component
@Slf4j
public class SocketTestClient {
// 配置参数:socket服务器IP/端口
@Value("${ecp-collector.url}")
private String socketIP;
private static final int SOCKET_PORT = 16010;
// 复用socket
private static volatile Socket socket;
private static final Object lock = new Object();
// 建立连接使用,固定内容,核心是PW=123456;MN=7899871
// "上传" + cleanDataDay + "天之前的数据,不做处理" (15天前数据不处理)
// private static final String BUILD_MN_CONNECT_FIRST_TIME = "QN=20250523094003516;LOOK_AT_ME=just used for connected socket;ST=32;CN=2011;PW=%s;MN=%s;Flag=5;CP=&&DataTime=%s;w00000-Rtd=0.00;&&4540";
private static final String BUILD_MN_CONNECT_FIRST_TIME = "QN=20250523094003516;LOOK_AT_ME=just used for connected socket;ST=32;CN=2011;PW=%s;MN=%s;Flag=5;CP=&&DataTime=20250101000000;w00000-Rtd=0.00;&&4540";
// 普通红色:"\u001B[31m",普通绿色:"\u001B[32m"
// 高亮红色,sh脚本中使用 "\033[38;5;196m";
private static final String RED_HIGH_LIGHT = "\u001B[38;5;196m";
private static final String RESET = "\u001B[0m";
private static final String LOG_TAG = "【云平台-现场机-模拟socket报文交互】:";
// 发送最大间隔(毫秒)
private static final long MAX_SEND_INTERVAL = 500;
private volatile boolean responseReceived = false;
private boolean isFirstSocketMsgFlag = true;
private boolean hasBuildConnectMsg = false;
//待发送的报文的数量
private AtomicInteger socketMsgListTotalCount = null;
//正式报文已发送的数量
private AtomicInteger socketMsgListSentCount = null;
private CountDownLatch latch;
@Autowired
private ControllerInvoker controllerInvoker;
/**
* 模拟云平台和现场机交互的接口
* (整个交互的多条报文在一个接口入参中写完,就不用通过socket工具来测试了,
* 一次采集一套流程好像还有10秒内的限制,通过接口调用就没这些问题了。)
*
* @param urlObj 模拟第一次业务触发
* @param socketMsgList 其他交互报文
* @param autoUseNowDataTime 是否自动更新DataTime,默认是
* @return 返回qn
*/
public Map<Object, Object> doTestAll(FirstUrlRequest urlObj, List<String> socketMsgList, Boolean autoUseNowDataTime) {
//反控的qn
String qn = null;
boolean isReverseControlMode = false;
isFirstSocketMsgFlag = true;
hasBuildConnectMsg = false;
socketMsgListSentCount = new AtomicInteger(0);
socketMsgListTotalCount = new AtomicInteger(socketMsgList.size());
//记录下qn,返回便于查验
List<String> qnList = Lists.newArrayList();
try {
//单例模式复用socket
connectToServer();
if (CollectionUtils.isNotEmpty(socketMsgList)) {
// 获取输出流(用于发送数据)
OutputStream out = socket.getOutputStream();
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8));
// 调用业务上的反控接口
qn = mockInvokeReverseCtrl(urlObj, socketMsgList, writer, qn);
// 模拟现场机上传报文
isReverseControlMode = mockOnSiteMachineSendSocketMsg(socketMsgList, autoUseNowDataTime, qn, qnList, writer);
// writer.close(); socket也会关闭
}
HashMap<Object, Object> qnMap = Maps.newHashMap();
String key = String.format("请求结束,%s,返回qn列表:", isReverseControlMode ? "当前是反控模式(1个qn)" : "当期是监测数据上传模式");
qnMap.put(key, qnList);
return qnMap;
} catch (IOException e) {
showLog("连接或通信异常: " + e.getMessage());
throw new GnException("连接或通信异常: " + e.getMessage());
} catch (Exception e) {
throw new GnException("调用失败:" + e.getMessage());
} finally {
//disconnect();
}
}
/**
* 模拟调用业务反控接口
* @param urlObj
* @param socketMsgList
* @param writer
* @param qn
* @return
*/
private String mockInvokeReverseCtrl(FirstUrlRequest urlObj, List<String> socketMsgList, BufferedWriter writer, String qn) {
//触发第1个接口请求
if (urlObj != null && StringUtils.isNotBlank(urlObj.getUrl())) {
//发送建立连接使用
String msgOne = socketMsgList.get(0);
// 正则匹配 PW 和 MN 的值
String pw = extractValue(msgOne, "PW=([^;]+);");
String mn = extractValue(msgOne, "MN=([^;]+);");
showLog("取其中1条中的MN = {},PW = {}", mn, pw);
//这里补充1条
socketMsgListTotalCount.incrementAndGet();
hasBuildConnectMsg = true;
//只接收15天的数据,这里存个10
//String datetime = DateUtil.format(DateUtil.offsetDay(new Date(),-10), "yyyyMMdd000000");
//datetime = "20250101000000";
sendMessage(writer, handleFinalMsg(String.format(BUILD_MN_CONNECT_FIRST_TIME, pw, mn)));
//还要有首选
String authorization = HttpServletUtil.getRequest().getHeader("Authorization");
if (StringUtils.isBlank(authorization)) {
throw new GnException("请添加Authorization头");
}
String result = null;
try {
result = waitAndSendMessage(() ->
controllerInvoker.invokeController(urlObj.getUrl(),
urlObj.getMethod(),
JSONUtil.toJsonStr(urlObj.getUrlParams()),
authorization)
// 会阻塞
// HttpConnectionsUtils.requestWithBody(urlObj.getUrl(), urlObj.getMethod(),
// null, JSONUtil.toJsonStr(urlObj.getUrlParams()), authorization);
);
} catch (Exception e) {
throw new GnException("反控接口调用失败:" + e.getMessage());
}
JSONObject jsonObject = JSONUtil.parseObj(result);
qn = jsonObject.getStr("data", "");
if (StringUtils.isBlank(qn)) {
//这里返回的json,并不是抛出代码异常
throw new GnException("反控接口调用失败(qn返回空:" + jsonObject.getStr("msg", "") + ")");
}
}
return qn;
}
/**
* 模拟现场机上传报文
* @param socketMsgList
* @param autoUseNowDataTime
* @param qn
* @param qnList
* @param writer
* @return
*/
private boolean mockOnSiteMachineSendSocketMsg(List<String> socketMsgList, Boolean autoUseNowDataTime, String qn, List<String> qnList, BufferedWriter writer) {
boolean isReverseControlMode;
//替换qn 和 DataTime
String datetime = null;
if (autoUseNowDataTime == null) {
autoUseNowDataTime = true;
}
if (autoUseNowDataTime) {
datetime = DateUtil.format(new Date(), "yyyyMMddHHmmss");
}
//是否是反控模式
isReverseControlMode = qn != null;
//执行其他报文请求
String firstMsgQn = null;
for (int i = 0; i < socketMsgList.size(); i++) {
String msg = socketMsgList.get(i);
if (isReverseControlMode) {
//主要为3020提取类
addQnList(qnList, qn);
if (msg.startsWith("QN=")) {
msg = msg.replaceFirst("QN=[^;]+;", String.format("QN=%s;", qn));
} else {
msg = String.format("QN=%s;", qn) + msg;
}
} else {
if (i == 0 && msg.startsWith("QN=")) {
//主要为4099设置类4098切换,这里主要为了给响应的,
//已经有了qn的就使用原来的qn,为了测试方便快速入参(10秒内),只要现场机模拟上传的第1条的qn就行(9011)
//例如://对应急排水的现场机回复模拟:
//在日志中找到 雨水启动应急排水完成:mn:7899871(有mn在线的)
//SELECT qn,mn,cn,resp_detail from t_control_record where mn = '7899871' ORDER BY req_time desc;
//拿到qn,接口报文列表填:(qn放在第1条9011上就行,注意提前改好PW、MN)
// "QN=20250530114643967;ST=91;CN=9011;PW=123456;MN=7899871;Flag=4;CP=&&QnRtn=1&&",
// "ST=91;CN=9012;PW=123456;MN=7899871;Flag=4;CP=&&ExeRtn=1&&"
firstMsgQn = extractValue(msg, "QN=([^;]+);");
qn = firstMsgQn;
addQnList(qnList, qn);
} else {
if (firstMsgQn == null) {
//监测数据上传,这里重新生成qn
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
showLog("发送线程被中断");
}
qn = DateUtil.format(new Date(), "yyyyMMddHHmmssSSS");
addQnList(qnList, qn);
} else {
//主要为4099设置类4098切换,这里主要为了给响应的,
//已经有了qn的就使用原来的qn,为了测试方便快速入参(10秒内),只要现场机模拟上传的第1条的qn就行(9011)
//例如://对应急排水的现场机回复模拟:
//在日志中找到 雨水启动应急排水完成:mn:7899871(有mn在线的)
//SELECT qn,mn,cn,resp_detail from t_control_record where mn = '7899871' ORDER BY req_time desc;
//拿到qn,接口报文列表填:(qn放在第1条9011上就行,注意提前改好PW、MN)
// "QN=20250530114643967;ST=91;CN=9011;PW=123456;MN=7899871;Flag=4;CP=&&QnRtn=1&&",
// "ST=91;CN=9012;PW=123456;MN=7899871;Flag=4;CP=&&ExeRtn=1&&"
qn = firstMsgQn;
}
}
if (msg.startsWith("QN=")) {
msg = msg.replaceFirst("QN=[^;]+;", String.format("QN=%s;", qn));
} else {
msg = String.format("QN=%s;", qn) + msg;
}
}
if (datetime != null && msg.contains("DataTime=")) {
msg = msg.replaceFirst("DataTime=[^;]+;", String.format("DataTime=%s;", datetime));
}
msg = handleFinalMsg(msg);
String finalMsg = msg;
waitAndSendMessage(() -> {
sendMessage(writer, finalMsg);
return null;
});
socketMsgListSentCount.incrementAndGet();
}
return isReverseControlMode;
}
private static void addQnList(List<String> qnList, String qn) {
if (!qnList.contains(qn)) {
qnList.add(qn);
}
}
/**
* 处理成最终的报文格式
*
* @param msg
* @return
*/
private static String handleFinalMsg(String msg) {
//判断是否需要组装成完整的报文
if (msg.startsWith("QN")) {
// 找到最后一个 "&&" 的位置
int index = msg.lastIndexOf("&&");
if (index > 0 && !msg.endsWith("&&")) {
//先截断
msg = msg.substring(0, index + 2); // 保留 "&&
}
int length = msg.length();
String msgStart = String.format("##%04d", length);
msg = msgStart + msg;
}
//这个应该紧跟上面的逻辑,拼接的内容好像不重要
if (msg.endsWith("&&")) {
msg += "4540";
}
return msg;
}
// 提取方法
private static String extractValue(String input, String regex) {
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(input);
if (matcher.find()) {
return matcher.group(1);
}
return null;
}
/**
* 给最大等待时间后调用
*
* @param callable
* @param <T>
*/
private <T> T waitAndSendMessage(Callable<T> callable) {
try {
if (isFirstSocketMsgFlag) {
return callable.call();
}
latch = new CountDownLatch(1);
boolean received = latch.await(MAX_SEND_INTERVAL, TimeUnit.MILLISECONDS);
//这里是靠latch.countDown();唤醒的
if (received || responseReceived) {
// 提前收到响应,
// 万一socket服务端没有处理好链接数据。
// 这里如果有必要就记录下 sendMessage 到接收到responseReceived的时间差,做个500毫秒以下的延时。尽量不要太大
// try {
// Thread.sleep(MAX_SEND_INTERVAL);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// showLog("发送线程被中断");
// }
showLog("提前收到响应(或建立mn链接),立即正式执行http请求或发送下一条报文");
} else {
// 超时未收到响应
showLog("未收到响应(或建立mn链接),等待{}ms后正式执行http请求或发送下一条报文", MAX_SEND_INTERVAL);
}
return callable.call();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("等待响应时被中断", e);
} catch (Exception e) {
throw new GnException("Callable调用异常" + e.getMessage());
} finally {
latch = null;
responseReceived = false;
}
return null;
}
// 封装发送方法:添加 \r\n 并刷新
private void sendMessage(BufferedWriter writer, String message) {
try {
// 添加回车换行 ,应该只要 \n
writer.write(message + "\r\n");
writer.flush();
if (!hasBuildConnectMsg || !isFirstSocketMsgFlag) {
//异步回复太快,日志顺序有点问题
showLog(String.format("第%d条报文发送完成 {}", socketMsgListSentCount.get() + 1), message);
}
isFirstSocketMsgFlag = false;
} catch (IOException e) {
showLog("报文发送失败 {}", message, e);
}
}
// 启动一个线程接收服务器响应(可选)
private void startServerResponseThread(Socket socket) {
new Thread(() -> {
try (InputStream in = socket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
String responseLine;
while ((responseLine = reader.readLine()) != null) {
showLog("收到来自服务器的报文: " + responseLine);
}
} catch (IOException e) {
showLog("接收服务器报文失败: " + e.getMessage());
}
}).start();
}
/**
* 单例模式复用socket
*/
public void connectToServer() {
if (socket != null && !socket.isClosed()) {
// 这里为了提升性能
//|| socket.isConnected()只表示这个 Socket 对象是否曾经成功连接过一次。
showLog("使用已有的socket连接:{}:{}", socketIP, SOCKET_PORT);
return;
}
synchronized (lock) {
if (socket != null && !socket.isClosed()) {
//|| socket.isConnected()只表示这个 Socket 对象是否曾经成功连接过一次。
showLog("使用已有的socket连接:{}:{}", socketIP, SOCKET_PORT);
return;
}
try {
// 关闭旧 socket(如果有)
if (socket != null) {
socket.close();
}
socket = new Socket(socketIP, SOCKET_PORT);
// 可以在这里配置 socket 参数,如设置超时时间
showLog("成功连接到socket服务器:{}:{}", socketIP, SOCKET_PORT);
SocketMessageReceiver receiver = new SocketMessageReceiver(socket, new MessageHandler() {
@Override
public void onMessageReceived(String message) {
// 从下面的日志来看,发送和回复好像没有关联 实际参考意义不大,而且3条报文,只返回1条(也不是同一个qn的原因)
//2025-05-30 10:45:29.863 INFO 24908 --- [nio-8801-exec-1] c.e.ecp.service.common.SocketTestClient : [4ca705c78b4f4452b885f91d8d1db12d]
//第1条报文发送完成 ##0077QN=20250530104529280;ST=91;CN=9011;PW=123456;MN=7899871;Flag=4;CP=&&QnRtn=1&&4540
//2025-05-30 10:45:29.293 INFO 24908 --- [ Thread-212] c.e.ecp.service.common.SocketTestClient : []
//收到socket回复消息:##0096QN=20250530104529280;ST=32;CN=3020;PW=123456;MN=7899871;Flag=5;CP=&&PolId=md0501;InfoId=i42002&&3e80,即将执行下一条报文
//2025-05-30 10:45:29.863 INFO 24908 --- [nio-8801-exec-1] c.e.ecp.service.common.SocketTestClient : [4ca705c78b4f4452b885f91d8d1db12d]
//未收到响应(或建立mn链接),等待500ms后正式执行http请求或发送下一条报文
if (!message.contains("QN=20250523094003516")) {
String nextMsgTip = socketMsgListSentCount.get() <= socketMsgListTotalCount.get() ? "即将执行下一条报文" : "报文全部发送完毕";
showLog("收到socket回复消息:{},{}", message, nextMsgTip);
}
if (latch != null) {
responseReceived = true;
// 唤醒等待线程
latch.countDown();
}
}
@Override
public void onConnectionClosed() {
showLog("socket断开连接");
//断开实际上看 running 这里主动调用断开才行,以后如果有其他的场景必要,可以在socket连接断开的时候再连接。
//new Thread(() -> connectToServer()).start();
}
@Override
public void onError(Exception e) {
showLog("socket连接发生错误", e);
}
});
receiver.start();
} catch (IOException e) {
showLog("socket连接异常", e);
// 回退状态
socket = null;
throw new GnException("socket连接异常:" + e.getMessage());
}
}
}
public void disconnect() {
try {
if (socket != null && !socket.isClosed()) {
socket.close();
socket = null;
}
} catch (IOException e) {
throw new GnException("socket关闭异常:" + e.getMessage());
}
}
/**
* 打印高亮红色日志
*
* @param format
* @param arguments
*/
private synchronized void showLog(String format, Object... arguments) {
// 默认: log.info("\n" + format + "\n", arguments);
log.info("\n" + RED_HIGH_LIGHT + LOG_TAG + format + RESET + "\n", arguments);
}
/**
* 模拟第一次业务触发
*/
@Data
public static class FirstUrlRequest {
private String url;
private String method;
private Object urlParams;
}
@Data
public static class SocketAllParams {
//模拟第一次业务触发
private FirstUrlRequest firstUrlRequest;
//其他交互报文
private List<String> socketMsgList;
//是否自动更新DataTime,默认是
private Boolean autoUseNowDataTime;
}
interface MessageHandler {
void onMessageReceived(String message);
void onConnectionClosed();
void onError(Exception e);
}
static class SocketMessageReceiver {
private final Socket socket;
private final MessageHandler handler;
private volatile boolean running = true;
public SocketMessageReceiver(Socket socket, MessageHandler handler) {
this.socket = socket;
this.handler = handler;
}
public void start() {
new Thread(this::runLoop).start();
}
public void stop() {
running = false;
try {
if (!socket.isClosed()) {
socket.close();
}
} catch (IOException e) {
handler.onError(e);
}
}
private void runLoop() {
try (InputStream in = socket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
String line;
while (running && (line = reader.readLine()) != null) {
handler.onMessageReceived(line);
}
handler.onConnectionClosed();
} catch (IOException e) {
handler.onError(e);
}
}
}
}
3. ControllerInvoker mock调用http接口代码
ControllerInvoker.java
package cn.jiangjiesheng.code.core.utils;
import cn.jiangjiesheng.code.exception.GnException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.MediaType;
import org.springframework.mock.web.MockHttpServletResponse;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.HandlerAdapter;
import org.springframework.web.servlet.HandlerExecutionChain;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping;
import org.springframework.mock.web.MockHttpServletRequest;
import java.nio.charset.StandardCharsets;
/**
* http调用Controller http代码接口
* 依赖
* <dependency>
* <groupId>org.springframework.boot</groupId>
* <artifactId>spring-boot-starter-test</artifactId>
* <!-- <version>2.0.6.RELEASE</version>-->
* </dependency>
*/
@Service
public class ControllerInvoker {
@Autowired
private RequestMappingHandlerMapping handlerMapping;
@Autowired
//有3个
@Qualifier("requestMappingHandlerAdapter")
private HandlerAdapter handlerAdapter;
/**
* http调用Controller http代码接口,http不能直接调用自身服务的http接口,会阻塞
* @param uri 不要server.servlet.context-path,示例:/your http api/remoteControl
* @param method
* @param jsonBody
* @param authorization
* @return
* @throws Exception
*/
public String invokeController(String uri, String method, String jsonBody, String authorization) throws Exception {
MockHttpServletRequest request = new MockHttpServletRequest();
request.setRequestURI(uri);
request.setMethod(method.toUpperCase());
request.addHeader("Authorization", authorization);
request.setContentType(MediaType.APPLICATION_JSON_VALUE);
request.setContent(jsonBody.getBytes(StandardCharsets.UTF_8));
HandlerExecutionChain chain = handlerMapping.getHandler(request);
if (chain == null) {
throw new GnException("没找到对应url: " + uri);
}
// 执行 Controller 方法
MockHttpServletResponse response = new MockHttpServletResponse();
handlerAdapter.handle(request, response, chain.getHandler());
return new String(response.getContentAsByteArray(), StandardCharsets.UTF_8);
}
}
正文到此结束
- 本文标签: Java Spring Boot socket
- 本文链接: https://code.jiangjiesheng.cn/article/367
- 版权声明: 本文由小江同学原创发布,转载请先联系本站长,谢谢。