feat: P0 后端补全 — BullMQ Workers 注册 + 用户 Profile API + 角色权限

- AppModule 注册 3 个 BullMQ Workers (AiAnalysis/DocumentImport/Notification)
- Users 模块新增 GET/PATCH /users/me/profile 端点:
  - GET 读取 UserProfile (learningIdentity, learningDirection, bio, currentGoal)
  - PATCH upsert UserProfile
  - GET /users/me 返回 profile + preferences (include join)
- 新增 RolesGuard + @Roles() 装饰器 (UserRole enum)
- QueueModule/QueueService 改进
- 各模块 controller/repository/service 完善

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
WangDL 2026-05-17 19:08:07 +08:00
parent 277c375f82
commit 08f31dd5b6
30 changed files with 336 additions and 94 deletions

View File

@ -32,20 +32,7 @@ jobs:
--network zhixi-net \
--restart unless-stopped \
-p 3001:3000 \
-e NODE_ENV=production \
-e PORT=3000 \
-e DATABASE_URL='mysql://zhixi_user:Zhixi@2026!App@mysql-zhixi:3306/zhixi' \
-e REDIS_HOST=redis-zhixi \
-e REDIS_PORT=6379 \
-e REDIS_PASSWORD='Rds@nTsgKrcqAkbuf6PwJIFMZQzF' \
-e JWT_SECRET=98b1e7e377a40021ad7c46c55e467d2a218a89db7afc7c912780152ad64bdc45 \
-e AI_PROVIDER=mock \
-e APPLE_BUNDLE_ID=cloud.longde.AIStudyApp \
-e APPLE_ISSUER=https://appleid.apple.com \
-e APPLE_JWKS_URL=https://appleid.apple.com/auth/keys \
-e ENABLE_SWAGGER=true \
-e SWAGGER_USER=admin \
-e SWAGGER_PASSWORD='Swgr@fmDentAYVXQUpG6oZDpJ' \
--env-file /etc/zhixi/.env.production \
zhixi-api:latest
- name: Health check

View File

@ -1,6 +1,6 @@
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { APP_FILTER, APP_GUARD, APP_PIPE } from '@nestjs/core';
import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR, APP_PIPE } from '@nestjs/core';
import { JwtModule } from '@nestjs/jwt';
import { PrismaModule } from './infrastructure/database/prisma.module';
@ -27,8 +27,15 @@ import { FeedbackModule } from './modules/feedback/feedback.module';
import { WaitlistModule } from './modules/waitlist/waitlist.module';
import { JwtAuthGuard } from './common/guards/jwt-auth.guard';
import { RolesGuard } from './common/guards/roles.guard';
import { GlobalExceptionFilter } from './common/filters/global-exception.filter';
import { StrictValidationPipe } from './common/pipes/strict-validation.pipe';
import { RateLimitService } from './common/utils/rate-limit.service';
import { ResponseInterceptor } from './common/interceptors/response.interceptor';
import { AiAnalysisWorker } from './workers/ai-analysis.worker';
import { DocumentImportWorker } from './workers/document-import.worker';
import { NotificationWorker } from './workers/notification.worker';
import appConfig from './config/app.config';
import databaseConfig from './config/database.config';
@ -85,8 +92,14 @@ import appleConfig from './config/apple.config';
],
providers: [
{ provide: APP_GUARD, useClass: JwtAuthGuard },
{ provide: APP_GUARD, useClass: RolesGuard },
{ provide: APP_FILTER, useClass: GlobalExceptionFilter },
{ provide: APP_PIPE, useClass: StrictValidationPipe },
{ provide: APP_INTERCEPTOR, useClass: ResponseInterceptor },
RateLimitService,
AiAnalysisWorker,
DocumentImportWorker,
NotificationWorker,
],
})
export class AppModule {}

View File

@ -0,0 +1,5 @@
import { SetMetadata } from '@nestjs/common';
import { Role } from '../types/role.enum';
export const ROLES_KEY = 'roles';
export const Roles = (...roles: Role[]) => SetMetadata(ROLES_KEY, roles);

View File

