diff --git a/src/modules/document-import/document-import.repository.ts b/src/modules/document-import/document-import.repository.ts index af14b0e..6c77302 100644 --- a/src/modules/document-import/document-import.repository.ts +++ b/src/modules/document-import/document-import.repository.ts @@ -5,13 +5,26 @@ import { PrismaService } from '../../infrastructure/database/prisma.service'; export class DocumentImportRepository { constructor(private readonly prisma: PrismaService) {} - async create(data: { userId?: string; fileName?: string; sourceType?: string }) { + async create(data: { + userId?: string; + knowledgeBaseId?: string; + sourceId?: string; + fileId?: string; + sourceType?: string; + sourceName?: string; + sourceUrl?: string; + status?: string; + }) { return this.prisma.documentImport.create({ data: { userId: data.userId ?? '', - sourceType: data.sourceType ?? 'upload', - sourceName: data.fileName ?? 'unknown', - status: 'pending', + knowledgeBaseId: data.knowledgeBaseId, + sourceId: data.sourceId, + fileId: data.fileId, + sourceType: data.sourceType ?? 'file', + sourceName: data.sourceName ?? 'unknown', + sourceUrl: data.sourceUrl, + status: data.status ?? 'QUEUED', }, }); } @@ -20,10 +33,66 @@ export class DocumentImportRepository { return this.prisma.documentImport.findUnique({ where: { id } }); } - async updateStatus(id: string, status: string) { - await this.prisma.documentImport.update({ + async updateStatus(id: string, status: string, data?: { + step?: string; + progress?: number; + errorCode?: string; + errorMessage?: string; + workerId?: string; + }) { + return this.prisma.documentImport.update({ where: { id }, - data: { status }, + data: { status, ...data }, + }); + } + + async claimNext(workerId: string) { + return this.prisma.documentImport.findFirst({ + where: { status: 'QUEUED' }, + orderBy: { createdAt: 'asc' }, + }); + } + + async claim(id: string, workerId: string) { + return this.prisma.documentImport.updateMany({ + where: { id, status: 'QUEUED' }, + data: { status: 'CLAIMED', workerId, heartbeatAt: new Date(), startedAt: new Date() }, + }); + } + + async heartbeat(id: string) { + return this.prisma.documentImport.update({ + where: { id }, + data: { heartbeatAt: new Date() }, + }); + } + + async findStaleJobs() { + const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000); + return this.prisma.documentImport.findMany({ + where: { + status: { in: ['CLAIMED', 'DOWNLOADING', 'PARSING', 'OCR_PROCESSING', 'VISION_PROCESSING', 'CLEANING', 'CHUNKING', 'EMBEDDING', 'INDEXING', 'GENERATING_CANDIDATES'] }, + heartbeatAt: { lt: fiveMinutesAgo }, + }, + }); + } + + async resetStaleJob(id: string, retryCount: number, maxRetries: number) { + const newStatus = retryCount >= maxRetries ? 'FAILED_FINAL' : 'QUEUED'; + return this.prisma.documentImport.update({ + where: { id }, + data: { + status: newStatus, + workerId: null, + retryCount: { increment: 1 }, + }, + }); + } + + async findBySourceId(sourceId: string) { + return this.prisma.documentImport.findFirst({ + where: { sourceId }, + orderBy: { createdAt: 'desc' }, }); } }