KnowFlow 项目学习笔记(一):上传流水线深度解析

写在前面:本学习笔记基于 KnowFlow 项目源码逐行解析,深度讲解文件上传流水线的实现细节(分片上传、断点续传、MinIO 存储、Redis Bitmap 去重)。并指出源码里的 4 处严重问题(❗❗ 标记),给出修复方案。适合面试前深度学习,确保”傻子都能懂”。


一、什么是上传流水线?

傻子都能懂的解释

想象你要上传一个 100MB 的文件到服务器:

  • 传统方式:直接上传整个文件,如果网络断了,得重新上传(从头开始)
  • 分片上传:把文件切成 20 个 5MB 的小块,分别上传,哪个块失败了就重传哪个块(不用重新上传整个文件)

KnowFlow 的上传流水线

  1. 前端:把文件切成 5MB 的小块(分片),逐个上传
  2. 后端:每个分片上传到 MinIO(对象存储),并在 Redis 里记录”这个分片已经上传了”
  3. 合并:所有分片都上传完后,后端把分片合并成完整文件
  4. 断点续传:如果网络断了,前端可以问后端”哪些分片已经上传了”,只上传没上传的分片

二、源码解析:UploadService.java

文件位置PaiSmart-zuzhi/src/main/java/com/yizhaoqi/smartpai/service/UploadService.java

2.1 分片上传:uploadChunk() 方法