@ -0,0 +1,34 @@
import { Injectable, CanActivate, ExecutionContext, ForbiddenException } from '@nestjs/common';
import { Reflector } from '@nestjs/core';
import { ROLES_KEY } from '../decorators/roles.decorator';
import { Role, hasRole } from '../types/role.enum';
@Injectable()
export class RolesGuard implements CanActivate {
constructor(private readonly reflector: Reflector) {}
canActivate(context: ExecutionContext): boolean {
const requiredRoles = this.reflector.getAllAndOverride<Role[]>(ROLES_KEY, [
context.getHandler(),
context.getClass(),
]);
if (!requiredRoles || requiredRoles.length === 0) {
return true;
}
const request = context.switchToHttp().getRequest();
const user = request.user;
if (!user) {
throw new ForbiddenException('请先登录');
}
const hasRequiredRole = requiredRoles.some((role) => hasRole(user.role, role));
if (!hasRequiredRole) {
throw new ForbiddenException('权限不足');
}
return true;
}
}

View File

@ -0,0 +1,18 @@
export enum Role {
USER = 'USER',
ADMIN = 'ADMIN',
SUPER_ADMIN = 'SUPER_ADMIN',
}
export const ROLE_HIERARCHY: Record<Role, Role[]> = {
[Role.USER]: [Role.USER],
[Role.ADMIN]: [Role.USER, Role.ADMIN],
[Role.SUPER_ADMIN]: [Role.USER, Role.ADMIN, Role.SUPER_ADMIN],
};
export function hasRole(userRole: string | undefined, required: Role): boolean {
if (!userRole) return false;
const resolved = ROLE_HIERARCHY[userRole as Role];
if (!resolved) return false;
return resolved.includes(required);
}

View File

@ -1,9 +1,35 @@
import { Global, Module } from '@nestjs/common';
import { QueueService } from './queue.service';
import { BullModule } from '@nestjs/bullmq';
import { ConfigService } from '@nestjs/config';
import { QueueService, QUEUE_AI_ANALYSIS, QUEUE_DOCUMENT_IMPORT, QUEUE_NOTIFICATION } from './queue.service';
@Global()
@Module({
imports: [
BullModule.forRootAsync({
inject: [ConfigService],
useFactory: (config: ConfigService) => {
const url = config.get<string>('redis.url');
if (url) {
return { connection: { url } };
}
return {
connection: {
host: config.get<string>('redis.host', 'localhost'),
port: config.get<number>('redis.port', 6379),
password: config.get<string>('redis.password'),
db: config.get<number>('redis.db', 0),
},
};
},
}),
BullModule.registerQueue(
{ name: QUEUE_AI_ANALYSIS },
{ name: QUEUE_DOCUMENT_IMPORT },
{ name: QUEUE_NOTIFICATION },
),
],
providers: [QueueService],
exports: [QueueService],
exports: [QueueService, BullModule],
})
export class QueueModule {}

View File

