mirror of
https://github.com/anthropics/claude-code-action.git
synced 2026-01-23 06:54:13 +08:00
feat: add repository_dispatch event support
- Add new progress MCP server for reporting task status via API
- Support repository_dispatch events with task description and progress endpoint
- Introduce isDispatch flag to unify dispatch event handling
- Make GitHub data optional for dispatch events without issues/PRs
- Update prompt generation with dispatch-specific instructions
Enables triggering Claude via repository_dispatch with:
{
"event_type": "claude_task",
"client_payload": {
"description": "Task description",
"progress_endpoint": "https://api.example.com/progress"
}
}
This commit is contained in:
@@ -102,7 +102,7 @@ runs:
|
||||
- name: Setup Node.js
|
||||
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # https://github.com/actions/setup-node/releases/tag/v4.4.0
|
||||
with:
|
||||
node-version: ${{ env.NODE_VERSION || '18.x' }}
|
||||
node-version: ${{ env.NODE_VERSION || '22.x' }}
|
||||
cache: ${{ inputs.use_node_cache == 'true' && 'npm' || '' }}
|
||||
|
||||
- name: Install Bun
|
||||
@@ -118,7 +118,9 @@ runs:
|
||||
|
||||
- name: Install Claude Code
|
||||
shell: bash
|
||||
run: bun install -g @anthropic-ai/claude-code@1.0.67
|
||||
run: |
|
||||
# Install Claude Code
|
||||
bun install -g @anthropic-ai/claude-code
|
||||
|
||||
- name: Run Claude Code Action
|
||||
shell: bash
|
||||
|
||||
@@ -30,7 +30,8 @@ async function run() {
|
||||
appendSystemPrompt: process.env.INPUT_APPEND_SYSTEM_PROMPT,
|
||||
claudeEnv: process.env.INPUT_CLAUDE_ENV,
|
||||
fallbackModel: process.env.INPUT_FALLBACK_MODEL,
|
||||
model: process.env.ANTHROPIC_MODEL,
|
||||
resumeEndpoint: process.env.INPUT_RESUME_ENDPOINT,
|
||||
streamConfig: process.env.INPUT_STREAM_CONFIG,
|
||||
});
|
||||
} catch (error) {
|
||||
core.setFailed(`Action failed with error: ${error}`);
|
||||
|
||||
@@ -4,6 +4,7 @@ import { promisify } from "util";
|
||||
import { unlink, writeFile, stat } from "fs/promises";
|
||||
import { createWriteStream } from "fs";
|
||||
import { spawn } from "child_process";
|
||||
import { StreamHandler } from "./stream-handler";
|
||||
|
||||
const execAsync = promisify(exec);
|
||||
|
||||
@@ -21,7 +22,15 @@ export type ClaudeOptions = {
|
||||
claudeEnv?: string;
|
||||
fallbackModel?: string;
|
||||
timeoutMinutes?: string;
|
||||
model?: string;
|
||||
resumeEndpoint?: string;
|
||||
streamConfig?: string;
|
||||
};
|
||||
|
||||
export type StreamConfig = {
|
||||
progress_endpoint?: string;
|
||||
headers?: Record<string, string>;
|
||||
resume_endpoint?: string;
|
||||
session_id?: string;
|
||||
};
|
||||
|
||||
type PreparedConfig = {
|
||||
@@ -95,9 +104,6 @@ export function prepareRunConfig(
|
||||
if (options.fallbackModel) {
|
||||
claudeArgs.push("--fallback-model", options.fallbackModel);
|
||||
}
|
||||
if (options.model) {
|
||||
claudeArgs.push("--model", options.model);
|
||||
}
|
||||
if (options.timeoutMinutes) {
|
||||
const timeoutMinutesNum = parseInt(options.timeoutMinutes, 10);
|
||||
if (isNaN(timeoutMinutesNum) || timeoutMinutesNum <= 0) {
|
||||
@@ -106,6 +112,25 @@ export function prepareRunConfig(
|
||||
);
|
||||
}
|
||||
}
|
||||
if (options.resumeEndpoint) {
|
||||
claudeArgs.push("--teleport", options.resumeEndpoint);
|
||||
}
|
||||
// Parse stream config for session_id and resume_endpoint
|
||||
if (options.streamConfig) {
|
||||
try {
|
||||
const streamConfig: StreamConfig = JSON.parse(options.streamConfig);
|
||||
// Add --session-id if session_id is provided
|
||||
if (streamConfig.session_id) {
|
||||
claudeArgs.push("--session-id", streamConfig.session_id);
|
||||
}
|
||||
// Only add --teleport if we have both session_id AND resume_endpoint
|
||||
if (streamConfig.session_id && streamConfig.resume_endpoint) {
|
||||
claudeArgs.push("--teleport", streamConfig.session_id);
|
||||
}
|
||||
} catch (e) {
|
||||
console.error("Failed to parse stream_config JSON:", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Parse custom environment variables
|
||||
const customEnv = parseCustomEnvVars(options.claudeEnv);
|
||||
@@ -120,6 +145,34 @@ export function prepareRunConfig(
|
||||
export async function runClaude(promptPath: string, options: ClaudeOptions) {
|
||||
const config = prepareRunConfig(promptPath, options);
|
||||
|
||||
// Set up streaming if endpoint is provided in stream config
|
||||
let streamHandler: StreamHandler | null = null;
|
||||
let streamConfig: StreamConfig | null = null;
|
||||
if (options.streamConfig) {
|
||||
try {
|
||||
streamConfig = JSON.parse(options.streamConfig);
|
||||
if (streamConfig?.progress_endpoint) {
|
||||
const customHeaders = streamConfig.headers || {};
|
||||
console.log("parsed headers", customHeaders);
|
||||
Object.keys(customHeaders).forEach((key) => {
|
||||
console.log(`Custom header: ${key} = ${customHeaders[key]}`);
|
||||
});
|
||||
streamHandler = new StreamHandler(
|
||||
streamConfig.progress_endpoint,
|
||||
customHeaders,
|
||||
);
|
||||
console.log(`Streaming output to: ${streamConfig.progress_endpoint}`);
|
||||
if (Object.keys(customHeaders).length > 0) {
|
||||
console.log(
|
||||
`Custom streaming headers: ${Object.keys(customHeaders).join(", ")}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
console.error("Failed to parse stream_config JSON:", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Create a named pipe
|
||||
try {
|
||||
await unlink(PIPE_PATH);
|
||||
@@ -162,12 +215,31 @@ export async function runClaude(promptPath: string, options: ClaudeOptions) {
|
||||
pipeStream.destroy();
|
||||
});
|
||||
|
||||
// Prepare environment variables
|
||||
const processEnv = {
|
||||
...process.env,
|
||||
...config.env,
|
||||
};
|
||||
|
||||
// If both session_id and resume_endpoint are provided, set environment variables
|
||||
if (streamConfig?.session_id && streamConfig?.resume_endpoint) {
|
||||
processEnv.TELEPORT_RESUME_URL = streamConfig.resume_endpoint;
|
||||
console.log(
|
||||
`Setting TELEPORT_RESUME_URL to: ${streamConfig.resume_endpoint}`,
|
||||
);
|
||||
|
||||
if (streamConfig.headers && Object.keys(streamConfig.headers).length > 0) {
|
||||
processEnv.TELEPORT_HEADERS = JSON.stringify(streamConfig.headers);
|
||||
console.log(`Setting TELEPORT_HEADERS for resume endpoint`);
|
||||
}
|
||||
}
|
||||
|
||||
// Log the full Claude command being executed
|
||||
console.log(`Running Claude with args: ${config.claudeArgs.join(" ")}`);
|
||||
|
||||
const claudeProcess = spawn("claude", config.claudeArgs, {
|
||||
stdio: ["pipe", "pipe", "inherit"],
|
||||
env: {
|
||||
...process.env,
|
||||
...config.env,
|
||||
},
|
||||
env: processEnv,
|
||||
});
|
||||
|
||||
// Handle Claude process errors
|
||||
@@ -178,32 +250,51 @@ export async function runClaude(promptPath: string, options: ClaudeOptions) {
|
||||
|
||||
// Capture output for parsing execution metrics
|
||||
let output = "";
|
||||
claudeProcess.stdout.on("data", (data) => {
|
||||
let lineBuffer = ""; // Buffer for incomplete lines
|
||||
|
||||
claudeProcess.stdout.on("data", async (data) => {
|
||||
const text = data.toString();
|
||||
output += text;
|
||||
|
||||
// Try to parse as JSON and pretty print if it's on a single line
|
||||
const lines = text.split("\n");
|
||||
lines.forEach((line: string, index: number) => {
|
||||
if (line.trim() === "") return;
|
||||
// Add new data to line buffer
|
||||
lineBuffer += text;
|
||||
|
||||
// Split into lines - the last element might be incomplete
|
||||
const lines = lineBuffer.split("\n");
|
||||
|
||||
// The last element is either empty (if text ended with \n) or incomplete
|
||||
lineBuffer = lines.pop() || "";
|
||||
|
||||
// Process complete lines
|
||||
for (let index = 0; index < lines.length; index++) {
|
||||
const line = lines[index];
|
||||
if (!line || line.trim() === "") continue;
|
||||
|
||||
// Try to parse as JSON and pretty print if it's on a single line
|
||||
try {
|
||||
// Check if this line is a JSON object
|
||||
const parsed = JSON.parse(line);
|
||||
const prettyJson = JSON.stringify(parsed, null, 2);
|
||||
process.stdout.write(prettyJson);
|
||||
if (index < lines.length - 1 || text.endsWith("\n")) {
|
||||
process.stdout.write("\n");
|
||||
process.stdout.write("\n");
|
||||
|
||||
// Send valid JSON to stream handler if available
|
||||
if (streamHandler) {
|
||||
try {
|
||||
// Send the original line (which is valid JSON) with newline for proper splitting
|
||||
const dataToSend = line + "\n";
|
||||
await streamHandler.addOutput(dataToSend);
|
||||
} catch (error) {
|
||||
core.warning(`Failed to stream output: ${error}`);
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
// Not a JSON object, print as is
|
||||
process.stdout.write(line);
|
||||
if (index < lines.length - 1 || text.endsWith("\n")) {
|
||||
process.stdout.write("\n");
|
||||
}
|
||||
process.stdout.write("\n");
|
||||
// Don't send non-JSON lines to stream handler
|
||||
}
|
||||
});
|
||||
|
||||
output += text;
|
||||
}
|
||||
});
|
||||
|
||||
// Handle stdout errors
|
||||
@@ -257,8 +348,33 @@ export async function runClaude(promptPath: string, options: ClaudeOptions) {
|
||||
}
|
||||
}, timeoutMs);
|
||||
|
||||
claudeProcess.on("close", (code) => {
|
||||
claudeProcess.on("close", async (code) => {
|
||||
if (!resolved) {
|
||||
// Process any remaining data in the line buffer
|
||||
if (lineBuffer.trim()) {
|
||||
// Try to parse and print the remaining line
|
||||
try {
|
||||
const parsed = JSON.parse(lineBuffer);
|
||||
const prettyJson = JSON.stringify(parsed, null, 2);
|
||||
process.stdout.write(prettyJson);
|
||||
process.stdout.write("\n");
|
||||
|
||||
// Send valid JSON to stream handler if available
|
||||
if (streamHandler) {
|
||||
try {
|
||||
const dataToSend = lineBuffer + "\n";
|
||||
await streamHandler.addOutput(dataToSend);
|
||||
} catch (error) {
|
||||
core.warning(`Failed to stream final output: ${error}`);
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
process.stdout.write(lineBuffer);
|
||||
process.stdout.write("\n");
|
||||
// Don't send non-JSON lines to stream handler
|
||||
}
|
||||
}
|
||||
|
||||
clearTimeout(timeoutId);
|
||||
resolved = true;
|
||||
resolve(code || 0);
|
||||
@@ -275,6 +391,15 @@ export async function runClaude(promptPath: string, options: ClaudeOptions) {
|
||||
});
|
||||
});
|
||||
|
||||
// Clean up streaming
|
||||
if (streamHandler) {
|
||||
try {
|
||||
await streamHandler.close();
|
||||
} catch (error) {
|
||||
core.warning(`Failed to close stream handler: ${error}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up processes
|
||||
try {
|
||||
catProcess.kill("SIGTERM");
|
||||
|
||||
152
base-action/src/stream-handler.ts
Normal file
152
base-action/src/stream-handler.ts
Normal file
@@ -0,0 +1,152 @@
|
||||
import * as core from "@actions/core";
|
||||
|
||||
export function parseStreamHeaders(
|
||||
headersInput?: string,
|
||||
): Record<string, string> {
|
||||
if (!headersInput || headersInput.trim() === "") {
|
||||
return {};
|
||||
}
|
||||
|
||||
try {
|
||||
return JSON.parse(headersInput);
|
||||
} catch (e) {
|
||||
console.error("Failed to parse stream headers as JSON:", e);
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
export type TokenGetter = (audience: string) => Promise<string>;
|
||||
|
||||
export class StreamHandler {
|
||||
private endpoint: string;
|
||||
private customHeaders: Record<string, string>;
|
||||
private tokenGetter: TokenGetter;
|
||||
private token: string | null = null;
|
||||
private tokenFetchTime: number = 0;
|
||||
private buffer: string[] = [];
|
||||
private flushTimer: NodeJS.Timeout | null = null;
|
||||
private isClosed = false;
|
||||
|
||||
private readonly TOKEN_LIFETIME_MS = 4 * 60 * 1000; // 4 minutes
|
||||
private readonly BATCH_SIZE = 10;
|
||||
private readonly BATCH_TIMEOUT_MS = 1000;
|
||||
private readonly REQUEST_TIMEOUT_MS = 5000;
|
||||
|
||||
constructor(
|
||||
endpoint: string,
|
||||
customHeaders: Record<string, string> = {},
|
||||
tokenGetter?: TokenGetter,
|
||||
) {
|
||||
this.endpoint = endpoint;
|
||||
this.customHeaders = customHeaders;
|
||||
this.tokenGetter = tokenGetter || ((audience) => core.getIDToken(audience));
|
||||
}
|
||||
|
||||
async addOutput(data: string): Promise<void> {
|
||||
if (this.isClosed) return;
|
||||
|
||||
// Split by newlines and add to buffer
|
||||
const lines = data.split("\n").filter((line) => line.length > 0);
|
||||
this.buffer.push(...lines);
|
||||
|
||||
// Check if we should flush
|
||||
if (this.buffer.length >= this.BATCH_SIZE) {
|
||||
await this.flush();
|
||||
} else {
|
||||
// Set or reset the timer
|
||||
this.resetFlushTimer();
|
||||
}
|
||||
}
|
||||
|
||||
private resetFlushTimer(): void {
|
||||
if (this.flushTimer) {
|
||||
clearTimeout(this.flushTimer);
|
||||
}
|
||||
this.flushTimer = setTimeout(() => {
|
||||
this.flush().catch((err) => {
|
||||
core.warning(`Failed to flush stream buffer: ${err}`);
|
||||
});
|
||||
}, this.BATCH_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
private async getToken(): Promise<string> {
|
||||
const now = Date.now();
|
||||
|
||||
// Check if we need a new token
|
||||
if (!this.token || now - this.tokenFetchTime >= this.TOKEN_LIFETIME_MS) {
|
||||
try {
|
||||
this.token = await this.tokenGetter("claude-code-github-action");
|
||||
this.tokenFetchTime = now;
|
||||
core.debug("Fetched new OIDC token for streaming");
|
||||
} catch (error) {
|
||||
throw new Error(`Failed to get OIDC token: ${error}`);
|
||||
}
|
||||
}
|
||||
|
||||
return this.token;
|
||||
}
|
||||
|
||||
private async flush(): Promise<void> {
|
||||
if (this.buffer.length === 0) return;
|
||||
|
||||
// Clear the flush timer
|
||||
if (this.flushTimer) {
|
||||
clearTimeout(this.flushTimer);
|
||||
this.flushTimer = null;
|
||||
}
|
||||
|
||||
// Get the current buffer and clear it
|
||||
const output = [...this.buffer];
|
||||
this.buffer = [];
|
||||
|
||||
try {
|
||||
const token = await this.getToken();
|
||||
|
||||
const payload = {
|
||||
timestamp: new Date().toISOString(),
|
||||
output: output,
|
||||
};
|
||||
|
||||
// Create an AbortController for timeout
|
||||
const controller = new AbortController();
|
||||
const timeoutId = setTimeout(
|
||||
() => controller.abort(),
|
||||
this.REQUEST_TIMEOUT_MS,
|
||||
);
|
||||
|
||||
try {
|
||||
await fetch(this.endpoint, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Authorization: `Bearer ${token}`,
|
||||
...this.customHeaders,
|
||||
},
|
||||
body: JSON.stringify(payload),
|
||||
signal: controller.signal,
|
||||
});
|
||||
} finally {
|
||||
clearTimeout(timeoutId);
|
||||
}
|
||||
} catch (error) {
|
||||
// Log but don't throw - we don't want to interrupt Claude's execution
|
||||
core.warning(`Failed to stream output: ${error}`);
|
||||
}
|
||||
}
|
||||
|
||||
async close(): Promise<void> {
|
||||
// Clear any pending timer
|
||||
if (this.flushTimer) {
|
||||
clearTimeout(this.flushTimer);
|
||||
this.flushTimer = null;
|
||||
}
|
||||
|
||||
// Flush any remaining output
|
||||
if (this.buffer.length > 0) {
|
||||
await this.flush();
|
||||
}
|
||||
|
||||
// Mark as closed after flushing
|
||||
this.isClosed = true;
|
||||
}
|
||||
}
|
||||
97
base-action/test/resume-endpoint.test.ts
Normal file
97
base-action/test/resume-endpoint.test.ts
Normal file
@@ -0,0 +1,97 @@
|
||||
import { describe, it, expect } from "bun:test";
|
||||
import { prepareRunConfig } from "../src/run-claude";
|
||||
|
||||
describe("resume endpoint functionality", () => {
|
||||
it("should add --teleport flag when both session_id and resume_endpoint are provided", () => {
|
||||
const streamConfig = JSON.stringify({
|
||||
session_id: "12345",
|
||||
resume_endpoint: "https://example.com/resume/12345",
|
||||
});
|
||||
const config = prepareRunConfig("/path/to/prompt", {
|
||||
streamConfig,
|
||||
});
|
||||
|
||||
expect(config.claudeArgs).toContain("--teleport");
|
||||
expect(config.claudeArgs).toContain("12345");
|
||||
});
|
||||
|
||||
it("should not add --teleport flag when no streamConfig is provided", () => {
|
||||
const config = prepareRunConfig("/path/to/prompt", {
|
||||
allowedTools: "Edit",
|
||||
});
|
||||
|
||||
expect(config.claudeArgs).not.toContain("--teleport");
|
||||
});
|
||||
|
||||
it("should not add --teleport flag when only session_id is provided without resume_endpoint", () => {
|
||||
const streamConfig = JSON.stringify({
|
||||
session_id: "12345",
|
||||
// No resume_endpoint
|
||||
});
|
||||
const config = prepareRunConfig("/path/to/prompt", {
|
||||
streamConfig,
|
||||
});
|
||||
|
||||
expect(config.claudeArgs).not.toContain("--teleport");
|
||||
});
|
||||
|
||||
it("should not add --teleport flag when only resume_endpoint is provided without session_id", () => {
|
||||
const streamConfig = JSON.stringify({
|
||||
resume_endpoint: "https://example.com/resume/12345",
|
||||
// No session_id
|
||||
});
|
||||
const config = prepareRunConfig("/path/to/prompt", {
|
||||
streamConfig,
|
||||
});
|
||||
|
||||
expect(config.claudeArgs).not.toContain("--teleport");
|
||||
});
|
||||
|
||||
it("should maintain order of arguments with session_id", () => {
|
||||
const streamConfig = JSON.stringify({
|
||||
session_id: "12345",
|
||||
resume_endpoint: "https://example.com/resume/12345",
|
||||
});
|
||||
const config = prepareRunConfig("/path/to/prompt", {
|
||||
allowedTools: "Edit",
|
||||
streamConfig,
|
||||
maxTurns: "5",
|
||||
});
|
||||
|
||||
const teleportIndex = config.claudeArgs.indexOf("--teleport");
|
||||
const maxTurnsIndex = config.claudeArgs.indexOf("--max-turns");
|
||||
|
||||
expect(teleportIndex).toBeGreaterThan(-1);
|
||||
expect(maxTurnsIndex).toBeGreaterThan(-1);
|
||||
});
|
||||
|
||||
it("should handle progress_endpoint and headers in streamConfig", () => {
|
||||
const streamConfig = JSON.stringify({
|
||||
progress_endpoint: "https://example.com/progress",
|
||||
headers: { "X-Test": "value" },
|
||||
});
|
||||
const config = prepareRunConfig("/path/to/prompt", {
|
||||
streamConfig,
|
||||
});
|
||||
|
||||
// This test just verifies parsing doesn't fail - actual streaming logic
|
||||
// is tested elsewhere as it requires environment setup
|
||||
expect(config.claudeArgs).toBeDefined();
|
||||
});
|
||||
|
||||
it("should handle session_id with resume_endpoint and headers", () => {
|
||||
const streamConfig = JSON.stringify({
|
||||
session_id: "abc123",
|
||||
resume_endpoint: "https://example.com/resume/abc123",
|
||||
headers: { Authorization: "Bearer token" },
|
||||
progress_endpoint: "https://example.com/progress",
|
||||
});
|
||||
const config = prepareRunConfig("/path/to/prompt", {
|
||||
streamConfig,
|
||||
});
|
||||
|
||||
expect(config.claudeArgs).toContain("--teleport");
|
||||
expect(config.claudeArgs).toContain("abc123");
|
||||
// Note: Environment variable setup (TELEPORT_RESUME_URL, TELEPORT_HEADERS) is tested in integration tests
|
||||
});
|
||||
});
|
||||
364
base-action/test/stream-handler.test.ts
Normal file
364
base-action/test/stream-handler.test.ts
Normal file
@@ -0,0 +1,364 @@
|
||||
import { describe, it, expect, beforeEach, mock } from "bun:test";
|
||||
import {
|
||||
StreamHandler,
|
||||
parseStreamHeaders,
|
||||
type TokenGetter,
|
||||
} from "../src/stream-handler";
|
||||
|
||||
describe("parseStreamHeaders", () => {
|
||||
it("should return empty object for empty input", () => {
|
||||
expect(parseStreamHeaders("")).toEqual({});
|
||||
expect(parseStreamHeaders(undefined)).toEqual({});
|
||||
expect(parseStreamHeaders(" ")).toEqual({});
|
||||
});
|
||||
|
||||
it("should parse single header", () => {
|
||||
const result = parseStreamHeaders('{"X-Correlation-Id": "12345"}');
|
||||
expect(result).toEqual({ "X-Correlation-Id": "12345" });
|
||||
});
|
||||
|
||||
it("should parse multiple headers", () => {
|
||||
const headers = JSON.stringify({
|
||||
"X-Correlation-Id": "12345",
|
||||
"X-Custom-Header": "custom-value",
|
||||
Authorization: "Bearer token123",
|
||||
});
|
||||
|
||||
const result = parseStreamHeaders(headers);
|
||||
expect(result).toEqual({
|
||||
"X-Correlation-Id": "12345",
|
||||
"X-Custom-Header": "custom-value",
|
||||
Authorization: "Bearer token123",
|
||||
});
|
||||
});
|
||||
|
||||
it("should handle headers with spaces", () => {
|
||||
const headers = JSON.stringify({
|
||||
"X-Header-One": "value with spaces",
|
||||
"X-Header-Two": "another value",
|
||||
});
|
||||
|
||||
const result = parseStreamHeaders(headers);
|
||||
expect(result).toEqual({
|
||||
"X-Header-One": "value with spaces",
|
||||
"X-Header-Two": "another value",
|
||||
});
|
||||
});
|
||||
|
||||
it("should skip empty lines and comments", () => {
|
||||
const headers = JSON.stringify({
|
||||
"X-Header-One": "value1",
|
||||
"X-Header-Two": "value2",
|
||||
"X-Header-Three": "value3",
|
||||
});
|
||||
|
||||
const result = parseStreamHeaders(headers);
|
||||
expect(result).toEqual({
|
||||
"X-Header-One": "value1",
|
||||
"X-Header-Two": "value2",
|
||||
"X-Header-Three": "value3",
|
||||
});
|
||||
});
|
||||
|
||||
it("should skip lines without colons", () => {
|
||||
const headers = JSON.stringify({
|
||||
"X-Header-One": "value1",
|
||||
"X-Header-Two": "value2",
|
||||
});
|
||||
|
||||
const result = parseStreamHeaders(headers);
|
||||
expect(result).toEqual({
|
||||
"X-Header-One": "value1",
|
||||
"X-Header-Two": "value2",
|
||||
});
|
||||
});
|
||||
|
||||
it("should handle headers with colons in values", () => {
|
||||
const headers = JSON.stringify({
|
||||
"X-URL": "https://example.com:8080/path",
|
||||
"X-Time": "10:30:45",
|
||||
});
|
||||
|
||||
const result = parseStreamHeaders(headers);
|
||||
expect(result).toEqual({
|
||||
"X-URL": "https://example.com:8080/path",
|
||||
"X-Time": "10:30:45",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("StreamHandler", () => {
|
||||
let handler: StreamHandler;
|
||||
let mockFetch: ReturnType<typeof mock>;
|
||||
let mockTokenGetter: TokenGetter;
|
||||
const mockEndpoint = "https://test.example.com/stream";
|
||||
const mockToken = "mock-oidc-token";
|
||||
|
||||
beforeEach(() => {
|
||||
// Mock fetch
|
||||
mockFetch = mock(() => Promise.resolve({ ok: true }));
|
||||
global.fetch = mockFetch as any;
|
||||
|
||||
// Mock token getter
|
||||
mockTokenGetter = mock(() => Promise.resolve(mockToken));
|
||||
});
|
||||
|
||||
describe("basic functionality", () => {
|
||||
it("should batch lines up to BATCH_SIZE", async () => {
|
||||
handler = new StreamHandler(mockEndpoint, {}, mockTokenGetter);
|
||||
|
||||
// Add 9 lines (less than batch size of 10)
|
||||
for (let i = 1; i <= 9; i++) {
|
||||
await handler.addOutput(`line ${i}\n`);
|
||||
}
|
||||
|
||||
// Should not have sent anything yet
|
||||
expect(mockFetch).not.toHaveBeenCalled();
|
||||
|
||||
// Add the 10th line to trigger flush
|
||||
await handler.addOutput("line 10\n");
|
||||
|
||||
// Should have sent the batch
|
||||
expect(mockFetch).toHaveBeenCalledTimes(1);
|
||||
expect(mockFetch).toHaveBeenCalledWith(mockEndpoint, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Authorization: `Bearer ${mockToken}`,
|
||||
},
|
||||
body: expect.stringContaining(
|
||||
'"output":["line 1","line 2","line 3","line 4","line 5","line 6","line 7","line 8","line 9","line 10"]',
|
||||
),
|
||||
signal: expect.any(AbortSignal),
|
||||
});
|
||||
});
|
||||
|
||||
it("should flush on timeout", async () => {
|
||||
handler = new StreamHandler(mockEndpoint, {}, mockTokenGetter);
|
||||
|
||||
// Add a few lines
|
||||
await handler.addOutput("line 1\n");
|
||||
await handler.addOutput("line 2\n");
|
||||
|
||||
// Should not have sent anything yet
|
||||
expect(mockFetch).not.toHaveBeenCalled();
|
||||
|
||||
// Wait for the timeout to trigger
|
||||
await new Promise((resolve) => setTimeout(resolve, 1100));
|
||||
|
||||
// Should have sent the batch
|
||||
expect(mockFetch).toHaveBeenCalledTimes(1);
|
||||
const call = mockFetch.mock.calls[0];
|
||||
expect(call).toBeDefined();
|
||||
const body = JSON.parse(call![1].body);
|
||||
expect(body.output).toEqual(["line 1", "line 2"]);
|
||||
});
|
||||
|
||||
it("should include custom headers", async () => {
|
||||
const customHeaders = {
|
||||
"X-Correlation-Id": "12345",
|
||||
"X-Custom": "value",
|
||||
};
|
||||
handler = new StreamHandler(mockEndpoint, customHeaders, mockTokenGetter);
|
||||
|
||||
// Trigger a batch
|
||||
for (let i = 1; i <= 10; i++) {
|
||||
await handler.addOutput(`line ${i}\n`);
|
||||
}
|
||||
|
||||
expect(mockFetch).toHaveBeenCalledWith(mockEndpoint, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Authorization: `Bearer ${mockToken}`,
|
||||
"X-Correlation-Id": "12345",
|
||||
"X-Custom": "value",
|
||||
},
|
||||
body: expect.any(String),
|
||||
signal: expect.any(AbortSignal),
|
||||
});
|
||||
});
|
||||
|
||||
it("should include timestamp in payload", async () => {
|
||||
handler = new StreamHandler(mockEndpoint, {}, mockTokenGetter);
|
||||
|
||||
const beforeTime = new Date().toISOString();
|
||||
|
||||
// Trigger a batch
|
||||
for (let i = 1; i <= 10; i++) {
|
||||
await handler.addOutput(`line ${i}\n`);
|
||||
}
|
||||
|
||||
const afterTime = new Date().toISOString();
|
||||
|
||||
const call = mockFetch.mock.calls[0];
|
||||
expect(call).toBeDefined();
|
||||
const body = JSON.parse(call![1].body);
|
||||
|
||||
expect(body).toHaveProperty("timestamp");
|
||||
expect(new Date(body.timestamp).toISOString()).toBe(body.timestamp);
|
||||
expect(body.timestamp >= beforeTime).toBe(true);
|
||||
expect(body.timestamp <= afterTime).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("token management", () => {
|
||||
it("should fetch token on first request", async () => {
|
||||
handler = new StreamHandler(mockEndpoint, {}, mockTokenGetter);
|
||||
|
||||
// Trigger a flush
|
||||
for (let i = 1; i <= 10; i++) {
|
||||
await handler.addOutput(`line ${i}\n`);
|
||||
}
|
||||
|
||||
expect(mockTokenGetter).toHaveBeenCalledWith("claude-code-github-action");
|
||||
expect(mockTokenGetter).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("should reuse token within 4 minutes", async () => {
|
||||
handler = new StreamHandler(mockEndpoint, {}, mockTokenGetter);
|
||||
|
||||
// First batch
|
||||
for (let i = 1; i <= 10; i++) {
|
||||
await handler.addOutput(`line ${i}\n`);
|
||||
}
|
||||
|
||||
// Second batch immediately (within 4 minutes)
|
||||
for (let i = 11; i <= 20; i++) {
|
||||
await handler.addOutput(`line ${i}\n`);
|
||||
}
|
||||
|
||||
// Should have only fetched token once
|
||||
expect(mockTokenGetter).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("should handle token fetch errors", async () => {
|
||||
const errorTokenGetter = mock(() =>
|
||||
Promise.reject(new Error("Token fetch failed")),
|
||||
);
|
||||
handler = new StreamHandler(mockEndpoint, {}, errorTokenGetter);
|
||||
|
||||
// Try to send data
|
||||
for (let i = 1; i <= 10; i++) {
|
||||
await handler.addOutput(`line ${i}\n`);
|
||||
}
|
||||
|
||||
// Should not have made fetch request
|
||||
expect(mockFetch).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe("error handling", () => {
|
||||
it("should handle fetch errors gracefully", async () => {
|
||||
mockFetch.mockImplementation(() =>
|
||||
Promise.reject(new Error("Network error")),
|
||||
);
|
||||
handler = new StreamHandler(mockEndpoint, {}, mockTokenGetter);
|
||||
|
||||
// Send data - should not throw
|
||||
for (let i = 1; i <= 10; i++) {
|
||||
await handler.addOutput(`line ${i}\n`);
|
||||
}
|
||||
|
||||
// Should have attempted to fetch
|
||||
expect(mockFetch).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("should continue processing after errors", async () => {
|
||||
handler = new StreamHandler(mockEndpoint, {}, mockTokenGetter);
|
||||
|
||||
// First batch - make it fail
|
||||
let callCount = 0;
|
||||
mockFetch.mockImplementation(() => {
|
||||
callCount++;
|
||||
if (callCount === 1) {
|
||||
return Promise.reject(new Error("First batch failed"));
|
||||
}
|
||||
return Promise.resolve({ ok: true });
|
||||
});
|
||||
|
||||
for (let i = 1; i <= 10; i++) {
|
||||
await handler.addOutput(`line ${i}\n`);
|
||||
}
|
||||
|
||||
// Second batch - should work
|
||||
for (let i = 11; i <= 20; i++) {
|
||||
await handler.addOutput(`line ${i}\n`);
|
||||
}
|
||||
|
||||
// Should have attempted both batches
|
||||
expect(mockFetch).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
|
||||
describe("close functionality", () => {
|
||||
it("should flush remaining data on close", async () => {
|
||||
handler = new StreamHandler(mockEndpoint, {}, mockTokenGetter);
|
||||
|
||||
// Add some data but not enough to trigger batch
|
||||
await handler.addOutput("line 1\n");
|
||||
await handler.addOutput("line 2\n");
|
||||
|
||||
expect(mockFetch).not.toHaveBeenCalled();
|
||||
|
||||
// Close should flush
|
||||
await handler.close();
|
||||
|
||||
expect(mockFetch).toHaveBeenCalledTimes(1);
|
||||
const call = mockFetch.mock.calls[0];
|
||||
expect(call).toBeDefined();
|
||||
const body = JSON.parse(call![1].body);
|
||||
expect(body.output).toEqual(["line 1", "line 2"]);
|
||||
});
|
||||
|
||||
it("should not accept new data after close", async () => {
|
||||
handler = new StreamHandler(mockEndpoint, {}, mockTokenGetter);
|
||||
|
||||
await handler.close();
|
||||
|
||||
// Try to add data after close
|
||||
await handler.addOutput("should not be sent\n");
|
||||
|
||||
// Should not have sent anything
|
||||
expect(mockFetch).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe("data handling", () => {
|
||||
it("should filter out empty lines", async () => {
|
||||
handler = new StreamHandler(mockEndpoint, {}, mockTokenGetter);
|
||||
|
||||
await handler.addOutput("line 1\n\n\nline 2\n\n");
|
||||
await handler.close();
|
||||
|
||||
const call = mockFetch.mock.calls[0];
|
||||
expect(call).toBeDefined();
|
||||
const body = JSON.parse(call![1].body);
|
||||
expect(body.output).toEqual(["line 1", "line 2"]);
|
||||
});
|
||||
|
||||
it("should handle data without newlines", async () => {
|
||||
handler = new StreamHandler(mockEndpoint, {}, mockTokenGetter);
|
||||
|
||||
await handler.addOutput("single line");
|
||||
await handler.close();
|
||||
|
||||
const call = mockFetch.mock.calls[0];
|
||||
expect(call).toBeDefined();
|
||||
const body = JSON.parse(call![1].body);
|
||||
expect(body.output).toEqual(["single line"]);
|
||||
});
|
||||
|
||||
it("should handle multi-line input correctly", async () => {
|
||||
handler = new StreamHandler(mockEndpoint, {}, mockTokenGetter);
|
||||
|
||||
await handler.addOutput("line 1\nline 2\nline 3");
|
||||
await handler.close();
|
||||
|
||||
const call = mockFetch.mock.calls[0];
|
||||
expect(call).toBeDefined();
|
||||
const body = JSON.parse(call![1].body);
|
||||
expect(body.output).toEqual(["line 1", "line 2", "line 3"]);
|
||||
});
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user