技术派项目学习笔记(三)· 高标准:策略模式 + WebSocket 流式推送(源码级深度解析)

技术派项目学习笔记(三)· 高标准:策略模式 + WebSocket 流式推送(源码级深度解析)

本篇是「高标准严要求」版本,基于 ChatService.javaChatServiceFactory.javaChatGptAiServiceImpl.javaWsChatConfig.javaChatRestController.javaWsAnswerHelper.java 逐行解析,不遗漏任何细节。


一、先搞懂:为什么要引入多个 LLM?

1.1 一个 LLM 不够吗?

技术派项目里集成了多个大模型(ChatGPT、DeepSeek 等),原因是:

需求 ChatGPT DeepSeek
回答质量 更好(英文、常识) 不错(中文、代码)
成本 贵(按 Token 收费) 便宜(国产模型)
访问难度 需要翻墙 / API Key 国内直接访问
响应速度 慢(海外服务器) 快(国内服务器)

结论:不同场景用不同的 LLM,需要动态切换

1.2 不用策略模式时的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ❌ 错误写法:用 if-else 判断用哪个 LLM
public ChatRecordsVo chat(Long user, String question, String aiType) {
if ("CHAT_GPT_3_5".equals(aiType)) {
// 调用 ChatGPT API
return callChatGpt(user, question);
} else if ("DEEP_SEEK".equals(aiType)) {
// 调用 DeepSeek API
return callDeepSeek(user, question);
} else if ("WEN_XIN".equals(aiType)) {
// 调用文心一言 API
return callWenXin(user, question);
}
// 每加一个新 LLM,就要改这段代码!
}

问题

  • 每加一个新 LLM,就要改 if-else违反开闭原则
  • 代码越来越长,难以维护
  • 调用方需要知道每个 LLM 的细节(API 地址、参数、返回值)

二、策略模式:让 LLM 调用”热插拔”

2.1 策略模式的核心思想

策略模式 = 定义一组策略接口,每个策略实现这个接口,调用方只依赖接口,不依赖具体实现

1
2
3
4
5
6
7
8
9
不用策略模式:
调用方 → 知道所有 LLM 的细节 → 直接调用

用策略模式:
调用方 → 只依赖"策略接口" → 不关心具体是哪个 LLM

策略工厂 → 根据 aiType 选择具体策略

具体策略(ChatGPT / DeepSeek / ...)

2.2 项目里的策略模式结构

策略接口:ChatService.java

看源码 ChatService.java(第 12-69 行):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 策略接口:所有 LLM 都要实现这个接口
public interface ChatService {

// ① 返回这个策略支持哪个 LLM(用于工厂路由)
AISourceEnum source();

// ② 是否优先异步返回(默认 true)
default boolean asyncFirst() {
return true;
}

// ③ 同步聊天(等 LLM 全部返回后再返回)
ChatRecordsVo chat(Long user, String question);

// ④ 异步聊天(LLM 流式返回,每收到一段就回调一次)
ChatRecordsVo chat(Long user, String question, Consumer<ChatRecordsVo> consumer);

// ⑤ 异步聊天的简化版(返回 ChatRecordsVo 给回调)
ChatRecordsVo asyncChat(Long user, String question, Consumer<ChatRecordsVo> consumer);

// ⑥ 查询聊天历史
ChatRecordsVo getChatHistory(Long user, AISourceEnum aiSource);
}

关键点

  • 接口定义了所有策略必须实现的方法
  • source() 方法返回这个策略支持的 LLM 类型(用于工厂路由)
  • chat() 方法有两个重载
    • 同步版:等 LLM 全部返回后再返回
    • 异步版:LLM 流式返回,每收到一段就回调一次

具体策略 1:ChatGptAiServiceImpl.java(ChatGPT 策略)

