diff --git a/src/modules/admin-ai-chat/admin-ai-chat.controller.ts b/src/modules/admin-ai-chat/admin-ai-chat.controller.ts index 6d0c990..895151c 100644 --- a/src/modules/admin-ai-chat/admin-ai-chat.controller.ts +++ b/src/modules/admin-ai-chat/admin-ai-chat.controller.ts @@ -1,11 +1,12 @@ import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger'; -import { Controller, Post, Get, Body, Req, UseGuards } from '@nestjs/common'; +import { Controller, Post, Get, Body, Req, Res, UseGuards } from '@nestjs/common'; import { AdminAiChatService } from './admin-ai-chat.service'; import { AiChatDto } from './dto/ai-chat.dto'; import { AdminAuthGuard } from '../../common/guards/admin-auth.guard'; import { AdminRolesGuard } from '../../common/guards/admin-roles.guard'; import { AdminRoles } from '../../common/decorators/admin-roles.decorator'; import type { AdminRole } from '../../common/types/admin-role.enum'; +import type { Request, Response } from 'express'; @ApiTags('admin-ai-chat') @Controller('admin-api/ai') @@ -16,11 +17,27 @@ export class AdminAiChatController { @Post('chat') @AdminRoles('SUPER_ADMIN' as AdminRole) @ApiBearerAuth() - @ApiOperation({ summary: 'AI 对话(仅超级管理员)' }) - async chat(@Body() dto: AiChatDto, @Req() req: any) { + @ApiOperation({ summary: 'AI 对话(非流式,兼容旧版)' }) + async chat(@Body() dto: AiChatDto, @Req() req: Request & { adminUser: any }) { return this.aiChatService.chat(dto, req.adminUser.id); } + @Post('chat/stream') + @AdminRoles('SUPER_ADMIN' as AdminRole) + @ApiBearerAuth() + @ApiOperation({ summary: 'AI 对话(SSE 流式,支持 run 事件)' }) + async streamChat(@Body() dto: AiChatDto, @Req() req: Request & { adminUser: any }, @Res() res: Response) { + return this.aiChatService.streamChat(dto, req.adminUser.id, res); + } + + @Post('chat/stop') + @AdminRoles('SUPER_ADMIN' as AdminRole) + @ApiBearerAuth() + @ApiOperation({ summary: '停止正在运行的 AI 任务' }) + async stopChat(@Body('runId') runId: string) { + return this.aiChatService.stopRun(runId); + } + @Get('dashboard') @AdminRoles('SUPER_ADMIN' as AdminRole) @ApiBearerAuth() diff --git a/src/modules/admin-ai-chat/admin-ai-chat.service.ts b/src/modules/admin-ai-chat/admin-ai-chat.service.ts index 6385c7a..fa99b0e 100644 --- a/src/modules/admin-ai-chat/admin-ai-chat.service.ts +++ b/src/modules/admin-ai-chat/admin-ai-chat.service.ts @@ -1,13 +1,29 @@ import { Injectable, Logger } from '@nestjs/common'; import type { AiChatDto } from './dto/ai-chat.dto'; import { AdminConversationService } from '../admin-conversation/admin-conversation.service'; +import type { Response } from 'express'; -const HERMES_API_URL = 'http://10.2.0.7:8642/v1/chat/completions'; +const HERMES_API_URL = 'http://10.2.0.7:8642'; const HERMES_API_KEY = 'zhixi-hermes-key-2026'; +interface HermesRun { + run_id: string; + status: string; +} +interface HermesSSEEvent { + event: string; + run_id: string; + delta?: string; + text?: string; + output?: string; + usage?: { input_tokens: number; output_tokens: number; total_tokens: number }; + error?: { message: string }; +} + @Injectable() export class AdminAiChatService { private readonly logger = new Logger(AdminAiChatService.name); + private activeRuns = new Map(); constructor(private readonly conversationService: AdminConversationService) {} @@ -15,22 +31,16 @@ export class AdminAiChatService { const sessionId = dto.conversationId ? await this.conversationService.getSessionId(dto.conversationId, adminUserId) : null; - - // Auto-create conversation if none provided const conversationId = dto.conversationId ?? (await this.conversationService.create(adminUserId)).id; - // Save user message const userMsg = dto.messages[dto.messages.length - 1]; - if (userMsg && userMsg.role === 'user') { + if (userMsg?.role === 'user') { await this.conversationService.saveMessage(conversationId, 'user', userMsg.content); } const result = await this.callHermes(dto.messages, sessionId); - - // Save assistant reply await this.conversationService.saveMessage(conversationId, 'assistant', result.content); - return { ...result, conversationId }; } @@ -43,36 +53,144 @@ export class AdminAiChatService { 'Content-Type': 'application/json', Authorization: 'Bearer ' + HERMES_API_KEY, }; - if (sessionId) { - headers['X-Hermes-Session-Id'] = sessionId; - } + if (sessionId) headers['X-Hermes-Session-Id'] = sessionId; - const resp = await fetch(HERMES_API_URL, { - method: 'POST', - headers, - body: JSON.stringify({ - model: 'hermes-agent', - messages, - temperature: 0.7, - max_tokens: 4096, - }), + const resp = await fetch(`${HERMES_API_URL}/v1/chat/completions`, { + method: 'POST', headers, + body: JSON.stringify({ model: 'hermes-agent', messages, temperature: 0.7, max_tokens: 4096 }), signal: AbortSignal.timeout(120_000), }); - - if (!resp.ok) { - const text = await resp.text().catch(() => ''); - throw new Error(`Hermes API error ${resp.status}: ${text}`); - } - + if (!resp.ok) throw new Error(`Hermes API error ${resp.status}`); const data = await resp.json(); const content = data.choices?.[0]?.message?.content || ''; const usage = data.usage || {}; - - this.logger.log('Hermes chat: ' + (Date.now() - start) + 'ms, tokens: ' + - (usage.prompt_tokens || 0) + '/' + (usage.completion_tokens || 0)); + this.logger.log(`Chat: ${Date.now() - start}ms, tokens: ${usage.prompt_tokens || 0}/${usage.completion_tokens || 0}`); return { content, usage: { model: 'hermes-agent', inputTokens: usage.prompt_tokens, outputTokens: usage.completion_tokens } }; } + // ── Runs + SSE ──────────────────────────────────────────── + + async streamChat(dto: AiChatDto, adminUserId: string, res: Response) { + const sessionId = dto.conversationId + ? await this.conversationService.getSessionId(dto.conversationId, adminUserId) + : null; + const conversationId = dto.conversationId + ?? (await this.conversationService.create(adminUserId)).id; + + // Save user message + const userMsg = dto.messages[dto.messages.length - 1]; + if (userMsg?.role === 'user') { + await this.conversationService.saveMessage(conversationId, 'user', userMsg.content); + } + + // Set up SSE response + res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + }); + res.write(`data: ${JSON.stringify({ event: 'meta', conversationId })}\n\n`); + + try { + // 1. Create run + const headers: Record = { + 'Content-Type': 'application/json', + Authorization: 'Bearer ' + HERMES_API_KEY, + }; + if (sessionId) headers['X-Hermes-Session-Id'] = sessionId; + + const runResp = await fetch(`${HERMES_API_URL}/v1/runs`, { + method: 'POST', headers, + body: JSON.stringify({ + input: dto.messages.map(m => ({ role: m.role, content: m.content })), + }), + }); + if (!runResp.ok) { + const err = await runResp.text().catch(() => ''); + throw new Error(`Run creation failed ${runResp.status}: ${err}`); + } + const run = await runResp.json() as HermesRun; + const runId = run.run_id; + + // 2. Stream events + const abortController = new AbortController(); + this.activeRuns.set(runId, abortController); + + const eventResp = await fetch(`${HERMES_API_URL}/v1/runs/${runId}/events`, { + headers: { Authorization: 'Bearer ' + HERMES_API_KEY }, + signal: abortController.signal, + }); + + const reader = eventResp.body?.getReader(); + if (!reader) throw new Error('No response body'); + + const decoder = new TextDecoder(); + let buffer = ''; + let fullResponse = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + for (const line of lines) { + if (line.startsWith('data: ')) { + try { + const event: HermesSSEEvent = JSON.parse(line.slice(6)); + // Forward to client + res.write(`data: ${JSON.stringify({ ...event, runId })}\n\n`); + + if (event.event === 'message.delta' && event.delta) { + fullResponse += event.delta; + } else if (event.event === 'run.completed') { + fullResponse = event.output || fullResponse; + } else if (event.event === 'run.error' || event.event === 'error') { + throw new Error(event.error?.message || 'Run error'); + } + } catch (e: any) { + if (e.message?.includes('Run error') || e.message?.includes('error')) throw e; + // Ignore parse errors on individual lines + } + } + } + } + + // Save assistant message + if (fullResponse) { + await this.conversationService.saveMessage(conversationId, 'assistant', fullResponse); + } + + res.write(`data: ${JSON.stringify({ event: 'done', conversationId })}\n\n`); + } catch (err: any) { + if (err.name === 'AbortError') { + res.write(`data: ${JSON.stringify({ event: 'stopped' })}\n\n`); + } else { + this.logger.error('Stream error: ' + err.message); + res.write(`data: ${JSON.stringify({ event: 'error', error: err.message })}\n\n`); + } + } finally { + res.end(); + } + } + + async stopRun(runId: string) { + const controller = this.activeRuns.get(runId); + if (controller) { + controller.abort(); + this.activeRuns.delete(runId); + } + // Also tell Hermes + try { + await fetch(`${HERMES_API_URL}/v1/runs/${runId}/stop`, { + method: 'POST', + headers: { Authorization: 'Bearer ' + HERMES_API_KEY }, + }); + } catch { /* best effort */ } + return { success: true }; + } + getDashboardConfig() { return { url: 'http://10.2.0.7:9119',