原创

Java-代码段-http接口调用自身服务中的其他http接口(mock)-并建立socket连接发送和接收报文实例

1. controller入口

    @ApiOperation("模拟平台端+现场机socket交互过程,需要Authorization")
    @PostMapping(path = "/testSocketBusiness")
    public ResponseBean<Map<Object, Object>> testSocketBusiness(@RequestBody SocketTestClient.SocketAllParams socketAllParams) {
        // 建立socket连接  "\n")
        Map<Object, Object> res = socketTestClient.doTestAll(socketAllParams.getFirstUrlRequest(), socketAllParams.getSocketMsgList(), socketAllParams.getAutoUseNowDataTime());
        return new ResponseBean<>(200, "请求结束", res);
    }

1.1 接口入参示例及说明


对应急排水的现场机回复模拟:

在日志中找到  雨水启动应急排水完成:mn:7899871(有mn在线的)
SELECT qn,mn,cn,resp_detail from your_table where mn = '7899871' ORDER BY req_time desc;

拿到qn,接口报文列表填:(qn放在第1条9011上就行,注意提前改好PW、MN)

"QN=20250530114643967;ST=91;CN=9011;PW=123456;MN=7899871;Flag=4;CP=&&QnRtn=1&&",
"ST=91;CN=9012;PW=123456;MN=7899871;Flag=4;CP=&&ExeRtn=1&&"
{
    "firstUrlRequest": {
        "url": "/bms/waterMonitorEquip/remoteControl",
        "method": "PUT",
        "urlParams": {
            "monitorId": 3300,
            "cn": "3020",
            "polId": "md0501",
            "infoId": "i42002"
        }
    },
    "socketMsgList": [
        "ST=91;CN=9011;PW=123456;MN=33445566;Flag=4;CP=&&QnRtn=1&&",
        "ST=32;CN=3020;PW=123456;MN=33445566;Flag=4;CP=&&DataTime=20250528111758;PolId=md0501;i42002-Info=2&&",
        "ST=91;CN=9012;PW=123456;MN=33445566;Flag=4;CP=&&ExeRtn=1&&"
    ],
    "autoUseNowDataTime": true,
    "【示例非必要不改动】反控提取类入参示例(前不要长度、不要QN(接口会生成统一QN),后不要结尾,DataTime是否自动更新取决于autoUseNowDataTime)": {
        "firstUrlRequest": {
            "url": "/bms/waterMonitorEquip/remoteControl",
            "method": "PUT",
            "urlParams": {
                "monitorId": 3319,
                "cn": "3020",
                "polId": "md0501",
                "infoId": "i42002"
            }
        },
        "socketMsgList": [
            "ST=91;CN=9011;PW=123456;MN=7899871;Flag=4;CP=&&QnRtn=1&&",
            "ST=32;CN=3020;PW=123456;MN=7899871;Flag=4;CP=&&DataTime=20250528111758;PolId=md0501;i42002-Info=2&&",
            "ST=91;CN=9012;PW=123456;MN=7899871;Flag=4;CP=&&ExeRtn=1&&"
        ],
        "autoUseNowDataTime": true
    },
    "【示例非必要不改动】现场机回复模拟示例,类似上面的反控(3条变2条,第1条必须有QN)": {
        "说明": [
            "对应急排水的现场机回复模拟:",
            "在日志中找到  雨水启动应急排水完成:mn:7899871(有mn在线的)",
            "SELECT qn,mn,cn,resp_detail from t_control_record where mn = '7899871' ORDER BY req_time desc;",
            "拿到qn,接口报文列表填:(qn放在第1条9011上就行,注意提前改好PW、MN)",
            "QN=20250530114643967;ST=91;CN=9011;PW=123456;MN=7899871;Flag=4;CP=&&QnRtn=1&&",
            "ST=91;CN=9012;PW=123456;MN=7899871;Flag=4;CP=&&ExeRtn=1&&"
        ]
    },
    "【示例非必要不改动】监测数据入参示例(前不要长度、不要QN,后不要结尾,DataTime是否自动更新取决于autoUseNowDataTime)": {
        "socketMsgList": [
            "ST=32;CN=2011;PW=123456;MN=7899871;Flag=5;CP=&&DataTime=20250527105200;w00000-Rtd=8.8,w00000-Flag=N;w01018-Rtd=448,w01018-SampleTime=20250527105100,w01018-Flag=D&&"
        ],
        "autoUseNowDataTime": false
    },
    "查找测点对应的MN/PW": [
        "select distinct org.id as orgId,monitor.id as monitorId,monitor.name as monitorName,org.name as orgName,monitor.type as monitorType,concat (mi.item_code,mi.serial_number) AS \"支持的因子【关键】\",case monitor.type when 0 then '水' when 1 then '雨水' when 2 then '气' when 3 then '气无' else monitor.type :: VARCHAR END as \"测点类型\",mn.mn as mn,mn.pw as pw from bis_auto_mon_item mi INNER JOIN bis_auto_monitor_equipment me ON mi.main_id=me.id INNER JOIN bis_monitor_base monitor ON mi.monitor_id=monitor.id INNER join bis_main_org org on monitor.main_org_id=org.id INNER join bis_mn_info mn on monitor.id=mn.monitor_id INNER JOIN bis_scheme_factor_monitor sm ON sm.equipment_id = me.id INNER JOIN bis_scheme s ON s.id = sm.scheme_id AND s.status IN (0, 3) AND s.type = 2 where monitor.del_flag=1 and mn.del_flag=1 and org.del_flag=1 -- and org.name like '%DB迁移测试zgq%' and monitor.name like '%雨水测试测点zgq%'  -- and org.id = 15094 -- and monitor.id = 3313 -- and mn.mn = '456456' ORDER BY monitor.type;"
    ],
    "一次采集日志:": [
        "tail -200f /home/logs/yourproject/collect/connect.log",
        "tail -200f /home/logs/yourproject/collect/receive.log [重点]",
        "tail -200f /home/logs/yourproject/collect/send.log",
        "tail -200f /home/logs/yourproject/debug.log [重点]",
        "tail -200f /home/logs/yourproject/error.log"
    ]
}