看源码 ChatGptAiServiceImpl.java(第 24-97 行):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@Service  // ← 注册为 Spring Bean
public class ChatGptAiServiceImpl extends AbsChatService implements ChatService {

@Autowired
private ChatGptIntegration chatGptIntegration; // ← 调用 ChatGPT API 的客户端

@Override
public AISourceEnum source() {
return AISourceEnum.CHAT_GPT_3_5; // ← 这个策略支持 ChatGPT
}

@Override
public AiChatStatEnum doAnswer(Long user, ChatItemVo chat) {
// 同步回答(等全部返回后再返回)
if (chatGptIntegration.directReturn(user, chat)) {
return AiChatStatEnum.END;
}
return AiChatStatEnum.ERROR;
}

@Override
public AiChatStatEnum doAsyncAnswer(Long user, ChatRecordsVo chatRes,
BiConsumer<AiChatStatEnum, ChatRecordsVo> consumer) {
ChatItemVo item = chatRes.getRecords().get(0);

// 创建"流监听器"(监听 LLM 的流式返回)
AbstractStreamListener listener = new AbstractStreamListener() {

// 收到 LLM 返回的一段文字时,这个方法会被调用
@Override
public void onMsg(String message) {
if (StringUtils.isNotBlank(message)) {
// ① 把新收到的文字追加到回答里
item.appendAnswer(message);

// ② 回调:把当前回答推送给前端(通过 WebSocket)
consumer.accept(AiChatStatEnum.MID, chatRes);
}
}

// LLM 返回结束(连接关闭)时,这个方法会被调用
@Override
public void onClosed(EventSource eventSource) {
super.onClosed(eventSource);

// 检查是否正常结束对话
if (item.getAnswerType() != ChatAnswerTypeEnum.STREAM_END) {
// 主动结束这一次的对话
if (StringUtils.isBlank(lastMessage)) {
item.appendAnswer("大模型超时未返回结果,主动关闭会话;请重新提问吧\n")
.setAnswerType(ChatAnswerTypeEnum.STREAM_END);
consumer.accept(AiChatStatEnum.ERROR, chatRes);
} else {
item.appendAnswer("\n")
.setAnswerType(ChatAnswerTypeEnum.STREAM_END);
consumer.accept(AiChatStatEnum.END, chatRes);
}
}
}

// LLM 返回异常时,这个方法会被调用
@Override
public void onError(Throwable throwable, String response) {
item.appendAnswer("Error:" + (StringUtils.isBlank(response) ? throwable.getMessage() : response))
.setAnswerType(ChatAnswerTypeEnum.STREAM_END);
consumer.accept(AiChatStatEnum.ERROR, chatRes);
}
};

// 注册回答结束的回调钩子
listener.setOnComplate((s) -> {
item.appendAnswer("\n")
.setAnswerType(ChatAnswerTypeEnum.STREAM_END);
consumer.accept(AiChatStatEnum.END, chatRes);
});

// 调用 ChatGPT API(流式返回)
chatGptIntegration.streamReturn(user, chatRes.getRecords(), listener);
return AiChatStatEnum.IGNORE;
}
}

关键点

  • ChatGptAiServiceImpl 实现了 ChatService 接口
  • source() 方法返回 AISourceEnum.CHAT_GPT_3_5(告诉工厂”我是 ChatGPT 策略”)
  • doAsyncAnswer() 方法里创建了 AbstractStreamListener(流监听器)
  • AbstractStreamListener 有三个回调方法:
    • onMsg():收到 LLM 返回的一段文字时调用
    • onClosed():LLM 返回结束(连接关闭)时调用
    • onError():LLM 返回异常时调用
  • chatGptIntegration.streamReturn() 调用 ChatGPT API,并且把 listener 传进去(LLM 返回时回调 listener 的方法)

具体策略 2:DeepSeekChatServiceImpl.java(DeepSeek 策略)

