ai-manus/chat-server/src/main/java/com/bjtds/brichat/service/DeepAnalysisQueueService.java

456 lines
20 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.bjtds.brichat.service;
import com.bjtds.brichat.entity.dataset.DocUploadReq;
import com.bjtds.brichat.entity.dataset.TDatasetFiles;
import com.bjtds.brichat.entity.dto.PdfTaskDto;
import com.bjtds.brichat.enums.IndexingStatusEnum;
import com.bjtds.brichat.service.dify.PdfConversionService;
import com.bjtds.brichat.service.dify.constants.DifyConstants;
import com.bjtds.brichat.util.MultipartFileWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 深度解析队列管理服务
* 控制PDF深度解析的并发数量避免OCR服务过载
*
* @author system
*/
@Service
public class DeepAnalysisQueueService {
private static final Logger logger = LoggerFactory.getLogger(DeepAnalysisQueueService.class);
// Redis队列相关常量
private static final String DEEP_ANALYSIS_QUEUE = "deep_analysis:queue";
private static final String DEEP_ANALYSIS_PROCESSING = "deep_analysis:processing";
private static final String DEEP_ANALYSIS_TASK_INFO = "deep_analysis:task_info:";
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private PdfConversionService pdfConversionService;
@Resource
private DatasetFilesService datasetFilesService;
@Resource
private TempFileStorageService tempFileStorageService;
// 深度解析最大并发数默认4个
@Value("${deep.analysis.max.concurrent:4}")
private int maxConcurrentTasks;
// 当前正在处理的任务数量计数器
private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
/**
* 深度解析任务信息
*/
public static class DeepAnalysisTask {
private String taskId;
private DocUploadReq request;
private MultipartFile file;
private Integer fileId;
private long submitTime;
// 构造函数和getter/setter
public DeepAnalysisTask(String taskId, DocUploadReq request, MultipartFile file, Integer fileId) {
this.taskId = taskId;
this.request = request;
this.file = file;
this.fileId = fileId;
this.submitTime = System.currentTimeMillis();
}
// getters and setters
public String getTaskId() { return taskId; }
public void setTaskId(String taskId) { this.taskId = taskId; }
public DocUploadReq getRequest() { return request; }
public void setRequest(DocUploadReq request) { this.request = request; }
public MultipartFile getFile() { return file; }
public void setFile(MultipartFile file) { this.file = file; }
public Integer getFileId() { return fileId; }
public void setFileId(Integer fileId) { this.fileId = fileId; }
public long getSubmitTime() { return submitTime; }
public void setSubmitTime(long submitTime) { this.submitTime = submitTime; }
}
/**
* 提交深度解析任务到队列
*
* @param request 上传请求
* @param file 文件
* @param fileId 文件ID
* @return 任务ID
*/
public String submitDeepAnalysisTask(DocUploadReq request, MultipartFile file, Integer fileId) {
String taskId = UUID.randomUUID().toString();
try {
logger.info("提交深度解析任务到队列任务ID: {}, 文件: {}, 文件ID: {}",
taskId, file.getOriginalFilename(), fileId);
// 先保存临时文件
String tempFilePath = tempFileStorageService.saveTempFile(file, taskId);
// 创建任务信息
Map<String, Object> taskInfo = new HashMap<>();
taskInfo.put("taskId", taskId);
taskInfo.put("fileName", file.getOriginalFilename());
taskInfo.put("fileSize", file.getSize());
taskInfo.put("fileId", fileId);
taskInfo.put("datasetId", request.getDatasetId());
taskInfo.put("parentId", request.getParentId());
taskInfo.put("analysisStrategyType", request.getAnalysisStrategyType());
taskInfo.put("tempFilePath", tempFilePath);
taskInfo.put("submitTime", System.currentTimeMillis());
taskInfo.put("status", "queued");
// 将任务信息存储到Redis Hash
String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId;
redisTemplate.opsForHash().putAll(taskInfoKey, taskInfo);
redisTemplate.expire(taskInfoKey, 24, TimeUnit.HOURS);
// 将任务ID添加到队列
redisTemplate.opsForList().rightPush(DEEP_ANALYSIS_QUEUE, taskId);
// 更新文件状态为排队中
updateFileStatus(fileId, IndexingStatusEnum.QUEUED.getCode());
logger.info("深度解析任务已加入队列任务ID: {}, 当前队列长度: {}",
taskId, redisTemplate.opsForList().size(DEEP_ANALYSIS_QUEUE));
return taskId;
} catch (Exception e) {
logger.error("提交深度解析任务失败任务ID: {}, 错误: {}", taskId, e.getMessage(), e);
throw new RuntimeException("提交深度解析任务失败: " + e.getMessage(), e);
}
}
/**
* 定时任务:处理队列中的深度解析任务
* 每5秒检查一次队列如果有空闲位置就处理新任务
*/
@Scheduled(fixedRate = 5000)
public void processQueuedTasks() {
try {
// 检查当前正在处理的任务数量
long processingCount = redisTemplate.opsForSet().size(DEEP_ANALYSIS_PROCESSING);
if (processingCount >= maxConcurrentTasks) {
logger.debug("深度解析任务已达到最大并发数 {}, 当前处理中: {}", maxConcurrentTasks, processingCount);
return;
}
// 计算可以处理的任务数量
long availableSlots = maxConcurrentTasks - processingCount;
logger.debug("深度解析队列检查,当前处理中: {}, 可用槽位: {}", processingCount, availableSlots);
// 从队列中获取任务并处理
for (int i = 0; i < availableSlots; i++) {
String taskId = (String) redisTemplate.opsForList().leftPop(DEEP_ANALYSIS_QUEUE);
if (taskId == null) {
break; // 队列为空
}
logger.info("从队列中获取到深度解析任务: {}", taskId);
processTask(taskId);
}
} catch (Exception e) {
logger.error("处理深度解析队列时发生错误: {}", e.getMessage(), e);
}
}
/**
* 处理单个深度解析任务
*/
@Async
public void processTask(String taskId) {
try {
logger.info("开始处理深度解析任务: {}", taskId);
// 将任务添加到处理中集合
redisTemplate.opsForSet().add(DEEP_ANALYSIS_PROCESSING, taskId);
redisTemplate.expire(DEEP_ANALYSIS_PROCESSING, 24, TimeUnit.HOURS);
// 获取任务信息
String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId;
Map<Object, Object> taskInfo = redisTemplate.opsForHash().entries(taskInfoKey);
if (taskInfo.isEmpty()) {
logger.warn("任务信息不存在任务ID: {}", taskId);
removeFromProcessing(taskId);
return;
}
Integer fileId = (Integer) taskInfo.get("fileId");
String fileName = (String) taskInfo.get("fileName");
String datasetId = (String) taskInfo.get("datasetId");
String tempFilePath = (String) taskInfo.get("tempFilePath");
String analysisStrategyType = (String) taskInfo.get("analysisStrategyType");
Long parentId = taskInfo.get("parentId") != null ? Long.valueOf(taskInfo.get("parentId").toString()) : null;
logger.info("处理深度解析任务详情 - 任务ID: {}, 文件: {}, 数据集: {}", taskId, fileName, datasetId);
// 更新任务状态为处理中
redisTemplate.opsForHash().put(taskInfoKey, "status", "processing");
updateFileStatus(fileId, IndexingStatusEnum.PREPROCESSING.getCode());
// 从临时文件加载MultipartFile
MultipartFile file = tempFileStorageService.loadTempFile(tempFilePath);
// 由于文件名被哈希化需要创建一个带有原始文件名的MultipartFile包装器
MultipartFile wrappedFile = new MultipartFileWrapper(file, fileName);
// 重新构建请求对象
DocUploadReq request = new DocUploadReq();
request.setDatasetId(datasetId);
request.setParentId(parentId);
request.setAnalysisStrategyType(analysisStrategyType);
// 调用PDF转换服务进行深度解析传入深度解析任务ID用于关联
pdfConversionService.handlePdfConversion(request, wrappedFile, fileId, taskId);
// 更新任务状态为PDF转换中等待OCR服务处理完成
redisTemplate.opsForHash().put(taskInfoKey, "status", "pdf_converting");
logger.info("深度解析任务已提交PDF转换等待OCR处理完成任务ID: {}", taskId);
// 注意此时不从processing集合中移除任务继续占用槽位
// 只有当PDF转换真正完成success/failed/timeout时才释放槽位
} catch (Exception e) {
logger.error("处理深度解析任务失败任务ID: {}, 错误: {}", taskId, e.getMessage(), e);
// 更新任务状态为失败
String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId;
redisTemplate.opsForHash().put(taskInfoKey, "status", "failed");
redisTemplate.opsForHash().put(taskInfoKey, "error", e.getMessage());
// 更新文件状态为失败
try {
Map<Object, Object> taskInfo = redisTemplate.opsForHash().entries(taskInfoKey);
Integer fileId = (Integer) taskInfo.get("fileId");
if (fileId != null) {
updateFileStatus(fileId, IndexingStatusEnum.FAILED.getCode());
}
} catch (Exception ex) {
logger.error("更新文件状态失败: {}", ex.getMessage(), ex);
}
// 清理临时文件
try {
Map<Object, Object> taskInfo = redisTemplate.opsForHash().entries(taskInfoKey);
String tempFilePath = (String) taskInfo.get("tempFilePath");
if (tempFilePath != null) {
tempFileStorageService.deleteTempFile(tempFilePath);
}
} catch (Exception ex) {
logger.error("清理临时文件失败任务ID: {}, 错误: {}", taskId, ex.getMessage());
}
// 异常情况下从处理中集合移除任务,释放槽位
removeFromProcessing(taskId);
}
// 正常情况下不释放槽位等待PDF转换完成后由PdfConversionTaskService释放
}
/**
* 从处理中集合移除任务
*/
private void removeFromProcessing(String taskId) {
try {
redisTemplate.opsForSet().remove(DEEP_ANALYSIS_PROCESSING, taskId);
currentProcessingCount.decrementAndGet();
logger.info("任务已从处理中集合移除: {}", taskId);
} catch (Exception e) {
logger.error("移除处理中任务失败: {}", e.getMessage(), e);
}
}
/**
* 更新文件状态
*/
private void updateFileStatus(Integer fileId, String status) {
try {
TDatasetFiles file = datasetFilesService.getFileById(fileId);
if (file != null) {
file.setIndexingStatus(status);
datasetFilesService.updateFile(file);
logger.debug("文件状态已更新文件ID: {}, 状态: {}", fileId, status);
}
} catch (Exception e) {
logger.error("更新文件状态失败文件ID: {}, 状态: {}, 错误: {}", fileId, status, e.getMessage(), e);
}
}
/**
* 获取队列状态信息
*/
public Map<String, Object> getQueueStatus() {
Map<String, Object> status = new HashMap<>();
try {
Long queueSize = redisTemplate.opsForList().size(DEEP_ANALYSIS_QUEUE);
Long processingSize = redisTemplate.opsForSet().size(DEEP_ANALYSIS_PROCESSING);
status.put("queueSize", queueSize != null ? queueSize : 0);
status.put("processingSize", processingSize != null ? processingSize : 0);
status.put("maxConcurrent", maxConcurrentTasks);
status.put("availableSlots", maxConcurrentTasks - (processingSize != null ? processingSize : 0));
} catch (Exception e) {
logger.error("获取队列状态失败: {}", e.getMessage(), e);
}
return status;
}
/**
* 获取任务详情
*/
public Map<Object, Object> getTaskInfo(String taskId) {
String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId;
return redisTemplate.opsForHash().entries(taskInfoKey);
}
/**
* 定时清理过期的临时文件
* 每小时执行一次
*/
@Scheduled(fixedRate = 3600000) // 1小时
public void cleanupExpiredTempFiles() {
try {
logger.info("开始清理过期的深度解析临时文件");
tempFileStorageService.cleanupExpiredTempFiles();
// 记录临时目录大小用于监控
long dirSize = tempFileStorageService.getTempDirectorySize();
if (dirSize >= 0) {
logger.info("深度解析临时目录大小: {} MB", dirSize / 1024 / 1024);
}
} catch (Exception e) {
logger.error("清理过期临时文件时发生错误: {}", e.getMessage(), e);
}
}
/**
* 定时清理DEEP_ANALYSIS_PROCESSING队列中的失效任务
* 每30分钟执行一次检查处理中的任务是否还有效
*/
@Scheduled(fixedRate = 1800000) // 30分钟
public void cleanupInvalidProcessingTasks() {
try {
logger.info("开始清理DEEP_ANALYSIS_PROCESSING队列中的失效任务");
// 获取所有正在处理的任务ID
Set<Object> processingTaskIds = redisTemplate.opsForSet().members(DEEP_ANALYSIS_PROCESSING);
if (processingTaskIds == null || processingTaskIds.isEmpty()) {
logger.debug("DEEP_ANALYSIS_PROCESSING队列为空无需清理");
return;
}
int removedCount = 0;
long currentTime = System.currentTimeMillis();
for (Object taskIdObj : processingTaskIds) {
if (taskIdObj == null) {
continue;
}
String taskId = taskIdObj.toString();
try {
// 检查深度解析任务信息是否还存在
String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId;
Map<Object, Object> taskInfo = redisTemplate.opsForHash().entries(taskInfoKey);
boolean shouldRemove = false;
String reason = "";
if (taskInfo.isEmpty()) {
// 任务信息不存在,应该移除
shouldRemove = true;
reason = "任务信息不存在";
} else {
// 检查任务是否已经超时提交后超过4小时
Long submitTime = (Long) taskInfo.get("submitTime");
if (submitTime != null) {
long elapsedTime = currentTime - submitTime;
long timeoutThreshold = 4 * 60 * 60 * 1000; // 4小时
if (elapsedTime > timeoutThreshold) {
shouldRemove = true;
reason = String.format("任务超时(运行%d分钟", elapsedTime / (60 * 1000));
}
}
// 检查任务状态是否为已完成的状态这些状态不应该还在processing队列中
String status = (String) taskInfo.get("status");
if ("completed".equals(status) || "failed".equals(status)) {
shouldRemove = true;
reason = "任务状态为" + status + ",不应在处理队列中";
}
}
if (shouldRemove) {
// 从processing集合中移除任务
Long removed = redisTemplate.opsForSet().remove(DEEP_ANALYSIS_PROCESSING, taskId);
if (removed != null && removed > 0) {
removedCount++;
logger.warn("已清理失效的处理中任务任务ID: {},原因: {}", taskId, reason);
// 如果任务信息还存在,更新状态为已清理
if (!taskInfo.isEmpty()) {
redisTemplate.opsForHash().put(taskInfoKey, "status", "cleaned_timeout");
redisTemplate.opsForHash().put(taskInfoKey, "cleanupReason", reason);
redisTemplate.opsForHash().put(taskInfoKey, "cleanupTime", currentTime);
// 更新关联文件状态为失败
try {
Integer fileId = (Integer) taskInfo.get("fileId");
if (fileId != null) {
updateFileStatus(fileId, IndexingStatusEnum.FAILED.getCode());
logger.info("已将清理任务{}关联的文件{}状态更新为FAILED", taskId, fileId);
}
} catch (Exception e) {
logger.error("更新清理任务关联文件状态失败: {}", e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error("检查处理中任务{}时发生错误: {}", taskId, e.getMessage(), e);
}
}
if (removedCount > 0) {
Long remainingTasks = redisTemplate.opsForSet().size(DEEP_ANALYSIS_PROCESSING);
logger.info("清理完成,移除了{}个失效任务,剩余处理中任务数: {}", removedCount, remainingTasks);
} else {
logger.debug("未发现需要清理的失效任务");
}
} catch (Exception e) {
logger.error("清理DEEP_ANALYSIS_PROCESSING队列中的失效任务时发生错误: {}", e.getMessage(), e);
}
}
}