1.2 接口出参示例

{
    "code": 200,
    "msg": "请求结束",
    "data": {
        "请求结束,当期是监测数据上传模式,返回qn列表:": [
            "20250529132932726"
        ]
    },
    "timestamp": "2025-05-29 13:29:36",
    "traceId": "cb9557eaed1543aa8749bf2a545816ef"
}

2. mock test方式调用自身controller下的http接口并建立socket连接发送和接收报文实例

SocketTestClient.java

package cn.jiangjiesheng.code.service.common;

import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.jiangjiesheng.code.core.utils.ControllerInvoker;
import cn.jiangjiesheng.code.core.utils.HttpServletUtil;
import cn.jiangjiesheng.code.core.utils.StringUtils;
import cn.jiangjiesheng.code.exception.GnException;
import com.google.api.client.util.Lists;
import com.google.common.collect.Maps;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;

import java.io.*;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * 模拟现场机,当前服务本身就相当于是现场机,发送socket连接到一次采集服务【正常业务的场景就不能通过connectToServer()创建连接】
 */
@Component
@Slf4j
public class SocketTestClient {

    // 配置参数:socket服务器IP/端口
    @Value("${ecp-collector.url}")
    private String socketIP;
    private static final int SOCKET_PORT = 16010;

    // 复用socket
    private static volatile Socket socket;
    private static final Object lock = new Object();

    // 建立连接使用,固定内容,核心是PW=123456;MN=7899871
    // "上传" + cleanDataDay + "天之前的数据,不做处理" (15天前数据不处理)
    // private static final String BUILD_MN_CONNECT_FIRST_TIME = "QN=20250523094003516;LOOK_AT_ME=just used for connected socket;ST=32;CN=2011;PW=%s;MN=%s;Flag=5;CP=&&DataTime=%s;w00000-Rtd=0.00;&&4540";
    private static final String BUILD_MN_CONNECT_FIRST_TIME = "QN=20250523094003516;LOOK_AT_ME=just used for connected socket;ST=32;CN=2011;PW=%s;MN=%s;Flag=5;CP=&&DataTime=20250101000000;w00000-Rtd=0.00;&&4540";

