From 676c76d1c2b8279482ff0ee91b45bd6aa24f5c36 Mon Sep 17 00:00:00 2001 From: wenjinbo <599483010@qq.com> Date: Thu, 18 Sep 2025 12:32:01 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=A2=9E=E5=8A=A0=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E7=BC=93=E5=86=B2=E5=8C=BA=E4=B8=B2=E8=A1=8C?= =?UTF-8?q?=E8=A7=A3=E6=9E=90=EF=BC=8C=E8=B0=83=E6=95=B4=E8=A7=A3=E6=9E=90?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E6=A3=80=E6=B5=8B=E9=A2=91=E6=AC=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bjtds/brichat/entity/dto/PdfTaskDto.java | 5 + .../service/DeepAnalysisQueueService.java | 23 +- .../service/dify/PdfConversionService.java | 4 +- .../service/dify/constants/DifyConstants.java | 1 + .../dify/impl/PdfConversionServiceImpl.java | 7 +- .../task/PdfConversionTaskService.java | 230 +++++++++++++++++- 6 files changed, 252 insertions(+), 18 deletions(-) diff --git a/chat-server/src/main/java/com/bjtds/brichat/entity/dto/PdfTaskDto.java b/chat-server/src/main/java/com/bjtds/brichat/entity/dto/PdfTaskDto.java index 634ae2d..cfdd378 100644 --- a/chat-server/src/main/java/com/bjtds/brichat/entity/dto/PdfTaskDto.java +++ b/chat-server/src/main/java/com/bjtds/brichat/entity/dto/PdfTaskDto.java @@ -49,4 +49,9 @@ public class PdfTaskDto { private Integer fileId; + /** + * 深度解析任务ID(用于关联DeepAnalysisQueueService中的任务) + */ + private String deepAnalysisTaskId; + } \ No newline at end of file diff --git a/chat-server/src/main/java/com/bjtds/brichat/service/DeepAnalysisQueueService.java b/chat-server/src/main/java/com/bjtds/brichat/service/DeepAnalysisQueueService.java index cc70916..a94ed15 100644 --- a/chat-server/src/main/java/com/bjtds/brichat/service/DeepAnalysisQueueService.java +++ b/chat-server/src/main/java/com/bjtds/brichat/service/DeepAnalysisQueueService.java @@ -223,13 +223,16 @@ public class DeepAnalysisQueueService { request.setParentId(parentId); request.setAnalysisStrategyType(analysisStrategyType); - // 调用PDF转换服务进行深度解析 - pdfConversionService.handlePdfConversion(request, wrappedFile, fileId); + // 调用PDF转换服务进行深度解析,传入深度解析任务ID用于关联 + pdfConversionService.handlePdfConversion(request, wrappedFile, fileId, taskId); - // 更新任务状态为完成 - redisTemplate.opsForHash().put(taskInfoKey, "status", "completed"); + // 更新任务状态为PDF转换中(等待OCR服务处理完成) + redisTemplate.opsForHash().put(taskInfoKey, "status", "pdf_converting"); - logger.info("深度解析任务处理完成,任务ID: {}", taskId); + logger.info("深度解析任务已提交PDF转换,等待OCR处理完成,任务ID: {}", taskId); + + // 注意:此时不从processing集合中移除任务,继续占用槽位 + // 只有当PDF转换真正完成(success/failed/timeout)时才释放槽位 } catch (Exception e) { logger.error("处理深度解析任务失败,任务ID: {}, 错误: {}", taskId, e.getMessage(), e); @@ -250,22 +253,22 @@ public class DeepAnalysisQueueService { logger.error("更新文件状态失败: {}", ex.getMessage(), ex); } - } finally { // 清理临时文件 try { - String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId; Map taskInfo = redisTemplate.opsForHash().entries(taskInfoKey); String tempFilePath = (String) taskInfo.get("tempFilePath"); if (tempFilePath != null) { tempFileStorageService.deleteTempFile(tempFilePath); } - } catch (Exception e) { - logger.error("清理临时文件失败,任务ID: {}, 错误: {}", taskId, e.getMessage()); + } catch (Exception ex) { + logger.error("清理临时文件失败,任务ID: {}, 错误: {}", taskId, ex.getMessage()); } - // 从处理中集合移除任务 + // 异常情况下从处理中集合移除任务,释放槽位 removeFromProcessing(taskId); } + + // 正常情况下不释放槽位,等待PDF转换完成后由PdfConversionTaskService释放 } /** diff --git a/chat-server/src/main/java/com/bjtds/brichat/service/dify/PdfConversionService.java b/chat-server/src/main/java/com/bjtds/brichat/service/dify/PdfConversionService.java index a96f036..e2be8f9 100644 --- a/chat-server/src/main/java/com/bjtds/brichat/service/dify/PdfConversionService.java +++ b/chat-server/src/main/java/com/bjtds/brichat/service/dify/PdfConversionService.java @@ -19,9 +19,11 @@ public interface PdfConversionService { * * @param request 上传请求 * @param file PDF文件 + * @param fileId 文件ID + * @param deepAnalysisTaskId 深度解析任务ID(可选,用于关联深度解析任务) * @return 响应结果 */ - void handlePdfConversion(DocUploadReq request, MultipartFile file,Integer fileId); + void handlePdfConversion(DocUploadReq request, MultipartFile file, Integer fileId, String deepAnalysisTaskId); /** * 调用PDF转换服务 diff --git a/chat-server/src/main/java/com/bjtds/brichat/service/dify/constants/DifyConstants.java b/chat-server/src/main/java/com/bjtds/brichat/service/dify/constants/DifyConstants.java index 674c802..63d6435 100644 --- a/chat-server/src/main/java/com/bjtds/brichat/service/dify/constants/DifyConstants.java +++ b/chat-server/src/main/java/com/bjtds/brichat/service/dify/constants/DifyConstants.java @@ -15,6 +15,7 @@ public final class DifyConstants { public static final String PDF_TASK_REDIS_KEY = "pdf:conversion:tasks"; public static final String PDF_TASK_LIST_SUFFIX = ":list"; public static final int REDIS_EXPIRE_HOURS = 24; + public static final int PDF_TASK_EXPIRE_MINUTES = 10; // PDF转换任务10分钟超时 // ======== 文件类型常量 ======== public static final String FILE_TYPE_DOC = "doc"; diff --git a/chat-server/src/main/java/com/bjtds/brichat/service/dify/impl/PdfConversionServiceImpl.java b/chat-server/src/main/java/com/bjtds/brichat/service/dify/impl/PdfConversionServiceImpl.java index 45db989..1665166 100644 --- a/chat-server/src/main/java/com/bjtds/brichat/service/dify/impl/PdfConversionServiceImpl.java +++ b/chat-server/src/main/java/com/bjtds/brichat/service/dify/impl/PdfConversionServiceImpl.java @@ -50,7 +50,7 @@ public class PdfConversionServiceImpl implements PdfConversionService { private String pdfConversionServiceUrl; @Override - public void handlePdfConversion(DocUploadReq request, MultipartFile file,Integer fileId) { + public void handlePdfConversion(DocUploadReq request, MultipartFile file, Integer fileId, String deepAnalysisTaskId) { try { // 调用PDF转换服务 String taskId = callPdfConversionService(file); @@ -68,6 +68,7 @@ public class PdfConversionServiceImpl implements PdfConversionService { .datasetId(request.getDatasetId()) .datasetName(datasetName) .createTime(new Date().getTime()) + .deepAnalysisTaskId(deepAnalysisTaskId) // 关联深度解析任务ID .build(); // 存储到Redis @@ -133,8 +134,8 @@ public class PdfConversionServiceImpl implements PdfConversionService { String hashKey = DifyConstants.PDF_TASK_REDIS_KEY + ":" + pdfTask.getTaskId(); redisTemplate.opsForHash().put(hashKey, "taskInfo", pdfTask); - // 设置过期时间 - redisTemplate.expire(hashKey, DifyConstants.REDIS_EXPIRE_HOURS, TimeUnit.HOURS); + // 设置过期时间(10分钟) + redisTemplate.expire(hashKey, DifyConstants.PDF_TASK_EXPIRE_MINUTES, TimeUnit.MINUTES); // 同时将任务ID加入到任务列表中,便于定时任务扫描 redisTemplate.opsForList().rightPush( diff --git a/chat-server/src/main/java/com/bjtds/brichat/service/task/PdfConversionTaskService.java b/chat-server/src/main/java/com/bjtds/brichat/service/task/PdfConversionTaskService.java index 11bead9..8cb62d4 100644 --- a/chat-server/src/main/java/com/bjtds/brichat/service/task/PdfConversionTaskService.java +++ b/chat-server/src/main/java/com/bjtds/brichat/service/task/PdfConversionTaskService.java @@ -6,6 +6,7 @@ import com.bjtds.brichat.entity.dataset.RetrievalModel; import com.bjtds.brichat.entity.dataset.TDatasetFiles; import com.bjtds.brichat.entity.dto.PdfTaskDto; import com.bjtds.brichat.entity.dto.PdfTaskStatusResponse; +import com.bjtds.brichat.enums.IndexingStatusEnum; import com.bjtds.brichat.service.DatasetFilesService; import com.bjtds.brichat.service.dify.DifyDatasetApiService; import org.slf4j.Logger; @@ -53,8 +54,8 @@ public class PdfConversionTaskService { @Value("${ocr.service.url}") private String pdfConversionServiceUrl; - @Value("${ocr.service.uploadPath}") - private String uploadPath; +// @Value("${ocr.service.uploadPath}") +// private String uploadPath; @Value("${ocr.service.outputPath}") private String outputPath; @@ -63,11 +64,18 @@ public class PdfConversionTaskService { @Resource private DatasetFilesService datasetFilesService; + + // 深度解析任务相关常量 + private static final String DEEP_ANALYSIS_TASK_INFO = "deep_analysis:task_info:"; + private static final String DEEP_ANALYSIS_PROCESSING = "deep_analysis:processing"; + + // PDF任务超时时间(毫秒) + private static final long PDF_TASK_TIMEOUT_MS = 10 * 60 * 1000; // 10分钟 /** - * 定时任务:每10秒检查一次PDF转换任务状态 + * 定时任务:每3秒检查一次PDF转换任务状态 */ - @Scheduled(fixedRate = 3000) // 10秒执行一次 + @Scheduled(fixedRate = 5000) // 5秒执行一次 public void checkPdfConversionTasks() { try { // 获取所有待处理的任务ID @@ -115,6 +123,13 @@ public class PdfConversionTaskService { logger.debug("任务{}信息获取成功: fileId={}, name={}, datasetId={}", taskId, taskInfo.getFileId(), taskInfo.getName(), taskInfo.getDatasetId()); + // 检查任务是否超时 + if (isTaskTimeout(taskInfo)) { + logger.warn("任务{}已超时(超过10分钟),开始处理超时逻辑", taskId); + handleTimeoutTask(taskInfo); + return; + } + // 调用状态查询接口 PdfTaskStatusResponse statusResponse = queryTaskStatus(taskId); @@ -138,6 +153,8 @@ public class PdfConversionTaskService { case "failed": case "failure": logger.error("任务{}执行失败,状态: {}", taskId, statusResponse.getStatus()); + // 更新关联的深度解析任务状态为失败 + updateDeepAnalysisTaskStatus(taskInfo, "failed"); removeTaskFromQueue(taskId); logger.info("=== 任务{}因执行失败被移除 ===", taskId); break; @@ -219,6 +236,9 @@ public class PdfConversionTaskService { } + // 更新关联的深度解析任务状态为完成 + updateDeepAnalysisTaskStatus(taskInfo, "completed"); + // 从队列中移除任务 removeTaskFromQueue(taskId); logger.info("任务{}已完成并从队列中移除", taskId); @@ -449,4 +469,206 @@ public class PdfConversionTaskService { return request; } + + /** + * 检查任务是否超时 + * + * @param taskInfo PDF任务信息 + * @return 是否超时 + */ + private boolean isTaskTimeout(PdfTaskDto taskInfo) { + if (taskInfo.getCreateTime() == null) { + return false; + } + + long currentTime = System.currentTimeMillis(); + long taskCreateTime = taskInfo.getCreateTime(); + long elapsedTime = currentTime - taskCreateTime; + + logger.debug("任务{}运行时长: {}ms, 超时阈值: {}ms", + taskInfo.getTaskId(), elapsedTime, PDF_TASK_TIMEOUT_MS); + + return elapsedTime > PDF_TASK_TIMEOUT_MS; + } + + /** + * 处理超时任务 + * + * @param taskInfo 超时的PDF任务信息 + */ + private void handleTimeoutTask(PdfTaskDto taskInfo) { + String taskId = taskInfo.getTaskId(); + + try { + logger.error("=== PDF转换任务超时 ==="); + logger.error("任务ID: {}", taskId); + logger.error("文件名: {}", taskInfo.getName()); + logger.error("数据集ID: {}", taskInfo.getDatasetId()); + logger.error("创建时间: {}", new java.util.Date(taskInfo.getCreateTime())); + logger.error("运行时长: {}分钟", (System.currentTimeMillis() - taskInfo.getCreateTime()) / (60 * 1000)); + + // 更新关联的深度解析任务状态为超时失败 + updateDeepAnalysisTaskStatus(taskInfo, "failed"); + + + // 更新文件状态为失败 + if (taskInfo.getFileId() != null) { + try { + TDatasetFiles file = datasetFilesService.getFileById(taskInfo.getFileId()); + if (file != null) { + file.setIndexingStatus(IndexingStatusEnum.FAILED.getCode()); + datasetFilesService.updateFile(file); + logger.info("已将文件{}状态更新为FAILED", taskInfo.getFileId()); + } + } catch (Exception e) { + logger.error("更新超时任务的文件状态失败: {}", e.getMessage(), e); + } + } + + // 从队列中移除超时任务 + removeTaskFromQueue(taskId); + logger.info("=== 超时任务{}已被移除 ===", taskId); + + } catch (Exception e) { + logger.error("处理超时任务{}时发生错误: {}", taskId, e.getMessage(), e); + } + } + + /** + * 定时任务:每5分钟清理超时任务 + * 作为兜底机制,防止某些超时任务没有被及时处理 + */ + @Scheduled(fixedRate = 300000) // 5分钟执行一次 + public void cleanupTimeoutTasks() { + try { + logger.debug("开始执行超时任务清理"); + + // 获取所有待处理的任务ID + List taskIds = redisTemplate.opsForList().range(PDF_TASK_REDIS_KEY + ":list", 0, -1); + + if (taskIds == null || taskIds.isEmpty()) { + logger.debug("没有待清理的PDF转换任务"); + return; + } + + int timeoutCount = 0; + for (Object taskIdObj : taskIds) { + String taskId = taskIdObj.toString(); + try { + // 从Redis获取任务信息 + String hashKey = PDF_TASK_REDIS_KEY + ":" + taskId; + PdfTaskDto taskInfo = (PdfTaskDto) redisTemplate.opsForHash().get(hashKey, "taskInfo"); + + if (taskInfo != null && isTaskTimeout(taskInfo)) { + logger.warn("发现超时任务: {}, 开始清理", taskId); + handleTimeoutTask(taskInfo); + timeoutCount++; + } + } catch (Exception e) { + logger.error("清理超时任务{}时发生错误: {}", taskId, e.getMessage(), e); + } + } + + if (timeoutCount > 0) { + logger.info("本次清理了{}个超时任务", timeoutCount); + } else { + logger.debug("本次清理未发现超时任务"); + } + + } catch (Exception e) { + logger.error("定时清理超时任务时发生错误: {}", e.getMessage(), e); + } + } + + /** + * 更新深度解析任务状态 + * + * @param pdfTaskInfo PDF任务信息 + * @param status 新状态 + */ + private void updateDeepAnalysisTaskStatus(PdfTaskDto pdfTaskInfo, String status) { + try { + String deepAnalysisTaskId = pdfTaskInfo.getDeepAnalysisTaskId(); + if (deepAnalysisTaskId == null || deepAnalysisTaskId.trim().isEmpty()) { + logger.debug("PDF任务{}没有关联的深度解析任务,跳过状态更新", pdfTaskInfo.getTaskId()); + return; + } + + String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + deepAnalysisTaskId; + + // 检查深度解析任务是否存在 + if (!redisTemplate.hasKey(taskInfoKey)) { + logger.warn("深度解析任务{}不存在,无法更新状态", deepAnalysisTaskId); + return; + } + + // 更新状态 + redisTemplate.opsForHash().put(taskInfoKey, "status", status); + logger.info("已更新深度解析任务{}状态为: {}, 关联PDF任务: {}", + deepAnalysisTaskId, status, pdfTaskInfo.getTaskId()); + + // 当PDF转换完成(成功、失败或超时)时,释放深度解析处理槽位 + if ("completed".equals(status) || "failed".equals(status) || "timeout".equals(status)) { + releaseDeepAnalysisSlot(deepAnalysisTaskId); + + // 如果是completed状态,还需要清理临时文件 + if ("completed".equals(status)) { + cleanupDeepAnalysisTempFile(taskInfoKey); + } + } + + } catch (Exception e) { + logger.error("更新深度解析任务状态失败,PDF任务ID: {}, 深度解析任务ID: {}, 错误: {}", + pdfTaskInfo.getTaskId(), pdfTaskInfo.getDeepAnalysisTaskId(), e.getMessage(), e); + } + } + + /** + * 释放深度解析处理槽位 + * + * @param deepAnalysisTaskId 深度解析任务ID + */ + private void releaseDeepAnalysisSlot(String deepAnalysisTaskId) { + try { + // 从processing集合中移除任务,释放槽位 + Long removedCount = redisTemplate.opsForSet().remove(DEEP_ANALYSIS_PROCESSING, deepAnalysisTaskId); + + if (removedCount != null && removedCount > 0) { + logger.info("已释放深度解析槽位,任务ID: {}", deepAnalysisTaskId); + + // 记录当前处理中的任务数量 + Long processingCount = redisTemplate.opsForSet().size(DEEP_ANALYSIS_PROCESSING); + logger.info("当前深度解析处理中任务数: {}", processingCount != null ? processingCount : 0); + } else { + logger.debug("深度解析任务{}不在处理集合中,可能已被移除", deepAnalysisTaskId); + } + + } catch (Exception e) { + logger.error("释放深度解析槽位失败,任务ID: {}, 错误: {}", deepAnalysisTaskId, e.getMessage(), e); + } + } + + /** + * 清理深度解析任务的临时文件 + * + * @param taskInfoKey 任务信息键 + */ + private void cleanupDeepAnalysisTempFile(String taskInfoKey) { + try { + Map taskInfo = redisTemplate.opsForHash().entries(taskInfoKey); + String tempFilePath = (String) taskInfo.get("tempFilePath"); + + if (tempFilePath != null && !tempFilePath.trim().isEmpty()) { + // 注意:这里需要调用TempFileStorageService来删除临时文件 + // 但由于没有直接注入该服务,我们通过日志记录,让DeepAnalysisQueueService的定时清理任务处理 + logger.info("标记需要清理的临时文件: {}", tempFilePath); + + // 将临时文件路径设置为空,表示已处理 + redisTemplate.opsForHash().put(taskInfoKey, "tempFilePath", ""); + } + + } catch (Exception e) { + logger.error("清理深度解析临时文件失败,任务键: {}, 错误: {}", taskInfoKey, e.getMessage(), e); + } + } } \ No newline at end of file