源码(第 68-236 行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void uploadChunk(String fileMd5, int chunkIndex, long totalSize, String fileName, 
MultipartFile file, String orgTag, boolean isPublic, String userId) throws IOException {
// 获取文件类型信息
String fileType = getFileType(fileName);
String contentType = file.getContentType();

logger.info("[uploadChunk] 开始处理分片上传请求 => fileMd5: {}, chunkIndex: {}, totalSize: {}, fileName: {}, fileType: {}, contentType: {}, fileSize: {}, orgTag: {}, isPublic: {}, userId: {}",
fileMd5, chunkIndex, totalSize, fileName, fileType, contentType, file.getSize(), orgTag, isPublic, userId);

try {
// 检查 file_upload 表中是否存在该 file_md5
boolean fileExists = fileUploadRepository.findByFileMd5AndUserId(fileMd5, userId).isPresent();
logger.debug("检查文件记录是否存在 => fileMd5: {}, fileName: {}, fileType: {}, exists: {}", fileMd5, fileName, fileType, fileExists);

if (!fileExists) {
logger.info("创建新的文件记录 => fileMd5: {}, fileName: {}, fileType: {}, totalSize: {}, userId: {}, orgTag: {}, isPublic: {}",
fileMd5, fileName, fileType, totalSize, userId, orgTag, isPublic);
// 插入 file_upload 表
FileUpload fileUpload = new FileUpload();
fileUpload.setFileMd5(fileMd5);
fileUpload.setFileName(fileName);
fileUpload.setTotalSize(totalSize);
fileUpload.setStatus(0); // 0 表示上传中
fileUpload.setUserId(userId);
fileUpload.setOrgTag(orgTag);
fileUpload.setPublic(isPublic);
try {
fileUploadRepository.save(fileUpload);
logger.info("文件记录创建成功 => fileMd5: {}, fileName: {}, fileType: {}", fileMd5, fileName, fileType);
} catch (Exception e) {
logger.error("创建文件记录失败 => fileMd5: {}, fileName: {}, fileType: {}, 错误: {}", fileMd5, fileName, fileType, e.getMessage(), e);
throw new RuntimeException("创建文件记录失败: " + e.getMessage(), e);
}
}

// 检查分片是否已经上传
boolean chunkUploaded = isChunkUploaded(fileMd5, chunkIndex, userId);
// ...
} catch (Exception e) {
logger.error("分片上传过程中发生错误 => fileMd5: {}, fileName: {}, fileType: {}, chunkIndex: {}, 错误类型: {}, 错误信息: {}",
fileMd5, fileName, fileType, chunkIndex, e.getClass().getName(), e.getMessage(), e);
throw e;
}
}

逐行解析(傻子都能懂版)

  1. 第 69-75 行:方法参数

    • fileMd5:整个文件的 MD5 值(用来唯一标识一个文件,比如你上传了 简历.pdf,MD5 是 abc123,下次再上传同一个文件,MD5 还是 abc123,就可以秒传)
    • chunkIndex:这是第几个分片(比如第 0 个、第 1 个…)
    • totalSize:文件总大小(用来计算要切成多少片)
    • fileName:文件名(比如 简历.pdf
    • file:分片文件(Spring 的 MultipartFile 对象)
    • orgTag:组织标签(用来做权限控制,比如”技术部”的人只能看技术部的文件)
    • isPublic:是否公开(true = 所有人都能看,false = 只有自己能看)
    • userId:谁上传的
  2. 第 78-101 行:检查文件记录是否存在

    • 去数据库查:SELECT * FROM file_upload WHERE file_md5 = ? AND user_id = ?
    • 如果不存在,插入一条新记录:INSERT INTO file_upload (file_md5, file_name, total_size, status, user_id, org_tag, is_public) VALUES (...)
    • status = 0 表示”上传中”
  3. 第 103-104 行:检查这个分片是否已经上传了

    • 调用 isChunkUploaded() 方法(后面会讲)

为什么要在 Redis 里记录分片状态?

  • 如果每次都去数据库查”这个分片是否已经上传”,性能很差(数据库磁盘 I/O)
  • Redis 是内存数据库,查询速度极快(毫秒级)
  • 用 Redis 的 Bitmap(位图)数据结构:每个分片用一个 bit 表示(0 = 未上传,1 = 已上传),非常节省空间

2.2 Redis Bitmap 记录分片状态

源码(第 344-361 行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean isChunkUploaded(String fileMd5, int chunkIndex, String userId) {
logger.debug("检查分片是否已上传 => fileMd5: {}, chunkIndex: {}, userId: {}", fileMd5, chunkIndex, userId);
try {
if (chunkIndex < 0) {
logger.error("无效的分片索引 => fileMd5: {}, chunkIndex: {}", fileMd5, chunkIndex);
throw new IllegalArgumentException("chunkIndex must be non-negative");
}
String redisKey = "upload:" + userId + ":" + fileMd5;
boolean isUploaded = redisTemplate.opsForValue().getBit(redisKey, chunkIndex);
logger.debug("分片上传状态 => fileMd5: {}, chunkIndex: {}, userId: {}, isUploaded: {}",
fileMd5, chunkIndex, userId, isUploaded);
return isUploaded;
} catch (Exception e) {
logger.error("检查分片上传状态失败 => fileMd5: {}, chunkIndex: {}, userId: {}, 错误: {}",
fileMd5, chunkIndex, userId, e.getMessage(), e);
return false;
}
}

傻子都能懂的解释

什么是 Bitmap?

  • Bitmap 就是”位图”,用一串 0 和 1 表示状态
  • 比如一个文件有 10 个分片,Redis 里存的就是:0000000000(10 个 0)
  • 如果第 3 个分片上传成功了,就变成:0001000000(第 3 位是 1)
  • 如果第 5 个分片也上传成功了,就变成:0001001000

为什么用 Bitmap?

  • 节省空间:每个分片只占用 1 bit(1/8 字节),1000 个分片才占用 125 字节
  • 查询快GETBIT 命令是 O(1) 时间复杂度(瞬间返回)
  • 批量查询快:可以一次性取出所有 bit(后面 getUploadedChunks() 方法会讲)

代码解析

  1. 第 351 行String redisKey = "upload:" + userId + ":" + fileMd5;

    • Redis key 格式:upload:用户ID:文件MD5
    • 比如:upload:1001:abc123
    • 这样每个用户、每个文件都有独立的 Bitmap
  2. 第 352 行boolean isUploaded = redisTemplate.opsForValue().getBit(redisKey, chunkIndex);

    • 调用 Redis 的 GETBIT 命令:GETBIT upload:1001:abc123 3
    • 返回第 3 位的值(0 或 1)

2.3 标记分片为已上传

源码(第 370-385 行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void markChunkUploaded(String fileMd5, int chunkIndex, String userId) {
logger.debug("标记分片为已上传 => fileMd5: {}, chunkIndex: {}, userId: {}", fileMd5, chunkIndex, userId);
try {
if (chunkIndex < 0) {
logger.error("无效的分片索引 => fileMd5: {}, chunkIndex: {}", fileMd5, chunkIndex);
throw new IllegalArgumentException("chunkIndex must be non-negative");
}
String redisKey = "upload:" + userId + ":" + fileMd5;
redisTemplate.opsForValue().setBit(redisKey, chunkIndex, true);
logger.debug("分片已标记为已上传 => fileMd5: {}, chunkIndex: {}, userId: {}", fileMd5, chunkIndex, userId);
} catch (Exception e) {
logger.error("标记分片为已上传失败 => fileMd5: {}, chunkIndex: {}, userId: {}, 错误: {}",
fileMd5, chunkIndex, userId, e.getMessage(), e);
throw new RuntimeException("Failed to mark chunk as uploaded", e);
}
}

代码解析

  1. 第 378 行redisTemplate.opsForValue().setBit(redisKey, chunkIndex, true);
    • 调用 Redis 的 SETBIT 命令:SETBIT upload:1001:abc123 3 1
    • 把第 3 位设为 1(表示已上传)

2.4 上传分片到 MinIO

源码(第 162-202 行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
if (!chunkUploaded) {
// 计算分片的 MD5 值
logger.debug("计算分片MD5 => fileMd5: {}, fileName: {}, chunkIndex: {}", fileMd5, fileName, chunkIndex);
byte[] fileBytes = file.getBytes();
chunkMd5 = DigestUtils.md5Hex(fileBytes);
logger.debug("分片MD5计算完成 => fileMd5: {}, fileName: {}, chunkIndex: {}, chunkMd5: {}",
fileMd5, fileName, chunkIndex, chunkMd5);

// 构建分片的存储路径
storagePath = "chunks/" + fileMd5 + "/" + chunkIndex;
logger.debug("构建分片存储路径 => fileName: {}, path: {}", fileName, storagePath);

try {
// 存储到 MinIO
logger.info("开始上传分片到MinIO => fileMd5: {}, fileName: {}, fileType: {}, chunkIndex: {}, bucket: uploads, path: {}, size: {}, contentType: {}",
fileMd5, fileName, fileType, chunkIndex, storagePath, file.getSize(), contentType);

PutObjectArgs putObjectArgs = PutObjectArgs.builder()
.bucket("uploads")
.object(storagePath)
.stream(file.getInputStream(), file.getSize(), -1)
.contentType(file.getContentType())
.build();

minioClient.putObject(putObjectArgs);
logger.info("分片上传到MinIO成功 => fileMd5: {}, fileName: {}, fileType: {}, chunkIndex: {}", fileMd5, fileName, fileType, chunkIndex);
} catch (Exception e) {
logger.error("分片上传到MinIO失败 => fileMd5: {}, fileName: {}, fileType: {}, chunkIndex: {}, 错误类型: {}, 错误信息: {}",
fileMd5, fileName, fileType, chunkIndex, e.getClass().getName(), e.getMessage(), e);
throw new RuntimeException("上传分片到MinIO失败: " + e.getMessage(), e);
}

// 标记分片已上传
try {
logger.debug("标记分片为已上传 => fileMd5: {}, fileName: {}, chunkIndex: {}", fileMd5, fileName, chunkIndex);
markChunkUploaded(fileMd5, chunkIndex, userId);
logger.debug("分片标记完成 => fileMd5: {}, fileName: {}, chunkIndex: {}", fileMd5, fileName, chunkIndex);
} catch (Exception e) {
logger.error("标记分片已上传失败 => fileMd5: {}, fileName: {}, chunkIndex: {}, 错误: {}",
fileMd5, fileName, chunkIndex, e.getMessage(), e);
// 这里不抛出异常,因为分片已经上传成功,即使标记失败也不影响后续操作
}
}

傻子都能懂的解释

什么是 MinIO?

  • MinIO 是一个开源的对象存储服务(类似阿里云 OSS、腾讯云 COS)
  • 用来存储文件(比如 PDF、图片、视频)
  • 好处:可以部署在本地(不用花钱买云服务)

代码解析

  1. 第 164-169 行:计算分片的 MD5 值

    • DigestUtils.md5Hex(fileBytes):计算分片文件的 MD5(用来校验文件完整性)
    • 比如分片文件内容是 hello,MD5 是 abc123
    • 上传到 MinIO 后,再计算一次 MD5,如果还是 abc123,说明文件没损坏
  2. 第 171-172 行:构建存储路径

    • storagePath = "chunks/" + fileMd5 + "/" + chunkIndex;
    • 比如:chunks/abc123/0(第 0 个分片)、chunks/abc123/1(第 1 个分片)
  3. 第 180-187 行:上传到 MinIO

    • PutObjectArgs.builder():构建上传参数
    • .bucket("uploads"):上传到 uploads 桶(类似文件夹)
    • .object(storagePath):对象名(存储路径)
    • .stream(file.getInputStream(), file.getSize(), -1):文件流
    • minioClient.putObject(putObjectArgs):执行上传
  4. 第 206-213 行:标记分片为已上传

    • 调用 markChunkUploaded() 方法(前面讲过了)
    • 注意:这里如果标记失败,不会抛出异常(因为分片已经上传成功了,只是 Redis 没标记,下次还会重新上传这个分片,浪费带宽但不影响功能)

2.5 合并分片:mergeChunks() 方法

源码(第 541-676 行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
public String mergeChunks(String fileMd5, String fileName, String userId) {
String fileType = getFileType(fileName);
logger.info("开始合并文件分片 => fileMd5: {}, fileName: {}, fileType: {}, userId: {}", fileMd5, fileName, fileType, userId);
try {
// 查询所有分片信息
logger.debug("查询分片信息 => fileMd5: {}, fileName: {}", fileMd5, fileName);
List<ChunkInfo> chunks = chunkInfoRepository.findByFileMd5OrderByChunkIndexAsc(fileMd5);
logger.info("查询到分片信息 => fileMd5: {}, fileName: {}, fileType: {}, 分片数量: {}", fileMd5, fileName, fileType, chunks.size());

// 检查分片数量是否与预期一致
int expectedChunks = getTotalChunks(fileMd5, userId);
if (chunks.size() != expectedChunks) {
logger.error("分片数量不匹配 => fileMd5: {}, fileName: {}, fileType: {}, 期望: {}, 实际: {}",
fileMd5, fileName, fileType, expectedChunks, chunks.size());
throw new RuntimeException(String.format(
"分片数量不匹配,期望: %d, 实际: %d", expectedChunks, chunks.size()));
}

List<String> partPaths = chunks.stream()
.map(ChunkInfo::getStoragePath)
.collect(Collectors.toList());
logger.debug("分片路径列表 => fileMd5: {}, fileName: {}, 路径数量: {}", fileMd5, fileName, partPaths.size());

// 检查每个分片是否存在
logger.info("开始检查每个分片是否存在 => fileMd5: {}, fileName: {}, fileType: {}", fileMd5, fileName, fileType);
for (int i = 0; i < partPaths.size(); i++) {
String path = partPaths.get(i);
try {
StatObjectResponse stat = minioClient.statObject(
StatObjectArgs.builder()
.bucket("uploads")
.object(path)
.build()
);
logger.debug("分片存在 => fileName: {}, index: {}, path: {}, size: {}", fileName, i, path, stat.size());
} catch (Exception e) {
logger.error("分片不存在或无法访问 => fileName: {}, index: {}, path: {}, 错误: {}",
fileName, i, path, e.getMessage(), e);
throw new RuntimeException("分片 " + i + " 不存在或无法访问: " + e.getMessage(), e);
}
}
logger.info("分片检查完成,所有分片都存在 => fileMd5: {}, fileName: {}, fileType: {}", fileMd5, fileName, fileType);

String mergedPath = "merged/" + fileName;
logger.info("开始合并分片 => fileMd5: {}, fileName: {}, fileType: {}, 合并后路径: {}", fileMd5, fileName, fileType, mergedPath);

try {
// 合并分片
List<ComposeSource> sources = partPaths.stream()
.map(path -> ComposeSource.builder().bucket("uploads").object(path).build())
.collect(Collectors.toList());

logger.debug("构建合并请求 => fileMd5: {}, fileName: {}, targetPath: {}, sourcePaths: {}",
fileMd5, fileName, mergedPath, partPaths);

minioClient.composeObject(
ComposeObjectArgs.builder()
.bucket("uploads")
.object(mergedPath)
.sources(sources)
.build()
);
logger.info("分片合并成功 => fileMd5: {}, fileName: {}, fileType: {}, mergedPath: {}", fileMd5, fileName, fileType, mergedPath);

// 检查合并后的文件
StatObjectResponse stat = minioClient.statObject(
StatObjectArgs.builder()
.bucket("uploads")
.object(mergedPath)
.build()
);
logger.info("合并文件信息 => fileMd5: {}, fileName: {}, fileType: {}, path: {}, size: {}", fileMd5, fileName, fileType, mergedPath, stat.size());

// 清理分片文件
logger.info("开始清理分片文件 => fileMd5: {}, fileName: {}, 分片数量: {}", fileMd5, fileName, partPaths.size());
for (String path : partPaths) {
try {
minioClient.removeObject(
RemoveObjectArgs.builder()
.bucket("uploads")
.object(path)
.build()
);
logger.debug("分片文件已删除 => fileName: {}, path: {}", fileName, path);
} catch (Exception e) {
// 记录错误但不中断流程
logger.warn("删除分片文件失败,将继续处理 => fileName: {}, path: {}, 错误: {}", fileName, path, e.getMessage());
}
}
logger.info("分片文件清理完成 => fileMd5: {}, fileName: {}, fileType: {}", fileMd5, fileName, fileType);

// 删除 Redis 中的分片状态记录
logger.info("删除Redis中的分片状态记录 => fileMd5: {}, fileName: {}, userId: {}", fileMd5, fileName, userId);
deleteFileMark(fileMd5, userId);
logger.info("分片状态记录已删除 => fileMd5: {}, fileName: {}, userId: {}", fileMd5, fileName, userId);

// 更新文件状态
logger.info("更新文件状态为已完成 => fileMd5: {}, fileName: {}, fileType: {}, userId: {}", fileMd5, fileName, fileType, userId);
FileUpload fileUpload = fileUploadRepository.findByFileMd5AndUserId(fileMd5, userId)
.orElseThrow(() -> {
logger.error("更新文件状态失败,文件记录不存在 => fileMd5: {}, fileName: {}", fileMd5, fileName);
return new RuntimeException("文件记录不存在: " + fileMd5);
});
fileUpload.setStatus(1); // 已完成
fileUpload.setMergedAt(LocalDateTime.now());
fileUploadRepository.save(fileUpload);
logger.info("文件状态已更新为已完成 => fileMd5: {}, fileName: {}, fileType: {}", fileMd5, fileName, fileType);

// 生成预签名 URL(有效期为 1 小时)
logger.info("开始生成预签名URL => fileMd5: {}, fileName: {}, path: {}", fileMd5, fileName, mergedPath);
String presignedUrl = minioClient.getPresignedObjectUrl(
GetPresignedObjectUrlArgs.builder()
.method(Method.GET)
.bucket("uploads")
.object(mergedPath)
.expiry(1, TimeUnit.HOURS) // 设置有效期为 1 小时
.build()
);
logger.info("预签名URL已生成 => fileMd5: {}, fileName: {}, fileType: {}, URL: {}", fileMd5, fileName, fileType, presignedUrl);

// 文件上传成功,增加计数器
fileUploadCounter.increment();
logger.info("文件上传计数器已递增 => fileMd5: {}, fileName: {}", fileMd5, fileName);

return presignedUrl;
} catch (Exception e) {
logger.error("合并文件失败 => fileMd5: {}, fileName: {}, fileType: {}, 错误类型: {}, 错误信息: {}",
fileMd5, fileName, fileType, e.getClass().getName(), e.getMessage(), e);
throw new RuntimeException("合并文件失败: " + e.getMessage(), e);
}
} catch (Exception e) {
logger.error("文件合并过程中发生错误 => fileMd5: {}, fileName: {}, fileType: {}, 错误类型: {}, 错误信息: {}",
fileMd5, fileName, fileType, e.getClass().getName(), e.getMessage(), e);
throw new RuntimeException("文件合并失败: " + e.getMessage(), e);
}
}

傻子都能懂的解释

合并分片的过程

  1. 从数据库查询所有分片信息(SELECT * FROM chunk_info WHERE file_md5 = ? ORDER BY chunk_index ASC
  2. 检查分片数量是否完整(比如文件有 10 个分片,数据库里必须有 10 条记录)
  3. 检查每个分片文件是否存在于 MinIO(调用 statObject() 方法)
  4. 调用 MinIO 的 composeObject() 方法,把所有分片合并成一个文件
  5. 删除分片文件(节省存储空间)
  6. 删除 Redis 里的分片状态记录(节省内存)
  7. 更新数据库里的文件状态(status = 1 表示”已完成”)
  8. 生成预签名 URL(用来下载文件)

什么是预签名 URL?

  • 预签名 URL 是一个临时下载链接(比如有效期 1 小时)
  • 用户点击这个链接,可以直接下载文件(不需要登录)
  • 好处:不用把文件存在服务器本地,直接从 MinIO 下载(节省服务器带宽)

三、❗❗ 源码里的问题

问题 1:分片上传没有加锁,导致并发问题!

看代码(第 78-101 行)

1
2
3
4
5
boolean fileExists = fileUploadRepository.findByFileMd5AndUserId(fileMd5, userId).isPresent();
if (!fileExists) {
// 插入 file_upload 表
fileUploadRepository.save(fileUpload);
}

场景重现

  1. 线程 A 检查文件记录是否存在:fileExists = false
  2. 线程 B 检查文件记录是否存在:fileExists = false(因为线程 A 还没插入)
  3. 线程 A 插入文件记录:INSERT INTO file_upload ...
  4. 线程 B 插入文件记录:INSERT INTO file_upload ...
  5. 结果:数据库里有两条相同的文件记录(MD5 一样)!

修复方案:加分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void uploadChunk(String fileMd5, int chunkIndex, long totalSize, String fileName, 
MultipartFile file, String orgTag, boolean isPublic, String userId) throws IOException {
String lockKey = "lock:file_upload:" + fileMd5 + ":" + userId;
RLock lock = redisson.getLock(lockKey);
try {
lock.lock(5, TimeUnit.SECONDS); // 获取分布式锁,超时 5 秒

// 检查 file_upload 表中是否存在该 file_md5
boolean fileExists = fileUploadRepository.findByFileMd5AndUserId(fileMd5, userId).isPresent();
if (!fileExists) {
// 插入 file_upload 表
FileUpload fileUpload = new FileUpload();
// ... 设置属性
fileUploadRepository.save(fileUpload);
}

// 后续逻辑...
} finally {
lock.unlock(); // 释放锁
}
}

问题 2:Redis Bitmap 的 key 没有设置过期时间!

看代码(第 377-378 行)

1
2
String redisKey = "upload:" + userId + ":" + fileMd5;
redisTemplate.opsForValue().setBit(redisKey, chunkIndex, true);

问题

  • Redis key 没有设置过期时间(TTL)
  • 如果文件上传完成后,没有调用 deleteFileMark() 方法,这个 key 会一直存在 Redis 里(占用内存)

场景重现

  • 用户上传了 1000 个文件,每个文件的 Bitmap 占用 125 字节(1000 个分片)
  • 如果上传完成后没有清理,1000 个 key 占用 125KB(不多)
  • 但如果有 100 万个用户,每个用户上传 1000 个文件,就会占用 125GB(爆炸)!

修复方案:设置过期时间

1
2
3
4
5
6
7
8
9
10
11
12
13
public void markChunkUploaded(String fileMd5, int chunkIndex, String userId) {
try {
String redisKey = "upload:" + userId + ":" + fileMd5;
redisTemplate.opsForValue().setBit(redisKey, chunkIndex, true);

// 设置过期时间为 7 天(如果 7 天内没有完成上传,自动清理)
redisTemplate.expire(redisKey, 7, TimeUnit.DAYS);
} catch (Exception e) {
logger.error("标记分片为已上传失败 => fileMd5: {}, chunkIndex: {}, userId: {}, 错误: {}",
fileMd5, chunkIndex, userId, e.getMessage(), e);
throw new RuntimeException("Failed to mark chunk as uploaded", e);
}
}

问题 3:合并分片时没有事务,导致数据不一致!

看代码(第 614-647 行)

1
2
3
4
5
6
7
8
9
10
11
// 清理分片文件
for (String path : partPaths) {
minioClient.removeObject(...);
}

// 删除 Redis 中的分片状态记录
deleteFileMark(fileMd5, userId);

// 更新文件状态
fileUpload.setStatus(1);
fileUploadRepository.save(fileUpload);

问题

  • 如果”清理分片文件”成功,但”更新文件状态”失败,会怎么样?
  • 数据库里 status 还是 0(上传中),但分片文件已经被删除了
  • 用户再想上传这个文件,会认为”分片已经上传了”(因为 Redis 里有记录),但实际上 MinIO 里已经没有分片文件了

修复方案:加事务

1
2
3
4
5
6
7
@Transactional(rollbackFor = Exception.class)
public String mergeChunks(String fileMd5, String fileName, String userId) {
// 合并分片逻辑...

// 如果抛出异常,数据库操作会回滚
// 但 MinIO 的文件删除不能回滚(需要考虑补偿机制)
}

更好的方案:先更新数据库,再删除文件(即使删除文件失败,数据库状态是对的)


问题 4:没有限制文件大小,容易被攻击!

看代码(第 68-69 行)

1
2
public void uploadChunk(String fileMd5, int chunkIndex, long totalSize, String fileName, 
MultipartFile file, String orgTag, boolean isPublic, String userId) throws IOException {

问题

  • totalSize 是前端传过来的,没有校验
  • 如果前端传一个 totalSize = 999999999999999(超大文件),后端会接受
  • 如果有人恶意上传超大文件,会占满磁盘空间(DDoS 攻击)

修复方案:限制文件大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void uploadChunk(String fileMd5, int chunkIndex, long totalSize, String fileName, 
MultipartFile file, String orgTag, boolean isPublic, String userId) throws IOException {
// 限制文件大小:最大 100MB
long maxFileSize = 100 * 1024 * 1024; // 100MB
if (totalSize > maxFileSize) {
throw new RuntimeException("文件大小不能超过 100MB");
}

// 限制分片大小:每个分片最大 10MB
if (file.getSize() > 10 * 1024 * 1024) {
throw new RuntimeException("分片大小不能超过 10MB");
}

// 后续逻辑...
}

四、面试八股文

4.1 什么是分片上传?为什么要用分片上传?

  • 分片上传是把大文件切成小块,分别上传
  • 好处:
    1. 断点续传:如果网络断了,只需要重传失败的分片(不用重新上传整个文件)
    2. 并行上传:多个分片可以同时上传(提高上传速度)
    3. 失败重试:某个分片上传失败,只需要重传这个分片

4.2 什么是 Redis Bitmap?为什么要用 Bitmap 记录分片状态?

  • Bitmap 是 Redis 的一种数据结构,用一串 0 和 1 表示状态
  • 好处:
    1. 节省空间:每个分片只占用 1 bit(1/8 字节)
    2. 查询快GETBIT 命令是 O(1) 时间复杂度
    3. 批量查询快:可以一次性取出所有 bit

4.3 什么是 MinIO?为什么要用 MinIO?

  • MinIO 是一个开源的对象存储服务(类似阿里云 OSS)
  • 好处:
    1. 免费:可以部署在本地(不用花钱买云服务)
    2. 高性能:支持分布式部署(可以横向扩展)
    3. 兼容 S3 协议:可以用 AWS S3 的 SDK 访问

4.4 什么是预签名 URL?为什么要用预签名 URL?

  • 预签名 URL 是一个临时下载链接(比如有效期 1 小时)
  • 好处:
    1. 安全性:不需要把文件存在服务器本地(减少服务器被攻击的风险)
    2. 节省带宽:用户直接从 MinIO 下载(不用经过服务器)
    3. 权限控制:可以设置有效期(过期后链接失效)

五、总结

KnowFlow 的上传流水线

  1. 分片上传:把文件切成 5MB 的小块,逐个上传
  2. Redis Bitmap 记录分片状态:每个分片用一个 bit 表示(0 = 未上传,1 = 已上传)
  3. MinIO 存储分片:每个分片上传到 MinIO(对象存储)
  4. 合并分片:所有分片都上传完后,调用 MinIO 的 composeObject() 方法合并
  5. 预签名 URL:生成临时下载链接(有效期 1 小时)

源码里的问题(❗❗)

  1. 分片上传没有加锁,导致并发问题
  2. Redis Bitmap 的 key 没有设置过期时间
  3. 合并分片时没有事务,导致数据不一致
  4. 没有限制文件大小,容易被攻击

修复方案

  1. 加分布式锁(Redisson)
  2. 设置过期时间(redisTemplate.expire()
  3. 加事务(@Transactional
  4. 限制文件大小(后端校验)

下一篇预告:《KnowFlow 项目学习笔记(二):混合检索深度解析》


参考资料


最后更新:2026-06-08 21:45:00


KnowFlow 项目学习笔记(一):上传流水线深度解析
https://whyalwaysme.lol/2026/06/08/2026-06-08-knowflow-upload-pipeline-learn/
作者
Cassiur
发布于
2026年6月8日
许可协议