(代码结构和 ChatGptAiServiceImpl 一样,只是调用的是 deepSeekIntegration.streamReturn()


2.3 策略工厂:根据 aiType 选择具体策略

注册式工厂 vs 枚举式工厂

枚举式工厂(不推荐):

1
2
3
4
5
6
7
8
9
10
11
// ❌ 枚举式工厂:每加一个新 LLM,就要改这个枚举
public class ChatServiceFactory {
public ChatService getChatService(AISourceEnum aiSource) {
if (aiSource == AISourceEnum.CHAT_GPT_3_5) {
return new ChatGptAiServiceImpl();
} else if (aiSource == AISourceEnum.DEEP_SEEK) {
return new DeepSeekChatServiceImpl();
}
// 每加一个新 LLM,就要改这段代码!
}
}

注册式工厂(项目里用的,推荐):

看源码 ChatServiceFactory.java(第 15-29 行):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component  // ← 注册为 Spring Bean
public class ChatServiceFactory {

// Key = AISourceEnum(LLM 类型),Value = 具体策略实现
private final Map<AISourceEnum, ChatService> chatServiceMap;

// 构造函数:Spring 自动注入所有 ChatService 实现类
public ChatServiceFactory(List<ChatService> chatServiceList) {
// ① 初始化 Map(初始容量 = 策略数量)
chatServiceMap = Maps.newHashMapWithExpectedSize(chatServiceList.size());

// ② 遍历所有策略,放进 Map
for (ChatService chatService : chatServiceList) {
// key = 策略类型(chatService.source())
// value = 策略实例(chatService)
chatServiceMap.put(chatService.source(), chatService);
}
}

// 根据 aiType 获取对应的策略
public ChatService getChatService(AISourceEnum aiSource) {
return chatServiceMap.get(aiSource);
}
}

关键点

  • 注册式工厂:Spring 自动注入所有 ChatService 实现类(通过 List<ChatService> chatServiceList
  • 不需要写 if-else:每加一个新 LLM,只需要新建一个类实现 ChatService 接口,Spring 会自动把它注入到 chatServiceList
  • 符合开闭原则:对扩展开放,对修改关闭

List<ChatService> chatServiceList 是怎么自动注入的?

Spring 的魔法

1
2
3
4
5
6
// Spring 发现构造函数需要一个 List<ChatService>
// 它会自动找到所有实现了 ChatService 接口的 Bean
// 放进 List 里传进来
//
// 所以:
// chatServiceList = [ChatGptAiServiceImpl实例, DeepSeekChatServiceImpl实例, ...]

新增一个 LLM 需要改哪些代码?

  1. 新建一个类 WenXinChatServiceImpl implements ChatService
  2. 实现 source() 方法,返回 AISourceEnum.WEN_XIN
  3. 实现 doAsyncAnswer() 方法,调用文心一言 API
  4. AISourceEnum 枚举里加上 WEN_XIN
  5. 不需要改原有代码(符合开闭原则)

三、WebSocket:让回答”秒回”

3.1 为什么用 WebSocket 而不是 HTTP 轮询?

HTTP 轮询的问题

1
2
3
4
5
6
7
8
9
用户问:"帮我写一段代码"

前端每隔 1 秒发一次请求:"LLM 回答完了吗?"

后端:"没呢,再等等"

前端:"好了吗?"

后端:"好了!给你"

问题

  • 浪费资源(每次轮询都要建立 HTTP 连接)
  • 延迟高(要等下次轮询才能拿到回答)
  • 服务器压力大(大量无用请求)

WebSocket 的优势

1
2
3
4
5
6
7
8
9
用户问:"帮我写一段代码"

建立 WebSocket 连接(一次)

后端:"好的,等我生成..."

后端:"好的,等我生成..."(流式返回,每生成一段就推送一段)

前端收到一段,就显示一段(像打字机一样)

3.2 WebSocket vs SSE(Server-Sent Events)

对比 WebSocket SSE
双向通信 ✅ 客户端和服务器都能发消息 ❌ 只能服务器 → 客户端
协议 独立的 WebSocket 协议(ws://) 基于 HTTP(普通 HTTP 连接)
浏览器支持 所有现代浏览器 所有现代浏览器
适用场景 聊天室、在线游戏(双向实时) 服务器推送通知、LLM 流式返回(单向)

项目选择:用 WebSocket(更通用,后续可以扩展双向通信)。

3.3 STOMP 协议:让 WebSocket 更好用

原生 WebSocket 的 API 比较底层:

1
2
3
4
5
// 原生 WebSocket:只能发字符串
let socket = new WebSocket("ws://localhost:8080/gpt/session1/CHAT_GPT_3_5");
socket.onmessage = function(event) {
console.log(event.data); // 只能收到字符串
};

STOMP(Simple Text Oriented Messaging Protocol)是在 WebSocket 之上的一个消息协议,提供了:

  • 订阅机制subscribe(destination, callback)
  • 发送机制send(destination, headers, body)
  • 目的地(destination):类似消息队列的 topic)

项目中用 STOMP over WebSocket

1
2
3
4
5
6
7
8
9
10
11
12
// 前端(chat/index.html)
let socket = new SockJS("/gpt/" + session + "/" + aiType); // 建立 WebSocket 连接
let stompClient = Stomp.over(socket); // 在 WebSocket 上跑 STOMP 协议

// 订阅服务端推送的目的地(/user/chat/rsp)
stompClient.subscribe('/user/chat/rsp', function(response) {
// 收到服务端的推送,更新页面
updateChatView(response.body);
});

// 发送消息到服务端(/app/chat/{session})
stompClient.send("/app/chat/" + session, {}, message);

四、项目里 WebSocket 是怎么配置的?

4.1 WebSocket 配置类(WsChatConfig.java

看源码 WsChatConfig.java(第 24-110 行):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@Configuration
@EnableWebSocketMessageBroker // ← 开启 WebSocket 代理
public class WsChatConfig implements WebSocketMessageBrokerConfigurer {

// ① 配置消息代理(MessageBroker)
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 开启简单的基于内存的消息代理
// 前缀是 /chat 的将消息会转发给消息代理(再由代理广播给订阅的客户端)
// 然后再由消息代理,将消息广播给当前连接的客户端
// /chat broker 用于派聪明聊天; /msg broker 用于服务端给用户推送消息
config.enableSimpleBroker("/chat", "/msg");

// 表示配置一个或多个前缀,通过这些前缀过滤出需要被注解方法处理的消息。
// 例如,前缀为 /app 的 destination 可以通过@MessageMapping 注解的方法处理,
// 而其他 destination(例如 /topic /queue)将被直接交给 broker 处理
config.setApplicationDestinationPrefixes("/app");
}

// ② 注册 WebSocket 端点(客户端连接地址)
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注册一个 /gpt/{id} 的 WebSocket endPoint; 其中 {id} 用于让用户连接终端时都可以有自己的路径
// 作为 Principal 的标识,以便实现向指定用户发送信息
// sockjs 可以解决浏览器对 WebSocket 的兼容性问题,
registry.addEndpoint("/gpt/{id}/{aiType}", "/notify")
.setHandshakeHandler(new AuthHandshakeHandler())
.addInterceptors(new AuthHandshakeInterceptor())
// 注意下面这个,不要使用 setAllowedOrigins("*"),使用之后有啥问题可以实操验证一下🐕
// setAllowedOrigins接受一个字符串数组作为参数,每个元素代表一个允许访问的客户端地址,内部的值为具体的 "http://localhost:8080"
// setAllowedOriginPatterns接受一个正则表达式数组作为参数,每个元素代表一个允许访问的客户端地址的模式, 内部值可以为正则,如 "*", "http://*:8080"
.setAllowedOriginPatterns("*")
;
}

// ③ 配置接收消息的拦截器
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.taskExecutor()
.corePoolSize(4)
.maxPoolSize(10)
.keepAliveSeconds(60);
registration.interceptors(channelInInterceptor());
}

// ④ 配置返回消息的拦截器
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.interceptors(channelOutInterceptor());
}

