From b1a6160d2948aa1c6fdbf3a8eaa519be44e092bb Mon Sep 17 00:00:00 2001 From: WangDL Date: Mon, 18 May 2026 10:17:06 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20implement=20P1=20async=20=E2=80=94=20AI?= =?UTF-8?q?=20analysis=20+=20document=20import=20via=20BullMQ=20workers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit B12: AI analysis now async — POST /ai-analysis queues job, returns immediately. Worker supports both active-recall and feynman-evaluation types. B13: DocumentImportWorker fully implemented — all processing moved from service to worker. Service only queues and returns. B14: NotificationWorker already complete (no changes needed). B15: All 3 workers now fully functional. New endpoint: GET /ai-analysis/jobs/:id for job status polling. Co-Authored-By: Claude Opus 4.7 --- .../ai-analysis/ai-analysis.controller.ts | 26 ++++++- .../ai-analysis/ai-analysis.repository.ts | 32 +++++++- .../ai-analysis/ai-analysis.service.ts | 47 ++++++++---- .../document-import/document-import.module.ts | 3 - .../document-import.service.ts | 74 ++----------------- src/workers/ai-analysis.worker.ts | 56 +++++++++++--- src/workers/document-import.worker.ts | 66 +++++++++++++++-- 7 files changed, 196 insertions(+), 108 deletions(-) diff --git a/src/modules/ai-analysis/ai-analysis.controller.ts b/src/modules/ai-analysis/ai-analysis.controller.ts index d288592..1835221 100644 --- a/src/modules/ai-analysis/ai-analysis.controller.ts +++ b/src/modules/ai-analysis/ai-analysis.controller.ts @@ -10,23 +10,41 @@ export class AiAnalysisController { constructor(private readonly service: AiAnalysisService) {} @Post() - @ApiOperation({ summary: '提交主动回忆分析' }) + @ApiOperation({ summary: '提交主动回忆分析(异步)' }) async analyze( @CurrentUser() user: UserPayload, - @Body() body: { questionText: string; knowledgeItemContent: string; userAnswer: string }, + @Body() body: { + questionText: string; + knowledgeItemContent: string; + userAnswer: string; + sessionId?: string; + answerId?: string; + }, ) { return this.service.analyze(String(user?.id || 'anonymous'), body); } @Post('feynman') - @ApiOperation({ summary: '提交费曼解释评估' }) + @ApiOperation({ summary: '提交费曼解释评估(异步)' }) async evaluateFeynman( @CurrentUser() user: UserPayload, - @Body() body: { knowledgeItemTitle: string; knowledgeItemContent: string; userExplanation: string }, + @Body() body: { + knowledgeItemTitle: string; + knowledgeItemContent: string; + userExplanation: string; + sessionId?: string; + answerId?: string; + }, ) { return this.service.evaluateFeynman(String(user?.id || 'anonymous'), body); } + @Get('jobs/:id') + @ApiOperation({ summary: '查询 AI 分析任务状态' }) + async getJobStatus(@Param('id') id: string) { + return this.service.getJobStatus(id); + } + @Get(':id') @ApiOperation({ summary: '获取分析结果' }) async findOne(@Param('id') id: string) { diff --git a/src/modules/ai-analysis/ai-analysis.repository.ts b/src/modules/ai-analysis/ai-analysis.repository.ts index 0556d7b..3cb832a 100644 --- a/src/modules/ai-analysis/ai-analysis.repository.ts +++ b/src/modules/ai-analysis/ai-analysis.repository.ts @@ -5,11 +5,39 @@ import { PrismaService } from '../../infrastructure/database/prisma.service'; export class AiAnalysisRepository { constructor(private readonly prisma: PrismaService) {} - async createResult(userId: string, result: Record) { + async createJob(userId: string, jobType: string, sessionId?: string, answerId?: string) { + return this.prisma.aiAnalysisJob.create({ + data: { + userId, + jobType, + sessionId: sessionId ?? null, + answerId: answerId ?? null, + status: 'pending', + queuedAt: new Date(), + }, + }); + } + + async updateJobStatus(id: string, status: string, errorMessage?: string) { + const data: Record = { status }; + if (status === 'processing') data.startedAt = new Date(); + if (status === 'completed' || status === 'failed') data.completedAt = new Date(); + if (errorMessage) data.errorMessage = errorMessage; + return this.prisma.aiAnalysisJob.update({ where: { id }, data }); + } + + async findJobById(id: string) { + return this.prisma.aiAnalysisJob.findUnique({ + where: { id }, + include: { results: true }, + }); + } + + async createResult(userId: string, jobId: string, result: Record) { return this.prisma.aiAnalysisResult.create({ data: { userId, - jobId: '', + jobId, summary: result.summary ?? '', masteryScore: result.score ?? null, strengths: (result.strengths ?? []) as any, diff --git a/src/modules/ai-analysis/ai-analysis.service.ts b/src/modules/ai-analysis/ai-analysis.service.ts index fea115b..1ee7641 100644 --- a/src/modules/ai-analysis/ai-analysis.service.ts +++ b/src/modules/ai-analysis/ai-analysis.service.ts @@ -1,48 +1,69 @@ -import { Injectable, Logger } from '@nestjs/common'; -import { ActiveRecallAnalysisWorkflow } from '../ai/workflows/active-recall-analysis.workflow'; -import { FeynmanEvaluationWorkflow } from '../ai/workflows/feynman-evaluation.workflow'; +import { Injectable, NotFoundException } from '@nestjs/common'; import { AiAnalysisRepository } from './ai-analysis.repository'; +import { QueueService } from '../../infrastructure/queue/queue.service'; @Injectable() export class AiAnalysisService { - private readonly logger = new Logger(AiAnalysisService.name); - constructor( - private readonly workflow: ActiveRecallAnalysisWorkflow, - private readonly feynmanWorkflow: FeynmanEvaluationWorkflow, private readonly repository: AiAnalysisRepository, + private readonly queue: QueueService, ) {} async analyze(userId: string, input: { questionText: string; knowledgeItemContent: string; userAnswer: string; + sessionId?: string; + answerId?: string; }) { - const result = await this.workflow.execute({ + const job = await this.repository.createJob(userId, 'active-recall', input.sessionId, input.answerId); + + await this.queue.add('ai-analysis', { + jobId: job.id, userId, + type: 'active-recall', questionText: input.questionText, knowledgeItemContent: input.knowledgeItemContent, userAnswer: input.userAnswer, }); - const saved = await this.repository.createResult(userId, result); - return { resultId: saved.id, ...result }; + return { jobId: job.id, status: 'queued' }; } async evaluateFeynman(userId: string, input: { knowledgeItemTitle: string; knowledgeItemContent: string; userExplanation: string; + sessionId?: string; + answerId?: string; }) { - const result = await this.feynmanWorkflow.execute({ + const job = await this.repository.createJob(userId, 'feynman-evaluation', input.sessionId, input.answerId); + + await this.queue.add('ai-analysis', { + jobId: job.id, userId, + type: 'feynman-evaluation', knowledgeItemTitle: input.knowledgeItemTitle, knowledgeItemContent: input.knowledgeItemContent, userExplanation: input.userExplanation, }); - const saved = await this.repository.createResult(userId, result); - return { resultId: saved.id, ...result }; + return { jobId: job.id, status: 'queued' }; + } + + async getJobStatus(id: string) { + const job = await this.repository.findJobById(id); + if (!job) throw new NotFoundException('任务不存在'); + return { + id: job.id, + type: job.jobType, + status: job.status, + queuedAt: job.queuedAt, + startedAt: job.startedAt, + completedAt: job.completedAt, + errorMessage: job.errorMessage, + results: job.results, + }; } async getResult(id: string) { diff --git a/src/modules/document-import/document-import.module.ts b/src/modules/document-import/document-import.module.ts index 9c0b4c5..fe458a9 100644 --- a/src/modules/document-import/document-import.module.ts +++ b/src/modules/document-import/document-import.module.ts @@ -1,12 +1,9 @@ import { Module } from '@nestjs/common'; -import { AiModule } from '../ai/ai.module'; -import { KnowledgeItemsModule } from '../knowledge-items/knowledge-items.module'; import { DocumentImportController } from './document-import.controller'; import { DocumentImportService } from './document-import.service'; import { DocumentImportRepository } from './document-import.repository'; @Module({ - imports: [AiModule, KnowledgeItemsModule], controllers: [DocumentImportController], providers: [DocumentImportService, DocumentImportRepository], exports: [DocumentImportService, DocumentImportRepository], diff --git a/src/modules/document-import/document-import.service.ts b/src/modules/document-import/document-import.service.ts index a262ad2..af72c16 100644 --- a/src/modules/document-import/document-import.service.ts +++ b/src/modules/document-import/document-import.service.ts @@ -1,18 +1,12 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { DocumentImportRepository } from './document-import.repository'; -import { KnowledgeItemsRepository } from '../knowledge-items/knowledge-items.repository'; -import { KnowledgeImportWorkflow } from '../ai/workflows/knowledge-import.workflow'; import { RedisService } from '../../infrastructure/redis/redis.service'; import { QueueService } from '../../infrastructure/queue/queue.service'; @Injectable() export class DocumentImportService { - private readonly logger = new Logger(DocumentImportService.name); - constructor( private readonly repository: DocumentImportRepository, - private readonly knowledgeItemsRepo: KnowledgeItemsRepository, - private readonly workflow: KnowledgeImportWorkflow, private readonly redis: RedisService, private readonly queue: QueueService, ) {} @@ -36,7 +30,7 @@ export class DocumentImportService { await this.redis.set(`job:document-import:${job.id}:progress`, '0', 86400); await this.redis.set(`job:document-import:${job.id}:message`, '任务已加入队列', 86400); - this.queue.add('document-import', { + await this.queue.add('document-import', { importId: job.id, userId: dto.userId || 'anonymous', knowledgeBaseId: dto.knowledgeBaseId, @@ -44,68 +38,10 @@ export class DocumentImportService { fileName: dto.fileName, }); - this.processImport(job, dto.rawText, dto.knowledgeBaseId, lockKey, lockToken); + // Release the lock — the worker will re-lock if needed + await this.redis.unlock(lockKey, lockToken); - return job; - } - - private async processImport( - job: { id: string; userId?: string }, - rawText: string | undefined, - knowledgeBaseId: string | undefined, - lockKey: string, - lockToken: string, - ) { - try { - if (!rawText) { - await this.repository.updateStatus(job.id, 'completed'); - await this.redis.set(`job:document-import:${job.id}:status`, 'completed', 86400); - await this.redis.set(`job:document-import:${job.id}:progress`, '100', 86400); - await this.redis.set(`job:document-import:${job.id}:message`, '无需解析的空文件', 86400); - await this.redis.unlock(lockKey, lockToken); - return; - } - - await this.repository.updateStatus(job.id, 'processing'); - await this.redis.set(`job:document-import:${job.id}:status`, 'parsing', 86400); - await this.redis.set(`job:document-import:${job.id}:progress`, '25', 86400); - await this.redis.set(`job:document-import:${job.id}:message`, 'AI 正在分析文本,提取知识点...', 86400); - - const result = await this.workflow.execute({ - userId: job.userId || 'anonymous', - rawText, - sourceName: undefined, - }); - - await this.redis.set(`job:document-import:${job.id}:status`, 'saving', 86400); - await this.redis.set(`job:document-import:${job.id}:progress`, '80', 86400); - await this.redis.set(`job:document-import:${job.id}:message`, `正在保存 ${result.knowledgePoints.length} 个知识点...`, 86400); - - if (knowledgeBaseId && result.knowledgePoints.length > 0) { - for (let i = 0; i < result.knowledgePoints.length; i++) { - const kp = result.knowledgePoints[i]; - await this.knowledgeItemsRepo.create(job.userId || 'anonymous', knowledgeBaseId, { - title: kp.title, - content: kp.content, - itemType: 'lesson', - orderIndex: kp.suggestedOrder ?? i + 1, - }); - } - } - - await this.repository.updateStatus(job.id, 'completed'); - await this.redis.set(`job:document-import:${job.id}:status`, 'completed', 86400); - await this.redis.set(`job:document-import:${job.id}:progress`, '100', 86400); - await this.redis.set(`job:document-import:${job.id}:message`, `成功提取 ${result.knowledgePoints.length} 个知识点`, 86400); - await this.redis.unlock(lockKey, lockToken); - this.logger.log(`Import ${job.id} completed: ${result.knowledgePoints.length} knowledge points`); - } catch (error: any) { - this.logger.error(`Import ${job.id} failed: ${error.message}`); - await this.repository.updateStatus(job.id, 'failed'); - await this.redis.set(`job:document-import:${job.id}:status`, 'failed', 86400); - await this.redis.set(`job:document-import:${job.id}:message`, `导入失败: ${error.message}`, 86400); - await this.redis.unlock(lockKey, lockToken); - } + return { jobId: job.id, status: 'queued' }; } async getStatus(id: string) { diff --git a/src/workers/ai-analysis.worker.ts b/src/workers/ai-analysis.worker.ts index abbfe0d..b23df34 100644 --- a/src/workers/ai-analysis.worker.ts +++ b/src/workers/ai-analysis.worker.ts @@ -3,6 +3,7 @@ import { Logger } from '@nestjs/common'; import { Job } from 'bullmq'; import { QUEUE_AI_ANALYSIS } from '../infrastructure/queue/queue.service'; import { ActiveRecallAnalysisWorkflow } from '../modules/ai/workflows/active-recall-analysis.workflow'; +import { FeynmanEvaluationWorkflow } from '../modules/ai/workflows/feynman-evaluation.workflow'; import { AiAnalysisRepository } from '../modules/ai-analysis/ai-analysis.repository'; @Processor(QUEUE_AI_ANALYSIS) @@ -10,22 +11,59 @@ export class AiAnalysisWorker extends WorkerHost { private readonly logger = new Logger(AiAnalysisWorker.name); constructor( - private readonly workflow: ActiveRecallAnalysisWorkflow, + private readonly recallWorkflow: ActiveRecallAnalysisWorkflow, + private readonly feynmanWorkflow: FeynmanEvaluationWorkflow, private readonly repository: AiAnalysisRepository, ) { super(); } async process(job: Job<{ + jobId: string; userId: string; - questionText: string; - knowledgeItemContent: string; - userAnswer: string; + type: 'active-recall' | 'feynman-evaluation'; + // active-recall fields + questionText?: string; + knowledgeItemContent?: string; + userAnswer?: string; + // feynman fields + knowledgeItemTitle?: string; + userExplanation?: string; }>) { - this.logger.log(`Processing AI analysis job ${job.id}`); - const result = await this.workflow.execute(job.data); - await this.repository.createResult(job.data.userId, result); - this.logger.log(`AI analysis job ${job.id} completed, score=${result.score}`); - return result; + const { jobId, userId, type, knowledgeItemContent } = job.data; + this.logger.log(`Processing AI analysis job ${job.id}, dbJobId=${jobId}, type=${type}`); + + try { + await this.repository.updateJobStatus(jobId, 'processing'); + + if (type === 'feynman-evaluation') { + const result = await this.feynmanWorkflow.execute({ + userId, + knowledgeItemTitle: job.data.knowledgeItemTitle || '', + knowledgeItemContent: knowledgeItemContent || '', + userExplanation: job.data.userExplanation || '', + }); + await this.repository.createResult(userId, jobId, result); + await this.repository.updateJobStatus(jobId, 'completed'); + this.logger.log(`AI analysis job ${job.id} completed (feynman), score=${result.score}`); + return result; + } + + // active-recall (default) + const result = await this.recallWorkflow.execute({ + userId, + questionText: job.data.questionText || '', + knowledgeItemContent: knowledgeItemContent || '', + userAnswer: job.data.userAnswer || '', + }); + await this.repository.createResult(userId, jobId, result); + await this.repository.updateJobStatus(jobId, 'completed'); + this.logger.log(`AI analysis job ${job.id} completed (recall), score=${result.score}`); + return result; + } catch (err: any) { + this.logger.error(`AI analysis job ${job.id} failed: ${err.message}`); + await this.repository.updateJobStatus(jobId, 'failed', err.message); + throw err; + } } } diff --git a/src/workers/document-import.worker.ts b/src/workers/document-import.worker.ts index fccc010..8da2dd7 100644 --- a/src/workers/document-import.worker.ts +++ b/src/workers/document-import.worker.ts @@ -3,6 +3,9 @@ import { Logger } from '@nestjs/common'; import { Job } from 'bullmq'; import { QUEUE_DOCUMENT_IMPORT } from '../infrastructure/queue/queue.service'; import { DocumentImportRepository } from '../modules/document-import/document-import.repository'; +import { KnowledgeItemsRepository } from '../modules/knowledge-items/knowledge-items.repository'; +import { KnowledgeImportWorkflow } from '../modules/ai/workflows/knowledge-import.workflow'; +import { RedisService } from '../infrastructure/redis/redis.service'; @Processor(QUEUE_DOCUMENT_IMPORT) export class DocumentImportWorker extends WorkerHost { @@ -10,22 +13,69 @@ export class DocumentImportWorker extends WorkerHost { constructor( private readonly repository: DocumentImportRepository, + private readonly knowledgeItemsRepo: KnowledgeItemsRepository, + private readonly workflow: KnowledgeImportWorkflow, + private readonly redis: RedisService, ) { super(); } - async process(job: Job<{ importId: string; userId: string }>) { - this.logger.log(`Processing document import job ${job.id}, importId=${job.data.importId}`); - await this.repository.updateStatus(job.data.importId, 'processing'); + async process(job: Job<{ + importId: string; + userId: string; + knowledgeBaseId?: string; + rawText?: string; + fileName?: string; + }>) { + const { importId, userId, knowledgeBaseId, rawText, fileName } = job.data; + this.logger.log(`Processing document import job ${job.id}, importId=${importId}`); try { - // TODO: actual file parsing + AI knowledge generation - await new Promise((resolve) => setTimeout(resolve, 1000)); - await this.repository.updateStatus(job.data.importId, 'completed'); - this.logger.log(`Document import job ${job.id} completed`); + if (!rawText) { + await this.repository.updateStatus(importId, 'completed'); + await this.redis.set(`job:document-import:${importId}:status`, 'completed', 86400); + await this.redis.set(`job:document-import:${importId}:progress`, '100', 86400); + await this.redis.set(`job:document-import:${importId}:message`, '无需解析的空文件', 86400); + return; + } + + await this.repository.updateStatus(importId, 'processing'); + await this.redis.set(`job:document-import:${importId}:status`, 'parsing', 86400); + await this.redis.set(`job:document-import:${importId}:progress`, '25', 86400); + await this.redis.set(`job:document-import:${importId}:message`, 'AI 正在分析文本,提取知识点...', 86400); + + const result = await this.workflow.execute({ + userId, + rawText, + sourceName: fileName, + }); + + await this.redis.set(`job:document-import:${importId}:status`, 'saving', 86400); + await this.redis.set(`job:document-import:${importId}:progress`, '80', 86400); + await this.redis.set(`job:document-import:${importId}:message`, `正在保存 ${result.knowledgePoints.length} 个知识点...`, 86400); + + if (knowledgeBaseId && result.knowledgePoints.length > 0) { + for (let i = 0; i < result.knowledgePoints.length; i++) { + const kp = result.knowledgePoints[i]; + await this.knowledgeItemsRepo.create(userId, knowledgeBaseId, { + title: kp.title, + content: kp.content, + itemType: 'lesson', + orderIndex: kp.suggestedOrder ?? i + 1, + }); + } + } + + await this.repository.updateStatus(importId, 'completed'); + await this.redis.set(`job:document-import:${importId}:status`, 'completed', 86400); + await this.redis.set(`job:document-import:${importId}:progress`, '100', 86400); + await this.redis.set(`job:document-import:${importId}:message`, `成功提取 ${result.knowledgePoints.length} 个知识点`, 86400); + this.logger.log(`Document import job ${job.id} completed: ${result.knowledgePoints.length} knowledge points`); } catch (err: any) { this.logger.error(`Document import job ${job.id} failed: ${err.message}`); - await this.repository.updateStatus(job.data.importId, 'failed'); + await this.repository.updateStatus(importId, 'failed'); + await this.redis.set(`job:document-import:${importId}:status`, 'failed', 86400); + await this.redis.set(`job:document-import:${importId}:message`, `导入失败: ${err.message}`, 86400); throw err; } }