原创

Java-代码段-http接口调用自身服务中的其他http接口(mock)-并建立socket连接发送和接收报文实例

1. controller入口

    @ApiOperation("模拟平台端+现场机socket交互过程,需要Authorization")
    @PostMapping(path = "/testSocketBusiness")
    public ResponseBean<HashMap<Object, Object>> testSocketBusiness(@RequestBody SocketTestClient.SocketAllParams socketAllParams) {
        // 建立socket连接  "\n")
        HashMap<Object, Object> res = socketTestClient.doTestAll(socketAllParams.getFirstUrlRequest(), socketAllParams.getSocketMsgList(), socketAllParams.getAutoUseNowDataTime());
        return new ResponseBean<>(200, "请求结束", res);
    }

1.1 接口入参示例及说明

{
    "firstUrlRequestxxxxx": {
        "url": "/your http api/remoteControl",
        "method": "PUT",
        "urlParams": {
            "monitorId": 3319,
            "cn": "3020",
            "polId": "md0501",
            "infoId": "i42002"
        }
    },
    "socketMsgList": [
        "ST=32;CN=3020;PW=123456;MN=887799;Flag=5;CP=&&DataTime=20250529105000;PolId=md0501;DT=201;VaseNo=4;i33022-Info=0;i33028-Info=1&&"
    ],
    "autoUseNowDataTime": true,
    "【示例非必要不改动】反控入参示例(前不要长度、不要QN,后不要结尾,DataTime是否自动更新取决于autoUseNowDataTime)": {
        "firstUrlRequest": {
            "url": "/your http api/remoteControl",
            "method": "PUT",
            "urlParams": {
                "monitorId": 3319,
                "cn": "3020",
                "polId": "md0501",
                "infoId": "i42002"
            }
        },
        "socketMsgList": [
            "ST=91;CN=9011;PW=123456;MN=7899871;Flag=4;CP=&&QnRtn=1&&",
            "ST=32;CN=3020;PW=123456;MN=7899871;Flag=4;CP=&&DataTime=20250528111758;PolId=md0501;i42002-Info=2&&",
            "ST=91;CN=9012;PW=123456;MN=7899871;Flag=4;CP=&&ExeRtn=1&&"
        ],
        "autoUseNowDataTime": true
    },
    "【示例非必要不改动】监测数据入参示例(前不要长度、不要QN,后不要结尾,DataTime是否自动更新取决于autoUseNowDataTime)": {
        "socketMsgList": [
            "ST=32;CN=2011;PW=123456;MN=7899871;Flag=5;CP=&&DataTime=20250527105200;w01018-Rtd=8.8,w00000-Flag=N;w01018-Rtd=444.7,w01018-SampleTime=20250527105100,w01018-Flag=D&&"
        ],
        "autoUseNowDataTime": false
    },
    "一次采集日志:": [
        "tail -200f /home/logs/yourProject/collect/connect.log",
        "tail -200f /home/logs/yourProject/collect/receive.log [重点]",
        "tail -200f /home/logs/yourProject/collect/send.log",
        "tail -200f /home/logs/yourProject/debug.log [重点]",
        "tail -200f /home/logs/yourProject/error.log"
    ]
}

1.2 接口出参示例

{
    "code": 200,
    "msg": "请求结束",
    "data": {
        "请求结束,当期是监测数据上传模式,返回qn列表:": [
            "20250529132932726"
        ]
    },
    "timestamp": "2025-05-29 13:29:36",
    "traceId": "cb9557eaed1543aa8749bf2a545816ef"
}

2. mock test方式调用自身controller下的http接口并建立socket连接发送和接收报文实例

SocketTestClient.java

package cn.jiangjiesheng.code.service.common;

import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONUtil;
import cn.jiangjiesheng.code.core.utils.ControllerInvoker;
import cn.jiangjiesheng.code.core.utils.HttpServletUtil;
import cn.jiangjiesheng.code.core.utils.StringUtils;
import cn.jiangjiesheng.code.exception.GnException;
import com.google.api.client.util.Lists;
import com.google.common.collect.Maps;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;

import java.io.*;
import java.net.Socket;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Component
@Slf4j
public class SocketTestClient {

    // 配置参数:socket服务器IP/端口
    @Value("${ecp-collector.url}")
    private String socketIP;
    private static final int SOCKET_PORT = 16010;