    // 普通红色:"\u001B[31m",普通绿色:"\u001B[32m"
    // 高亮红色,sh脚本中使用 "\033[38;5;196m";
    private static final String RED_HIGH_LIGHT = "\u001B[38;5;196m";
    private static final String RESET = "\u001B[0m";
    private static final String LOG_TAG = "【云平台-现场机-模拟socket报文交互】:";

    // 发送最大间隔(毫秒)
    private static final long MAX_SEND_INTERVAL = 500;
    private volatile boolean responseReceived = false;
    private boolean isFirstSocketMsgFlag = true;
    private boolean hasBuildConnectMsg = false;

    //待发送的报文的数量
    private AtomicInteger socketMsgListTotalCount = null;
    //正式报文已发送的数量
    private AtomicInteger socketMsgListSentCount = null;
    private CountDownLatch latch;

    @Autowired
    private ControllerInvoker controllerInvoker;

    /**
     * 模拟云平台和现场机交互的接口
     * (整个交互的多条报文在一个接口入参中写完,就不用通过socket工具来测试了,
     * 一次采集一套流程好像还有10秒内的限制,通过接口调用就没这些问题了。)
     *
     * @param urlObj             模拟第一次业务触发
     * @param socketMsgList      其他交互报文
     * @param autoUseNowDataTime 是否自动更新DataTime,默认是
     * @return 返回qn
     */
    public Map<Object, Object> doTestAll(FirstUrlRequest urlObj, List<String> socketMsgList, Boolean autoUseNowDataTime) {
        //反控的qn
        String qn = null;
        boolean isReverseControlMode = false;
        isFirstSocketMsgFlag = true;
        hasBuildConnectMsg = false;
        socketMsgListSentCount = new AtomicInteger(0);
        socketMsgListTotalCount = new AtomicInteger(socketMsgList.size());
        //记录下qn,返回便于查验
        List<String> qnList = Lists.newArrayList();

        try {
            //单例模式复用socket
            connectToServer();

            if (CollectionUtils.isNotEmpty(socketMsgList)) {

                // 获取输出流(用于发送数据)
                OutputStream out = socket.getOutputStream();
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8));

                // 调用业务上的反控接口
                qn = mockInvokeReverseCtrl(urlObj, socketMsgList, writer, qn);
                // 模拟现场机上传报文
                isReverseControlMode = mockOnSiteMachineSendSocketMsg(socketMsgList, autoUseNowDataTime, qn, qnList, writer);
                // writer.close(); socket也会关闭
            }
            HashMap<Object, Object> qnMap = Maps.newHashMap();
            String key = String.format("请求结束,%s,返回qn列表:", isReverseControlMode ? "当前是反控模式(1个qn)" : "当期是监测数据上传模式");
            qnMap.put(key, qnList);
            return qnMap;
        } catch (IOException e) {
            showLog("连接或通信异常: " + e.getMessage());
            throw new GnException("连接或通信异常: " + e.getMessage());
        } catch (Exception e) {
            throw new GnException("调用失败:" + e.getMessage());
        } finally {
            //disconnect();
        }
    }

    /**
     * 模拟调用业务反控接口
     * @param urlObj
     * @param socketMsgList
     * @param writer
     * @param qn
     * @return
     */
    private String mockInvokeReverseCtrl(FirstUrlRequest urlObj, List<String> socketMsgList, BufferedWriter writer, String qn) {
        //触发第1个接口请求
        if (urlObj != null && StringUtils.isNotBlank(urlObj.getUrl())) {

            //发送建立连接使用
            String msgOne = socketMsgList.get(0);
            // 正则匹配 PW 和 MN 的值
            String pw = extractValue(msgOne, "PW=([^;]+);");
            String mn = extractValue(msgOne, "MN=([^;]+);");

            showLog("取其中1条中的MN = {},PW = {}", mn, pw);

            //这里补充1条
            socketMsgListTotalCount.incrementAndGet();
            hasBuildConnectMsg = true;

            //只接收15天的数据,这里存个10
            //String datetime = DateUtil.format(DateUtil.offsetDay(new Date(),-10), "yyyyMMdd000000");
            //datetime = "20250101000000";
            sendMessage(writer, handleFinalMsg(String.format(BUILD_MN_CONNECT_FIRST_TIME, pw, mn)));

            //还要有首选
            String authorization = HttpServletUtil.getRequest().getHeader("Authorization");
            if (StringUtils.isBlank(authorization)) {
                throw new GnException("请添加Authorization头");
            }
            String result = null;
            try {
                result = waitAndSendMessage(() ->
                                controllerInvoker.invokeController(urlObj.getUrl(),
                                        urlObj.getMethod(),
                                        JSONUtil.toJsonStr(urlObj.getUrlParams()),
                                        authorization)
                        // 会阻塞
                        // HttpConnectionsUtils.requestWithBody(urlObj.getUrl(), urlObj.getMethod(),
                        // null, JSONUtil.toJsonStr(urlObj.getUrlParams()), authorization);
                );
            } catch (Exception e) {
                throw new GnException("反控接口调用失败:" + e.getMessage());
            }
            JSONObject jsonObject = JSONUtil.parseObj(result);
            qn = jsonObject.getStr("data", "");
            if (StringUtils.isBlank(qn)) {
                //这里返回的json,并不是抛出代码异常
                throw new GnException("反控接口调用失败(qn返回空:" + jsonObject.getStr("msg", "") + ")");
            }
        }
        return qn;
    }

    /**
     * 模拟现场机上传报文
     * @param socketMsgList
     * @param autoUseNowDataTime
     * @param qn
     * @param qnList
     * @param writer
     * @return
     */
    private boolean mockOnSiteMachineSendSocketMsg(List<String> socketMsgList, Boolean autoUseNowDataTime, String qn, List<String> qnList, BufferedWriter writer) {
        boolean isReverseControlMode;
        //替换qn 和 DataTime
        String datetime = null;
        if (autoUseNowDataTime == null) {
            autoUseNowDataTime = true;
        }
        if (autoUseNowDataTime) {
            datetime = DateUtil.format(new Date(), "yyyyMMddHHmmss");
        }

        //是否是反控模式
        isReverseControlMode = qn != null;

        //执行其他报文请求
        String firstMsgQn = null;
        for (int i = 0; i < socketMsgList.size(); i++) {
            String msg = socketMsgList.get(i);
            if (isReverseControlMode) {
                //主要为3020提取类
                addQnList(qnList, qn);
                if (msg.startsWith("QN=")) {
                    msg = msg.replaceFirst("QN=[^;]+;", String.format("QN=%s;", qn));
                } else {
                    msg = String.format("QN=%s;", qn) + msg;
                }
            } else {
                if (i == 0 && msg.startsWith("QN=")) {
                    //主要为4099设置类4098切换,这里主要为了给响应的,
                    //已经有了qn的就使用原来的qn,为了测试方便快速入参(10秒内),只要现场机模拟上传的第1条的qn就行(9011)
                    //例如://对应急排水的现场机回复模拟:
                    //在日志中找到  雨水启动应急排水完成:mn:7899871(有mn在线的)
                    //SELECT qn,mn,cn,resp_detail from t_control_record where mn = '7899871' ORDER BY req_time desc;
                    //拿到qn,接口报文列表填:(qn放在第1条9011上就行,注意提前改好PW、MN)
                    // "QN=20250530114643967;ST=91;CN=9011;PW=123456;MN=7899871;Flag=4;CP=&&QnRtn=1&&",
                    // "ST=91;CN=9012;PW=123456;MN=7899871;Flag=4;CP=&&ExeRtn=1&&"
                    firstMsgQn = extractValue(msg, "QN=([^;]+);");
                    qn = firstMsgQn;
                    addQnList(qnList, qn);
                } else {
                    if (firstMsgQn == null) {
                        //监测数据上传,这里重新生成qn
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            showLog("发送线程被中断");
                        }
                        qn = DateUtil.format(new Date(), "yyyyMMddHHmmssSSS");
                        addQnList(qnList, qn);
                    } else {
                        //主要为4099设置类4098切换,这里主要为了给响应的,
                        //已经有了qn的就使用原来的qn,为了测试方便快速入参(10秒内),只要现场机模拟上传的第1条的qn就行(9011)
                        //例如://对应急排水的现场机回复模拟:
                        //在日志中找到  雨水启动应急排水完成:mn:7899871(有mn在线的)
                        //SELECT qn,mn,cn,resp_detail from t_control_record where mn = '7899871' ORDER BY req_time desc;
                        //拿到qn,接口报文列表填:(qn放在第1条9011上就行,注意提前改好PW、MN)
                        // "QN=20250530114643967;ST=91;CN=9011;PW=123456;MN=7899871;Flag=4;CP=&&QnRtn=1&&",
                        // "ST=91;CN=9012;PW=123456;MN=7899871;Flag=4;CP=&&ExeRtn=1&&"
                        qn = firstMsgQn;
                    }
                }
                if (msg.startsWith("QN=")) {
                    msg = msg.replaceFirst("QN=[^;]+;", String.format("QN=%s;", qn));
                } else {
                    msg = String.format("QN=%s;", qn) + msg;
                }
            }
            if (datetime != null && msg.contains("DataTime=")) {
                msg = msg.replaceFirst("DataTime=[^;]+;", String.format("DataTime=%s;", datetime));
            }
            msg = handleFinalMsg(msg);
            String finalMsg = msg;
            waitAndSendMessage(() -> {
                sendMessage(writer, finalMsg);
                return null;
            });
            socketMsgListSentCount.incrementAndGet();
        }
        return isReverseControlMode;
    }

    private static void addQnList(List<String> qnList, String qn) {
        if (!qnList.contains(qn)) {
            qnList.add(qn);
        }
    }

    /**
     * 处理成最终的报文格式
     *
     * @param msg
     * @return
     */
    private static String handleFinalMsg(String msg) {
        //判断是否需要组装成完整的报文
        if (msg.startsWith("QN")) {
            // 找到最后一个 "&&" 的位置
            int index = msg.lastIndexOf("&&");
            if (index > 0 && !msg.endsWith("&&")) {
                //先截断
                msg = msg.substring(0, index + 2); // 保留 "&&
            }
            int length = msg.length();
            String msgStart = String.format("##%04d", length);
            msg = msgStart + msg;
        }
        //这个应该紧跟上面的逻辑,拼接的内容好像不重要
        if (msg.endsWith("&&")) {
            msg += "4540";
        }
        return msg;
    }

    // 提取方法
    private static String extractValue(String input, String regex) {
        Pattern pattern = Pattern.compile(regex);
        Matcher matcher = pattern.matcher(input);
        if (matcher.find()) {
            return matcher.group(1);
        }
        return null;
    }

    /**
     * 给最大等待时间后调用
     *
     * @param callable
     * @param <T>
     */
    private <T> T waitAndSendMessage(Callable<T> callable) {
        try {
            if (isFirstSocketMsgFlag) {
                return callable.call();
            }
            latch = new CountDownLatch(1);
            boolean received = latch.await(MAX_SEND_INTERVAL, TimeUnit.MILLISECONDS);

            //这里是靠latch.countDown();唤醒的
            if (received || responseReceived) {
                // 提前收到响应,
                // 万一socket服务端没有处理好链接数据。
                // 这里如果有必要就记录下 sendMessage 到接收到responseReceived的时间差,做个500毫秒以下的延时。尽量不要太大
                //  try {
                //     Thread.sleep(MAX_SEND_INTERVAL);
                //  } catch (InterruptedException e) {
                //    Thread.currentThread().interrupt();
                //    showLog("发送线程被中断");
                // }
                showLog("提前收到响应(或建立mn链接),立即正式执行http请求或发送下一条报文");
            } else {
                // 超时未收到响应
                showLog("未收到响应(或建立mn链接),等待{}ms后正式执行http请求或发送下一条报文", MAX_SEND_INTERVAL);
            }

            return callable.call();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("等待响应时被中断", e);
        } catch (Exception e) {
            throw new GnException("Callable调用异常" + e.getMessage());
        } finally {
            latch = null;
            responseReceived = false;
        }
        return null;
    }

    // 封装发送方法:添加 \r\n 并刷新
    private void sendMessage(BufferedWriter writer, String message) {
        try {
            // 添加回车换行 ,应该只要 \n
            writer.write(message + "\r\n");
            writer.flush();
            if (!hasBuildConnectMsg || !isFirstSocketMsgFlag) {
                //异步回复太快,日志顺序有点问题
                showLog(String.format("第%d条报文发送完成 {}", socketMsgListSentCount.get() + 1), message);
            }
            isFirstSocketMsgFlag = false;
        } catch (IOException e) {
            showLog("报文发送失败 {}", message, e);
        }
    }

    // 启动一个线程接收服务器响应(可选)
    private void startServerResponseThread(Socket socket) {
        new Thread(() -> {
            try (InputStream in = socket.getInputStream();
                 BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {

                String responseLine;
                while ((responseLine = reader.readLine()) != null) {
                    showLog("收到来自服务器的报文: " + responseLine);
                }
            } catch (IOException e) {
                showLog("接收服务器报文失败: " + e.getMessage());
            }
        }).start();
    }

    /**
     * 单例模式复用socket
     */
    public void connectToServer() {
        if (socket != null && !socket.isClosed()) {
            // 这里为了提升性能
            //|| socket.isConnected()只表示这个 Socket 对象是否曾经成功连接过一次。
            showLog("使用已有的socket连接:{}:{}", socketIP, SOCKET_PORT);
            return;
        }

        synchronized (lock) {
            if (socket != null && !socket.isClosed()) {
                //|| socket.isConnected()只表示这个 Socket 对象是否曾经成功连接过一次。
                showLog("使用已有的socket连接:{}:{}", socketIP, SOCKET_PORT);
                return;
            }
            try {
                // 关闭旧 socket(如果有)
                if (socket != null) {
                    socket.close();
                }
                socket = new Socket(socketIP, SOCKET_PORT);
                // 可以在这里配置 socket 参数,如设置超时时间
                showLog("成功连接到socket服务器:{}:{}", socketIP, SOCKET_PORT);

                SocketMessageReceiver receiver = new SocketMessageReceiver(socket, new MessageHandler() {
                    @Override
                    public void onMessageReceived(String message) {
                        // 从下面的日志来看,发送和回复好像没有关联 实际参考意义不大,而且3条报文,只返回1条(也不是同一个qn的原因)
                        //2025-05-30 10:45:29.863  INFO 24908 --- [nio-8801-exec-1] c.e.ecp.service.common.SocketTestClient  : [4ca705c78b4f4452b885f91d8d1db12d]
                        //第1条报文发送完成 ##0077QN=20250530104529280;ST=91;CN=9011;PW=123456;MN=7899871;Flag=4;CP=&&QnRtn=1&&4540

                        //2025-05-30 10:45:29.293  INFO 24908 --- [     Thread-212] c.e.ecp.service.common.SocketTestClient  : []
                        //收到socket回复消息:##0096QN=20250530104529280;ST=32;CN=3020;PW=123456;MN=7899871;Flag=5;CP=&&PolId=md0501;InfoId=i42002&&3e80,即将执行下一条报文

                        //2025-05-30 10:45:29.863  INFO 24908 --- [nio-8801-exec-1] c.e.ecp.service.common.SocketTestClient  : [4ca705c78b4f4452b885f91d8d1db12d]
                        //未收到响应(或建立mn链接),等待500ms后正式执行http请求或发送下一条报文

                        if (!message.contains("QN=20250523094003516")) {
                            String nextMsgTip = socketMsgListSentCount.get() <= socketMsgListTotalCount.get() ? "即将执行下一条报文" : "报文全部发送完毕";
                            showLog("收到socket回复消息:{},{}", message, nextMsgTip);
                        }
                        if (latch != null) {
                            responseReceived = true;
                            // 唤醒等待线程
                            latch.countDown();
                        }
                    }

                    @Override
                    public void onConnectionClosed() {
                        showLog("socket断开连接");
                        //断开实际上看 running 这里主动调用断开才行,以后如果有其他的场景必要,可以在socket连接断开的时候再连接。
                        //new Thread(() -> connectToServer()).start();
                    }

                    @Override
                    public void onError(Exception e) {
                        showLog("socket连接发生错误", e);
                    }
                });
                receiver.start();
            } catch (IOException e) {
                showLog("socket连接异常", e);
                // 回退状态
                socket = null;
                throw new GnException("socket连接异常:" + e.getMessage());
            }
        }
    }


    public void disconnect() {
        try {
            if (socket != null && !socket.isClosed()) {
                socket.close();
                socket = null;
            }
        } catch (IOException e) {
            throw new GnException("socket关闭异常:" + e.getMessage());
        }
    }

    /**
     * 打印高亮红色日志
     *
     * @param format
     * @param arguments
     */
    private synchronized void showLog(String format, Object... arguments) {
        // 默认: log.info("\n" + format + "\n", arguments);
        log.info("\n" + RED_HIGH_LIGHT + LOG_TAG + format + RESET + "\n", arguments);
    }

    /**
     * 模拟第一次业务触发
     */
    @Data
    public static class FirstUrlRequest {
        private String url;
        private String method;
        private Object urlParams;
    }

    @Data
    public static class SocketAllParams {
        //模拟第一次业务触发
        private FirstUrlRequest firstUrlRequest;
        //其他交互报文
        private List<String> socketMsgList;
        //是否自动更新DataTime,默认是
        private Boolean autoUseNowDataTime;
    }

    interface MessageHandler {
        void onMessageReceived(String message);

        void onConnectionClosed();

        void onError(Exception e);
    }

    static class SocketMessageReceiver {
        private final Socket socket;
        private final MessageHandler handler;
        private volatile boolean running = true;

        public SocketMessageReceiver(Socket socket, MessageHandler handler) {
            this.socket = socket;
            this.handler = handler;
        }

        public void start() {
            new Thread(this::runLoop).start();
        }

        public void stop() {
            running = false;
            try {
                if (!socket.isClosed()) {
                    socket.close();
                }
            } catch (IOException e) {
                handler.onError(e);
            }
        }

        private void runLoop() {
            try (InputStream in = socket.getInputStream();
                 BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {

                String line;
                while (running && (line = reader.readLine()) != null) {
                    handler.onMessageReceived(line);
                }

                handler.onConnectionClosed();

            } catch (IOException e) {
                handler.onError(e);
            }
        }
    }
}

3. ControllerInvoker mock调用http接口代码

ControllerInvoker.java

package cn.jiangjiesheng.code.core.utils;

import cn.jiangjiesheng.code.exception.GnException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.MediaType;
import org.springframework.mock.web.MockHttpServletResponse;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.HandlerAdapter;
import org.springframework.web.servlet.HandlerExecutionChain;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping;
import org.springframework.mock.web.MockHttpServletRequest;

import java.nio.charset.StandardCharsets;

/**
 * http调用Controller http代码接口
 * 依赖
 * <dependency>
 *  <groupId>org.springframework.boot</groupId>
 *   <artifactId>spring-boot-starter-test</artifactId>
 *  <!--  <version>2.0.6.RELEASE</version>-->
 * </dependency>
 */
@Service
public class ControllerInvoker {

    @Autowired
    private RequestMappingHandlerMapping handlerMapping;

    @Autowired
    //有3个
    @Qualifier("requestMappingHandlerAdapter")
    private HandlerAdapter handlerAdapter;

    /**
     * http调用Controller http代码接口,http不能直接调用自身服务的http接口,会阻塞
     * @param uri 不要server.servlet.context-path,示例:/your http api/remoteControl
     * @param method
     * @param jsonBody
     * @param authorization
     * @return
     * @throws Exception
     */
    public String invokeController(String uri, String method,  String jsonBody, String authorization) throws Exception {
        MockHttpServletRequest request = new MockHttpServletRequest();

        request.setRequestURI(uri);
        request.setMethod(method.toUpperCase());
        request.addHeader("Authorization", authorization);
        request.setContentType(MediaType.APPLICATION_JSON_VALUE);
        request.setContent(jsonBody.getBytes(StandardCharsets.UTF_8));
        HandlerExecutionChain chain = handlerMapping.getHandler(request);
        if (chain == null) {
            throw new GnException("没找到对应url: " + uri);
        }
        // 执行 Controller 方法
        MockHttpServletResponse response = new MockHttpServletResponse();
        handlerAdapter.handle(request, response, chain.getHandler());
        return new String(response.getContentAsByteArray(), StandardCharsets.UTF_8);
    }
}
正文到此结束
本文目录