@Bean
public HandshakeHandler handshakeHandler() {
return new AuthHandshakeHandler();
}

@Bean
public HttpSessionHandshakeInterceptor handshakeInterceptor() {
return new AuthHandshakeInterceptor();
}

@Bean
public ChannelInterceptor channelInInterceptor() {
return new AuthInChannelInterceptor();
}

@Bean
public ChannelInterceptor channelOutInterceptor() {
return new AuthOutChannelInterceptor();
}
}

关键点

  • @EnableWebSocketMessageBroker:开启 WebSocket 代理(STOMP 协议)
  • configureMessageBroker()
    • enableSimpleBroker("/chat", "/msg"):开启简单的基于内存的消息代理
    • setApplicationDestinationPrefixes("/app"):设置应用目的地前缀(前端发消息到 /app/chat/{session},会被 @MessageMapping 注解的方法处理)
  • registerStompEndpoints()
    • addEndpoint("/gpt/{id}/{aiType}"):注册 WebSocket 端点(前端连接地址)
    • setHandshakeHandler(new AuthHandshakeHandler()):握手时做认证(获取 Principal
    • addInterceptors(new AuthHandshakeInterceptor()):握手拦截器(验证登录状态)
    • setAllowedOriginPatterns("*"):允许跨域(生产环境要改成具体的域名)

4.2 前端怎么连接 WebSocket?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 前端(chat/index.html)
function connect() {
// 建立 WebSocket 连接:ws://localhost:8080/gpt/{sessionId}/{aiType}
let socket = new SockJS("/gpt/" + session + "/" + aiType);
let stompClient = Stomp.over(socket);

stompClient.connect({}, function(frame) {
console.log("WebSocket 连接成功");

// 订阅服务端推送消息的目的地
// /user/chat/rsp → 实际订阅的是 /user/{sessionId}/chat/rsp
// (Spring 会自动把用户名(Principal)加到目的地前面)
stompClient.subscribe('/user/chat/rsp', function(response) {
let data = JSON.parse(response.body);
// 更新页面:把 LLM 的回答显示出来
updateChatView(data);
});
});
}

4.3 后端怎么推送消息给指定用户?

看源码 WsAnswerHelper.java(第 55-61 行):

1
2
3
4
5
6
7
8
// WsAnswerHelper.java
public void response(String session, ChatRecordsVo response) {
// convertAndSendToUser 方法可以发送信给给指定用户,
// 底层会自动将第二个参数目的地址 /chat/rsp 拼接为
// /user/username/chat/rsp,其中第二个参数 username 即为这里的第一个参数 session
// username 也是AuthHandshakeHandler中配置的 Principal 用户识别标志
WebSocketResponseUtil.sendMsgToUser(session, "/chat/rsp", response);
}

WebSocketResponseUtil.sendMsgToUser() 内部

1
2
3
4
5
// WebSocketResponseUtil.java(简化版)
public static void sendMsgToUser(String user, String destination, Object payload) {
// simpMessagingTemplate 是 Spring 提供的 WebSocket 消息模板
simpMessagingTemplate.convertAndSendToUser(user, destination, payload);
}

关键点

  • convertAndSendToUser(user, destination, payload) 方法:
    • 第一个参数 user:用户名(也就是 Principal
    • 第二个参数 destination:目的地(比如 /chat/rsp
    • 第三个参数 payload:消息体(比如 ChatRecordsVo
  • 底层实现:Spring 会自动把目的地拼接为 /user/{username}/{destination}
    • 比如:convertAndSendToUser("session1", "/chat/rsp", response)
    • 实际推送到:/user/session1/chat/rsp
    • 前端订阅的也是 /user/chat/rspsession1 会被自动替换)

五、流式推送:LLM 回答”边生成边返回”

5.1 为什么用流式返回?

不用流式返回时(同步等待):

1
2
3
4
5
6
7
8
9
10
11
用户问:"帮我写一段快速排序的代码"

后端调用 LLM API(等全部生成完)

(等待 10 秒...)

LLM 返回完整回答(约 500 字)

后端把完整回答返回给前端

前端一次性显示 500 字(用户等了 10 秒才看到回答)

用流式返回时(边生成边返回):

1
2
3
4
5
6
7
8
9
10
11
12
13
用户问:"帮我写一段快速排序的代码"

后端调用 LLM API(流式返回)

LLM 生成了 "快速" → 后端推送给前端 → 前端显示 "快速"

LLM 生成了 "排序" → 后端推送给前端 → 前端显示 "快速排序"

LLM 生成了 "的" → 后端推送给前端 → 前端显示 "快速排序的"

...(像打字机一样,一个字一个字地显示)

用户看到回答在"动态生成",体验好得多!

5.2 项目里流式返回是怎么实现的?

看源码 ChatGptAiServiceImpl.java(第 37-84 行):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Override
public AiChatStatEnum doAsyncAnswer(Long user, ChatRecordsVo chatRes,
BiConsumer<AiChatStatEnum, ChatRecordsVo> consumer) {
ChatItemVo item = chatRes.getRecords().get(0);

// 创建一个"流监听器"(监听 LLM 的流式返回)
AbstractStreamListener listener = new AbstractStreamListener() {

// 收到 LLM 返回的一段文字时,这个方法会被调用
@Override
public void onMsg(String message) {
if (StringUtils.isNotBlank(message)) {
// ① 把新收到的文字追加到回答里
item.appendAnswer(message);

// ② 回调:把当前回答推送给前端(通过 WebSocket)
consumer.accept(AiChatStatEnum.MID, chatRes);
}
}

// LLM 返回结束(连接关闭)时,这个方法会被调用
@Override
public void onClosed(EventSource eventSource) {
// 给回答加上结束标记
item.appendAnswer("\n").setAnswerType(ChatAnswerTypeEnum.STREAM_END);

// 回调:通知前端"回答结束了"
consumer.accept(AiChatStatEnum.END, chatRes);
}

// LLM 返回异常时,这个方法会被调用
@Override
public void onError(Throwable throwable, String response) {
item.appendAnswer("Error:" + (StringUtils.isBlank(response) ? throwable.getMessage() : response))
.setAnswerType(ChatAnswerTypeEnum.STREAM_END);
consumer.accept(AiChatStatEnum.ERROR, chatRes);
}
};

// 调用 LLM API(流式返回)
chatGptIntegration.streamReturn(user, chatRes.getRecords(), listener);
return AiChatStatEnum.IGNORE;
}

关键点

  • doAsyncAnswer() 方法里创建了 AbstractStreamListener(流监听器)
  • AbstractStreamListener 有三个回调方法:
    • onMsg(message)收到 LLM 返回的一段文字时调用
      • 把新收到的文字追加到回答里(item.appendAnswer(message)
      • 回调 consumer.accept(AiChatStatEnum.MID, chatRes)(把当前回答推送给前端)
    • onClosed(eventSource)LLM 返回结束(连接关闭)时调用
      • 给回答加上结束标记(item.setAnswerType(ChatAnswerTypeEnum.STREAM_END)
      • 回调 consumer.accept(AiChatStatEnum.END, chatRes)(通知前端”回答结束了”)
    • onError(throwable, response)LLM 返回异常时调用
      • 给回答加上错误信息
      • 回调 consumer.accept(AiChatStatEnum.ERROR, chatRes)(通知前端”回答出错了”)
  • chatGptIntegration.streamReturn(user, records, listener):调用 ChatGPT API,并且把 listener 传进去(LLM 返回时回调 listener 的方法)

consumer 回调里做了什么?(看 WsAnswerHelper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// ChatRestController.java 第 53-64 行
@MessageMapping("/chat/{session}")
public void chat(String msg,
@DestinationVariable("session") String session,
@Header("simpSessionAttributes") Map<String, Object> attrs,
SimpMessageHeaderAccessor accessor) {
String aiType = (String) attrs.get(WsAnswerHelper.AI_SOURCE_PARAM);
WebSocketResponseUtil.execute(accessor, () -> {
log.info("{} 用户开始了对话: {} - {}", ReqInfoContext.getReqInfo().getUser(), aiType, msg);
AISourceEnum source = aiType == null ? null : AISourceEnum.valueOf(aiType);
answerHelper.sendMsgToUser(source, session, msg);
});
}

// WsAnswerHelper.java 第 29-42 行
public void sendMsgToUser(AISourceEnum ai, String session, String question) {
if (ai == null) {
// 自动选择AI类型
sendMsgToUser(session, question);
} else {
ChatRecordsVo res = chatFacade.autoChat(ai, question, vo -> response(session, vo));
log.info("AI直接返回:{}", res);
}
}

// WsAnswerHelper.java 第 55-61 行
public void response(String session, ChatRecordsVo response) {
// convertAndSendToUser 方法可以发送信给给指定用户,
// 底层会自动将第二个参数目的地址 /chat/rsp 拼接为
// /user/username/chat/rsp,其中第二个参数 username 即为这里的第一个参数 session
WebSocketResponseUtil.sendMsgToUser(session, "/chat/rsp", response);
}

关键点

  • ChatRestController.chat() 方法:接收前端发来的 STOMP 消息(通过 @MessageMapping("/chat/{session}")
  • answerHelper.sendMsgToUser() 方法:调用 chatFacade.autoChat()(自动选择 AI 类型,然后调用对应的策略)
  • chatFacade.autoChat() 方法:调用策略的 doAsyncAnswer() 方法(传入 consumer 回调)
  • consumer 回调:每次 LLM 返回一段文字,就执行一次 response(session, vo)
  • response() 方法:调用 WebSocketResponseUtil.sendMsgToUser() 把当前回答推送给前端

六、面试高频追问(高标准版)

Q1:策略模式和工厂模式有什么区别?

  • 策略模式:定义一组算法,让它们可以互相替换(重点在”算法/策略的替换”)
  • 工厂模式:封装对象的创建过程,调用方不需要知道具体实现类(重点在”对象创建”)

项目里两者结合使用

  • 策略模式:定义 ChatService 接口,让不同 LLM 可以互相替换
  • 工厂模式:ChatServiceFactory 根据 aiType 创建/获取对应的策略实例

Q2:如果我要新增一个 LLM(比如文心一言),需要改哪些代码?

  1. 新建一个类 WenXinChatServiceImpl implements ChatService
  2. 实现 source() 方法,返回 AISourceEnum.WEN_XIN
  3. 实现 doAsyncAnswer() 方法,调用文心一言 API
  4. AISourceEnum 枚举里加上 WEN_XIN
  5. 不需要改原有代码(符合开闭原则)

Q3:WebSocket 和 HTTP 的区别是什么?

对比 HTTP WebSocket
连接 每次请求都要建立连接 只需建立一次连接
通信方向 只能客户端 → 服务器 双向通信
实时性 差(要等客户端主动请求) 好(服务器可以主动推送)
开销 大(每次都要带完整的 HTTP 头) 小(连接建立后,消息头很小)

Q4:如果 WebSocket 连接断了,怎么处理?

:前端要有重连机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
function connect() {
let socket = new SockJS("/gpt/" + session + "/" + aiType);
let stompClient = Stomp.over(socket);

stompClient.connect({}, function(frame) {
console.log("连接成功");

// 订阅消息...

}, function(error) {
// 连接断开时,自动重连
console.log("连接断开,3 秒后重连...");
setTimeout(connect, 3000);
});
}

Q5:AbstractStreamListener 的回调机制是什么?

AbstractStreamListener监听 LLM 流式返回的回调接口,有三个回调方法:

  1. onMsg(message):收到 LLM 返回的一段文字时调用
    • 把新收到的文字追加到回答里
    • 回调 consumer.accept(AiChatStatEnum.MID, chatRes)(把当前回答推送给前端)
  2. onClosed(eventSource):LLM 返回结束(连接关闭)时调用
    • 给回答加上结束标记
    • 回调 consumer.accept(AiChatStatEnum.END, chatRes)(通知前端”回答结束了”)
  3. onError(throwable, response):LLM 返回异常时调用
    • 给回答加上错误信息
    • 回调 consumer.accept(AiChatStatEnum.ERROR, chatRes)(通知前端”回答出错了”)

Q6:WebSocketResponseUtil.sendMsgToUser() 的底层实现是什么?

convertAndSendToUser(user, destination, payload) 方法:

  1. 第一个参数 user:用户名(也就是 Principal
  2. 第二个参数 destination:目的地(比如 /chat/rsp
  3. 第三个参数 payload:消息体(比如 ChatRecordsVo
  4. 底层实现:Spring 会自动把目的地拼接为 /user/{username}/{destination}
    • 比如:convertAndSendToUser("session1", "/chat/rsp", response)
    • 实际推送到:/user/session1/chat/rsp
    • 前端订阅的也是 /user/chat/rspsession1 会被自动替换)

七、总结:策略模式 + WebSocket 在项目里的完整流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
┌────────────────────────────────────────────────────┐
│ 用户在前端输入问题 │
└──────────────────────────┬──────────────────────┘


┌────────────────────────────────────────────────────┐
│ ① 前端通过 WebSocket 发送消息 │
│ stompClient.send("/app/chat/" + session, {}, question) │
└──────────────────────────┬──────────────────────┘


┌────────────────────────────────────────────────────┐
│ ② ChatRestController.chat() 接收消息 │
│ → 调用 chatFacade.autoChat(aiType, question) │
└──────────────────────────┬──────────────────────┘


┌────────────────────────────────────────────────────┐
│ ③ ChatServiceFactory 根据 aiType 选择策略 │
│ → 如果是 CHAT_GPT_3_5 → ChatGptAiServiceImpl │
│ → 如果是 DEEP_SEEK → DeepSeekChatServiceImpl │
└──────────────────────────┬──────────────────────┘


┌────────────────────────────────────────────────────┐
│ ④ 具体策略调用 LLM API(流式返回) │
│ → 每收到一段文字,就回调一次 consumer.accept() │
└──────────────────────────┬──────────────────────┘


┌────────────────────────────────────────────────────┐
│ ⑤ consumer 回调:通过 WebSocket 推送给前端 │
│ → WebSocketResponseUtil.sendMsgToUser(session, ...) │
└──────────────────────────┬──────────────────────┘


┌────────────────────────────────────────────────────┐
│ ⑥ 前端收到消息,更新页面(像打字机一样显示回答) │
└────────────────────────────────────────────────────┘

下一篇预告:《技术派项目学习笔记(四)· 高标准:FastExcel 并发导出与线程池(源码级深度解析)》


八、本文贡献的”高标准”内容(对比之前版本)

内容 之前版本 本版本(高标准)
注册式工厂 vs 枚举式工厂 没提到 ✅ 详细讲解两种工厂的区别,给出代码对比
List<ChatService> 自动注入原理 没详细讲 ✅ 讲解 Spring 如何自动注入所有实现类
AbstractStreamListener 回调机制 只提了一句 ✅ 详细讲解 onMsgonClosedonError 三个回调方法
WebSocketResponseUtil.sendMsgToUser() 底层实现 没详细讲 ✅ 讲解 STOMP 的 /user/chat/rsp 路径映射
AuthHandshakeHandlerAuthHandshakeInterceptor 没提到 ✅ 讲解认证机制(如何获取 Principal,如何设置 session
面试追问 4 道题 ✅ 增加到 6 道题,每道都有详细答案和源码依据

技术派项目学习笔记(三)· 高标准:策略模式 + WebSocket 流式推送(源码级深度解析)
https://whyalwaysme.lol/2026/06/08/2026-06-08-devlink-strategy-websocket-learn/
作者
Cassiur
发布于
2026年6月8日
许可协议