fix: 采用消息队列优化解析任务状态更新

This commit is contained in:
wenjinbo 2025-09-22 18:09:01 +08:00
parent 10689d81e3
commit 0ef003e0b1
4 changed files with 190 additions and 172 deletions

View File

@ -54,6 +54,7 @@ public class RedisConfig {
template.afterPropertiesSet();
return template;
}
// 新增字符串专用 RedisTemplate
// @Bean("stringRedisTemplate")
// public RedisTemplate<String, String> stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {

View File

@ -0,0 +1,34 @@
package com.bjtds.brichat.config;
import com.bjtds.brichat.service.task.PdfConversionResultListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
/**
* Redis消息监听器配置
* 分离出来避免与RedisConfig产生循环依赖
*/
@Configuration
public class RedisMessageConfig {
@Autowired
private PdfConversionResultListener pdfConversionResultListener;
/**
* Redis消息监听器容器
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 添加PDF转换结果监听器
container.addMessageListener(pdfConversionResultListener, new ChannelTopic("ppocr_result"));
return container;
}
}

View File

@ -0,0 +1,119 @@
package com.bjtds.brichat.service.task;
import com.bjtds.brichat.entity.dto.PdfTaskDto;
import com.bjtds.brichat.entity.dto.PdfTaskStatusResponse;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
/**
* PDF转换结果消息监听器
* 监听Redis消息队列ppocr_result处理PDF转换完成的通知
*/
@Component
public class PdfConversionResultListener implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(PdfConversionResultListener.class);
private static final String PDF_TASK_REDIS_KEY = "pdf:conversion:tasks";
@Autowired
@Qualifier("redisTemplate")
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private PdfConversionTaskService pdfConversionTaskService;
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String messageBody = new String(message.getBody(), "UTF-8");
logger.info("收到PDF转换结果消息: {}", messageBody);
// 解析消息
JsonNode messageNode = objectMapper.readTree(messageBody);
String taskId = messageNode.get("taskId").asText();
String status = messageNode.get("status").asText();
logger.info("解析消息 - 任务ID: {}, 状态: {}", taskId, status);
// 从Redis获取任务信息
String hashKey = PDF_TASK_REDIS_KEY + ":" + taskId;
PdfTaskDto taskInfo = (PdfTaskDto) redisTemplate.opsForHash().get(hashKey, "taskInfo");
if (taskInfo == null) {
logger.warn("任务{}的信息在Redis中不存在可能已被清理", taskId);
return;
}
// 根据状态处理消息
if ("success".equals(status)) {
handleSuccessMessage(taskInfo, messageNode);
} else if ("failed".equals(status) || "failure".equals(status)) {
handleFailedMessage(taskInfo, messageNode);
} else {
logger.warn("收到未知状态的消息: taskId={}, status={}", taskId, status);
}
} catch (Exception e) {
logger.error("处理PDF转换结果消息时发生错误: {}", e.getMessage(), e);
}
}
/**
* 处理成功消息
*/
private void handleSuccessMessage(PdfTaskDto taskInfo, JsonNode messageNode) {
try {
logger.info("处理成功消息 - 任务ID: {}", taskInfo.getTaskId());
// 构造状态响应对象
PdfTaskStatusResponse statusResponse = new PdfTaskStatusResponse();
statusResponse.setStatus("success");
// 从result中提取folderUrl
JsonNode resultNode = messageNode.get("result");
if (resultNode != null && resultNode.get("folderUrl") != null) {
statusResponse.setFolderUrl(resultNode.get("folderUrl").asText());
}
// 调用原有的处理成功任务方法
pdfConversionTaskService.handleSuccessTask(taskInfo, statusResponse);
} catch (Exception e) {
logger.error("处理成功消息时发生错误: taskId={}, 错误: {}", taskInfo.getTaskId(), e.getMessage(), e);
}
}
/**
* 处理失败消息
*/
private void handleFailedMessage(PdfTaskDto taskInfo, JsonNode messageNode) {
try {
logger.error("处理失败消息 - 任务ID: {}", taskInfo.getTaskId());
// 从result中提取错误信息
String errorMessage = "未知错误";
JsonNode resultNode = messageNode.get("result");
if (resultNode != null && resultNode.get("error") != null) {
errorMessage = resultNode.get("error").asText();
}
logger.error("PDF转换任务失败 - 任务ID: {}, 错误: {}", taskInfo.getTaskId(), errorMessage);
// 调用原有的处理失败任务逻辑
pdfConversionTaskService.handleFailedTask(taskInfo, errorMessage);
} catch (Exception e) {
logger.error("处理失败消息时发生错误: taskId={}, 错误: {}", taskInfo.getTaskId(), e.getMessage(), e);
}
}
}

