feat(axon): add toggleable vLLM provider backend
Introduces a provider abstraction so Axon can proxy to either Claude SDK
(existing behavior) or a vLLM-compatible endpoint. Toggled via
AXON_PROVIDER env var ("claude" | "vllm"). When vllm, requests pass
through as-is (no prompt translation), session pool and OAuth are skipped.
Closes openova-io/openova#36
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
500aca483e
commit
68fcbe1aed
@ -18,6 +18,7 @@ spec:
|
||||
imagePullSecrets:
|
||||
- name: {{ .Values.image.pullSecretName }}
|
||||
{{- end }}
|
||||
{{- if ne (.Values.axon.provider | default "claude") "vllm" }}
|
||||
initContainers:
|
||||
- name: copy-credentials
|
||||
image: busybox:1.36
|
||||
@ -31,6 +32,7 @@ spec:
|
||||
securityContext:
|
||||
runAsNonRoot: true
|
||||
runAsUser: 1001
|
||||
{{- end }}
|
||||
containers:
|
||||
- name: axon
|
||||
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
|
||||
@ -43,6 +45,8 @@ spec:
|
||||
value: /home/axon
|
||||
- name: AXON_PORT
|
||||
value: "{{ .Values.axon.port }}"
|
||||
- name: AXON_PROVIDER
|
||||
value: "{{ .Values.axon.provider | default "claude" }}"
|
||||
- name: AXON_DEFAULT_MODEL
|
||||
value: "{{ .Values.axon.defaultModel }}"
|
||||
- name: AXON_POOL_SIZE
|
||||
@ -58,9 +62,22 @@ spec:
|
||||
secretKeyRef:
|
||||
name: {{ .Values.axon.existingSecret }}
|
||||
key: AXON_API_KEYS
|
||||
{{- if eq (.Values.axon.provider | default "claude") "vllm" }}
|
||||
- name: AXON_VLLM_BASE_URL
|
||||
value: "{{ .Values.axon.vllm.baseUrl }}"
|
||||
- name: AXON_VLLM_DEFAULT_MODEL
|
||||
value: "{{ .Values.axon.vllm.defaultModel | default "qwen3-coder" }}"
|
||||
- name: AXON_VLLM_API_KEY
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ .Values.axon.vllm.existingSecret }}
|
||||
key: AXON_VLLM_API_KEY
|
||||
{{- end }}
|
||||
{{- if ne (.Values.axon.provider | default "claude") "vllm" }}
|
||||
volumeMounts:
|
||||
- name: claude-home
|
||||
mountPath: /home/axon/.claude
|
||||
{{- end }}
|
||||
resources:
|
||||
{{- toYaml .Values.resources.axon | nindent 12 }}
|
||||
livenessProbe:
|
||||
@ -80,9 +97,11 @@ spec:
|
||||
readOnlyRootFilesystem: false
|
||||
runAsNonRoot: true
|
||||
runAsUser: 1001
|
||||
{{- if ne (.Values.axon.provider | default "claude") "vllm" }}
|
||||
volumes:
|
||||
- name: claude-auth-secret
|
||||
secret:
|
||||
secretName: {{ .Values.axon.claudeAuthSecret }}
|
||||
- name: claude-home
|
||||
emptyDir: {}
|
||||
{{- end }}
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
{{- if .Values.axon.tokenRefresh.enabled }}
|
||||
{{- if and .Values.axon.tokenRefresh.enabled (ne (.Values.axon.provider | default "claude") "vllm") }}
|
||||
apiVersion: batch/v1
|
||||
kind: CronJob
|
||||
metadata:
|
||||
|
||||
@ -6,6 +6,7 @@ image:
|
||||
|
||||
axon:
|
||||
port: 3000
|
||||
provider: claude # "claude" or "vllm"
|
||||
defaultModel: claude-sonnet-4-6
|
||||
poolSize: 2
|
||||
conversationTtl: 604800
|
||||
@ -14,6 +15,10 @@ axon:
|
||||
tokenRefresh:
|
||||
enabled: false
|
||||
schedule: "0 */4 * * *" # Every 4 hours
|
||||
vllm:
|
||||
baseUrl: ""
|
||||
defaultModel: qwen3-coder
|
||||
existingSecret: axon-vllm-secret
|
||||
|
||||
valkey:
|
||||
enabled: true
|
||||
|
||||
@ -1,10 +1,20 @@
|
||||
export type Provider = "claude" | "vllm";
|
||||
|
||||
export interface VllmConfig {
|
||||
baseUrl: string;
|
||||
apiKey: string;
|
||||
defaultModel: string;
|
||||
}
|
||||
|
||||
export interface Config {
|
||||
port: number;
|
||||
apiKeys: string[];
|
||||
provider: Provider;
|
||||
defaultModel: string;
|
||||
poolSize: number;
|
||||
valkeyUrl: string;
|
||||
conversationTtl: number; // seconds, default 7 days
|
||||
vllm: VllmConfig;
|
||||
}
|
||||
|
||||
export function loadConfig(): Config {
|
||||
@ -13,12 +23,27 @@ export function loadConfig(): Config {
|
||||
throw new Error("AXON_API_KEYS must be set");
|
||||
}
|
||||
|
||||
const provider = (process.env.AXON_PROVIDER ?? "claude") as Provider;
|
||||
if (provider !== "claude" && provider !== "vllm") {
|
||||
throw new Error(`AXON_PROVIDER must be "claude" or "vllm", got "${provider}"`);
|
||||
}
|
||||
|
||||
const vllmDefaultModel = process.env.AXON_VLLM_DEFAULT_MODEL ?? "qwen3-coder";
|
||||
|
||||
return {
|
||||
port: parseInt(process.env.AXON_PORT ?? "3000", 10),
|
||||
apiKeys: keys.split(",").map((k) => k.trim()),
|
||||
defaultModel: process.env.AXON_DEFAULT_MODEL ?? "claude-sonnet-4-6",
|
||||
provider,
|
||||
defaultModel: provider === "vllm"
|
||||
? vllmDefaultModel
|
||||
: (process.env.AXON_DEFAULT_MODEL ?? "claude-sonnet-4-6"),
|
||||
poolSize: parseInt(process.env.AXON_POOL_SIZE ?? "3", 10),
|
||||
valkeyUrl: process.env.AXON_VALKEY_URL ?? "redis://localhost:6379",
|
||||
conversationTtl: parseInt(process.env.AXON_CONVERSATION_TTL ?? "604800", 10), // 7 days
|
||||
conversationTtl: parseInt(process.env.AXON_CONVERSATION_TTL ?? "604800", 10),
|
||||
vllm: {
|
||||
baseUrl: process.env.AXON_VLLM_BASE_URL ?? "",
|
||||
apiKey: process.env.AXON_VLLM_API_KEY ?? "",
|
||||
defaultModel: vllmDefaultModel,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@ -7,20 +7,45 @@ import { initPool, shutdownPool, getPool } from "./providers/claude.js";
|
||||
import { connectValkey, disconnectValkey, getPoolStats } from "./providers/valkey.js";
|
||||
import { ConversationStore } from "./providers/conversation.js";
|
||||
import { refreshIfExpired, startPeriodicRefresh, stopPeriodicRefresh } from "./providers/token-refresh.js";
|
||||
import { VllmProvider } from "./providers/vllm.js";
|
||||
|
||||
const config = loadConfig();
|
||||
const app = Fastify({ logger: true });
|
||||
const conversations = new ConversationStore(config.conversationTtl);
|
||||
|
||||
// Health check (no auth)
|
||||
app.get("/health", async () => ({ status: "ok" }));
|
||||
const isVllm = config.provider === "vllm";
|
||||
let vllm: VllmProvider | undefined;
|
||||
|
||||
if (isVllm) {
|
||||
vllm = new VllmProvider(config.vllm);
|
||||
app.log.info(`Provider: vllm (${config.vllm.baseUrl}), default model: ${config.vllm.defaultModel}`);
|
||||
} else {
|
||||
app.log.info(`Provider: claude, default model: ${config.defaultModel}`);
|
||||
}
|
||||
|
||||
// Health check (no auth) — proxy to vLLM backend when using vllm provider
|
||||
app.get("/health", async () => {
|
||||
if (vllm) {
|
||||
return vllm.health();
|
||||
}
|
||||
return { status: "ok" };
|
||||
});
|
||||
|
||||
// Pool + conversation stats (no auth — internal observability)
|
||||
app.get("/stats", async () => {
|
||||
const poolStats = await getPoolStats();
|
||||
const pool = getPool();
|
||||
const convCount = await conversations.count();
|
||||
if (isVllm) {
|
||||
return {
|
||||
provider: "vllm",
|
||||
backend: config.vllm.baseUrl,
|
||||
model: config.vllm.defaultModel,
|
||||
conversations: convCount,
|
||||
};
|
||||
}
|
||||
const pool = getPool();
|
||||
return {
|
||||
provider: "claude",
|
||||
pool: poolStats ?? "valkey not connected",
|
||||
sessions: pool?.stats ?? "pool not initialized",
|
||||
conversations: convCount,
|
||||
@ -31,8 +56,8 @@ app.get("/stats", async () => {
|
||||
app.addHook("onRequest", createAuthHook(config));
|
||||
|
||||
// Register routes
|
||||
await modelsRoute(app);
|
||||
await chatCompletionsRoute(app, config, conversations);
|
||||
await modelsRoute(app, vllm);
|
||||
await chatCompletionsRoute(app, config, conversations, vllm);
|
||||
|
||||
// GET /v1/conversations/:id — retrieve conversation history (debugging)
|
||||
app.get<{ Params: { id: string } }>(
|
||||
@ -60,23 +85,28 @@ app.get<{ Params: { id: string } }>(
|
||||
// Connect to Valkey (non-blocking — gateway works without it)
|
||||
await connectValkey(config.valkeyUrl);
|
||||
|
||||
// Refresh OAuth token before creating sessions (SDK doesn't handle refresh)
|
||||
app.log.info("Checking OAuth token...");
|
||||
const tokenOk = await refreshIfExpired();
|
||||
if (!tokenOk) {
|
||||
app.log.warn("Token refresh failed — sessions may fail to authenticate");
|
||||
}
|
||||
startPeriodicRefresh();
|
||||
if (!isVllm) {
|
||||
// Claude provider: refresh OAuth token and warm session pool
|
||||
app.log.info("Checking OAuth token...");
|
||||
const tokenOk = await refreshIfExpired();
|
||||
if (!tokenOk) {
|
||||
app.log.warn("Token refresh failed — sessions may fail to authenticate");
|
||||
}
|
||||
startPeriodicRefresh();
|
||||
|
||||
// Pre-warm session pool before accepting traffic
|
||||
app.log.info(`Warming session pool (size=${config.poolSize})...`);
|
||||
await initPool(config.defaultModel, config.poolSize);
|
||||
app.log.info("Session pool ready — accepting requests");
|
||||
app.log.info(`Warming session pool (size=${config.poolSize})...`);
|
||||
await initPool(config.defaultModel, config.poolSize);
|
||||
app.log.info("Session pool ready — accepting requests");
|
||||
} else {
|
||||
app.log.info("vLLM provider — skipping session pool and OAuth token refresh");
|
||||
}
|
||||
|
||||
// Graceful shutdown
|
||||
const shutdown = () => {
|
||||
stopPeriodicRefresh();
|
||||
shutdownPool();
|
||||
if (!isVllm) {
|
||||
stopPeriodicRefresh();
|
||||
shutdownPool();
|
||||
}
|
||||
disconnectValkey();
|
||||
process.exit(0);
|
||||
};
|
||||
@ -85,10 +115,10 @@ process.on("SIGINT", shutdown);
|
||||
|
||||
try {
|
||||
await app.listen({ port: config.port, host: "0.0.0.0" });
|
||||
app.log.info(`Axon gateway listening on port ${config.port}`);
|
||||
app.log.info(`Axon gateway listening on port ${config.port} (provider: ${config.provider})`);
|
||||
} catch (err) {
|
||||
app.log.error(err);
|
||||
shutdownPool();
|
||||
if (!isVllm) shutdownPool();
|
||||
await disconnectValkey();
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
70
products/axon/src/providers/vllm.ts
Normal file
70
products/axon/src/providers/vllm.ts
Normal file
@ -0,0 +1,70 @@
|
||||
import type { VllmConfig } from "../config.js";
|
||||
import type { ChatCompletionRequest, ModelListResponse } from "../types/openai.js";
|
||||
|
||||
export class VllmProvider {
|
||||
private baseUrl: string;
|
||||
private apiKey: string;
|
||||
private defaultModel: string;
|
||||
|
||||
constructor(config: VllmConfig) {
|
||||
if (!config.baseUrl) throw new Error("AXON_VLLM_BASE_URL must be set when provider=vllm");
|
||||
if (!config.apiKey) throw new Error("AXON_VLLM_API_KEY must be set when provider=vllm");
|
||||
this.baseUrl = config.baseUrl.replace(/\/+$/, "");
|
||||
this.apiKey = config.apiKey;
|
||||
this.defaultModel = config.defaultModel;
|
||||
}
|
||||
|
||||
async chat(body: ChatCompletionRequest): Promise<Response> {
|
||||
const payload = { ...body, model: body.model ?? this.defaultModel, stream: false };
|
||||
delete (payload as Record<string, unknown>).conversation_id;
|
||||
delete (payload as Record<string, unknown>).thinking;
|
||||
delete (payload as Record<string, unknown>).effort;
|
||||
delete (payload as Record<string, unknown>).profile;
|
||||
|
||||
return fetch(`${this.baseUrl}/v1/chat/completions`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Authorization: `Bearer ${this.apiKey}`,
|
||||
},
|
||||
body: JSON.stringify(payload),
|
||||
});
|
||||
}
|
||||
|
||||
async chatStream(body: ChatCompletionRequest): Promise<Response> {
|
||||
const payload = { ...body, model: body.model ?? this.defaultModel, stream: true };
|
||||
delete (payload as Record<string, unknown>).conversation_id;
|
||||
delete (payload as Record<string, unknown>).thinking;
|
||||
delete (payload as Record<string, unknown>).effort;
|
||||
delete (payload as Record<string, unknown>).profile;
|
||||
|
||||
return fetch(`${this.baseUrl}/v1/chat/completions`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Authorization: `Bearer ${this.apiKey}`,
|
||||
},
|
||||
body: JSON.stringify(payload),
|
||||
});
|
||||
}
|
||||
|
||||
async models(): Promise<ModelListResponse> {
|
||||
const res = await fetch(`${this.baseUrl}/v1/models`, {
|
||||
headers: { Authorization: `Bearer ${this.apiKey}` },
|
||||
});
|
||||
if (!res.ok) {
|
||||
throw new Error(`vLLM /v1/models returned ${res.status}: ${await res.text()}`);
|
||||
}
|
||||
return res.json() as Promise<ModelListResponse>;
|
||||
}
|
||||
|
||||
async health(): Promise<{ status: string }> {
|
||||
const res = await fetch(`${this.baseUrl}/health`, {
|
||||
headers: { Authorization: `Bearer ${this.apiKey}` },
|
||||
});
|
||||
if (!res.ok) {
|
||||
return { status: `vllm_unhealthy_${res.status}` };
|
||||
}
|
||||
return { status: "ok" };
|
||||
}
|
||||
}
|
||||
@ -2,6 +2,7 @@ import type { FastifyInstance } from "fastify";
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
import { chat, chatStream, chatV1, chatV1Stream, type ChatOptions, type ChatV1Options } from "../providers/claude.js";
|
||||
import { trackPoolMetric } from "../providers/valkey.js";
|
||||
import type { VllmProvider } from "../providers/vllm.js";
|
||||
import type { ConversationStore } from "../providers/conversation.js";
|
||||
import type { Config } from "../config.js";
|
||||
import type {
|
||||
@ -20,6 +21,7 @@ export async function chatCompletionsRoute(
|
||||
app: FastifyInstance,
|
||||
config: Config,
|
||||
conversations: ConversationStore,
|
||||
vllm?: VllmProvider,
|
||||
): Promise<void> {
|
||||
app.post<{ Body: ChatCompletionRequest }>(
|
||||
"/v1/chat/completions",
|
||||
@ -32,9 +34,6 @@ export async function chatCompletionsRoute(
|
||||
const maxTokens = body.max_tokens ?? body.max_completion_tokens;
|
||||
const includeUsage = stream && body.stream_options?.include_usage === true;
|
||||
|
||||
// Route to V1 query() if caller explicitly requests thinking, effort, or profile=deep
|
||||
const useV1 = body.thinking !== undefined || body.effort !== undefined || body.profile === "deep";
|
||||
|
||||
if (!newMessages || newMessages.length === 0) {
|
||||
return reply.code(400).send({
|
||||
error: {
|
||||
@ -73,7 +72,103 @@ export async function chatCompletionsRoute(
|
||||
convId = await conversations.create(newMessages, model, apiKey);
|
||||
}
|
||||
|
||||
// Build ChatOptions — pass all OpenAI params through
|
||||
// ── vLLM provider: proxy passthrough ──────────────────────
|
||||
if (config.provider === "vllm" && vllm) {
|
||||
const proxyBody: ChatCompletionRequest = { ...body, messages: allMessages, model };
|
||||
|
||||
if (!stream) {
|
||||
const t0 = Date.now();
|
||||
const res = await vllm.chat(proxyBody);
|
||||
if (!res.ok) {
|
||||
const errText = await res.text();
|
||||
app.log.error(`vLLM error: ${res.status} — ${errText}`);
|
||||
return reply.code(res.status).send({
|
||||
error: { message: `vLLM backend error: ${errText}`, type: "server_error" },
|
||||
});
|
||||
}
|
||||
const data = await res.json() as ChatCompletionResponse;
|
||||
trackPoolMetric("request", Date.now() - t0);
|
||||
|
||||
const assistantContent = data.choices?.[0]?.message?.content ?? "";
|
||||
await conversations.append(convId, { role: "assistant", content: assistantContent });
|
||||
|
||||
data.conversation_id = convId;
|
||||
return data;
|
||||
}
|
||||
|
||||
// Streaming: pipe vLLM SSE stream through to client
|
||||
const t0 = Date.now();
|
||||
const res = await vllm.chatStream(proxyBody);
|
||||
if (!res.ok) {
|
||||
const errText = await res.text();
|
||||
app.log.error(`vLLM stream error: ${res.status} — ${errText}`);
|
||||
return reply.code(res.status).send({
|
||||
error: { message: `vLLM backend error: ${errText}`, type: "server_error" },
|
||||
});
|
||||
}
|
||||
|
||||
reply.raw.writeHead(200, {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache",
|
||||
Connection: "keep-alive",
|
||||
});
|
||||
|
||||
let fullText = "";
|
||||
const reader = res.body?.getReader();
|
||||
if (!reader) {
|
||||
reply.raw.write("data: [DONE]\n\n");
|
||||
reply.raw.end();
|
||||
return;
|
||||
}
|
||||
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = "";
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
const lines = buffer.split("\n");
|
||||
buffer = lines.pop() ?? "";
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.startsWith("data: ")) continue;
|
||||
const payload = line.slice(6);
|
||||
|
||||
if (payload === "[DONE]") {
|
||||
reply.raw.write("data: [DONE]\n\n");
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
const chunk = JSON.parse(payload) as ChatCompletionChunk;
|
||||
const delta = chunk.choices?.[0]?.delta?.content;
|
||||
if (delta) fullText += delta;
|
||||
chunk.conversation_id = convId;
|
||||
reply.raw.write(`data: ${JSON.stringify(chunk)}\n\n`);
|
||||
} catch {
|
||||
reply.raw.write(`${line}\n\n`);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
app.log.error(err, "vLLM streaming error");
|
||||
}
|
||||
|
||||
reply.raw.end();
|
||||
trackPoolMetric("request", Date.now() - t0);
|
||||
|
||||
if (fullText) {
|
||||
await conversations.append(convId, { role: "assistant", content: fullText });
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// ── Claude provider (existing logic) ──────────────────────
|
||||
const useV1 = body.thinking !== undefined || body.effort !== undefined || body.profile === "deep";
|
||||
|
||||
const chatOpts: ChatOptions = {
|
||||
messages: allMessages,
|
||||
model,
|
||||
@ -88,7 +183,6 @@ export async function chatCompletionsRoute(
|
||||
};
|
||||
|
||||
if (!stream) {
|
||||
// ── Non-streaming ────────────────────────────────────────
|
||||
const t0 = Date.now();
|
||||
let text: string;
|
||||
if (useV1) {
|
||||
@ -135,7 +229,7 @@ export async function chatCompletionsRoute(
|
||||
return response;
|
||||
}
|
||||
|
||||
// ── Streaming ──────────────────────────────────────────────
|
||||
// ── Streaming (Claude) ────────────────────────────────────
|
||||
const t0 = Date.now();
|
||||
const id = makeCompletionId();
|
||||
const created = Math.floor(Date.now() / 1000);
|
||||
@ -150,7 +244,6 @@ export async function chatCompletionsRoute(
|
||||
reply.raw.write(`data: ${JSON.stringify(chunk)}\n\n`);
|
||||
}
|
||||
|
||||
// Initial chunk with role
|
||||
sendChunk({
|
||||
id,
|
||||
object: "chat.completion.chunk",
|
||||
@ -197,7 +290,6 @@ export async function chatCompletionsRoute(
|
||||
app.log.error(err, "streaming error");
|
||||
}
|
||||
|
||||
// Final chunk with finish_reason
|
||||
sendChunk({
|
||||
id,
|
||||
object: "chat.completion.chunk",
|
||||
@ -214,7 +306,6 @@ export async function chatCompletionsRoute(
|
||||
],
|
||||
});
|
||||
|
||||
// Usage chunk (if stream_options.include_usage)
|
||||
if (includeUsage) {
|
||||
sendChunk({
|
||||
id,
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
import type { FastifyInstance } from "fastify";
|
||||
import type { ModelListResponse } from "../types/openai.js";
|
||||
import type { VllmProvider } from "../providers/vllm.js";
|
||||
|
||||
const MODELS: ModelListResponse = {
|
||||
const CLAUDE_MODELS: ModelListResponse = {
|
||||
object: "list",
|
||||
data: [
|
||||
{
|
||||
@ -25,8 +26,11 @@ const MODELS: ModelListResponse = {
|
||||
],
|
||||
};
|
||||
|
||||
export async function modelsRoute(app: FastifyInstance): Promise<void> {
|
||||
export async function modelsRoute(app: FastifyInstance, vllm?: VllmProvider): Promise<void> {
|
||||
app.get("/v1/models", async () => {
|
||||
return MODELS;
|
||||
if (vllm) {
|
||||
return vllm.models();
|
||||
}
|
||||
return CLAUDE_MODELS;
|
||||
});
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user