fix: 增加文件上传缓冲区串行解析,调整解析状态检测频次

This commit is contained in:
wenjinbo 2025-09-18 12:32:01 +08:00
parent 2ae5f2f4ad
commit 676c76d1c2
6 changed files with 252 additions and 18 deletions

View File

@ -49,4 +49,9 @@ public class PdfTaskDto {
private Integer fileId;
/**
* 深度解析任务ID用于关联DeepAnalysisQueueService中的任务
*/
private String deepAnalysisTaskId;
}

View File

@ -223,13 +223,16 @@ public class DeepAnalysisQueueService {
request.setParentId(parentId);
request.setAnalysisStrategyType(analysisStrategyType);
// 调用PDF转换服务进行深度解析
pdfConversionService.handlePdfConversion(request, wrappedFile, fileId);
// 调用PDF转换服务进行深度解析传入深度解析任务ID用于关联
pdfConversionService.handlePdfConversion(request, wrappedFile, fileId, taskId);
// 更新任务状态为完成
redisTemplate.opsForHash().put(taskInfoKey, "status", "completed");
// 更新任务状态为PDF转换中等待OCR服务处理完成
redisTemplate.opsForHash().put(taskInfoKey, "status", "pdf_converting");
logger.info("深度解析任务处理完成任务ID: {}", taskId);
logger.info("深度解析任务已提交PDF转换等待OCR处理完成任务ID: {}", taskId);
// 注意此时不从processing集合中移除任务继续占用槽位
// 只有当PDF转换真正完成success/failed/timeout时才释放槽位
} catch (Exception e) {
logger.error("处理深度解析任务失败任务ID: {}, 错误: {}", taskId, e.getMessage(), e);
@ -250,22 +253,22 @@ public class DeepAnalysisQueueService {
logger.error("更新文件状态失败: {}", ex.getMessage(), ex);
}
} finally {
// 清理临时文件
try {
String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId;
Map<Object, Object> taskInfo = redisTemplate.opsForHash().entries(taskInfoKey);
String tempFilePath = (String) taskInfo.get("tempFilePath");
if (tempFilePath != null) {
tempFileStorageService.deleteTempFile(tempFilePath);
}
} catch (Exception e) {
logger.error("清理临时文件失败任务ID: {}, 错误: {}", taskId, e.getMessage());
} catch (Exception ex) {
logger.error("清理临时文件失败任务ID: {}, 错误: {}", taskId, ex.getMessage());
}
// 从处理中集合移除任务
// 异常情况下从处理中集合移除任务释放槽位
removeFromProcessing(taskId);
}
// 正常情况下不释放槽位等待PDF转换完成后由PdfConversionTaskService释放
}
/**

View File

@ -19,9 +19,11 @@ public interface PdfConversionService {
*
* @param request 上传请求
* @param file PDF文件
* @param fileId 文件ID
* @param deepAnalysisTaskId 深度解析任务ID可选用于关联深度解析任务
* @return 响应结果
*/
void handlePdfConversion(DocUploadReq request, MultipartFile file,Integer fileId);
void handlePdfConversion(DocUploadReq request, MultipartFile file, Integer fileId, String deepAnalysisTaskId);
/**
* 调用PDF转换服务

View File

@ -15,6 +15,7 @@ public final class DifyConstants {
public static final String PDF_TASK_REDIS_KEY = "pdf:conversion:tasks";
public static final String PDF_TASK_LIST_SUFFIX = ":list";
public static final int REDIS_EXPIRE_HOURS = 24;
public static final int PDF_TASK_EXPIRE_MINUTES = 10; // PDF转换任务10分钟超时
// ======== 文件类型常量 ========
public static final String FILE_TYPE_DOC = "doc";

View File

@ -50,7 +50,7 @@ public class PdfConversionServiceImpl implements PdfConversionService {
private String pdfConversionServiceUrl;
@Override
public void handlePdfConversion(DocUploadReq request, MultipartFile file,Integer fileId) {
public void handlePdfConversion(DocUploadReq request, MultipartFile file, Integer fileId, String deepAnalysisTaskId) {
try {
// 调用PDF转换服务
String taskId = callPdfConversionService(file);
@ -68,6 +68,7 @@ public class PdfConversionServiceImpl implements PdfConversionService {
.datasetId(request.getDatasetId())
.datasetName(datasetName)
.createTime(new Date().getTime())
.deepAnalysisTaskId(deepAnalysisTaskId) // 关联深度解析任务ID
.build();
// 存储到Redis
@ -133,8 +134,8 @@ public class PdfConversionServiceImpl implements PdfConversionService {
String hashKey = DifyConstants.PDF_TASK_REDIS_KEY + ":" + pdfTask.getTaskId();
redisTemplate.opsForHash().put(hashKey, "taskInfo", pdfTask);
// 设置过期时间
redisTemplate.expire(hashKey, DifyConstants.REDIS_EXPIRE_HOURS, TimeUnit.HOURS);
// 设置过期时间10分钟
redisTemplate.expire(hashKey, DifyConstants.PDF_TASK_EXPIRE_MINUTES, TimeUnit.MINUTES);
// 同时将任务ID加入到任务列表中便于定时任务扫描
redisTemplate.opsForList().rightPush(

View File

@ -6,6 +6,7 @@ import com.bjtds.brichat.entity.dataset.RetrievalModel;
import com.bjtds.brichat.entity.dataset.TDatasetFiles;
import com.bjtds.brichat.entity.dto.PdfTaskDto;
import com.bjtds.brichat.entity.dto.PdfTaskStatusResponse;
import com.bjtds.brichat.enums.IndexingStatusEnum;
import com.bjtds.brichat.service.DatasetFilesService;
import com.bjtds.brichat.service.dify.DifyDatasetApiService;
import org.slf4j.Logger;
@ -53,8 +54,8 @@ public class PdfConversionTaskService {
@Value("${ocr.service.url}")
private String pdfConversionServiceUrl;
@Value("${ocr.service.uploadPath}")
private String uploadPath;
// @Value("${ocr.service.uploadPath}")
// private String uploadPath;
@Value("${ocr.service.outputPath}")
private String outputPath;
@ -63,11 +64,18 @@ public class PdfConversionTaskService {
@Resource
private DatasetFilesService datasetFilesService;
// 深度解析任务相关常量
private static final String DEEP_ANALYSIS_TASK_INFO = "deep_analysis:task_info:";
private static final String DEEP_ANALYSIS_PROCESSING = "deep_analysis:processing";
// PDF任务超时时间毫秒
private static final long PDF_TASK_TIMEOUT_MS = 10 * 60 * 1000; // 10分钟
/**
* 定时任务每10秒检查一次PDF转换任务状态
* 定时任务3秒检查一次PDF转换任务状态
*/
@Scheduled(fixedRate = 3000) // 10秒执行一次
@Scheduled(fixedRate = 5000) // 5秒执行一次
public void checkPdfConversionTasks() {
try {
// 获取所有待处理的任务ID
@ -115,6 +123,13 @@ public class PdfConversionTaskService {
logger.debug("任务{}信息获取成功: fileId={}, name={}, datasetId={}",
taskId, taskInfo.getFileId(), taskInfo.getName(), taskInfo.getDatasetId());
// 检查任务是否超时
if (isTaskTimeout(taskInfo)) {
logger.warn("任务{}已超时超过10分钟开始处理超时逻辑", taskId);
handleTimeoutTask(taskInfo);
return;
}
// 调用状态查询接口
PdfTaskStatusResponse statusResponse = queryTaskStatus(taskId);
@ -138,6 +153,8 @@ public class PdfConversionTaskService {
case "failed":
case "failure":
logger.error("任务{}执行失败,状态: {}", taskId, statusResponse.getStatus());
// 更新关联的深度解析任务状态为失败
updateDeepAnalysisTaskStatus(taskInfo, "failed");
removeTaskFromQueue(taskId);
logger.info("=== 任务{}因执行失败被移除 ===", taskId);
break;
@ -219,6 +236,9 @@ public class PdfConversionTaskService {
}
// 更新关联的深度解析任务状态为完成
updateDeepAnalysisTaskStatus(taskInfo, "completed");
// 从队列中移除任务
removeTaskFromQueue(taskId);
logger.info("任务{}已完成并从队列中移除", taskId);
@ -449,4 +469,206 @@ public class PdfConversionTaskService {
return request;
}
/**
* 检查任务是否超时
*
* @param taskInfo PDF任务信息
* @return 是否超时
*/
private boolean isTaskTimeout(PdfTaskDto taskInfo) {
if (taskInfo.getCreateTime() == null) {
return false;
}
long currentTime = System.currentTimeMillis();
long taskCreateTime = taskInfo.getCreateTime();
long elapsedTime = currentTime - taskCreateTime;
logger.debug("任务{}运行时长: {}ms, 超时阈值: {}ms",
taskInfo.getTaskId(), elapsedTime, PDF_TASK_TIMEOUT_MS);
return elapsedTime > PDF_TASK_TIMEOUT_MS;
}
/**
* 处理超时任务
*
* @param taskInfo 超时的PDF任务信息
*/
private void handleTimeoutTask(PdfTaskDto taskInfo) {
String taskId = taskInfo.getTaskId();
try {
logger.error("=== PDF转换任务超时 ===");
logger.error("任务ID: {}", taskId);
logger.error("文件名: {}", taskInfo.getName());
logger.error("数据集ID: {}", taskInfo.getDatasetId());
logger.error("创建时间: {}", new java.util.Date(taskInfo.getCreateTime()));
logger.error("运行时长: {}分钟", (System.currentTimeMillis() - taskInfo.getCreateTime()) / (60 * 1000));
// 更新关联的深度解析任务状态为超时失败
updateDeepAnalysisTaskStatus(taskInfo, "failed");
// 更新文件状态为失败
if (taskInfo.getFileId() != null) {
try {
TDatasetFiles file = datasetFilesService.getFileById(taskInfo.getFileId());
if (file != null) {
file.setIndexingStatus(IndexingStatusEnum.FAILED.getCode());
datasetFilesService.updateFile(file);
logger.info("已将文件{}状态更新为FAILED", taskInfo.getFileId());
}
} catch (Exception e) {
logger.error("更新超时任务的文件状态失败: {}", e.getMessage(), e);
}
}
// 从队列中移除超时任务
removeTaskFromQueue(taskId);
logger.info("=== 超时任务{}已被移除 ===", taskId);
} catch (Exception e) {
logger.error("处理超时任务{}时发生错误: {}", taskId, e.getMessage(), e);
}
}
/**
* 定时任务每5分钟清理超时任务
* 作为兜底机制防止某些超时任务没有被及时处理
*/
@Scheduled(fixedRate = 300000) // 5分钟执行一次
public void cleanupTimeoutTasks() {
try {
logger.debug("开始执行超时任务清理");
// 获取所有待处理的任务ID
List<Object> taskIds = redisTemplate.opsForList().range(PDF_TASK_REDIS_KEY + ":list", 0, -1);
if (taskIds == null || taskIds.isEmpty()) {
logger.debug("没有待清理的PDF转换任务");
return;
}
int timeoutCount = 0;
for (Object taskIdObj : taskIds) {
String taskId = taskIdObj.toString();
try {
// 从Redis获取任务信息
String hashKey = PDF_TASK_REDIS_KEY + ":" + taskId;
PdfTaskDto taskInfo = (PdfTaskDto) redisTemplate.opsForHash().get(hashKey, "taskInfo");
if (taskInfo != null && isTaskTimeout(taskInfo)) {
logger.warn("发现超时任务: {}, 开始清理", taskId);
handleTimeoutTask(taskInfo);
timeoutCount++;
}
} catch (Exception e) {
logger.error("清理超时任务{}时发生错误: {}", taskId, e.getMessage(), e);
}
}
if (timeoutCount > 0) {
logger.info("本次清理了{}个超时任务", timeoutCount);
} else {
logger.debug("本次清理未发现超时任务");
}
} catch (Exception e) {
logger.error("定时清理超时任务时发生错误: {}", e.getMessage(), e);
}
}
/**
* 更新深度解析任务状态
*
* @param pdfTaskInfo PDF任务信息
* @param status 新状态
*/
private void updateDeepAnalysisTaskStatus(PdfTaskDto pdfTaskInfo, String status) {
try {
String deepAnalysisTaskId = pdfTaskInfo.getDeepAnalysisTaskId();
if (deepAnalysisTaskId == null || deepAnalysisTaskId.trim().isEmpty()) {
logger.debug("PDF任务{}没有关联的深度解析任务,跳过状态更新", pdfTaskInfo.getTaskId());
return;
}
String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + deepAnalysisTaskId;
// 检查深度解析任务是否存在
if (!redisTemplate.hasKey(taskInfoKey)) {
logger.warn("深度解析任务{}不存在,无法更新状态", deepAnalysisTaskId);
return;
}
// 更新状态
redisTemplate.opsForHash().put(taskInfoKey, "status", status);
logger.info("已更新深度解析任务{}状态为: {}, 关联PDF任务: {}",
deepAnalysisTaskId, status, pdfTaskInfo.getTaskId());
// 当PDF转换完成成功失败或超时释放深度解析处理槽位
if ("completed".equals(status) || "failed".equals(status) || "timeout".equals(status)) {
releaseDeepAnalysisSlot(deepAnalysisTaskId);
// 如果是completed状态还需要清理临时文件
if ("completed".equals(status)) {
cleanupDeepAnalysisTempFile(taskInfoKey);
}
}
} catch (Exception e) {
logger.error("更新深度解析任务状态失败PDF任务ID: {}, 深度解析任务ID: {}, 错误: {}",
pdfTaskInfo.getTaskId(), pdfTaskInfo.getDeepAnalysisTaskId(), e.getMessage(), e);
}
}
/**
* 释放深度解析处理槽位
*
* @param deepAnalysisTaskId 深度解析任务ID
*/
private void releaseDeepAnalysisSlot(String deepAnalysisTaskId) {
try {
// 从processing集合中移除任务释放槽位
Long removedCount = redisTemplate.opsForSet().remove(DEEP_ANALYSIS_PROCESSING, deepAnalysisTaskId);
if (removedCount != null && removedCount > 0) {
logger.info("已释放深度解析槽位任务ID: {}", deepAnalysisTaskId);
// 记录当前处理中的任务数量
Long processingCount = redisTemplate.opsForSet().size(DEEP_ANALYSIS_PROCESSING);
logger.info("当前深度解析处理中任务数: {}", processingCount != null ? processingCount : 0);
} else {
logger.debug("深度解析任务{}不在处理集合中,可能已被移除", deepAnalysisTaskId);
}
} catch (Exception e) {
logger.error("释放深度解析槽位失败任务ID: {}, 错误: {}", deepAnalysisTaskId, e.getMessage(), e);
}
}
/**
* 清理深度解析任务的临时文件
*
* @param taskInfoKey 任务信息键
*/
private void cleanupDeepAnalysisTempFile(String taskInfoKey) {
try {
Map<Object, Object> taskInfo = redisTemplate.opsForHash().entries(taskInfoKey);
String tempFilePath = (String) taskInfo.get("tempFilePath");
if (tempFilePath != null && !tempFilePath.trim().isEmpty()) {
// 注意这里需要调用TempFileStorageService来删除临时文件
// 但由于没有直接注入该服务我们通过日志记录让DeepAnalysisQueueService的定时清理任务处理
logger.info("标记需要清理的临时文件: {}", tempFilePath);
// 将临时文件路径设置为空表示已处理
redisTemplate.opsForHash().put(taskInfoKey, "tempFilePath", "");
}
} catch (Exception e) {
logger.error("清理深度解析临时文件失败,任务键: {}, 错误: {}", taskInfoKey, e.getMessage(), e);
}
}
}