View File

@ -15,14 +15,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Resource;
@ -34,7 +31,7 @@ import java.util.List;
import java.util.Map;
/**
* PDF转换任务定时服务
* PDF转换任务服务
*/
@Service
public class PdfConversionTaskService {
@ -42,8 +39,7 @@ public class PdfConversionTaskService {
private static final Logger logger = LoggerFactory.getLogger(PdfConversionTaskService.class);
private static final String PDF_TASK_REDIS_KEY = "pdf:conversion:tasks";
@Autowired
private RestTemplate restTemplate;
@Autowired
@Qualifier("redisTemplate")
@ -52,8 +48,7 @@ public class PdfConversionTaskService {
@Autowired
private DifyDatasetApiService difyDatasetApiService;
@Value("${ocr.service.url}")
private String pdfConversionServiceUrl;
// @Value("${ocr.service.uploadPath}")
// private String uploadPath;
@Value("${ocr.service.outputPath}")
@ -72,137 +67,10 @@ public class PdfConversionTaskService {
// PDF任务超时时间毫秒
private static final long PDF_TASK_TIMEOUT_MS = 1 * 60 * 60 * 1000; // 1小时
/**
* 定时任务每3秒检查一次PDF转换任务状态
*/
@Scheduled(fixedRate = 5000) // 5秒执行一次
public void checkPdfConversionTasks() {
try {
// 获取所有待处理的任务ID
List<Object> taskIds = redisTemplate.opsForList().range(PDF_TASK_REDIS_KEY + ":list", 0, -1);
if (taskIds == null || taskIds.isEmpty()) {
logger.debug("没有待处理的PDF转换任务");
return;
}
logger.info("开始检查PDF转换任务状态共{}个任务", taskIds.size());
for (Object taskIdObj : taskIds) {
String taskId = taskIdObj.toString();
try {
checkSingleTask(taskId);
} catch (Exception e) {
logger.error("检查任务{}状态时发生错误: {}", taskId, e.getMessage(), e);
}
}
} catch (Exception e) {
logger.error("定时检查PDF转换任务时发生错误: {}", e.getMessage(), e);
}
}
/**
* 检查单个任务的状态
*/
private void checkSingleTask(String taskId) {
try {
logger.debug("开始检查任务: {}", taskId);
// 从Redis获取任务信息
String hashKey = PDF_TASK_REDIS_KEY + ":" + taskId;
PdfTaskDto taskInfo = (PdfTaskDto) redisTemplate.opsForHash().get(hashKey, "taskInfo");
if (taskInfo == null) {
logger.warn("任务{}的信息在Redis中不存在从队列中移除。Hash Key: {}", taskId, hashKey);
removeTaskFromQueue(taskId);
logger.info("=== 任务{}因信息丢失被移除 ===", taskId);
return;
}
logger.debug("任务{}信息获取成功: fileId={}, name={}, datasetId={}",
taskId, taskInfo.getFileId(), taskInfo.getName(), taskInfo.getDatasetId());
// 检查任务是否超时
if (isTaskTimeout(taskInfo)) {
logger.warn("任务{}已超时超过10分钟开始处理超时逻辑", taskId);
handleTimeoutTask(taskInfo);
return;
}
// 调用状态查询接口
PdfTaskStatusResponse statusResponse = queryTaskStatus(taskId);
if (statusResponse == null) {
logger.warn("无法获取任务{}的状态信息,可能是网络问题或服务异常", taskId);
return;
}
logger.info("任务{}状态: {}, 文件夹URL: {}", taskId, statusResponse.getStatus(), statusResponse.getFolderUrl());
// 根据状态处理任务
switch (statusResponse.getStatus().toLowerCase()) {
case "success":
logger.info("任务{}状态为success开始处理完成逻辑", taskId);
handleSuccessTask(taskInfo, statusResponse);
break;
case "running":
//updateTaskProgress(taskInfo, statusResponse);
logger.info("任务{}正在运行,不更新进度", taskId);
break;
case "failed":
case "failure":
logger.error("任务{}执行失败,状态: {}", taskId, statusResponse.getStatus());
// 更新关联的深度解析任务状态为失败
updateDeepAnalysisTaskStatus(taskInfo, "failed");
removeTaskFromQueue(taskId);
logger.info("=== 任务{}因执行失败被移除 ===", taskId);
break;
default:
logger.warn("任务{}状态未知: {},保持在队列中继续监控", taskId, statusResponse.getStatus());
break;
}
} catch (Exception e) {
logger.error("检查任务{}时发生错误: {}", taskId, e.getMessage(), e);
}
}
/**
* 查询任务状态
*/
private PdfTaskStatusResponse queryTaskStatus(String taskId) {
String url = pdfConversionServiceUrl + "/status?taskId=" + taskId;
HttpHeaders headers = new HttpHeaders();
HttpEntity<?> requestEntity = new HttpEntity<>(headers);
try {
logger.debug("查询任务{}状态URL: {}", taskId, url);
ResponseEntity<PdfTaskStatusResponse> response = restTemplate.exchange(
url,
HttpMethod.GET,
requestEntity,
PdfTaskStatusResponse.class
);
PdfTaskStatusResponse body = response.getBody();
logger.debug("任务{}状态查询成功,响应: {}", taskId, body);
return body;
} catch (Exception e) {
logger.error("查询任务{}状态失败URL: {}, 错误类型: {}, 错误信息: {}",
taskId, url, e.getClass().getSimpleName(), e.getMessage());
return null;
}
}
/**
* 处理成功完成的任务
*/
private void handleSuccessTask(PdfTaskDto taskInfo, PdfTaskStatusResponse statusResponse) {
public void handleSuccessTask(PdfTaskDto taskInfo, PdfTaskStatusResponse statusResponse) {
String taskId = taskInfo.getTaskId();
//String result = statusResponse.getResult();
@ -245,45 +113,41 @@ public class PdfConversionTaskService {
logger.info("任务{}已完成并从队列中移除", taskId);
}
/**
* 处理失败的任务
*/
// private void handleFailedTask(PdfTaskDto taskInfo, PdfTaskStatusResponse statusResponse) {
// String taskId = taskInfo.getTaskId();
//
// logger.error("=== PDF转换任务失败 ===");
// logger.error("任务ID: {}", taskId);
// logger.error("文件名: {}", taskInfo.getName());
// logger.error("数据集ID: {}", taskInfo.getDatasetId());
//
// if (statusResponse.getError() != null) {
// logger.error("错误信息: {}", statusResponse.getError());
// }
//
// // 从队列中移除失败的任务
// removeTaskFromQueue(taskId);
// logger.info("失败任务{}已从队列中移除", taskId);
// }
public void handleFailedTask(PdfTaskDto taskInfo, String errorMessage) {
String taskId = taskInfo.getTaskId();
/**
* 更新任务进度
*/
// private void updateTaskProgress(PdfTaskDto taskInfo, PdfTaskStatusResponse statusResponse) {
// String taskId = taskInfo.getTaskId();
//
// // 更新任务进度
// if (statusResponse.getProgress() != null) {
// Double newPercent = statusResponse.getProgress().getPercent();
// taskInfo.setPercent(newPercent);
//
// // 更新Redis中的任务信息
// String hashKey = PDF_TASK_REDIS_KEY + ":" + taskId;
// redisTemplate.opsForHash().put(hashKey, "taskInfo", taskInfo);
//
// logger.debug("任务{}进度更新: {}%", taskId, String.format("%.1f", newPercent));
// }
// }
logger.error("=== PDF转换任务失败 ===");
logger.error("任务ID: {}", taskId);
logger.error("文件名: {}", taskInfo.getName());
logger.error("数据集ID: {}", taskInfo.getDatasetId());
logger.error("错误信息: {}", errorMessage);
// 更新关联的深度解析任务状态为失败
updateDeepAnalysisTaskStatus(taskInfo, "failed");
// 更新文件状态为失败
if (taskInfo.getFileId() != null) {
try {
TDatasetFiles file = datasetFilesService.getFileById(taskInfo.getFileId());
if (file != null) {
file.setIndexingStatus(IndexingStatusEnum.FAILED.getCode());
datasetFilesService.updateFile(file);
logger.info("已将文件{}状态更新为FAILED", taskInfo.getFileId());
}
} catch (Exception e) {
logger.error("更新失败任务的文件状态失败: {}", e.getMessage(), e);
}
}
// 从队列中移除失败的任务
removeTaskFromQueue(taskId);
logger.info("失败任务{}已从队列中移除", taskId);
}
// ... existing code ...
/**
* 从队列中移除任务