From 0ef003e0b10a735282ff59fec9cc3538cbae0161 Mon Sep 17 00:00:00 2001 From: wenjinbo <599483010@qq.com> Date: Mon, 22 Sep 2025 18:09:01 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E9=87=87=E7=94=A8=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E9=98=9F=E5=88=97=E4=BC=98=E5=8C=96=E8=A7=A3=E6=9E=90=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=8A=B6=E6=80=81=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/bjtds/brichat/config/RedisConfig.java | 1 + .../brichat/config/RedisMessageConfig.java | 34 +++ .../task/PdfConversionResultListener.java | 119 ++++++++++ .../task/PdfConversionTaskService.java | 208 +++--------------- 4 files changed, 190 insertions(+), 172 deletions(-) create mode 100644 chat-server/src/main/java/com/bjtds/brichat/config/RedisMessageConfig.java create mode 100644 chat-server/src/main/java/com/bjtds/brichat/service/task/PdfConversionResultListener.java diff --git a/chat-server/src/main/java/com/bjtds/brichat/config/RedisConfig.java b/chat-server/src/main/java/com/bjtds/brichat/config/RedisConfig.java index f3289a6..cda1945 100644 --- a/chat-server/src/main/java/com/bjtds/brichat/config/RedisConfig.java +++ b/chat-server/src/main/java/com/bjtds/brichat/config/RedisConfig.java @@ -54,6 +54,7 @@ public class RedisConfig { template.afterPropertiesSet(); return template; } + // 新增字符串专用 RedisTemplate // @Bean("stringRedisTemplate") // public RedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) { diff --git a/chat-server/src/main/java/com/bjtds/brichat/config/RedisMessageConfig.java b/chat-server/src/main/java/com/bjtds/brichat/config/RedisMessageConfig.java new file mode 100644 index 0000000..c423a1e --- /dev/null +++ b/chat-server/src/main/java/com/bjtds/brichat/config/RedisMessageConfig.java @@ -0,0 +1,34 @@ +package com.bjtds.brichat.config; + +import com.bjtds.brichat.service.task.PdfConversionResultListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.listener.ChannelTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; + +/** + * Redis消息监听器配置 + * 分离出来避免与RedisConfig产生循环依赖 + */ +@Configuration +public class RedisMessageConfig { + + @Autowired + private PdfConversionResultListener pdfConversionResultListener; + + /** + * Redis消息监听器容器 + */ + @Bean + public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) { + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + + // 添加PDF转换结果监听器 + container.addMessageListener(pdfConversionResultListener, new ChannelTopic("ppocr_result")); + + return container; + } +} \ No newline at end of file diff --git a/chat-server/src/main/java/com/bjtds/brichat/service/task/PdfConversionResultListener.java b/chat-server/src/main/java/com/bjtds/brichat/service/task/PdfConversionResultListener.java new file mode 100644 index 0000000..34c27a0 --- /dev/null +++ b/chat-server/src/main/java/com/bjtds/brichat/service/task/PdfConversionResultListener.java @@ -0,0 +1,119 @@ +package com.bjtds.brichat.service.task; + +import com.bjtds.brichat.entity.dto.PdfTaskDto; +import com.bjtds.brichat.entity.dto.PdfTaskStatusResponse; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +/** + * PDF转换结果消息监听器 + * 监听Redis消息队列ppocr_result,处理PDF转换完成的通知 + */ +@Component +public class PdfConversionResultListener implements MessageListener { + + private static final Logger logger = LoggerFactory.getLogger(PdfConversionResultListener.class); + private static final String PDF_TASK_REDIS_KEY = "pdf:conversion:tasks"; + + @Autowired + @Qualifier("redisTemplate") + private RedisTemplate redisTemplate; + + @Autowired + private PdfConversionTaskService pdfConversionTaskService; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public void onMessage(Message message, byte[] pattern) { + try { + String messageBody = new String(message.getBody(), "UTF-8"); + logger.info("收到PDF转换结果消息: {}", messageBody); + + // 解析消息 + JsonNode messageNode = objectMapper.readTree(messageBody); + String taskId = messageNode.get("taskId").asText(); + String status = messageNode.get("status").asText(); + + logger.info("解析消息 - 任务ID: {}, 状态: {}", taskId, status); + + // 从Redis获取任务信息 + String hashKey = PDF_TASK_REDIS_KEY + ":" + taskId; + PdfTaskDto taskInfo = (PdfTaskDto) redisTemplate.opsForHash().get(hashKey, "taskInfo"); + + if (taskInfo == null) { + logger.warn("任务{}的信息在Redis中不存在,可能已被清理", taskId); + return; + } + + // 根据状态处理消息 + if ("success".equals(status)) { + handleSuccessMessage(taskInfo, messageNode); + } else if ("failed".equals(status) || "failure".equals(status)) { + handleFailedMessage(taskInfo, messageNode); + } else { + logger.warn("收到未知状态的消息: taskId={}, status={}", taskId, status); + } + + } catch (Exception e) { + logger.error("处理PDF转换结果消息时发生错误: {}", e.getMessage(), e); + } + } + + /** + * 处理成功消息 + */ + private void handleSuccessMessage(PdfTaskDto taskInfo, JsonNode messageNode) { + try { + logger.info("处理成功消息 - 任务ID: {}", taskInfo.getTaskId()); + + // 构造状态响应对象 + PdfTaskStatusResponse statusResponse = new PdfTaskStatusResponse(); + statusResponse.setStatus("success"); + + // 从result中提取folderUrl + JsonNode resultNode = messageNode.get("result"); + if (resultNode != null && resultNode.get("folderUrl") != null) { + statusResponse.setFolderUrl(resultNode.get("folderUrl").asText()); + } + + // 调用原有的处理成功任务方法 + pdfConversionTaskService.handleSuccessTask(taskInfo, statusResponse); + + } catch (Exception e) { + logger.error("处理成功消息时发生错误: taskId={}, 错误: {}", taskInfo.getTaskId(), e.getMessage(), e); + } + } + + /** + * 处理失败消息 + */ + private void handleFailedMessage(PdfTaskDto taskInfo, JsonNode messageNode) { + try { + logger.error("处理失败消息 - 任务ID: {}", taskInfo.getTaskId()); + + // 从result中提取错误信息 + String errorMessage = "未知错误"; + JsonNode resultNode = messageNode.get("result"); + if (resultNode != null && resultNode.get("error") != null) { + errorMessage = resultNode.get("error").asText(); + } + + logger.error("PDF转换任务失败 - 任务ID: {}, 错误: {}", taskInfo.getTaskId(), errorMessage); + + // 调用原有的处理失败任务逻辑 + pdfConversionTaskService.handleFailedTask(taskInfo, errorMessage); + + } catch (Exception e) { + logger.error("处理失败消息时发生错误: taskId={}, 错误: {}", taskInfo.getTaskId(), e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/chat-server/src/main/java/com/bjtds/brichat/service/task/PdfConversionTaskService.java b/chat-server/src/main/java/com/bjtds/brichat/service/task/PdfConversionTaskService.java index 441185d..fd79ec5 100644 --- a/chat-server/src/main/java/com/bjtds/brichat/service/task/PdfConversionTaskService.java +++ b/chat-server/src/main/java/com/bjtds/brichat/service/task/PdfConversionTaskService.java @@ -15,14 +15,11 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; -import org.springframework.web.client.RestTemplate; + import org.springframework.web.multipart.MultipartFile; import javax.annotation.Resource; @@ -34,7 +31,7 @@ import java.util.List; import java.util.Map; /** - * PDF转换任务定时服务 + * PDF转换任务服务 */ @Service public class PdfConversionTaskService { @@ -42,8 +39,7 @@ public class PdfConversionTaskService { private static final Logger logger = LoggerFactory.getLogger(PdfConversionTaskService.class); private static final String PDF_TASK_REDIS_KEY = "pdf:conversion:tasks"; - @Autowired - private RestTemplate restTemplate; + @Autowired @Qualifier("redisTemplate") @@ -52,8 +48,7 @@ public class PdfConversionTaskService { @Autowired private DifyDatasetApiService difyDatasetApiService; - @Value("${ocr.service.url}") - private String pdfConversionServiceUrl; + // @Value("${ocr.service.uploadPath}") // private String uploadPath; @Value("${ocr.service.outputPath}") @@ -72,137 +67,10 @@ public class PdfConversionTaskService { // PDF任务超时时间(毫秒) private static final long PDF_TASK_TIMEOUT_MS = 1 * 60 * 60 * 1000; // 1小时 - /** - * 定时任务:每3秒检查一次PDF转换任务状态 - */ - @Scheduled(fixedRate = 5000) // 5秒执行一次 - public void checkPdfConversionTasks() { - try { - // 获取所有待处理的任务ID - List taskIds = redisTemplate.opsForList().range(PDF_TASK_REDIS_KEY + ":list", 0, -1); - - if (taskIds == null || taskIds.isEmpty()) { - logger.debug("没有待处理的PDF转换任务"); - return; - } - - logger.info("开始检查PDF转换任务状态,共{}个任务", taskIds.size()); - - for (Object taskIdObj : taskIds) { - String taskId = taskIdObj.toString(); - try { - checkSingleTask(taskId); - } catch (Exception e) { - logger.error("检查任务{}状态时发生错误: {}", taskId, e.getMessage(), e); - } - } - - } catch (Exception e) { - logger.error("定时检查PDF转换任务时发生错误: {}", e.getMessage(), e); - } - } - - /** - * 检查单个任务的状态 - */ - private void checkSingleTask(String taskId) { - try { - logger.debug("开始检查任务: {}", taskId); - - // 从Redis获取任务信息 - String hashKey = PDF_TASK_REDIS_KEY + ":" + taskId; - PdfTaskDto taskInfo = (PdfTaskDto) redisTemplate.opsForHash().get(hashKey, "taskInfo"); - - if (taskInfo == null) { - logger.warn("任务{}的信息在Redis中不存在,从队列中移除。Hash Key: {}", taskId, hashKey); - removeTaskFromQueue(taskId); - logger.info("=== 任务{}因信息丢失被移除 ===", taskId); - return; - } - - logger.debug("任务{}信息获取成功: fileId={}, name={}, datasetId={}", - taskId, taskInfo.getFileId(), taskInfo.getName(), taskInfo.getDatasetId()); - - // 检查任务是否超时 - if (isTaskTimeout(taskInfo)) { - logger.warn("任务{}已超时(超过10分钟),开始处理超时逻辑", taskId); - handleTimeoutTask(taskInfo); - return; - } - - // 调用状态查询接口 - PdfTaskStatusResponse statusResponse = queryTaskStatus(taskId); - - if (statusResponse == null) { - logger.warn("无法获取任务{}的状态信息,可能是网络问题或服务异常", taskId); - return; - } - - logger.info("任务{}状态: {}, 文件夹URL: {}", taskId, statusResponse.getStatus(), statusResponse.getFolderUrl()); - - // 根据状态处理任务 - switch (statusResponse.getStatus().toLowerCase()) { - case "success": - logger.info("任务{}状态为success,开始处理完成逻辑", taskId); - handleSuccessTask(taskInfo, statusResponse); - break; - case "running": - //updateTaskProgress(taskInfo, statusResponse); - logger.info("任务{}正在运行,不更新进度", taskId); - break; - case "failed": - case "failure": - logger.error("任务{}执行失败,状态: {}", taskId, statusResponse.getStatus()); - // 更新关联的深度解析任务状态为失败 - updateDeepAnalysisTaskStatus(taskInfo, "failed"); - removeTaskFromQueue(taskId); - logger.info("=== 任务{}因执行失败被移除 ===", taskId); - break; - default: - logger.warn("任务{}状态未知: {},保持在队列中继续监控", taskId, statusResponse.getStatus()); - break; - } - - } catch (Exception e) { - logger.error("检查任务{}时发生错误: {}", taskId, e.getMessage(), e); - } - } - - /** - * 查询任务状态 - */ - private PdfTaskStatusResponse queryTaskStatus(String taskId) { - String url = pdfConversionServiceUrl + "/status?taskId=" + taskId; - - HttpHeaders headers = new HttpHeaders(); - HttpEntity requestEntity = new HttpEntity<>(headers); - - try { - logger.debug("查询任务{}状态,URL: {}", taskId, url); - - ResponseEntity response = restTemplate.exchange( - url, - HttpMethod.GET, - requestEntity, - PdfTaskStatusResponse.class - ); - - PdfTaskStatusResponse body = response.getBody(); - logger.debug("任务{}状态查询成功,响应: {}", taskId, body); - - return body; - - } catch (Exception e) { - logger.error("查询任务{}状态失败,URL: {}, 错误类型: {}, 错误信息: {}", - taskId, url, e.getClass().getSimpleName(), e.getMessage()); - return null; - } - } - /** * 处理成功完成的任务 */ - private void handleSuccessTask(PdfTaskDto taskInfo, PdfTaskStatusResponse statusResponse) { + public void handleSuccessTask(PdfTaskDto taskInfo, PdfTaskStatusResponse statusResponse) { String taskId = taskInfo.getTaskId(); //String result = statusResponse.getResult(); @@ -245,45 +113,41 @@ public class PdfConversionTaskService { logger.info("任务{}已完成并从队列中移除", taskId); } - /** * 处理失败的任务 */ -// private void handleFailedTask(PdfTaskDto taskInfo, PdfTaskStatusResponse statusResponse) { -// String taskId = taskInfo.getTaskId(); -// -// logger.error("=== PDF转换任务失败 ==="); -// logger.error("任务ID: {}", taskId); -// logger.error("文件名: {}", taskInfo.getName()); -// logger.error("数据集ID: {}", taskInfo.getDatasetId()); -// -// if (statusResponse.getError() != null) { -// logger.error("错误信息: {}", statusResponse.getError()); -// } -// -// // 从队列中移除失败的任务 -// removeTaskFromQueue(taskId); -// logger.info("失败任务{}已从队列中移除", taskId); -// } + public void handleFailedTask(PdfTaskDto taskInfo, String errorMessage) { + String taskId = taskInfo.getTaskId(); - /** - * 更新任务进度 - */ -// private void updateTaskProgress(PdfTaskDto taskInfo, PdfTaskStatusResponse statusResponse) { -// String taskId = taskInfo.getTaskId(); -// -// // 更新任务进度 -// if (statusResponse.getProgress() != null) { -// Double newPercent = statusResponse.getProgress().getPercent(); -// taskInfo.setPercent(newPercent); -// -// // 更新Redis中的任务信息 -// String hashKey = PDF_TASK_REDIS_KEY + ":" + taskId; -// redisTemplate.opsForHash().put(hashKey, "taskInfo", taskInfo); -// -// logger.debug("任务{}进度更新: {}%", taskId, String.format("%.1f", newPercent)); -// } -// } + logger.error("=== PDF转换任务失败 ==="); + logger.error("任务ID: {}", taskId); + logger.error("文件名: {}", taskInfo.getName()); + logger.error("数据集ID: {}", taskInfo.getDatasetId()); + logger.error("错误信息: {}", errorMessage); + + // 更新关联的深度解析任务状态为失败 + updateDeepAnalysisTaskStatus(taskInfo, "failed"); + + // 更新文件状态为失败 + if (taskInfo.getFileId() != null) { + try { + TDatasetFiles file = datasetFilesService.getFileById(taskInfo.getFileId()); + if (file != null) { + file.setIndexingStatus(IndexingStatusEnum.FAILED.getCode()); + datasetFilesService.updateFile(file); + logger.info("已将文件{}状态更新为FAILED", taskInfo.getFileId()); + } + } catch (Exception e) { + logger.error("更新失败任务的文件状态失败: {}", e.getMessage(), e); + } + } + + // 从队列中移除失败的任务 + removeTaskFromQueue(taskId); + logger.info("失败任务{}已从队列中移除", taskId); + } + + // ... existing code ... /** * 从队列中移除任务