    // 复用socket
    private static volatile Socket socket;
    private static final Object lock = new Object();

    // 建立连接使用,核心是PW=123456;MN=7899871
    private static final String BUILD_MN_CONNECT_FIRST_TIME = "QN=20250523094003516;ST=32;CN=2011;PW=%s;MN=%s;Flag=5;CP=&&DataTime=20250525093100;w00000-Rtd=0.00;&&4540"; // 固定内容
    // 发送间隔(毫秒)
    private static final long SEND_INTERVAL = 500;

    @Autowired
    private ControllerInvoker controllerInvoker;

    /**
     * 模拟云平台和现场机交互的接口
     * (整个交互的多条报文在一个接口入参中写完,就不用通过socket工具来测试了,
     * 一次采集一套流程好像还有10秒内的限制,通过接口调用就没这些问题了。)
     *
     * @param urlObj             模拟第一次业务触发
     * @param socketMsgList      其他交互报文
     * @param autoUseNowDataTime 是否自动更新DataTime,默认是
     * @return 返回qn
     */
    public HashMap<Object, Object> doTestAll(FirstUrlRequest urlObj, List<String> socketMsgList, Boolean autoUseNowDataTime) {
        //反控的qn
        String qn = null;
        boolean isReverseControlMode = false;
        //记录下qn,返回便于查验
        List<String> qnList = Lists.newArrayList();

        try {
            //单例模式复用socket
            connectToServer();

            if (CollectionUtils.isNotEmpty(socketMsgList)) {
                // 获取输出流(用于发送数据)
                OutputStream out = socket.getOutputStream();
                // 字符集
                String CHARSET = "UTF-8";
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out, CHARSET));

                //触发第1个接口请求
                if (urlObj != null && StringUtils.isNotBlank(urlObj.getUrl())) {

                    //发送建立连接使用
                    String msgOne = socketMsgList.get(0);
                    // 正则匹配 PW 和 MN 的值
                    String pw = extractValue(msgOne, "PW=([^;]+);");
                    String mn = extractValue(msgOne, "MN=([^;]+);");
                    log.info("doTestAll,PW = {}", mn);
                    log.info("doTestAll,MN = {}", mn);
                    sendMessage(writer, handleFinalMsg(String.format(BUILD_MN_CONNECT_FIRST_TIME, pw, mn)));

                    //还要有首选
                    String authorization = getRequest().getHeader("Authorization");
                    if (StringUtils.isBlank(authorization)) {
                        throw new GnException("请添加Authorization头");
                    }
                    String result = null;
                    try {
                        try {
                            //这里要等待建立好mn连接,必须,否则会报"链接不可用"
                            Thread.sleep(SEND_INTERVAL);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            throw new GnException("发送线程被中断");
                        }
                        // 直接post get 方式调用会阻塞
                        //  result = HttpConnectionsUtils.requestWithBody(urlObj.getUrl(), urlObj.getMethod(), null, JSONUtil.toJsonStr(urlObj.getUrlParams()), authorization);
                        result = controllerInvoker.invokeController(urlObj.getUrl(), urlObj.getMethod(), JSONUtil.toJsonStr(urlObj.getUrlParams()), authorization);
                    } catch (Exception e) {
                        throw new GnException("当前反控接口调用失败:" + e.getMessage());
                    }
                    qn = JSONUtil.parseObj(result).getStr("data", "");
                    if (StringUtils.isBlank(qn)) {
                        throw new GnException("当前反控接口调用失败");
                    }
                }

                //替换qn 和 DataTime
                String datetime = null;
                if (autoUseNowDataTime == null) {
                    autoUseNowDataTime = true;
                }
                if (autoUseNowDataTime) {
                    datetime = DateUtil.format(new Date(), "yyyyMMddHHmmss");
                }

                //是否是反控模式
                isReverseControlMode = qn != null;

                //执行其他报文请求
                for (String msg : socketMsgList) {
                    if (isReverseControlMode) {
                        addQnList(qnList, qn);
                    } else {
                        //监测数据上传,这里重新生成qn
                        qn = DateUtil.format(new Date(), "yyyyMMddHHmmssSSS");
                        addQnList(qnList, qn);
                    }
                    if (msg.startsWith("QN=")) {
                        msg = msg.replaceFirst("QN=[^;]+;", String.format("QN=%s;", qn));
                    } else {
                        msg = String.format("QN=%s;", qn) + msg;
                    }

                    if (datetime != null && msg.contains("DataTime=")) {
                        msg = msg.replaceFirst("DataTime=[^;]+;", String.format("DataTime=%s;", datetime));
                    }
                    msg = handleFinalMsg(msg);
                    try {
                        Thread.sleep(SEND_INTERVAL);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        log.info("发送线程被中断");
                        break;
                    }
                    sendMessage(writer, msg);
                }
                // writer.close(); socket也会关闭
            }
            HashMap<Object, Object> qnMap = Maps.newHashMap();
            String key = String.format("请求结束,%s,返回qn列表:", isReverseControlMode ? "当前是反控模式(1个qn)" : "当期是监测数据上传模式");
            qnMap.put(key, qnList);
            return qnMap;
        } catch (IOException e) {
            log.info("连接或通信异常: " + e.getMessage());
            throw new GnException("连接或通信异常: " + e.getMessage());
        } catch (Exception e) {
            throw new GnException("当前反控接口调用失败:" + e.getMessage());
        } finally {
            //disconnect();
        }
    }

    private static void addQnList(List<String> qnList, String qn) {
        if (!qnList.contains(qn)) {
            qnList.add(qn);
        }
    }

    /**
     * 处理成最终的报文格式
     *
     * @param msg
     * @return
     */
    private static String handleFinalMsg(String msg) {
        //判断是否需要组装成完整的报文
        if (msg.startsWith("QN")) {
            // 找到最后一个 "&&" 的位置
            int index = msg.lastIndexOf("&&");
            if (index > 0 && !msg.endsWith("&&")) {
                //先截断
                msg = msg.substring(0, index + 2); // 保留 "&&
            }
            int length = msg.length();
            String msgStart = String.format("##%04d", length);
            msg = msgStart + msg;
        }
        //这个应该紧跟上面的逻辑,拼接的内容好像不重要
        if (msg.endsWith("&&")) {
            msg += "4540";
        }
        return msg;
    }

