remoteIO.ts 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. import type { StdoutMessage } from 'src/entrypoints/sdk/controlTypes.js'
  2. import { PassThrough } from 'stream'
  3. import { URL } from 'url'
  4. import { getSessionId } from '../bootstrap/state.js'
  5. import { getPollIntervalConfig } from '../bridge/pollConfig.js'
  6. import { registerCleanup } from '../utils/cleanupRegistry.js'
  7. import { setCommandLifecycleListener } from '../utils/commandLifecycle.js'
  8. import { isDebugMode, logForDebugging } from '../utils/debug.js'
  9. import { logForDiagnosticsNoPII } from '../utils/diagLogs.js'
  10. import { isEnvTruthy } from '../utils/envUtils.js'
  11. import { errorMessage } from '../utils/errors.js'
  12. import { gracefulShutdown } from '../utils/gracefulShutdown.js'
  13. import { logError } from '../utils/log.js'
  14. import { writeToStdout } from '../utils/process.js'
  15. import { getSessionIngressAuthToken } from '../utils/sessionIngressAuth.js'
  16. import {
  17. setSessionMetadataChangedListener,
  18. setSessionStateChangedListener,
  19. } from '../utils/sessionState.js'
  20. import {
  21. setInternalEventReader,
  22. setInternalEventWriter,
  23. } from '../utils/sessionStorage.js'
  24. import { ndjsonSafeStringify } from './ndjsonSafeStringify.js'
  25. import { StructuredIO } from './structuredIO.js'
  26. import { CCRClient, CCRInitError } from './transports/ccrClient.js'
  27. import { SSETransport } from './transports/SSETransport.js'
  28. import type { Transport } from './transports/Transport.js'
  29. import { getTransportForUrl } from './transports/transportUtils.js'
  30. /**
  31. * Bidirectional streaming for SDK mode with session tracking
  32. * Supports WebSocket transport
  33. */
  34. export class RemoteIO extends StructuredIO {
  35. private url: URL
  36. private transport: Transport
  37. private inputStream: PassThrough
  38. private readonly isBridge: boolean = false
  39. private readonly isDebug: boolean = false
  40. private ccrClient: CCRClient | null = null
  41. private keepAliveTimer: ReturnType<typeof setInterval> | null = null
  42. constructor(
  43. streamUrl: string,
  44. initialPrompt?: AsyncIterable<string>,
  45. replayUserMessages?: boolean,
  46. ) {
  47. const inputStream = new PassThrough({ encoding: 'utf8' })
  48. super(inputStream, replayUserMessages)
  49. this.inputStream = inputStream
  50. this.url = new URL(streamUrl)
  51. // Prepare headers with session token if available
  52. const headers: Record<string, string> = {}
  53. const sessionToken = getSessionIngressAuthToken()
  54. if (sessionToken) {
  55. headers['Authorization'] = `Bearer ${sessionToken}`
  56. } else {
  57. logForDebugging('[remote-io] No session ingress token available', {
  58. level: 'error',
  59. })
  60. }
  61. // Add environment runner version if available (set by Environment Manager)
  62. const erVersion = process.env.CLAUDE_CODE_ENVIRONMENT_RUNNER_VERSION
  63. if (erVersion) {
  64. headers['x-environment-runner-version'] = erVersion
  65. }
  66. // Provide a callback that re-reads the session token dynamically.
  67. // When the parent process refreshes the token (via token file or env var),
  68. // the transport can pick it up on reconnection.
  69. const refreshHeaders = (): Record<string, string> => {
  70. const h: Record<string, string> = {}
  71. const freshToken = getSessionIngressAuthToken()
  72. if (freshToken) {
  73. h['Authorization'] = `Bearer ${freshToken}`
  74. }
  75. const freshErVersion = process.env.CLAUDE_CODE_ENVIRONMENT_RUNNER_VERSION
  76. if (freshErVersion) {
  77. h['x-environment-runner-version'] = freshErVersion
  78. }
  79. return h
  80. }
  81. // Get appropriate transport based on URL protocol
  82. this.transport = getTransportForUrl(
  83. this.url,
  84. headers,
  85. getSessionId(),
  86. refreshHeaders,
  87. )
  88. // Set up data callback
  89. this.isBridge = process.env.CLAUDE_CODE_ENVIRONMENT_KIND === 'bridge'
  90. this.isDebug = isDebugMode()
  91. this.transport.setOnData((data: string) => {
  92. this.inputStream.write(data)
  93. if (this.isBridge && this.isDebug) {
  94. writeToStdout(data.endsWith('\n') ? data : data + '\n')
  95. }
  96. })
  97. // Set up close callback to handle connection failures
  98. this.transport.setOnClose(() => {
  99. // End the input stream to trigger graceful shutdown
  100. this.inputStream.end()
  101. })
  102. // Initialize CCR v2 client (heartbeats, epoch, state reporting, event writes).
  103. // The CCRClient constructor wires the SSE received-ack handler
  104. // synchronously, so new CCRClient() MUST run before transport.connect() —
  105. // otherwise early SSE frames hit an unwired onEventCallback and their
  106. // 'received' delivery acks are silently dropped.
  107. if (isEnvTruthy(process.env.CLAUDE_CODE_USE_CCR_V2)) {
  108. // CCR v2 is SSE+POST by definition. getTransportForUrl returns
  109. // SSETransport under the same env var, but the two checks live in
  110. // different files — assert the invariant so a future decoupling
  111. // fails loudly here instead of confusingly inside CCRClient.
  112. if (!(this.transport instanceof SSETransport)) {
  113. throw new Error(
  114. 'CCR v2 requires SSETransport; check getTransportForUrl',
  115. )
  116. }
  117. this.ccrClient = new CCRClient(this.transport, this.url)
  118. const init = this.ccrClient.initialize()
  119. this.restoredWorkerState = init.catch(() => null)
  120. init.catch((error: unknown) => {
  121. logForDiagnosticsNoPII('error', 'cli_worker_lifecycle_init_failed', {
  122. reason: error instanceof CCRInitError ? error.reason : 'unknown',
  123. })
  124. logError(
  125. new Error(`CCRClient initialization failed: ${errorMessage(error)}`),
  126. )
  127. void gracefulShutdown(1, 'other')
  128. })
  129. registerCleanup(async () => this.ccrClient?.close())
  130. // Register internal event writer for transcript persistence.
  131. // When set, sessionStorage writes transcript messages as CCR v2
  132. // internal events instead of v1 Session Ingress.
  133. setInternalEventWriter((eventType, payload, options) =>
  134. this.ccrClient!.writeInternalEvent(eventType, payload, options),
  135. )
  136. // Register internal event readers for session resume.
  137. // When set, hydrateFromCCRv2InternalEvents() can fetch foreground
  138. // and subagent internal events to reconstruct conversation state.
  139. setInternalEventReader(
  140. () => this.ccrClient!.readInternalEvents(),
  141. () => this.ccrClient!.readSubagentInternalEvents(),
  142. )
  143. const LIFECYCLE_TO_DELIVERY = {
  144. started: 'processing',
  145. completed: 'processed',
  146. } as const
  147. setCommandLifecycleListener((uuid, state) => {
  148. this.ccrClient?.reportDelivery(uuid, LIFECYCLE_TO_DELIVERY[state])
  149. })
  150. setSessionStateChangedListener((state, details) => {
  151. this.ccrClient?.reportState(state, details)
  152. })
  153. setSessionMetadataChangedListener(metadata => {
  154. this.ccrClient?.reportMetadata(metadata)
  155. })
  156. }
  157. // Start connection only after all callbacks are wired (setOnData above,
  158. // setOnEvent inside new CCRClient() when CCR v2 is enabled).
  159. void this.transport.connect()
  160. // Push a silent keep_alive frame on a fixed interval so upstream
  161. // proxies and the session-ingress layer don't GC an otherwise-idle
  162. // remote control session. The keep_alive type is filtered before
  163. // reaching any client UI (Query.ts drops it; structuredIO.ts drops it;
  164. // web/iOS/Android never see it in their message loop). Interval comes
  165. // from GrowthBook (tengu_bridge_poll_interval_config
  166. // session_keepalive_interval_v2_ms, default 120s); 0 = disabled.
  167. // Bridge-only: fixes Envoy idle timeout on bridge-topology sessions
  168. // (#21931). byoc workers ran without this before #21931 and do not
  169. // need it — different network path.
  170. const keepAliveIntervalMs =
  171. getPollIntervalConfig().session_keepalive_interval_v2_ms
  172. if (this.isBridge && keepAliveIntervalMs > 0) {
  173. this.keepAliveTimer = setInterval(() => {
  174. logForDebugging('[remote-io] keep_alive sent')
  175. void this.write({ type: 'keep_alive' }).catch(err => {
  176. logForDebugging(
  177. `[remote-io] keep_alive write failed: ${errorMessage(err)}`,
  178. )
  179. })
  180. }, keepAliveIntervalMs)
  181. this.keepAliveTimer.unref?.()
  182. }
  183. // Register for graceful shutdown cleanup
  184. registerCleanup(async () => this.close())
  185. // If initial prompt is provided, send it through the input stream
  186. if (initialPrompt) {
  187. // Convert the initial prompt to the input stream format.
  188. // Chunks from stdin may already contain trailing newlines, so strip
  189. // them before appending our own to avoid double-newline issues that
  190. // cause structuredIO to parse empty lines. String() handles both
  191. // string chunks and Buffer objects from process.stdin.
  192. const stream = this.inputStream
  193. void (async () => {
  194. for await (const chunk of initialPrompt) {
  195. stream.write(String(chunk).replace(/\n$/, '') + '\n')
  196. }
  197. })()
  198. }
  199. }
  200. override flushInternalEvents(): Promise<void> {
  201. return this.ccrClient?.flushInternalEvents() ?? Promise.resolve()
  202. }
  203. override get internalEventsPending(): number {
  204. return this.ccrClient?.internalEventsPending ?? 0
  205. }
  206. /**
  207. * Send output to the transport.
  208. * In bridge mode, control_request messages are always echoed to stdout so the
  209. * bridge parent can detect permission requests. Other messages are echoed only
  210. * in debug mode.
  211. */
  212. async write(message: StdoutMessage): Promise<void> {
  213. if (this.ccrClient) {
  214. await this.ccrClient.writeEvent(message)
  215. } else {
  216. await this.transport.write(message)
  217. }
  218. if (this.isBridge) {
  219. if (message.type === 'control_request' || this.isDebug) {
  220. writeToStdout(ndjsonSafeStringify(message) + '\n')
  221. }
  222. }
  223. }
  224. /**
  225. * Clean up connections gracefully
  226. */
  227. close(): void {
  228. if (this.keepAliveTimer) {
  229. clearInterval(this.keepAliveTimer)
  230. this.keepAliveTimer = null
  231. }
  232. this.transport.close()
  233. this.inputStream.end()
  234. }
  235. }