diff --git a/chat-server/src/main/java/com/bjtds/brichat/service/DeepAnalysisQueueService.java b/chat-server/src/main/java/com/bjtds/brichat/service/DeepAnalysisQueueService.java index 338ded4..e05ea48 100644 --- a/chat-server/src/main/java/com/bjtds/brichat/service/DeepAnalysisQueueService.java +++ b/chat-server/src/main/java/com/bjtds/brichat/service/DeepAnalysisQueueService.java @@ -350,4 +350,107 @@ public class DeepAnalysisQueueService { logger.error("清理过期临时文件时发生错误: {}", e.getMessage(), e); } } + + /** + * 定时清理DEEP_ANALYSIS_PROCESSING队列中的失效任务 + * 每30分钟执行一次,检查处理中的任务是否还有效 + */ + @Scheduled(fixedRate = 1800000) // 30分钟 + public void cleanupInvalidProcessingTasks() { + try { + logger.info("开始清理DEEP_ANALYSIS_PROCESSING队列中的失效任务"); + + // 获取所有正在处理的任务ID + Set processingTaskIds = redisTemplate.opsForSet().members(DEEP_ANALYSIS_PROCESSING); + + if (processingTaskIds == null || processingTaskIds.isEmpty()) { + logger.debug("DEEP_ANALYSIS_PROCESSING队列为空,无需清理"); + return; + } + + int removedCount = 0; + long currentTime = System.currentTimeMillis(); + + for (Object taskIdObj : processingTaskIds) { + if (taskIdObj == null) { + continue; + } + + String taskId = taskIdObj.toString(); + try { + // 检查深度解析任务信息是否还存在 + String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId; + Map taskInfo = redisTemplate.opsForHash().entries(taskInfoKey); + + boolean shouldRemove = false; + String reason = ""; + + if (taskInfo.isEmpty()) { + // 任务信息不存在,应该移除 + shouldRemove = true; + reason = "任务信息不存在"; + } else { + // 检查任务是否已经超时(提交后超过4小时) + Long submitTime = (Long) taskInfo.get("submitTime"); + if (submitTime != null) { + long elapsedTime = currentTime - submitTime; + long timeoutThreshold = 4 * 60 * 60 * 1000; // 4小时 + + if (elapsedTime > timeoutThreshold) { + shouldRemove = true; + reason = String.format("任务超时(运行%d分钟)", elapsedTime / (60 * 1000)); + } + } + + // 检查任务状态是否为已完成的状态(这些状态不应该还在processing队列中) + String status = (String) taskInfo.get("status"); + if ("completed".equals(status) || "failed".equals(status)) { + shouldRemove = true; + reason = "任务状态为" + status + ",不应在处理队列中"; + } + } + + if (shouldRemove) { + // 从processing集合中移除任务 + Long removed = redisTemplate.opsForSet().remove(DEEP_ANALYSIS_PROCESSING, taskId); + if (removed != null && removed > 0) { + removedCount++; + logger.warn("已清理失效的处理中任务,任务ID: {},原因: {}", taskId, reason); + + // 如果任务信息还存在,更新状态为已清理 + if (!taskInfo.isEmpty()) { + redisTemplate.opsForHash().put(taskInfoKey, "status", "cleaned_timeout"); + redisTemplate.opsForHash().put(taskInfoKey, "cleanupReason", reason); + redisTemplate.opsForHash().put(taskInfoKey, "cleanupTime", currentTime); + + // 更新关联文件状态为失败 + try { + Integer fileId = (Integer) taskInfo.get("fileId"); + if (fileId != null) { + updateFileStatus(fileId, IndexingStatusEnum.FAILED.getCode()); + logger.info("已将清理任务{}关联的文件{}状态更新为FAILED", taskId, fileId); + } + } catch (Exception e) { + logger.error("更新清理任务关联文件状态失败: {}", e.getMessage(), e); + } + } + } + } + + } catch (Exception e) { + logger.error("检查处理中任务{}时发生错误: {}", taskId, e.getMessage(), e); + } + } + + if (removedCount > 0) { + Long remainingTasks = redisTemplate.opsForSet().size(DEEP_ANALYSIS_PROCESSING); + logger.info("清理完成,移除了{}个失效任务,剩余处理中任务数: {}", removedCount, remainingTasks); + } else { + logger.debug("未发现需要清理的失效任务"); + } + + } catch (Exception e) { + logger.error("清理DEEP_ANALYSIS_PROCESSING队列中的失效任务时发生错误: {}", e.getMessage(), e); + } + } } \ No newline at end of file