/**
     * 获取当前请求的request对象
     *
     * @author xuyuxiang
     * @date 2020/3/30 15:10
     */
    public static HttpServletRequest getRequest() {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        if (requestAttributes == null) {
            throw new GnException("请求的参数异常");
        } else {
            return requestAttributes.getRequest();
        }
    }

    // 提取方法
    private static String extractValue(String input, String regex) {
        Pattern pattern = Pattern.compile(regex);
        Matcher matcher = pattern.matcher(input);
        if (matcher.find()) {
            return matcher.group(1);
        }
        return null;
    }

    // 封装发送方法:添加 \r\n 并刷新
    private void sendMessage(BufferedWriter writer, String message) {
        try {
            // 添加回车换行 ,应该只要 \n
            writer.write(message + "\r\n");
            writer.flush();
            log.info("doTestAll,已发送报文 {}", message);
        } catch (IOException e) {
            log.info("doTestAll,发送报文失败 {}", message);
        }
    }

    // 启动一个线程接收服务器响应(可选)
    private void startServerResponseThread(Socket socket) {
        new Thread(() -> {
            try (InputStream in = socket.getInputStream();
                 BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {

                String responseLine;
                while ((responseLine = reader.readLine()) != null) {
                    log.info(" 收到来自服务器的报文: " + responseLine);
                }
            } catch (IOException e) {
                log.info(" 接收服务器报文失败: " + e.getMessage());
            }
        }).start();
    }

    /**
     * 单例模式复用socket
     */
    public void connectToServer() {
        if (socket == null || socket.isClosed()) {//|| socket.isConnected()只表示这个 Socket 对象是否曾经成功连接过一次。
            synchronized (lock) {
                if (socket == null || socket.isClosed()) {//|| socket.isConnected()只表示这个 Socket 对象是否曾经成功连接过一次。
                    try {
                        socket = new Socket(socketIP, SOCKET_PORT);
                        // 可以在这里配置 socket 参数,如设置超时时间

                        log.info("成功连接到socket服务器:{}:{}", socketIP, SOCKET_PORT);

                        SocketMessageReceiver receiver = new SocketMessageReceiver(socket, new MessageHandler() {
                            @Override
                            public void onMessageReceived(String message) {
                                log.info("收到socket回复消息:{}", message);
                            }

                            @Override
                            public void onConnectionClosed() {
                                log.info("socket断开连接");
                            }

                            @Override
                            public void onError(Exception e) {
                                log.info("socket连接发生错误", e);
                            }
                        });
                        receiver.start();
                    } catch (IOException e) {
                        throw new GnException("socket连接异常:" + e.getMessage());
                    }
                } else {
                    log.info("socket已连接:{}:{}", socketIP, SOCKET_PORT);
                }
            }
        } else {
            log.info("socket已连接:{}:{}", socketIP, SOCKET_PORT);
        }
    }

    public void disconnect() {
        try {
            if (socket != null && !socket.isClosed()) {
                socket.close();
                socket = null;
            }
        } catch (IOException e) {
            throw new GnException("socket关闭异常:" + e.getMessage());
        }
    }

    /**
     * 模拟第一次业务触发
     */
    @Data
    public static class FirstUrlRequest {
        private String url;
        private String method;
        private Object urlParams;
    }

    @Data
    public static class SocketAllParams {
        //模拟第一次业务触发
        private FirstUrlRequest firstUrlRequest;
        //其他交互报文
        private List<String> socketMsgList;
        //是否自动更新DataTime,默认是
        private Boolean autoUseNowDataTime;
    }

    interface MessageHandler {
        void onMessageReceived(String message);

        void onConnectionClosed();

        void onError(Exception e);
    }

    static class SocketMessageReceiver {
        private final Socket socket;
        private final MessageHandler handler;
        private volatile boolean running = true;

        public SocketMessageReceiver(Socket socket, MessageHandler handler) {
            this.socket = socket;
            this.handler = handler;
        }

        public void start() {
            new Thread(this::runLoop).start();
        }

        public void stop() {
            running = false;
            try {
                if (!socket.isClosed()) {
                    socket.close();
                }
            } catch (IOException e) {
                handler.onError(e);
            }
        }

        private void runLoop() {
            try (InputStream in = socket.getInputStream();
                 BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {

                String line;
                while (running && (line = reader.readLine()) != null) {
                    handler.onMessageReceived(line);
                }

                handler.onConnectionClosed();

            } catch (IOException e) {
                handler.onError(e);
            }
        }
    }
}

3. ControllerInvoker mock调用http接口代码

ControllerInvoker.java

package cn.jiangjiesheng.code.core.utils;

import cn.jiangjiesheng.code.exception.GnException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.MediaType;
import org.springframework.mock.web.MockHttpServletResponse;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.HandlerAdapter;
import org.springframework.web.servlet.HandlerExecutionChain;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping;
import org.springframework.mock.web.MockHttpServletRequest;

import java.nio.charset.StandardCharsets;

/**
 * http调用Controller http代码接口
 * 依赖
 * <dependency>
 *  <groupId>org.springframework.boot</groupId>
 *   <artifactId>spring-boot-starter-test</artifactId>
 *  <!--  <version>2.0.6.RELEASE</version>-->
 * </dependency>
 */
@Service
public class ControllerInvoker {

    @Autowired
    private RequestMappingHandlerMapping handlerMapping;

    @Autowired
    //有3个
    @Qualifier("requestMappingHandlerAdapter")
    private HandlerAdapter handlerAdapter;

    /**
     * http调用Controller http代码接口,http不能直接调用自身服务的http接口,会阻塞
     * @param uri 不要server.servlet.context-path,示例:/your http api/remoteControl
     * @param method
     * @param jsonBody
     * @param authorization
     * @return
     * @throws Exception
     */
    public String invokeController(String uri, String method,  String jsonBody, String authorization) throws Exception {
        MockHttpServletRequest request = new MockHttpServletRequest();

        request.setRequestURI(uri);
        request.setMethod(method.toUpperCase());
        request.addHeader("Authorization", authorization);
        request.setContentType(MediaType.APPLICATION_JSON_VALUE);
        request.setContent(jsonBody.getBytes(StandardCharsets.UTF_8));
        HandlerExecutionChain chain = handlerMapping.getHandler(request);
        if (chain == null) {
            throw new GnException("没找到对应url: " + uri);
        }
        // 执行 Controller 方法
        MockHttpServletResponse response = new MockHttpServletResponse();
        handlerAdapter.handle(request, response, chain.getHandler());
        return new String(response.getContentAsByteArray(), StandardCharsets.UTF_8);
    }
}
正文到此结束
本文目录