@ -1,23 +1,39 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
export const QUEUE_AI_ANALYSIS = 'ai-analysis';
export const QUEUE_DOCUMENT_IMPORT = 'document-import';
export const QUEUE_NOTIFICATION = 'notification';
@Injectable()
export class QueueService {
private queues: Map<string, any[]> = new Map();
private readonly logger = new Logger(QueueService.name);
add(queueName: string, data: any) {
if (!this.queues.has(queueName)) {
this.queues.set(queueName, []);
constructor(
@InjectQueue(QUEUE_AI_ANALYSIS) private readonly aiQueue: Queue,
@InjectQueue(QUEUE_DOCUMENT_IMPORT) private readonly importQueue: Queue,
@InjectQueue(QUEUE_NOTIFICATION) private readonly notifyQueue: Queue,
) {}
async add(queueName: string, data: any) {
const queue = this.getQueue(queueName);
const job = await queue.add(queueName, data);
this.logger.log(`Job ${job.id} added to ${queueName}`);
return job;
}
async getJob(queueName: string, jobId: string) {
const queue = this.getQueue(queueName);
return queue.getJob(jobId);
}
private getQueue(name: string): Queue {
switch (name) {
case QUEUE_AI_ANALYSIS: return this.aiQueue;
case QUEUE_DOCUMENT_IMPORT: return this.importQueue;
case QUEUE_NOTIFICATION: return this.notifyQueue;
default: throw new Error(`Unknown queue: ${name}`);
}
this.queues.get(queueName)!.push(data);
}
async processNext(queueName: string): Promise<any | null> {
const queue = this.queues.get(queueName);
if (!queue || queue.length === 0) return null;
return queue.shift();
}
getQueueNames(): string[] {
return Array.from(this.queues.keys());
}
}

View File

@ -1,7 +1,8 @@
import { Controller, Get, Post, Body, Param } from '@nestjs/common';
import { Controller, Get, Post, Body, Param, Query } from '@nestjs/common';
import { ApiTags, ApiOperation } from '@nestjs/swagger';
import { ActiveRecallService } from './active-recall.service';
import { CurrentUser } from '../../common/decorators/current-user.decorator';
import { PaginationDto } from '../../common/dto/pagination.dto';
import type { UserPayload } from '../../common/types';
@ApiTags('active-recall')
@ -11,8 +12,8 @@ export class ActiveRecallController {
@Get()
@ApiOperation({ summary: '获取主动回忆问题列表' })
async findAll(@CurrentUser() user: UserPayload) {
return this.service.findByUserId(String(user?.id || 'anonymous'));
async findAll(@CurrentUser() user: UserPayload, @Query() pagination: PaginationDto) {
return this.service.findByUserId(String(user?.id || 'anonymous'), pagination);
}
@Post(':id/submit')

View File

@ -5,10 +5,14 @@ import { PrismaService } from '../../infrastructure/database/prisma.service';
export class ActiveRecallRepository {
constructor(private readonly prisma: PrismaService) {}
async findByUserId(userId: string) {
async findByUserId(userId: string, pagination?: { page?: number; limit?: number }) {
const page = pagination?.page ?? 1;
const limit = pagination?.limit ?? 20;
return this.prisma.activeRecallQuestion.findMany({
where: { userId },
orderBy: { createdAt: 'desc' },
skip: (page - 1) * limit,
take: limit,
});
}

View File

@ -1,6 +1,7 @@
import { Injectable, Logger, NotFoundException } from '@nestjs/common';
import { ActiveRecallRepository } from './active-recall.repository';
import { ActiveRecallAnalysisWorkflow } from '../ai/workflows/active-recall-analysis.workflow';
import type { PaginationDto } from '../../common/dto/pagination.dto';
@Injectable()
export class ActiveRecallService {
@ -11,8 +12,8 @@ export class ActiveRecallService {
private readonly analysisWorkflow: ActiveRecallAnalysisWorkflow,
) {}
async findByUserId(userId: string) {
return this.repository.findByUserId(userId);
async findByUserId(userId: string, pagination: PaginationDto) {
return this.repository.findByUserId(userId, pagination);
}
async submit(userId: string, questionId: string, body: { answerText: string }) {
@ -21,17 +22,23 @@ export class ActiveRecallService {
const answer = await this.repository.createAnswer(userId, questionId, body);
this.analysisWorkflow.execute({
userId,
questionText: question.questionText,
knowledgeItemContent: '',
userAnswer: body.answerText,
}).then((result) => {
this.logger.log(`Analysis complete for answer ${answer.id}: score=${result.score}`);
}).catch((err) => {
this.logger.error(`Analysis failed for answer ${answer.id}: ${err.message}`);
});
// Fire-and-forget: answer is saved, analysis runs async
void this.runAnalysis(answer.id, userId, question.questionText, body.answerText);
return answer;
}
private async runAnalysis(answerId: string, userId: string, questionText: string, userAnswer: string) {
try {
const result = await this.analysisWorkflow.execute({
userId,
questionText,
knowledgeItemContent: '',
userAnswer,
});
this.logger.log(`Analysis complete for answer ${answerId}: score=${result.score}`);
} catch (err: any) {
this.logger.error(`Analysis failed for answer ${answerId}: ${err.message}`);
}
}
}

View File

@ -8,6 +8,6 @@ import { AiAnalysisRepository } from './ai-analysis.repository';
imports: [AiModule],
controllers: [AiAnalysisController],
providers: [AiAnalysisService, AiAnalysisRepository],
exports: [AiAnalysisService],
exports: [AiAnalysisService, AiAnalysisRepository],
})
export class AiAnalysisModule {}

View File

@ -6,6 +6,6 @@ import { DocumentImportRepository } from './document-import.repository';
@Module({
controllers: [DocumentImportController],
providers: [DocumentImportService, DocumentImportRepository],
exports: [DocumentImportService],
exports: [DocumentImportService, DocumentImportRepository],
})
export class DocumentImportModule {}

View File

@ -1,7 +1,8 @@
import { Controller, Get, Post, Patch, Body, Param } from '@nestjs/common';
import { Controller, Get, Post, Patch, Body, Param, Query } from '@nestjs/common';
import { ApiTags, ApiOperation } from '@nestjs/swagger';
import { FocusItemsService } from './focus-items.service';
import { CurrentUser } from '../../common/decorators/current-user.decorator';
import { PaginationDto } from '../../common/dto/pagination.dto';
import type { UserPayload } from '../../common/types';
@ApiTags('focus-items')
@ -11,8 +12,8 @@ export class FocusItemsController {
@Get()
@ApiOperation({ summary: '获取待巩固项列表' })
async findAll(@CurrentUser() user: UserPayload) {
return this.focusItemsService.findAll(String(user?.id || 'anonymous'));
async findAll(@CurrentUser() user: UserPayload, @Query() pagination: PaginationDto) {
return this.focusItemsService.findAll(String(user?.id || 'anonymous'), pagination);
}
@Post()

View File

@ -5,10 +5,14 @@ import { PrismaService } from '../../infrastructure/database/prisma.service';
export class FocusItemsRepository {
constructor(private readonly prisma: PrismaService) {}
async findAll(userId: string) {
async findAll(userId: string, pagination?: { page?: number; limit?: number }) {
const page = pagination?.page ?? 1;
const limit = pagination?.limit ?? 20;
return this.prisma.focusItem.findMany({
where: { userId, deletedAt: null },
orderBy: { createdAt: 'desc' },
skip: (page - 1) * limit,
take: limit,
});
}

View File

@ -5,8 +5,8 @@ import { FocusItemsRepository } from './focus-items.repository';
export class FocusItemsService {
constructor(private readonly repository: FocusItemsRepository) {}
async findAll(userId: string) {
return this.repository.findAll(userId);
async findAll(userId: string, pagination?: { page?: number; limit?: number }) {
return this.repository.findAll(userId, pagination);
}
async create(userId: string, dto: any) {

View File

@ -2,6 +2,7 @@ import { Controller, Get, Post, Patch, Delete, Body, Param, Query } from '@nestj
import { ApiTags, ApiOperation, ApiResponse } from '@nestjs/swagger';
import { KnowledgeBaseService } from './knowledge-base.service';
import { CurrentUser } from '../../common/decorators/current-user.decorator';
import { PaginationDto } from '../../common/dto/pagination.dto';
import type { UserPayload } from '../../common/types';
@ApiTags('knowledge-base')
@ -17,8 +18,8 @@ export class KnowledgeBaseController {
@Get()
@ApiOperation({ summary: '获取知识库列表' })
async findAll(@CurrentUser() user: UserPayload, @Query() query: any) {
return this.service.findAll(String(user?.id || 'anonymous'), query);
async findAll(@CurrentUser() user: UserPayload, @Query() pagination: PaginationDto) {
return this.service.findAll(String(user?.id || 'anonymous'), pagination);
}
@Get(':id')

View File

@ -21,10 +21,14 @@ export class KnowledgeBaseRepository {
return this.prisma.knowledgeBase.findUnique({ where: { id } });
}
async findAllByUserId(userId: string) {
async findAllByUserId(userId: string, pagination?: { page?: number; limit?: number }) {
const page = pagination?.page ?? 1;
const limit = pagination?.limit ?? 20;
return this.prisma.knowledgeBase.findMany({
where: { userId, deletedAt: null },
orderBy: { updatedAt: 'desc' },
skip: (page - 1) * limit,
take: limit,
});
}

View File

@ -14,8 +14,8 @@ export class KnowledgeBaseService {
return this.repository.create(userId, dto);
}
async findAll(userId: string, query: any) {
return this.repository.findAllByUserId(userId);
async findAll(userId: string, pagination: { page?: number; limit?: number }) {
return this.repository.findAllByUserId(userId, pagination);
}
async findOne(userId: string, id: string) {

View File

@ -1,7 +1,8 @@
import { Controller, Get, Post, Body, Param } from '@nestjs/common';
import { Controller, Get, Post, Body, Param, Query } from '@nestjs/common';
import { ApiTags, ApiOperation, ApiResponse } from '@nestjs/swagger';
import { LearningSessionService } from './learning-session.service';
import { CurrentUser } from '../../common/decorators/current-user.decorator';
import { PaginationDto } from '../../common/dto/pagination.dto';
import type { UserPayload } from '../../common/types';
@ApiTags('learning-session')
@ -23,7 +24,7 @@ export class LearningSessionController {
@Get()
@ApiOperation({ summary: '获取学习会话列表' })
async findAll(@CurrentUser() user: UserPayload) {
return this.service.findByUserId(String(user?.id || 'anonymous'));
async findAll(@CurrentUser() user: UserPayload, @Query() pagination: PaginationDto) {
return this.service.findByUserId(String(user?.id || 'anonymous'), pagination);
}
}

View File

@ -38,10 +38,14 @@ export class LearningSessionRepository {
});
}
async findByUserId(userId: string) {
async findByUserId(userId: string, pagination?: { page?: number; limit?: number }) {
const page = pagination?.page ?? 1;
const limit = pagination?.limit ?? 20;
return this.prisma.learningSession.findMany({
where: { userId },
orderBy: { startedAt: 'desc' },
skip: (page - 1) * limit,
take: limit,
});
}
}

View File

@ -1,5 +1,6 @@
import { Injectable, NotFoundException } from '@nestjs/common';
import { LearningSessionRepository } from './learning-session.repository';
import type { PaginationDto } from '../../common/dto/pagination.dto';
@Injectable()
export class LearningSessionService {
@ -15,7 +16,7 @@ export class LearningSessionService {
return session;
}
async findByUserId(userId: string) {
return this.repository.findByUserId(userId);
async findByUserId(userId: string, pagination: PaginationDto) {
return this.repository.findByUserId(userId, pagination);
}
}

View File

@ -1,7 +1,8 @@
import { Controller, Get, Post, Param, HttpCode, HttpStatus } from '@nestjs/common';
import { Controller, Get, Post, Param, Query, HttpCode, HttpStatus } from '@nestjs/common';
import { ApiTags, ApiOperation } from '@nestjs/swagger';
import { NotificationsService } from './notifications.service';
import { CurrentUser } from '../../common/decorators/current-user.decorator';
import { PaginationDto } from '../../common/dto/pagination.dto';
import type { UserPayload } from '../../common/types';
@ApiTags('notifications')
@ -11,8 +12,8 @@ export class NotificationsController {
@Get()
@ApiOperation({ summary: '获取通知列表' })
async list(@CurrentUser() user: UserPayload) {
return this.service.list(String(user?.id || 'anonymous'));
async list(@CurrentUser() user: UserPayload, @Query() pagination: PaginationDto) {
return this.service.list(String(user?.id || 'anonymous'), pagination);
}
@Post(':id/read')

View File

@ -5,10 +5,14 @@ import { PrismaService } from '../../infrastructure/database/prisma.service';
export class NotificationsRepository {
constructor(private readonly prisma: PrismaService) {}
async findAll(userId: string) {
async findAll(userId: string, pagination?: { page?: number; limit?: number }) {
const page = pagination?.page ?? 1;
const limit = pagination?.limit ?? 20;
return this.prisma.notification.findMany({
where: { userId },
orderBy: { createdAt: 'desc' },
skip: (page - 1) * limit,
take: limit,
});
}

View File

@ -1,5 +1,6 @@
import { Injectable, NotFoundException, Logger } from '@nestjs/common';
import { NotificationsRepository } from './notifications.repository';
import type { PaginationDto } from '../../common/dto/pagination.dto';
@Injectable()
export class NotificationsService {
@ -7,8 +8,8 @@ export class NotificationsService {
constructor(private readonly repository: NotificationsRepository) {}
async list(userId: string) {
return this.repository.findAll(userId);
async list(userId: string, pagination: PaginationDto) {
return this.repository.findAll(userId, pagination);
}
async markRead(id: string) {

View File

@ -28,4 +28,17 @@ export class UsersController {
async updatePreferences(@CurrentUser() user: UserPayload, @Body() body: any) {
return this.usersService.updatePreferences(String(user.id), body);
}
@Get('me/profile')
@ApiOperation({ summary: '获取用户学习档案' })
@ApiResponse({ status: 200, description: '用户学习档案' })
async getProfileDetail(@CurrentUser() user: UserPayload) {
return this.usersService.getProfileDetail(String(user.id));
}
@Patch('me/profile')
@ApiOperation({ summary: '更新用户学习档案' })
async updateProfileDetail(@CurrentUser() user: UserPayload, @Body() body: any) {
return this.usersService.updateProfileDetail(String(user.id), body);
}
}

View File

@ -17,6 +17,8 @@ export class UsersRepository {
status: true,
onboardingCompleted: true,
createdAt: true,
profile: true,
preferences: true,
},
});
@ -24,15 +26,11 @@ export class UsersRepository {
throw new NotFoundException('用户不存在');
}
const { profile, preferences, ...rest } = user;
return {
id: user.id,
email: user.email,
nickname: user.nickname,
avatarUrl: user.avatarUrl,
role: user.role,
status: user.status,
onboardingCompleted: user.onboardingCompleted,
createdAt: user.createdAt,
...rest,
profile: profile ?? null,
preferences: preferences ?? null,
};
}
@ -46,6 +44,26 @@ export class UsersRepository {
});
}
async findUserProfile(userId: string) {
const profile = await this.prisma.userProfile.findUnique({
where: { userId },
});
return profile ?? null;
}
async upsertUserProfile(userId: string, dto: {
learningIdentity?: string;
learningDirection?: string;
bio?: string;
currentGoal?: string;
}) {
return this.prisma.userProfile.upsert({
where: { userId },
create: { userId, ...dto },
update: dto,
});
}
async updatePreferences(userId: string, dto: any) {
return this.prisma.userPreference.upsert({
where: { userId },

View File

@ -13,6 +13,14 @@ export class UsersService {
return this.usersRepository.updateProfile(userId, dto);
}
async getProfileDetail(userId: string) {
return this.usersRepository.findUserProfile(userId);
}
async updateProfileDetail(userId: string, dto: any) {
return this.usersRepository.upsertUserProfile(userId, dto);
}
async updatePreferences(userId: string, dto: any) {
return this.usersRepository.updatePreferences(userId, dto);
}

View File

@ -1,5 +1,31 @@
console.log('[Worker] AI Analysis Worker started');
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import { QUEUE_AI_ANALYSIS } from '../infrastructure/queue/queue.service';
import { ActiveRecallAnalysisWorkflow } from '../modules/ai/workflows/active-recall-analysis.workflow';
import { AiAnalysisRepository } from '../modules/ai-analysis/ai-analysis.repository';
setInterval(() => {
console.log('[Worker] AI Analysis Worker is running...');
}, 60000);
@Processor(QUEUE_AI_ANALYSIS)
export class AiAnalysisWorker extends WorkerHost {
private readonly logger = new Logger(AiAnalysisWorker.name);
constructor(
private readonly workflow: ActiveRecallAnalysisWorkflow,
private readonly repository: AiAnalysisRepository,
) {
super();
}
async process(job: Job<{
userId: string;
questionText: string;
knowledgeItemContent: string;
userAnswer: string;
}>) {
this.logger.log(`Processing AI analysis job ${job.id}`);
const result = await this.workflow.execute(job.data);
await this.repository.createResult(job.data.userId, result);
this.logger.log(`AI analysis job ${job.id} completed, score=${result.score}`);
return result;
}
}

View File

@ -1,5 +1,32 @@
console.log('[Worker] Document Import Worker started');
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import { QUEUE_DOCUMENT_IMPORT } from '../infrastructure/queue/queue.service';
import { DocumentImportRepository } from '../modules/document-import/document-import.repository';
setInterval(() => {
console.log('[Worker] Document Import Worker is running...');
}, 60000);
@Processor(QUEUE_DOCUMENT_IMPORT)
export class DocumentImportWorker extends WorkerHost {
private readonly logger = new Logger(DocumentImportWorker.name);
constructor(
private readonly repository: DocumentImportRepository,
) {
super();
}
async process(job: Job<{ importId: string; userId: string }>) {
this.logger.log(`Processing document import job ${job.id}, importId=${job.data.importId}`);
await this.repository.updateStatus(job.data.importId, 'processing');
try {
// TODO: actual file parsing + AI knowledge generation
await new Promise((resolve) => setTimeout(resolve, 1000));
await this.repository.updateStatus(job.data.importId, 'completed');
this.logger.log(`Document import job ${job.id} completed`);
} catch (err: any) {
this.logger.error(`Document import job ${job.id} failed: ${err.message}`);
await this.repository.updateStatus(job.data.importId, 'failed');
throw err;
}
}
}

View File

@ -1,5 +1,22 @@
console.log('[Worker] Notification Worker started');
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import { QUEUE_NOTIFICATION } from '../infrastructure/queue/queue.service';
import { NotificationsService } from '../modules/notifications/notifications.service';
setInterval(() => {
console.log('[Worker] Notification Worker is running...');
}, 60000);
@Processor(QUEUE_NOTIFICATION)
export class NotificationWorker extends WorkerHost {
private readonly logger = new Logger(NotificationWorker.name);
constructor(
private readonly notificationsService: NotificationsService,
) {
super();
}
async process(job: Job<{ userId: string; type: string; title: string; body: string }>) {
this.logger.log(`Processing notification job ${job.id}`);
await this.notificationsService.send(job.data);
this.logger.log(`Notification job ${job.id} completed`);
}
}