ai-manus/chat-server/src/main/java/com/bjtds/brichat/service/DeepAnalysisQueueService.java

353 lines
15 KiB
Java
Raw Normal View History

package com.bjtds.brichat.service;
import com.bjtds.brichat.entity.dataset.DocUploadReq;
import com.bjtds.brichat.entity.dataset.TDatasetFiles;
import com.bjtds.brichat.entity.dto.PdfTaskDto;
import com.bjtds.brichat.enums.IndexingStatusEnum;
import com.bjtds.brichat.service.dify.PdfConversionService;
import com.bjtds.brichat.service.dify.constants.DifyConstants;
import com.bjtds.brichat.util.MultipartFileWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 深度解析队列管理服务
* 控制PDF深度解析的并发数量避免OCR服务过载
*
* @author system
*/
@Service
public class DeepAnalysisQueueService {
private static final Logger logger = LoggerFactory.getLogger(DeepAnalysisQueueService.class);
// Redis队列相关常量
private static final String DEEP_ANALYSIS_QUEUE = "deep_analysis:queue";
private static final String DEEP_ANALYSIS_PROCESSING = "deep_analysis:processing";
private static final String DEEP_ANALYSIS_TASK_INFO = "deep_analysis:task_info:";
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private PdfConversionService pdfConversionService;
@Resource
private DatasetFilesService datasetFilesService;
@Resource
private TempFileStorageService tempFileStorageService;
// 深度解析最大并发数默认4个
@Value("${deep.analysis.max.concurrent:4}")
private int maxConcurrentTasks;
// 当前正在处理的任务数量计数器
private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
/**
* 深度解析任务信息
*/
public static class DeepAnalysisTask {
private String taskId;
private DocUploadReq request;
private MultipartFile file;
private Integer fileId;
private long submitTime;
// 构造函数和getter/setter
public DeepAnalysisTask(String taskId, DocUploadReq request, MultipartFile file, Integer fileId) {
this.taskId = taskId;
this.request = request;
this.file = file;
this.fileId = fileId;
this.submitTime = System.currentTimeMillis();
}
// getters and setters
public String getTaskId() { return taskId; }
public void setTaskId(String taskId) { this.taskId = taskId; }
public DocUploadReq getRequest() { return request; }
public void setRequest(DocUploadReq request) { this.request = request; }
public MultipartFile getFile() { return file; }
public void setFile(MultipartFile file) { this.file = file; }
public Integer getFileId() { return fileId; }
public void setFileId(Integer fileId) { this.fileId = fileId; }
public long getSubmitTime() { return submitTime; }
public void setSubmitTime(long submitTime) { this.submitTime = submitTime; }
}
/**
* 提交深度解析任务到队列
*
* @param request 上传请求
* @param file 文件
* @param fileId 文件ID
* @return 任务ID
*/
public String submitDeepAnalysisTask(DocUploadReq request, MultipartFile file, Integer fileId) {
String taskId = UUID.randomUUID().toString();
try {
logger.info("提交深度解析任务到队列任务ID: {}, 文件: {}, 文件ID: {}",
taskId, file.getOriginalFilename(), fileId);
// 先保存临时文件
String tempFilePath = tempFileStorageService.saveTempFile(file, taskId);
// 创建任务信息
Map<String, Object> taskInfo = new HashMap<>();
taskInfo.put("taskId", taskId);
taskInfo.put("fileName", file.getOriginalFilename());
taskInfo.put("fileSize", file.getSize());
taskInfo.put("fileId", fileId);
taskInfo.put("datasetId", request.getDatasetId());
taskInfo.put("parentId", request.getParentId());
taskInfo.put("analysisStrategyType", request.getAnalysisStrategyType());
taskInfo.put("tempFilePath", tempFilePath);
taskInfo.put("submitTime", System.currentTimeMillis());
taskInfo.put("status", "queued");
// 将任务信息存储到Redis Hash
String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId;
redisTemplate.opsForHash().putAll(taskInfoKey, taskInfo);
redisTemplate.expire(taskInfoKey, 24, TimeUnit.HOURS);
// 将任务ID添加到队列
redisTemplate.opsForList().rightPush(DEEP_ANALYSIS_QUEUE, taskId);
// 更新文件状态为排队中
updateFileStatus(fileId, IndexingStatusEnum.QUEUED.getCode());
logger.info("深度解析任务已加入队列任务ID: {}, 当前队列长度: {}",
taskId, redisTemplate.opsForList().size(DEEP_ANALYSIS_QUEUE));
return taskId;
} catch (Exception e) {
logger.error("提交深度解析任务失败任务ID: {}, 错误: {}", taskId, e.getMessage(), e);
throw new RuntimeException("提交深度解析任务失败: " + e.getMessage(), e);
}
}
/**
* 定时任务处理队列中的深度解析任务
* 每5秒检查一次队列如果有空闲位置就处理新任务
*/
@Scheduled(fixedRate = 5000)
public void processQueuedTasks() {
try {
// 检查当前正在处理的任务数量
long processingCount = redisTemplate.opsForSet().size(DEEP_ANALYSIS_PROCESSING);
if (processingCount >= maxConcurrentTasks) {
logger.debug("深度解析任务已达到最大并发数 {}, 当前处理中: {}", maxConcurrentTasks, processingCount);
return;
}
// 计算可以处理的任务数量
long availableSlots = maxConcurrentTasks - processingCount;
logger.debug("深度解析队列检查,当前处理中: {}, 可用槽位: {}", processingCount, availableSlots);
// 从队列中获取任务并处理
for (int i = 0; i < availableSlots; i++) {
String taskId = (String) redisTemplate.opsForList().leftPop(DEEP_ANALYSIS_QUEUE);
if (taskId == null) {
break; // 队列为空
}
logger.info("从队列中获取到深度解析任务: {}", taskId);
processTask(taskId);
}
} catch (Exception e) {
logger.error("处理深度解析队列时发生错误: {}", e.getMessage(), e);
}
}
/**
* 处理单个深度解析任务
*/
@Async
public void processTask(String taskId) {
try {
logger.info("开始处理深度解析任务: {}", taskId);
// 将任务添加到处理中集合
redisTemplate.opsForSet().add(DEEP_ANALYSIS_PROCESSING, taskId);
redisTemplate.expire(DEEP_ANALYSIS_PROCESSING, 24, TimeUnit.HOURS);
// 获取任务信息
String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId;
Map<Object, Object> taskInfo = redisTemplate.opsForHash().entries(taskInfoKey);
if (taskInfo.isEmpty()) {
logger.warn("任务信息不存在任务ID: {}", taskId);
removeFromProcessing(taskId);
return;
}
Integer fileId = (Integer) taskInfo.get("fileId");
String fileName = (String) taskInfo.get("fileName");
String datasetId = (String) taskInfo.get("datasetId");
String tempFilePath = (String) taskInfo.get("tempFilePath");
String analysisStrategyType = (String) taskInfo.get("analysisStrategyType");
Long parentId = taskInfo.get("parentId") != null ? Long.valueOf(taskInfo.get("parentId").toString()) : null;
logger.info("处理深度解析任务详情 - 任务ID: {}, 文件: {}, 数据集: {}", taskId, fileName, datasetId);
// 更新任务状态为处理中
redisTemplate.opsForHash().put(taskInfoKey, "status", "processing");
updateFileStatus(fileId, IndexingStatusEnum.PREPROCESSING.getCode());
// 从临时文件加载MultipartFile
MultipartFile file = tempFileStorageService.loadTempFile(tempFilePath);
// 由于文件名被哈希化需要创建一个带有原始文件名的MultipartFile包装器
MultipartFile wrappedFile = new MultipartFileWrapper(file, fileName);
// 重新构建请求对象
DocUploadReq request = new DocUploadReq();
request.setDatasetId(datasetId);
request.setParentId(parentId);
request.setAnalysisStrategyType(analysisStrategyType);
// 调用PDF转换服务进行深度解析传入深度解析任务ID用于关联
pdfConversionService.handlePdfConversion(request, wrappedFile, fileId, taskId);
// 更新任务状态为PDF转换中等待OCR服务处理完成
redisTemplate.opsForHash().put(taskInfoKey, "status", "pdf_converting");
logger.info("深度解析任务已提交PDF转换等待OCR处理完成任务ID: {}", taskId);
// 注意此时不从processing集合中移除任务继续占用槽位
// 只有当PDF转换真正完成success/failed/timeout时才释放槽位
} catch (Exception e) {
logger.error("处理深度解析任务失败任务ID: {}, 错误: {}", taskId, e.getMessage(), e);
// 更新任务状态为失败
String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId;
redisTemplate.opsForHash().put(taskInfoKey, "status", "failed");
redisTemplate.opsForHash().put(taskInfoKey, "error", e.getMessage());
// 更新文件状态为失败
try {
Map<Object, Object> taskInfo = redisTemplate.opsForHash().entries(taskInfoKey);
Integer fileId = (Integer) taskInfo.get("fileId");
if (fileId != null) {
updateFileStatus(fileId, IndexingStatusEnum.FAILED.getCode());
}
} catch (Exception ex) {
logger.error("更新文件状态失败: {}", ex.getMessage(), ex);
}
// 清理临时文件
try {
Map<Object, Object> taskInfo = redisTemplate.opsForHash().entries(taskInfoKey);
String tempFilePath = (String) taskInfo.get("tempFilePath");
if (tempFilePath != null) {
tempFileStorageService.deleteTempFile(tempFilePath);
}
} catch (Exception ex) {
logger.error("清理临时文件失败任务ID: {}, 错误: {}", taskId, ex.getMessage());
}
// 异常情况下从处理中集合移除任务,释放槽位
removeFromProcessing(taskId);
}
// 正常情况下不释放槽位等待PDF转换完成后由PdfConversionTaskService释放
}
/**
* 从处理中集合移除任务
*/
private void removeFromProcessing(String taskId) {
try {
redisTemplate.opsForSet().remove(DEEP_ANALYSIS_PROCESSING, taskId);
currentProcessingCount.decrementAndGet();
logger.info("任务已从处理中集合移除: {}", taskId);
} catch (Exception e) {
logger.error("移除处理中任务失败: {}", e.getMessage(), e);
}
}
/**
* 更新文件状态
*/
private void updateFileStatus(Integer fileId, String status) {
try {
TDatasetFiles file = datasetFilesService.getFileById(fileId);
if (file != null) {
file.setIndexingStatus(status);
datasetFilesService.updateFile(file);
logger.debug("文件状态已更新文件ID: {}, 状态: {}", fileId, status);
}
} catch (Exception e) {
logger.error("更新文件状态失败文件ID: {}, 状态: {}, 错误: {}", fileId, status, e.getMessage(), e);
}
}
/**
* 获取队列状态信息
*/
public Map<String, Object> getQueueStatus() {
Map<String, Object> status = new HashMap<>();
try {
Long queueSize = redisTemplate.opsForList().size(DEEP_ANALYSIS_QUEUE);
Long processingSize = redisTemplate.opsForSet().size(DEEP_ANALYSIS_PROCESSING);
status.put("queueSize", queueSize != null ? queueSize : 0);
status.put("processingSize", processingSize != null ? processingSize : 0);
status.put("maxConcurrent", maxConcurrentTasks);
status.put("availableSlots", maxConcurrentTasks - (processingSize != null ? processingSize : 0));
} catch (Exception e) {
logger.error("获取队列状态失败: {}", e.getMessage(), e);
}
return status;
}
/**
* 获取任务详情
*/
public Map<Object, Object> getTaskInfo(String taskId) {
String taskInfoKey = DEEP_ANALYSIS_TASK_INFO + taskId;
return redisTemplate.opsForHash().entries(taskInfoKey);
}
/**
* 定时清理过期的临时文件
* 每小时执行一次
*/
@Scheduled(fixedRate = 3600000) // 1小时
public void cleanupExpiredTempFiles() {
try {
logger.info("开始清理过期的深度解析临时文件");
tempFileStorageService.cleanupExpiredTempFiles();
// 记录临时目录大小用于监控
long dirSize = tempFileStorageService.getTempDirectorySize();
if (dirSize >= 0) {
logger.info("深度解析临时目录大小: {} MB", dirSize / 1024 / 1024);
}
} catch (Exception e) {
logger.error("清理过期临时文件时发生错误: {}", e.getMessage(), e);
}
}
}