fix: 完善 DocumentImport 仓库,支持新字段
All checks were successful
Deploy API Server / build-and-deploy (push) Successful in 55s
All checks were successful
Deploy API Server / build-and-deploy (push) Successful in 55s
This commit is contained in:
parent
9c161db26b
commit
c149b96b04
@ -5,13 +5,26 @@ import { PrismaService } from '../../infrastructure/database/prisma.service';
|
||||
export class DocumentImportRepository {
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
|
||||
async create(data: { userId?: string; fileName?: string; sourceType?: string }) {
|
||||
async create(data: {
|
||||
userId?: string;
|
||||
knowledgeBaseId?: string;
|
||||
sourceId?: string;
|
||||
fileId?: string;
|
||||
sourceType?: string;
|
||||
sourceName?: string;
|
||||
sourceUrl?: string;
|
||||
status?: string;
|
||||
}) {
|
||||
return this.prisma.documentImport.create({
|
||||
data: {
|
||||
userId: data.userId ?? '',
|
||||
sourceType: data.sourceType ?? 'upload',
|
||||
sourceName: data.fileName ?? 'unknown',
|
||||
status: 'pending',
|
||||
knowledgeBaseId: data.knowledgeBaseId,
|
||||
sourceId: data.sourceId,
|
||||
fileId: data.fileId,
|
||||
sourceType: data.sourceType ?? 'file',
|
||||
sourceName: data.sourceName ?? 'unknown',
|
||||
sourceUrl: data.sourceUrl,
|
||||
status: data.status ?? 'QUEUED',
|
||||
},
|
||||
});
|
||||
}
|
||||
@ -20,10 +33,66 @@ export class DocumentImportRepository {
|
||||
return this.prisma.documentImport.findUnique({ where: { id } });
|
||||
}
|
||||
|
||||
async updateStatus(id: string, status: string) {
|
||||
await this.prisma.documentImport.update({
|
||||
async updateStatus(id: string, status: string, data?: {
|
||||
step?: string;
|
||||
progress?: number;
|
||||
errorCode?: string;
|
||||
errorMessage?: string;
|
||||
workerId?: string;
|
||||
}) {
|
||||
return this.prisma.documentImport.update({
|
||||
where: { id },
|
||||
data: { status },
|
||||
data: { status, ...data },
|
||||
});
|
||||
}
|
||||
|
||||
async claimNext(workerId: string) {
|
||||
return this.prisma.documentImport.findFirst({
|
||||
where: { status: 'QUEUED' },
|
||||
orderBy: { createdAt: 'asc' },
|
||||
});
|
||||
}
|
||||
|
||||
async claim(id: string, workerId: string) {
|
||||
return this.prisma.documentImport.updateMany({
|
||||
where: { id, status: 'QUEUED' },
|
||||
data: { status: 'CLAIMED', workerId, heartbeatAt: new Date(), startedAt: new Date() },
|
||||
});
|
||||
}
|
||||
|
||||
async heartbeat(id: string) {
|
||||
return this.prisma.documentImport.update({
|
||||
where: { id },
|
||||
data: { heartbeatAt: new Date() },
|
||||
});
|
||||
}
|
||||
|
||||
async findStaleJobs() {
|
||||
const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000);
|
||||
return this.prisma.documentImport.findMany({
|
||||
where: {
|
||||
status: { in: ['CLAIMED', 'DOWNLOADING', 'PARSING', 'OCR_PROCESSING', 'VISION_PROCESSING', 'CLEANING', 'CHUNKING', 'EMBEDDING', 'INDEXING', 'GENERATING_CANDIDATES'] },
|
||||
heartbeatAt: { lt: fiveMinutesAgo },
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async resetStaleJob(id: string, retryCount: number, maxRetries: number) {
|
||||
const newStatus = retryCount >= maxRetries ? 'FAILED_FINAL' : 'QUEUED';
|
||||
return this.prisma.documentImport.update({
|
||||
where: { id },
|
||||
data: {
|
||||
status: newStatus,
|
||||
workerId: null,
|
||||
retryCount: { increment: 1 },
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async findBySourceId(sourceId: string) {
|
||||
return this.prisma.documentImport.findFirst({
|
||||
where: { sourceId },
|
||||
orderBy: { createdAt: 'desc' },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user