package com.bjtds.brichat.service; import com.bjtds.brichat.entity.dataset.DocUploadReq; import com.bjtds.brichat.entity.dataset.TDatasetFiles; import com.bjtds.brichat.entity.dto.PdfTaskDto; import com.bjtds.brichat.enums.IndexingStatusEnum; import com.bjtds.brichat.service.dify.PdfConversionService; import com.bjtds.brichat.service.dify.constants.DifyConstants; import com.bjtds.brichat.util.MultipartFileWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; import javax.annotation.Resource; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 深度解析队列管理服务 * 控制PDF深度解析的并发数量,避免OCR服务过载 * * @author system */ @Service public class DeepAnalysisQueueService { private static final Logger logger = LoggerFactory.getLogger(DeepAnalysisQueueService.class); // Redis队列相关常量 private static final String DEEP_ANALYSIS_QUEUE = "deep_analysis:queue"; private static final String DEEP_ANALYSIS_PROCESSING = "deep_analysis:processing"; private static final String DEEP_ANALYSIS_TASK_INFO = "deep_analysis:task_info:"; @Resource private RedisTemplate redisTemplate; @Resource private PdfConversionService pdfConversionService; @Resource private DatasetFilesService datasetFilesService; @Resource private TempFileStorageService tempFileStorageService; // 深度解析最大并发数,默认4个 @Value("${deep.analysis.max.concurrent:4}") private int maxConcurrentTasks; // 当前正在处理的任务数量计数器 private final AtomicInteger currentProcessingCount = new AtomicInteger(0); /** * 深度解析任务信息 */ public static class DeepAnalysisTask { private String taskId; private DocUploadReq request; private MultipartFile file; private Integer fileId; private long submitTime; // 构造函数和getter/setter public DeepAnalysisTask(String taskId, DocUploadReq request, MultipartFile file, Integer fileId) { this.taskId = taskId; this.request = request; this.file = file; this.fileId = fileId; this.submitTime = System.currentTimeMillis(); } // getters and setters public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public DocUploadReq getRequest() { return request; } public void setRequest(DocUploadReq request) { this.request = request; } public MultipartFile getFile() { return file; } public void setFile(MultipartFile file) { this.file = file; } public Integer getFileId() { return fileId; } public void setFileId(Integer fileId) { this.fileId = fileId; } public long getSubmitTime() { return submitTime; } public void setSubmitTime(long submitTime) { this.submitTime = submitTime; } } /** * 提交深度解析任务到队列 * * @param request 上传请求 * @param file 文件 * @param fileId 文件ID * @return 任务ID */ public String submitDeepAnalysisTask(DocUploadReq request, MultipartFile file, Integer fileId) { String taskId = UUID.randomUUID().toString(); try { logger.info("提交深度解析任务到队列,任务ID: {}, 文件: {}, 文件ID: {}", taskId, file.getOriginalFilename(), fileId); // 先保存临时文件 String tempFilePath = tempFileStorageService.saveTempFile(file, taskId); // 创建任务信息 Map taskInfo = new HashMap<>(); taskInfo.put("taskId", taskId); taskInfo.put("fileName", file.getOriginalFilename()); taskInfo.put("fileSize", file.getSize()); taskInfo.put("fileId", fileId); taskInfo.put("datasetId", request.getDatasetId()); taskInfo.put("parentId", request.getParentId()); taskInfo.put("analysisStrategyType", request.getAnalysisStrategyType()); taskInfo.put("tempFilePath", tempFilePath); taskInfo.put("submitTime", System.currentTimeMillis()); taskInfo.put("status", "queued"); // 将任务信息存储到Redis Hash String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId; redisTemplate.opsForHash().putAll(taskInfoKey, taskInfo); redisTemplate.expire(taskInfoKey, 24, TimeUnit.HOURS); // 将任务ID添加到队列 redisTemplate.opsForList().rightPush(DEEP_ANALYSIS_QUEUE, taskId); // 更新文件状态为排队中 updateFileStatus(fileId, IndexingStatusEnum.QUEUED.getCode()); logger.info("深度解析任务已加入队列,任务ID: {}, 当前队列长度: {}", taskId, redisTemplate.opsForList().size(DEEP_ANALYSIS_QUEUE)); return taskId; } catch (Exception e) { logger.error("提交深度解析任务失败,任务ID: {}, 错误: {}", taskId, e.getMessage(), e); throw new RuntimeException("提交深度解析任务失败: " + e.getMessage(), e); } } /** * 定时任务:处理队列中的深度解析任务 * 每5秒检查一次队列,如果有空闲位置就处理新任务 */ @Scheduled(fixedRate = 5000) public void processQueuedTasks() { try { // 检查当前正在处理的任务数量 long processingCount = redisTemplate.opsForSet().size(DEEP_ANALYSIS_PROCESSING); if (processingCount >= maxConcurrentTasks) { logger.debug("深度解析任务已达到最大并发数 {}, 当前处理中: {}", maxConcurrentTasks, processingCount); return; } // 计算可以处理的任务数量 long availableSlots = maxConcurrentTasks - processingCount; logger.debug("深度解析队列检查,当前处理中: {}, 可用槽位: {}", processingCount, availableSlots); // 从队列中获取任务并处理 for (int i = 0; i < availableSlots; i++) { String taskId = (String) redisTemplate.opsForList().leftPop(DEEP_ANALYSIS_QUEUE); if (taskId == null) { break; // 队列为空 } logger.info("从队列中获取到深度解析任务: {}", taskId); processTask(taskId); } } catch (Exception e) { logger.error("处理深度解析队列时发生错误: {}", e.getMessage(), e); } } /** * 处理单个深度解析任务 */ @Async public void processTask(String taskId) { try { logger.info("开始处理深度解析任务: {}", taskId); // 将任务添加到处理中集合 redisTemplate.opsForSet().add(DEEP_ANALYSIS_PROCESSING, taskId); redisTemplate.expire(DEEP_ANALYSIS_PROCESSING, 24, TimeUnit.HOURS); // 获取任务信息 String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId; Map taskInfo = redisTemplate.opsForHash().entries(taskInfoKey); if (taskInfo.isEmpty()) { logger.warn("任务信息不存在,任务ID: {}", taskId); removeFromProcessing(taskId); return; } Integer fileId = (Integer) taskInfo.get("fileId"); String fileName = (String) taskInfo.get("fileName"); String datasetId = (String) taskInfo.get("datasetId"); String tempFilePath = (String) taskInfo.get("tempFilePath"); String analysisStrategyType = (String) taskInfo.get("analysisStrategyType"); Long parentId = taskInfo.get("parentId") != null ? Long.valueOf(taskInfo.get("parentId").toString()) : null; logger.info("处理深度解析任务详情 - 任务ID: {}, 文件: {}, 数据集: {}", taskId, fileName, datasetId); // 更新任务状态为处理中 redisTemplate.opsForHash().put(taskInfoKey, "status", "processing"); updateFileStatus(fileId, IndexingStatusEnum.PREPROCESSING.getCode()); // 从临时文件加载MultipartFile MultipartFile file = tempFileStorageService.loadTempFile(tempFilePath); // 由于文件名被哈希化,需要创建一个带有原始文件名的MultipartFile包装器 MultipartFile wrappedFile = new MultipartFileWrapper(file, fileName); // 重新构建请求对象 DocUploadReq request = new DocUploadReq(); request.setDatasetId(datasetId); request.setParentId(parentId); request.setAnalysisStrategyType(analysisStrategyType); // 调用PDF转换服务进行深度解析 pdfConversionService.handlePdfConversion(request, wrappedFile, fileId); // 更新任务状态为完成 redisTemplate.opsForHash().put(taskInfoKey, "status", "completed"); logger.info("深度解析任务处理完成,任务ID: {}", taskId); } catch (Exception e) { logger.error("处理深度解析任务失败,任务ID: {}, 错误: {}", taskId, e.getMessage(), e); // 更新任务状态为失败 String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId; redisTemplate.opsForHash().put(taskInfoKey, "status", "failed"); redisTemplate.opsForHash().put(taskInfoKey, "error", e.getMessage()); // 更新文件状态为失败 try { Map taskInfo = redisTemplate.opsForHash().entries(taskInfoKey); Integer fileId = (Integer) taskInfo.get("fileId"); if (fileId != null) { updateFileStatus(fileId, IndexingStatusEnum.ERROR.getCode()); } } catch (Exception ex) { logger.error("更新文件状态失败: {}", ex.getMessage(), ex); } } finally { // 清理临时文件 try { String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId; Map taskInfo = redisTemplate.opsForHash().entries(taskInfoKey); String tempFilePath = (String) taskInfo.get("tempFilePath"); if (tempFilePath != null) { tempFileStorageService.deleteTempFile(tempFilePath); } } catch (Exception e) { logger.error("清理临时文件失败,任务ID: {}, 错误: {}", taskId, e.getMessage()); } // 从处理中集合移除任务 removeFromProcessing(taskId); } } /** * 从处理中集合移除任务 */ private void removeFromProcessing(String taskId) { try { redisTemplate.opsForSet().remove(DEEP_ANALYSIS_PROCESSING, taskId); currentProcessingCount.decrementAndGet(); logger.info("任务已从处理中集合移除: {}", taskId); } catch (Exception e) { logger.error("移除处理中任务失败: {}", e.getMessage(), e); } } /** * 更新文件状态 */ private void updateFileStatus(Integer fileId, String status) { try { TDatasetFiles file = datasetFilesService.getFileById(fileId); if (file != null) { file.setIndexingStatus(status); datasetFilesService.updateFile(file); logger.debug("文件状态已更新,文件ID: {}, 状态: {}", fileId, status); } } catch (Exception e) { logger.error("更新文件状态失败,文件ID: {}, 状态: {}, 错误: {}", fileId, status, e.getMessage(), e); } } /** * 获取队列状态信息 */ public Map getQueueStatus() { Map status = new HashMap<>(); try { Long queueSize = redisTemplate.opsForList().size(DEEP_ANALYSIS_QUEUE); Long processingSize = redisTemplate.opsForSet().size(DEEP_ANALYSIS_PROCESSING); status.put("queueSize", queueSize != null ? queueSize : 0); status.put("processingSize", processingSize != null ? processingSize : 0); status.put("maxConcurrent", maxConcurrentTasks); status.put("availableSlots", maxConcurrentTasks - (processingSize != null ? processingSize : 0)); } catch (Exception e) { logger.error("获取队列状态失败: {}", e.getMessage(), e); } return status; } /** * 获取任务详情 */ public Map getTaskInfo(String taskId) { String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId; return redisTemplate.opsForHash().entries(taskInfoKey); } /** * 定时清理过期的临时文件 * 每小时执行一次 */ @Scheduled(fixedRate = 3600000) // 1小时 public void cleanupExpiredTempFiles() { try { logger.info("开始清理过期的深度解析临时文件"); tempFileStorageService.cleanupExpiredTempFiles(); // 记录临时目录大小用于监控 long dirSize = tempFileStorageService.getTempDirectorySize(); if (dirSize >= 0) { logger.info("深度解析临时目录大小: {} MB", dirSize / 1024 / 1024); } } catch (Exception e) { logger.error("清理过期临时文件时发生错误: {}", e.getMessage(), e); } } }