feat: runs + SSE streaming proxy from Hermes /v1/runs
All checks were successful
Deploy API Server / build-and-deploy (push) Successful in 39s

This commit is contained in:
WangDL 2026-05-22 11:29:22 +08:00
parent aa0575b71b
commit c31725433d
2 changed files with 167 additions and 32 deletions

View File

@ -1,11 +1,12 @@
import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger';
import { Controller, Post, Get, Body, Req, UseGuards } from '@nestjs/common';
import { Controller, Post, Get, Body, Req, Res, UseGuards } from '@nestjs/common';
import { AdminAiChatService } from './admin-ai-chat.service';
import { AiChatDto } from './dto/ai-chat.dto';
import { AdminAuthGuard } from '../../common/guards/admin-auth.guard';
import { AdminRolesGuard } from '../../common/guards/admin-roles.guard';
import { AdminRoles } from '../../common/decorators/admin-roles.decorator';
import type { AdminRole } from '../../common/types/admin-role.enum';
import type { Request, Response } from 'express';
@ApiTags('admin-ai-chat')
@Controller('admin-api/ai')
@ -16,11 +17,27 @@ export class AdminAiChatController {
@Post('chat')
@AdminRoles('SUPER_ADMIN' as AdminRole)
@ApiBearerAuth()
@ApiOperation({ summary: 'AI 对话(仅超级管理员' })
async chat(@Body() dto: AiChatDto, @Req() req: any) {
@ApiOperation({ summary: 'AI 对话(非流式,兼容旧版' })
async chat(@Body() dto: AiChatDto, @Req() req: Request & { adminUser: any }) {
return this.aiChatService.chat(dto, req.adminUser.id);
}
@Post('chat/stream')
@AdminRoles('SUPER_ADMIN' as AdminRole)
@ApiBearerAuth()
@ApiOperation({ summary: 'AI 对话SSE 流式,支持 run 事件)' })
async streamChat(@Body() dto: AiChatDto, @Req() req: Request & { adminUser: any }, @Res() res: Response) {
return this.aiChatService.streamChat(dto, req.adminUser.id, res);
}
@Post('chat/stop')
@AdminRoles('SUPER_ADMIN' as AdminRole)
@ApiBearerAuth()
@ApiOperation({ summary: '停止正在运行的 AI 任务' })
async stopChat(@Body('runId') runId: string) {
return this.aiChatService.stopRun(runId);
}
@Get('dashboard')
@AdminRoles('SUPER_ADMIN' as AdminRole)
@ApiBearerAuth()

View File

@ -1,13 +1,29 @@
import { Injectable, Logger } from '@nestjs/common';
import type { AiChatDto } from './dto/ai-chat.dto';
import { AdminConversationService } from '../admin-conversation/admin-conversation.service';
import type { Response } from 'express';
const HERMES_API_URL = 'http://10.2.0.7:8642/v1/chat/completions';
const HERMES_API_URL = 'http://10.2.0.7:8642';
const HERMES_API_KEY = 'zhixi-hermes-key-2026';
interface HermesRun {
run_id: string;
status: string;
}
interface HermesSSEEvent {
event: string;
run_id: string;
delta?: string;
text?: string;
output?: string;
usage?: { input_tokens: number; output_tokens: number; total_tokens: number };
error?: { message: string };
}
@Injectable()
export class AdminAiChatService {
private readonly logger = new Logger(AdminAiChatService.name);
private activeRuns = new Map<string, AbortController>();
constructor(private readonly conversationService: AdminConversationService) {}
@ -15,22 +31,16 @@ export class AdminAiChatService {
const sessionId = dto.conversationId
? await this.conversationService.getSessionId(dto.conversationId, adminUserId)
: null;
// Auto-create conversation if none provided
const conversationId = dto.conversationId
?? (await this.conversationService.create(adminUserId)).id;
// Save user message
const userMsg = dto.messages[dto.messages.length - 1];
if (userMsg && userMsg.role === 'user') {
if (userMsg?.role === 'user') {
await this.conversationService.saveMessage(conversationId, 'user', userMsg.content);
}
const result = await this.callHermes(dto.messages, sessionId);
// Save assistant reply
await this.conversationService.saveMessage(conversationId, 'assistant', result.content);
return { ...result, conversationId };
}
@ -43,36 +53,144 @@ export class AdminAiChatService {
'Content-Type': 'application/json',
Authorization: 'Bearer ' + HERMES_API_KEY,
};
if (sessionId) {
headers['X-Hermes-Session-Id'] = sessionId;
}
if (sessionId) headers['X-Hermes-Session-Id'] = sessionId;
const resp = await fetch(HERMES_API_URL, {
method: 'POST',
headers,
body: JSON.stringify({
model: 'hermes-agent',
messages,
temperature: 0.7,
max_tokens: 4096,
}),
const resp = await fetch(`${HERMES_API_URL}/v1/chat/completions`, {
method: 'POST', headers,
body: JSON.stringify({ model: 'hermes-agent', messages, temperature: 0.7, max_tokens: 4096 }),
signal: AbortSignal.timeout(120_000),
});
if (!resp.ok) {
const text = await resp.text().catch(() => '');
throw new Error(`Hermes API error ${resp.status}: ${text}`);
}
if (!resp.ok) throw new Error(`Hermes API error ${resp.status}`);
const data = await resp.json();
const content = data.choices?.[0]?.message?.content || '';
const usage = data.usage || {};
this.logger.log('Hermes chat: ' + (Date.now() - start) + 'ms, tokens: ' +
(usage.prompt_tokens || 0) + '/' + (usage.completion_tokens || 0));
this.logger.log(`Chat: ${Date.now() - start}ms, tokens: ${usage.prompt_tokens || 0}/${usage.completion_tokens || 0}`);
return { content, usage: { model: 'hermes-agent', inputTokens: usage.prompt_tokens, outputTokens: usage.completion_tokens } };
}
// ── Runs + SSE ────────────────────────────────────────────
async streamChat(dto: AiChatDto, adminUserId: string, res: Response) {
const sessionId = dto.conversationId
? await this.conversationService.getSessionId(dto.conversationId, adminUserId)
: null;
const conversationId = dto.conversationId
?? (await this.conversationService.create(adminUserId)).id;
// Save user message
const userMsg = dto.messages[dto.messages.length - 1];
if (userMsg?.role === 'user') {
await this.conversationService.saveMessage(conversationId, 'user', userMsg.content);
}
// Set up SSE response
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
});
res.write(`data: ${JSON.stringify({ event: 'meta', conversationId })}\n\n`);
try {
// 1. Create run
const headers: Record<string, string> = {
'Content-Type': 'application/json',
Authorization: 'Bearer ' + HERMES_API_KEY,
};
if (sessionId) headers['X-Hermes-Session-Id'] = sessionId;
const runResp = await fetch(`${HERMES_API_URL}/v1/runs`, {
method: 'POST', headers,
body: JSON.stringify({
input: dto.messages.map(m => ({ role: m.role, content: m.content })),
}),
});
if (!runResp.ok) {
const err = await runResp.text().catch(() => '');
throw new Error(`Run creation failed ${runResp.status}: ${err}`);
}
const run = await runResp.json() as HermesRun;
const runId = run.run_id;
// 2. Stream events
const abortController = new AbortController();
this.activeRuns.set(runId, abortController);
const eventResp = await fetch(`${HERMES_API_URL}/v1/runs/${runId}/events`, {
headers: { Authorization: 'Bearer ' + HERMES_API_KEY },
signal: abortController.signal,
});
const reader = eventResp.body?.getReader();
if (!reader) throw new Error('No response body');
const decoder = new TextDecoder();
let buffer = '';
let fullResponse = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const event: HermesSSEEvent = JSON.parse(line.slice(6));
// Forward to client
res.write(`data: ${JSON.stringify({ ...event, runId })}\n\n`);
if (event.event === 'message.delta' && event.delta) {
fullResponse += event.delta;
} else if (event.event === 'run.completed') {
fullResponse = event.output || fullResponse;
} else if (event.event === 'run.error' || event.event === 'error') {
throw new Error(event.error?.message || 'Run error');
}
} catch (e: any) {
if (e.message?.includes('Run error') || e.message?.includes('error')) throw e;
// Ignore parse errors on individual lines
}
}
}
}
// Save assistant message
if (fullResponse) {
await this.conversationService.saveMessage(conversationId, 'assistant', fullResponse);
}
res.write(`data: ${JSON.stringify({ event: 'done', conversationId })}\n\n`);
} catch (err: any) {
if (err.name === 'AbortError') {
res.write(`data: ${JSON.stringify({ event: 'stopped' })}\n\n`);
} else {
this.logger.error('Stream error: ' + err.message);
res.write(`data: ${JSON.stringify({ event: 'error', error: err.message })}\n\n`);
}
} finally {
res.end();
}
}
async stopRun(runId: string) {
const controller = this.activeRuns.get(runId);
if (controller) {
controller.abort();
this.activeRuns.delete(runId);
}
// Also tell Hermes
try {
await fetch(`${HERMES_API_URL}/v1/runs/${runId}/stop`, {
method: 'POST',
headers: { Authorization: 'Bearer ' + HERMES_API_KEY },
});
} catch { /* best effort */ }
return { success: true };
}
getDashboardConfig() {
return {
url: 'http://10.2.0.7:9119',