| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419 |
- import type {
- BetaContentBlock,
- BetaContentBlockParam,
- BetaImageBlockParam,
- BetaJSONOutputFormat,
- BetaMessage,
- BetaMessageDeltaUsage,
- BetaMessageStreamParams,
- BetaOutputConfig,
- BetaRawMessageStreamEvent,
- BetaRequestDocumentBlock,
- BetaStopReason,
- BetaToolChoiceAuto,
- BetaToolChoiceTool,
- BetaToolResultBlockParam,
- BetaToolUnion,
- BetaUsage,
- BetaMessageParam as MessageParam,
- } from '@anthropic-ai/sdk/resources/beta/messages/messages.mjs'
- import type { TextBlockParam } from '@anthropic-ai/sdk/resources/index.mjs'
- import type { Stream } from '@anthropic-ai/sdk/streaming.mjs'
- import { randomUUID } from 'crypto'
- import {
- getAPIProvider,
- isFirstPartyAnthropicBaseUrl,
- } from 'src/utils/model/providers.js'
- import {
- getAttributionHeader,
- getCLISyspromptPrefix,
- } from '../../constants/system.js'
- import {
- getEmptyToolPermissionContext,
- type QueryChainTracking,
- type Tool,
- type ToolPermissionContext,
- type Tools,
- toolMatchesName,
- } from '../../Tool.js'
- import type { AgentDefinition } from '../../tools/AgentTool/loadAgentsDir.js'
- import {
- type ConnectorTextBlock,
- type ConnectorTextDelta,
- isConnectorTextBlock,
- } from '../../types/connectorText.js'
- import type {
- AssistantMessage,
- Message,
- StreamEvent,
- SystemAPIErrorMessage,
- UserMessage,
- } from '../../types/message.js'
- import {
- type CacheScope,
- logAPIPrefix,
- splitSysPromptPrefix,
- toolToAPISchema,
- } from '../../utils/api.js'
- import { getOauthAccountInfo } from '../../utils/auth.js'
- import {
- getBedrockExtraBodyParamsBetas,
- getMergedBetas,
- getModelBetas,
- } from '../../utils/betas.js'
- import { getOrCreateUserID } from '../../utils/config.js'
- import {
- CAPPED_DEFAULT_MAX_TOKENS,
- getModelMaxOutputTokens,
- getSonnet1mExpTreatmentEnabled,
- } from '../../utils/context.js'
- import { resolveAppliedEffort } from '../../utils/effort.js'
- import { isEnvTruthy } from '../../utils/envUtils.js'
- import { errorMessage } from '../../utils/errors.js'
- import { computeFingerprintFromMessages } from '../../utils/fingerprint.js'
- import { captureAPIRequest, logError } from '../../utils/log.js'
- import {
- createAssistantAPIErrorMessage,
- createUserMessage,
- ensureToolResultPairing,
- normalizeContentFromAPI,
- normalizeMessagesForAPI,
- stripAdvisorBlocks,
- stripCallerFieldFromAssistantMessage,
- stripToolReferenceBlocksFromUserMessage,
- } from '../../utils/messages.js'
- import {
- getDefaultOpusModel,
- getDefaultSonnetModel,
- getSmallFastModel,
- isNonCustomOpusModel,
- } from '../../utils/model/model.js'
- import {
- asSystemPrompt,
- type SystemPrompt,
- } from '../../utils/systemPromptType.js'
- import { tokenCountFromLastAPIResponse } from '../../utils/tokens.js'
- import { getDynamicConfig_BLOCKS_ON_INIT } from '../analytics/growthbook.js'
- import {
- currentLimits,
- extractQuotaStatusFromError,
- extractQuotaStatusFromHeaders,
- } from '../claudeAiLimits.js'
- import { getAPIContextManagement } from '../compact/apiMicrocompact.js'
- /* eslint-disable @typescript-eslint/no-require-imports */
- const autoModeStateModule = feature('TRANSCRIPT_CLASSIFIER')
- ? (require('../../utils/permissions/autoModeState.js') as typeof import('../../utils/permissions/autoModeState.js'))
- : null
- import { feature } from 'bun:bundle'
- import type { ClientOptions } from '@anthropic-ai/sdk'
- import {
- APIConnectionTimeoutError,
- APIError,
- APIUserAbortError,
- } from '@anthropic-ai/sdk/error'
- import {
- getAfkModeHeaderLatched,
- getCacheEditingHeaderLatched,
- getFastModeHeaderLatched,
- getLastApiCompletionTimestamp,
- getPromptCache1hAllowlist,
- getPromptCache1hEligible,
- getSessionId,
- getThinkingClearLatched,
- setAfkModeHeaderLatched,
- setCacheEditingHeaderLatched,
- setFastModeHeaderLatched,
- setLastMainRequestId,
- setPromptCache1hAllowlist,
- setPromptCache1hEligible,
- setThinkingClearLatched,
- } from 'src/bootstrap/state.js'
- import {
- AFK_MODE_BETA_HEADER,
- CONTEXT_1M_BETA_HEADER,
- CONTEXT_MANAGEMENT_BETA_HEADER,
- EFFORT_BETA_HEADER,
- FAST_MODE_BETA_HEADER,
- PROMPT_CACHING_SCOPE_BETA_HEADER,
- REDACT_THINKING_BETA_HEADER,
- STRUCTURED_OUTPUTS_BETA_HEADER,
- TASK_BUDGETS_BETA_HEADER,
- } from 'src/constants/betas.js'
- import type { QuerySource } from 'src/constants/querySource.js'
- import type { Notification } from 'src/context/notifications.js'
- import { addToTotalSessionCost } from 'src/cost-tracker.js'
- import { getFeatureValue_CACHED_MAY_BE_STALE } from 'src/services/analytics/growthbook.js'
- import type { AgentId } from 'src/types/ids.js'
- import {
- ADVISOR_TOOL_INSTRUCTIONS,
- getExperimentAdvisorModels,
- isAdvisorEnabled,
- isValidAdvisorModel,
- modelSupportsAdvisor,
- } from 'src/utils/advisor.js'
- import { getAgentContext } from 'src/utils/agentContext.js'
- import { isClaudeAISubscriber } from 'src/utils/auth.js'
- import {
- getToolSearchBetaHeader,
- modelSupportsStructuredOutputs,
- shouldIncludeFirstPartyOnlyBetas,
- shouldUseGlobalCacheScope,
- } from 'src/utils/betas.js'
- import { CLAUDE_IN_CHROME_MCP_SERVER_NAME } from 'src/utils/claudeInChrome/common.js'
- import { CHROME_TOOL_SEARCH_INSTRUCTIONS } from 'src/utils/claudeInChrome/prompt.js'
- import { getMaxThinkingTokensForModel } from 'src/utils/context.js'
- import { logForDebugging } from 'src/utils/debug.js'
- import { logForDiagnosticsNoPII } from 'src/utils/diagLogs.js'
- import { type EffortValue, modelSupportsEffort } from 'src/utils/effort.js'
- import {
- isFastModeAvailable,
- isFastModeCooldown,
- isFastModeEnabled,
- isFastModeSupportedByModel,
- } from 'src/utils/fastMode.js'
- import { returnValue } from 'src/utils/generators.js'
- import { headlessProfilerCheckpoint } from 'src/utils/headlessProfiler.js'
- import { isMcpInstructionsDeltaEnabled } from 'src/utils/mcpInstructionsDelta.js'
- import { calculateUSDCost } from 'src/utils/modelCost.js'
- import { endQueryProfile, queryCheckpoint } from 'src/utils/queryProfiler.js'
- import {
- modelSupportsAdaptiveThinking,
- modelSupportsThinking,
- type ThinkingConfig,
- } from 'src/utils/thinking.js'
- import {
- extractDiscoveredToolNames,
- isDeferredToolsDeltaEnabled,
- isToolSearchEnabled,
- } from 'src/utils/toolSearch.js'
- import { API_MAX_MEDIA_PER_REQUEST } from '../../constants/apiLimits.js'
- import { ADVISOR_BETA_HEADER } from '../../constants/betas.js'
- import {
- formatDeferredToolLine,
- isDeferredTool,
- TOOL_SEARCH_TOOL_NAME,
- } from '../../tools/ToolSearchTool/prompt.js'
- import { count } from '../../utils/array.js'
- import { insertBlockAfterToolResults } from '../../utils/contentArray.js'
- import { validateBoundedIntEnvVar } from '../../utils/envValidation.js'
- import { safeParseJSON } from '../../utils/json.js'
- import { getInferenceProfileBackingModel } from '../../utils/model/bedrock.js'
- import {
- normalizeModelStringForAPI,
- parseUserSpecifiedModel,
- } from '../../utils/model/model.js'
- import {
- startSessionActivity,
- stopSessionActivity,
- } from '../../utils/sessionActivity.js'
- import { jsonStringify } from '../../utils/slowOperations.js'
- import {
- isBetaTracingEnabled,
- type LLMRequestNewContext,
- startLLMRequestSpan,
- } from '../../utils/telemetry/sessionTracing.js'
- /* eslint-enable @typescript-eslint/no-require-imports */
- import {
- type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- logEvent,
- } from '../analytics/index.js'
- import {
- consumePendingCacheEdits,
- getPinnedCacheEdits,
- markToolsSentToAPIState,
- pinCacheEdits,
- } from '../compact/microCompact.js'
- import { getInitializationStatus } from '../lsp/manager.js'
- import { isToolFromMcpServer } from '../mcp/utils.js'
- import { withStreamingVCR, withVCR } from '../vcr.js'
- import { CLIENT_REQUEST_ID_HEADER, getAnthropicClient } from './client.js'
- import {
- API_ERROR_MESSAGE_PREFIX,
- CUSTOM_OFF_SWITCH_MESSAGE,
- getAssistantMessageFromError,
- getErrorMessageIfRefusal,
- } from './errors.js'
- import {
- EMPTY_USAGE,
- type GlobalCacheStrategy,
- logAPIError,
- logAPIQuery,
- logAPISuccessAndDuration,
- type NonNullableUsage,
- } from './logging.js'
- import {
- CACHE_TTL_1HOUR_MS,
- checkResponseForCacheBreak,
- recordPromptState,
- } from './promptCacheBreakDetection.js'
- import {
- CannotRetryError,
- FallbackTriggeredError,
- is529Error,
- type RetryContext,
- withRetry,
- } from './withRetry.js'
- // Define a type that represents valid JSON values
- type JsonValue = string | number | boolean | null | JsonObject | JsonArray
- type JsonObject = { [key: string]: JsonValue }
- type JsonArray = JsonValue[]
- /**
- * Assemble the extra body parameters for the API request, based on the
- * CLAUDE_CODE_EXTRA_BODY environment variable if present and on any beta
- * headers (primarily for Bedrock requests).
- *
- * @param betaHeaders - An array of beta headers to include in the request.
- * @returns A JSON object representing the extra body parameters.
- */
- export function getExtraBodyParams(betaHeaders?: string[]): JsonObject {
- // Parse user's extra body parameters first
- const extraBodyStr = process.env.CLAUDE_CODE_EXTRA_BODY
- let result: JsonObject = {}
- if (extraBodyStr) {
- try {
- // Parse as JSON, which can be null, boolean, number, string, array or object
- const parsed = safeParseJSON(extraBodyStr)
- // We expect an object with key-value pairs to spread into API parameters
- if (parsed && typeof parsed === 'object' && !Array.isArray(parsed)) {
- // Shallow clone — safeParseJSON is LRU-cached and returns the same
- // object reference for the same string. Mutating `result` below
- // would poison the cache, causing stale values to persist.
- result = { ...(parsed as JsonObject) }
- } else {
- logForDebugging(
- `CLAUDE_CODE_EXTRA_BODY env var must be a JSON object, but was given ${extraBodyStr}`,
- { level: 'error' },
- )
- }
- } catch (error) {
- logForDebugging(
- `Error parsing CLAUDE_CODE_EXTRA_BODY: ${errorMessage(error)}`,
- { level: 'error' },
- )
- }
- }
- // Anti-distillation: send fake_tools opt-in for 1P CLI only
- if (
- feature('ANTI_DISTILLATION_CC')
- ? process.env.CLAUDE_CODE_ENTRYPOINT === 'cli' &&
- shouldIncludeFirstPartyOnlyBetas() &&
- getFeatureValue_CACHED_MAY_BE_STALE(
- 'tengu_anti_distill_fake_tool_injection',
- false,
- )
- : false
- ) {
- result.anti_distillation = ['fake_tools']
- }
- // Handle beta headers if provided
- if (betaHeaders && betaHeaders.length > 0) {
- if (result.anthropic_beta && Array.isArray(result.anthropic_beta)) {
- // Add to existing array, avoiding duplicates
- const existingHeaders = result.anthropic_beta as string[]
- const newHeaders = betaHeaders.filter(
- header => !existingHeaders.includes(header),
- )
- result.anthropic_beta = [...existingHeaders, ...newHeaders]
- } else {
- // Create new array with the beta headers
- result.anthropic_beta = betaHeaders
- }
- }
- return result
- }
- export function getPromptCachingEnabled(model: string): boolean {
- // Global disable takes precedence
- if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING)) return false
- // Check if we should disable for small/fast model
- if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_HAIKU)) {
- const smallFastModel = getSmallFastModel()
- if (model === smallFastModel) return false
- }
- // Check if we should disable for default Sonnet
- if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_SONNET)) {
- const defaultSonnet = getDefaultSonnetModel()
- if (model === defaultSonnet) return false
- }
- // Check if we should disable for default Opus
- if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_OPUS)) {
- const defaultOpus = getDefaultOpusModel()
- if (model === defaultOpus) return false
- }
- return true
- }
- export function getCacheControl({
- scope,
- querySource,
- }: {
- scope?: CacheScope
- querySource?: QuerySource
- } = {}): {
- type: 'ephemeral'
- ttl?: '1h'
- scope?: CacheScope
- } {
- return {
- type: 'ephemeral',
- ...(should1hCacheTTL(querySource) && { ttl: '1h' }),
- ...(scope === 'global' && { scope }),
- }
- }
- /**
- * Determines if 1h TTL should be used for prompt caching.
- *
- * Only applied when:
- * 1. User is eligible (ant or subscriber within rate limits)
- * 2. The query source matches a pattern in the GrowthBook allowlist
- *
- * GrowthBook config shape: { allowlist: string[] }
- * Patterns support trailing '*' for prefix matching.
- * Examples:
- * - { allowlist: ["repl_main_thread*", "sdk"] } — main thread + SDK only
- * - { allowlist: ["repl_main_thread*", "sdk", "agent:*"] } — also subagents
- * - { allowlist: ["*"] } — all sources
- *
- * The allowlist is cached in STATE for session stability — prevents mixed
- * TTLs when GrowthBook's disk cache updates mid-request.
- */
- function should1hCacheTTL(querySource?: QuerySource): boolean {
- // 3P Bedrock users get 1h TTL when opted in via env var — they manage their own billing
- // No GrowthBook gating needed since 3P users don't have GrowthBook configured
- if (
- getAPIProvider() === 'bedrock' &&
- isEnvTruthy(process.env.ENABLE_PROMPT_CACHING_1H_BEDROCK)
- ) {
- return true
- }
- // Latch eligibility in bootstrap state for session stability — prevents
- // mid-session overage flips from changing the cache_control TTL, which
- // would bust the server-side prompt cache (~20K tokens per flip).
- let userEligible = getPromptCache1hEligible()
- if (userEligible === null) {
- userEligible =
- process.env.USER_TYPE === 'ant' ||
- (isClaudeAISubscriber() && !currentLimits.isUsingOverage)
- setPromptCache1hEligible(userEligible)
- }
- if (!userEligible) return false
- // Cache allowlist in bootstrap state for session stability — prevents mixed
- // TTLs when GrowthBook's disk cache updates mid-request
- let allowlist = getPromptCache1hAllowlist()
- if (allowlist === null) {
- const config = getFeatureValue_CACHED_MAY_BE_STALE<{
- allowlist?: string[]
- }>('tengu_prompt_cache_1h_config', {})
- allowlist = config.allowlist ?? []
- setPromptCache1hAllowlist(allowlist)
- }
- return (
- querySource !== undefined &&
- allowlist.some(pattern =>
- pattern.endsWith('*')
- ? querySource.startsWith(pattern.slice(0, -1))
- : querySource === pattern,
- )
- )
- }
- /**
- * Configure effort parameters for API request.
- *
- */
- function configureEffortParams(
- effortValue: EffortValue | undefined,
- outputConfig: BetaOutputConfig,
- extraBodyParams: Record<string, unknown>,
- betas: string[],
- model: string,
- ): void {
- if (!modelSupportsEffort(model) || 'effort' in outputConfig) {
- return
- }
- if (effortValue === undefined) {
- betas.push(EFFORT_BETA_HEADER)
- } else if (typeof effortValue === 'string') {
- // Send string effort level as is
- outputConfig.effort = effortValue
- betas.push(EFFORT_BETA_HEADER)
- } else if (process.env.USER_TYPE === 'ant') {
- // Numeric effort override - ant-only (uses anthropic_internal)
- const existingInternal =
- (extraBodyParams.anthropic_internal as Record<string, unknown>) || {}
- extraBodyParams.anthropic_internal = {
- ...existingInternal,
- effort_override: effortValue,
- }
- }
- }
- // output_config.task_budget — API-side token budget awareness for the model.
- // Stainless SDK types don't yet include task_budget on BetaOutputConfig, so we
- // define the wire shape locally and cast. The API validates on receipt; see
- // api/api/schemas/messages/request/output_config.py:12-39 in the monorepo.
- // Beta: task-budgets-2026-03-13 (EAP, claude-strudel-eap only as of Mar 2026).
- type TaskBudgetParam = {
- type: 'tokens'
- total: number
- remaining?: number
- }
- export function configureTaskBudgetParams(
- taskBudget: Options['taskBudget'],
- outputConfig: BetaOutputConfig & { task_budget?: TaskBudgetParam },
- betas: string[],
- ): void {
- if (
- !taskBudget ||
- 'task_budget' in outputConfig ||
- !shouldIncludeFirstPartyOnlyBetas()
- ) {
- return
- }
- outputConfig.task_budget = {
- type: 'tokens',
- total: taskBudget.total,
- ...(taskBudget.remaining !== undefined && {
- remaining: taskBudget.remaining,
- }),
- }
- if (!betas.includes(TASK_BUDGETS_BETA_HEADER)) {
- betas.push(TASK_BUDGETS_BETA_HEADER)
- }
- }
- export function getAPIMetadata() {
- // https://docs.google.com/document/d/1dURO9ycXXQCBS0V4Vhl4poDBRgkelFc5t2BNPoEgH5Q/edit?tab=t.0#heading=h.5g7nec5b09w5
- let extra: JsonObject = {}
- const extraStr = process.env.CLAUDE_CODE_EXTRA_METADATA
- if (extraStr) {
- const parsed = safeParseJSON(extraStr, false)
- if (parsed && typeof parsed === 'object' && !Array.isArray(parsed)) {
- extra = parsed as JsonObject
- } else {
- logForDebugging(
- `CLAUDE_CODE_EXTRA_METADATA env var must be a JSON object, but was given ${extraStr}`,
- { level: 'error' },
- )
- }
- }
- return {
- user_id: jsonStringify({
- ...extra,
- device_id: getOrCreateUserID(),
- // Only include OAuth account UUID when actively using OAuth authentication
- account_uuid: getOauthAccountInfo()?.accountUuid ?? '',
- session_id: getSessionId(),
- }),
- }
- }
- export async function verifyApiKey(
- apiKey: string,
- isNonInteractiveSession: boolean,
- ): Promise<boolean> {
- // Skip API verification if running in print mode (isNonInteractiveSession)
- if (isNonInteractiveSession) {
- return true
- }
- try {
- // WARNING: if you change this to use a non-Haiku model, this request will fail in 1P unless it uses getCLISyspromptPrefix.
- const model = getSmallFastModel()
- const betas = getModelBetas(model)
- return await returnValue(
- withRetry(
- () =>
- getAnthropicClient({
- apiKey,
- maxRetries: 3,
- model,
- source: 'verify_api_key',
- }),
- async anthropic => {
- const messages: MessageParam[] = [{ role: 'user', content: 'test' }]
- // biome-ignore lint/plugin: API key verification is intentionally a minimal direct call
- await anthropic.beta.messages.create({
- model,
- max_tokens: 1,
- messages,
- temperature: 1,
- ...(betas.length > 0 && { betas }),
- metadata: getAPIMetadata(),
- ...getExtraBodyParams(),
- })
- return true
- },
- { maxRetries: 2, model, thinkingConfig: { type: 'disabled' } }, // Use fewer retries for API key verification
- ),
- )
- } catch (errorFromRetry) {
- let error = errorFromRetry
- if (errorFromRetry instanceof CannotRetryError) {
- error = errorFromRetry.originalError
- }
- logError(error)
- // Check for authentication error
- if (
- error instanceof Error &&
- error.message.includes(
- '{"type":"error","error":{"type":"authentication_error","message":"invalid x-api-key"}}',
- )
- ) {
- return false
- }
- throw error
- }
- }
- export function userMessageToMessageParam(
- message: UserMessage,
- addCache = false,
- enablePromptCaching: boolean,
- querySource?: QuerySource,
- ): MessageParam {
- if (addCache) {
- if (typeof message.message.content === 'string') {
- return {
- role: 'user',
- content: [
- {
- type: 'text',
- text: message.message.content,
- ...(enablePromptCaching && {
- cache_control: getCacheControl({ querySource }),
- }),
- },
- ],
- }
- } else {
- return {
- role: 'user',
- content: message.message.content.map((_, i) => ({
- ..._,
- ...(i === message.message.content.length - 1
- ? enablePromptCaching
- ? { cache_control: getCacheControl({ querySource }) }
- : {}
- : {}),
- })),
- }
- }
- }
- // Clone array content to prevent in-place mutations (e.g., insertCacheEditsBlock's
- // splice) from contaminating the original message. Without cloning, multiple calls
- // to addCacheBreakpoints share the same array and each splices in duplicate cache_edits.
- return {
- role: 'user',
- content: Array.isArray(message.message.content)
- ? [...message.message.content]
- : message.message.content,
- }
- }
- export function assistantMessageToMessageParam(
- message: AssistantMessage,
- addCache = false,
- enablePromptCaching: boolean,
- querySource?: QuerySource,
- ): MessageParam {
- if (addCache) {
- if (typeof message.message.content === 'string') {
- return {
- role: 'assistant',
- content: [
- {
- type: 'text',
- text: message.message.content,
- ...(enablePromptCaching && {
- cache_control: getCacheControl({ querySource }),
- }),
- },
- ],
- }
- } else {
- return {
- role: 'assistant',
- content: message.message.content.map((_, i) => ({
- ..._,
- ...(i === message.message.content.length - 1 &&
- _.type !== 'thinking' &&
- _.type !== 'redacted_thinking' &&
- (feature('CONNECTOR_TEXT') ? !isConnectorTextBlock(_) : true)
- ? enablePromptCaching
- ? { cache_control: getCacheControl({ querySource }) }
- : {}
- : {}),
- })),
- }
- }
- }
- return {
- role: 'assistant',
- content: message.message.content,
- }
- }
- export type Options = {
- getToolPermissionContext: () => Promise<ToolPermissionContext>
- model: string
- toolChoice?: BetaToolChoiceTool | BetaToolChoiceAuto | undefined
- isNonInteractiveSession: boolean
- extraToolSchemas?: BetaToolUnion[]
- maxOutputTokensOverride?: number
- fallbackModel?: string
- onStreamingFallback?: () => void
- querySource: QuerySource
- agents: AgentDefinition[]
- allowedAgentTypes?: string[]
- hasAppendSystemPrompt: boolean
- fetchOverride?: ClientOptions['fetch']
- enablePromptCaching?: boolean
- skipCacheWrite?: boolean
- temperatureOverride?: number
- effortValue?: EffortValue
- mcpTools: Tools
- hasPendingMcpServers?: boolean
- queryTracking?: QueryChainTracking
- agentId?: AgentId // Only set for subagents
- outputFormat?: BetaJSONOutputFormat
- fastMode?: boolean
- advisorModel?: string
- addNotification?: (notif: Notification) => void
- // API-side task budget (output_config.task_budget). Distinct from the
- // tokenBudget.ts +500k auto-continue feature — this one is sent to the API
- // so the model can pace itself. `remaining` is computed by the caller
- // (query.ts decrements across the agentic loop).
- taskBudget?: { total: number; remaining?: number }
- }
- export async function queryModelWithoutStreaming({
- messages,
- systemPrompt,
- thinkingConfig,
- tools,
- signal,
- options,
- }: {
- messages: Message[]
- systemPrompt: SystemPrompt
- thinkingConfig: ThinkingConfig
- tools: Tools
- signal: AbortSignal
- options: Options
- }): Promise<AssistantMessage> {
- // Store the assistant message but continue consuming the generator to ensure
- // logAPISuccessAndDuration gets called (which happens after all yields)
- let assistantMessage: AssistantMessage | undefined
- for await (const message of withStreamingVCR(messages, async function* () {
- yield* queryModel(
- messages,
- systemPrompt,
- thinkingConfig,
- tools,
- signal,
- options,
- )
- })) {
- if (message.type === 'assistant') {
- assistantMessage = message
- }
- }
- if (!assistantMessage) {
- // If the signal was aborted, throw APIUserAbortError instead of a generic error
- // This allows callers to handle abort scenarios gracefully
- if (signal.aborted) {
- throw new APIUserAbortError()
- }
- throw new Error('未找到助手消息')
- }
- return assistantMessage
- }
- export async function* queryModelWithStreaming({
- messages,
- systemPrompt,
- thinkingConfig,
- tools,
- signal,
- options,
- }: {
- messages: Message[]
- systemPrompt: SystemPrompt
- thinkingConfig: ThinkingConfig
- tools: Tools
- signal: AbortSignal
- options: Options
- }): AsyncGenerator<
- StreamEvent | AssistantMessage | SystemAPIErrorMessage,
- void
- > {
- return yield* withStreamingVCR(messages, async function* () {
- yield* queryModel(
- messages,
- systemPrompt,
- thinkingConfig,
- tools,
- signal,
- options,
- )
- })
- }
- /**
- * Determines if an LSP tool should be deferred (tool appears with defer_loading: true)
- * because LSP initialization is not yet complete.
- */
- function shouldDeferLspTool(tool: Tool): boolean {
- if (!('isLsp' in tool) || !tool.isLsp) {
- return false
- }
- const status = getInitializationStatus()
- // Defer when pending or not started
- return status.status === 'pending' || status.status === 'not-started'
- }
- /**
- * Per-attempt timeout for non-streaming fallback requests, in milliseconds.
- * Reads API_TIMEOUT_MS when set so slow backends and the streaming path
- * share the same ceiling.
- *
- * Remote sessions default to 120s to stay under CCR's container idle-kill
- * (~5min) so a hung fallback to a wedged backend surfaces a clean
- * APIConnectionTimeoutError instead of stalling past SIGKILL.
- *
- * Otherwise defaults to 300s — long enough for slow backends without
- * approaching the API's 10-minute non-streaming boundary.
- */
- function getNonstreamingFallbackTimeoutMs(): number {
- const override = parseInt(process.env.API_TIMEOUT_MS || '', 10)
- if (override) return override
- return isEnvTruthy(process.env.CLAUDE_CODE_REMOTE) ? 120_000 : 300_000
- }
- /**
- * Helper generator for non-streaming API requests.
- * Encapsulates the common pattern of creating a withRetry generator,
- * iterating to yield system messages, and returning the final BetaMessage.
- */
- export async function* executeNonStreamingRequest(
- clientOptions: {
- model: string
- fetchOverride?: Options['fetchOverride']
- source: string
- },
- retryOptions: {
- model: string
- fallbackModel?: string
- thinkingConfig: ThinkingConfig
- fastMode?: boolean
- signal: AbortSignal
- initialConsecutive529Errors?: number
- querySource?: QuerySource
- },
- paramsFromContext: (context: RetryContext) => BetaMessageStreamParams,
- onAttempt: (attempt: number, start: number, maxOutputTokens: number) => void,
- captureRequest: (params: BetaMessageStreamParams) => void,
- /**
- * Request ID of the failed streaming attempt this fallback is recovering
- * from. Emitted in tengu_nonstreaming_fallback_error for funnel correlation.
- */
- originatingRequestId?: string | null,
- ): AsyncGenerator<SystemAPIErrorMessage, BetaMessage> {
- const fallbackTimeoutMs = getNonstreamingFallbackTimeoutMs()
- const generator = withRetry(
- () =>
- getAnthropicClient({
- maxRetries: 0,
- model: clientOptions.model,
- fetchOverride: clientOptions.fetchOverride,
- source: clientOptions.source,
- }),
- async (anthropic, attempt, context) => {
- const start = Date.now()
- const retryParams = paramsFromContext(context)
- captureRequest(retryParams)
- onAttempt(attempt, start, retryParams.max_tokens)
- const adjustedParams = adjustParamsForNonStreaming(
- retryParams,
- MAX_NON_STREAMING_TOKENS,
- )
- try {
- // biome-ignore lint/plugin: non-streaming API call
- return await anthropic.beta.messages.create(
- {
- ...adjustedParams,
- model: normalizeModelStringForAPI(adjustedParams.model),
- },
- {
- signal: retryOptions.signal,
- timeout: fallbackTimeoutMs,
- },
- )
- } catch (err) {
- // User aborts are not errors — re-throw immediately without logging
- if (err instanceof APIUserAbortError) throw err
- // Instrumentation: record when the non-streaming request errors (including
- // timeouts). Lets us distinguish "fallback hung past container kill"
- // (no event) from "fallback hit the bounded timeout" (this event).
- logForDiagnosticsNoPII('error', 'cli_nonstreaming_fallback_error')
- logEvent('tengu_nonstreaming_fallback_error', {
- model:
- clientOptions.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- error:
- err instanceof Error
- ? (err.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
- : ('unknown' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS),
- attempt,
- timeout_ms: fallbackTimeoutMs,
- request_id: (originatingRequestId ??
- 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- throw err
- }
- },
- {
- model: retryOptions.model,
- fallbackModel: retryOptions.fallbackModel,
- thinkingConfig: retryOptions.thinkingConfig,
- ...(isFastModeEnabled() && { fastMode: retryOptions.fastMode }),
- signal: retryOptions.signal,
- initialConsecutive529Errors: retryOptions.initialConsecutive529Errors,
- querySource: retryOptions.querySource,
- },
- )
- let e
- do {
- e = await generator.next()
- if (!e.done && e.value.type === 'system') {
- yield e.value
- }
- } while (!e.done)
- return e.value as BetaMessage
- }
- /**
- * Extracts the request ID from the most recent assistant message in the
- * conversation. Used to link consecutive API requests in analytics so we can
- * join them for cache-hit-rate analysis and incremental token tracking.
- *
- * Deriving this from the message array (rather than global state) ensures each
- * query chain (main thread, subagent, teammate) tracks its own request chain
- * independently, and rollback/undo naturally updates the value.
- */
- function getPreviousRequestIdFromMessages(
- messages: Message[],
- ): string | undefined {
- for (let i = messages.length - 1; i >= 0; i--) {
- const msg = messages[i]!
- if (msg.type === 'assistant' && msg.requestId) {
- return msg.requestId
- }
- }
- return undefined
- }
- function isMedia(
- block: BetaContentBlockParam,
- ): block is BetaImageBlockParam | BetaRequestDocumentBlock {
- return block.type === 'image' || block.type === 'document'
- }
- function isToolResult(
- block: BetaContentBlockParam,
- ): block is BetaToolResultBlockParam {
- return block.type === 'tool_result'
- }
- /**
- * Ensures messages contain at most `limit` media items (images + documents).
- * Strips oldest media first to preserve the most recent.
- */
- export function stripExcessMediaItems(
- messages: (UserMessage | AssistantMessage)[],
- limit: number,
- ): (UserMessage | AssistantMessage)[] {
- let toRemove = 0
- for (const msg of messages) {
- if (!Array.isArray(msg.message.content)) continue
- for (const block of msg.message.content) {
- if (isMedia(block)) toRemove++
- if (isToolResult(block) && Array.isArray(block.content)) {
- for (const nested of block.content) {
- if (isMedia(nested)) toRemove++
- }
- }
- }
- }
- toRemove -= limit
- if (toRemove <= 0) return messages
- return messages.map(msg => {
- if (toRemove <= 0) return msg
- const content = msg.message.content
- if (!Array.isArray(content)) return msg
- const before = toRemove
- const stripped = content
- .map(block => {
- if (
- toRemove <= 0 ||
- !isToolResult(block) ||
- !Array.isArray(block.content)
- )
- return block
- const filtered = block.content.filter(n => {
- if (toRemove > 0 && isMedia(n)) {
- toRemove--
- return false
- }
- return true
- })
- return filtered.length === block.content.length
- ? block
- : { ...block, content: filtered }
- })
- .filter(block => {
- if (toRemove > 0 && isMedia(block)) {
- toRemove--
- return false
- }
- return true
- })
- return before === toRemove
- ? msg
- : {
- ...msg,
- message: { ...msg.message, content: stripped },
- }
- }) as (UserMessage | AssistantMessage)[]
- }
- async function* queryModel(
- messages: Message[],
- systemPrompt: SystemPrompt,
- thinkingConfig: ThinkingConfig,
- tools: Tools,
- signal: AbortSignal,
- options: Options,
- ): AsyncGenerator<
- StreamEvent | AssistantMessage | SystemAPIErrorMessage,
- void
- > {
- // Check cheap conditions first — the off-switch await blocks on GrowthBook
- // init (~10ms). For non-Opus models (haiku, sonnet) this skips the await
- // entirely. Subscribers don't hit this path at all.
- if (
- !isClaudeAISubscriber() &&
- isNonCustomOpusModel(options.model) &&
- (
- await getDynamicConfig_BLOCKS_ON_INIT<{ activated: boolean }>(
- 'tengu-off-switch',
- {
- activated: false,
- },
- )
- ).activated
- ) {
- logEvent('tengu_off_switch_query', {})
- yield getAssistantMessageFromError(
- new Error(CUSTOM_OFF_SWITCH_MESSAGE),
- options.model,
- )
- return
- }
- // Derive previous request ID from the last assistant message in this query chain.
- // This is scoped per message array (main thread, subagent, teammate each have their own),
- // so concurrent agents don't clobber each other's request chain tracking.
- // Also naturally handles rollback/undo since removed messages won't be in the array.
- const previousRequestId = getPreviousRequestIdFromMessages(messages)
- const resolvedModel =
- getAPIProvider() === 'bedrock' &&
- options.model.includes('application-inference-profile')
- ? ((await getInferenceProfileBackingModel(options.model)) ??
- options.model)
- : options.model
- queryCheckpoint('query_tool_schema_build_start')
- const isAgenticQuery =
- options.querySource.startsWith('repl_main_thread') ||
- options.querySource.startsWith('agent:') ||
- options.querySource === 'sdk' ||
- options.querySource === 'hook_agent' ||
- options.querySource === 'verification_agent'
- const betas = getMergedBetas(options.model, { isAgenticQuery })
- // Always send the advisor beta header when advisor is enabled, so
- // non-agentic queries (compact, side_question, extract_memories, etc.)
- // can parse advisor server_tool_use blocks already in the conversation history.
- if (isAdvisorEnabled()) {
- betas.push(ADVISOR_BETA_HEADER)
- }
- let advisorModel: string | undefined
- if (isAgenticQuery && isAdvisorEnabled()) {
- let advisorOption = options.advisorModel
- const advisorExperiment = getExperimentAdvisorModels()
- if (advisorExperiment !== undefined) {
- if (
- normalizeModelStringForAPI(advisorExperiment.baseModel) ===
- normalizeModelStringForAPI(options.model)
- ) {
- // Override the advisor model if the base model matches. We
- // should only have experiment models if the user cannot
- // configure it themselves.
- advisorOption = advisorExperiment.advisorModel
- }
- }
- if (advisorOption) {
- const normalizedAdvisorModel = normalizeModelStringForAPI(
- parseUserSpecifiedModel(advisorOption),
- )
- if (!modelSupportsAdvisor(options.model)) {
- logForDebugging(
- `[AdvisorTool] Skipping advisor - base model ${options.model} does not support advisor`,
- )
- } else if (!isValidAdvisorModel(normalizedAdvisorModel)) {
- logForDebugging(
- `[AdvisorTool] Skipping advisor - ${normalizedAdvisorModel} is not a valid advisor model`,
- )
- } else {
- advisorModel = normalizedAdvisorModel
- logForDebugging(
- `[AdvisorTool] Server-side tool enabled with ${advisorModel} as the advisor model`,
- )
- }
- }
- }
- // Check if tool search is enabled (checks mode, model support, and threshold for auto mode)
- // This is async because it may need to calculate MCP tool description sizes for TstAuto mode
- let useToolSearch = await isToolSearchEnabled(
- options.model,
- tools,
- options.getToolPermissionContext,
- options.agents,
- 'query',
- )
- // Precompute once — isDeferredTool does 2 GrowthBook lookups per call
- const deferredToolNames = new Set<string>()
- if (useToolSearch) {
- for (const t of tools) {
- if (isDeferredTool(t)) deferredToolNames.add(t.name)
- }
- }
- // Even if tool search mode is enabled, skip if there are no deferred tools
- // AND no MCP servers are still connecting. When servers are pending, keep
- // ToolSearch available so the model can discover tools after they connect.
- if (
- useToolSearch &&
- deferredToolNames.size === 0 &&
- !options.hasPendingMcpServers
- ) {
- logForDebugging(
- 'Tool search disabled: no deferred tools available to search',
- )
- useToolSearch = false
- }
- // Filter out ToolSearchTool if tool search is not enabled for this model
- // ToolSearchTool returns tool_reference blocks which unsupported models can't handle
- let filteredTools: Tools
- if (useToolSearch) {
- // Dynamic tool loading: Only include deferred tools that have been discovered
- // via tool_reference blocks in the message history. This eliminates the need
- // to predeclare all deferred tools upfront and removes limits on tool quantity.
- const discoveredToolNames = extractDiscoveredToolNames(messages)
- filteredTools = tools.filter(tool => {
- // Always include non-deferred tools
- if (!deferredToolNames.has(tool.name)) return true
- // Always include ToolSearchTool (so it can discover more tools)
- if (toolMatchesName(tool, TOOL_SEARCH_TOOL_NAME)) return true
- // Only include deferred tools that have been discovered
- return discoveredToolNames.has(tool.name)
- })
- } else {
- filteredTools = tools.filter(
- t => !toolMatchesName(t, TOOL_SEARCH_TOOL_NAME),
- )
- }
- // Add tool search beta header if enabled - required for defer_loading to be accepted
- // Header differs by provider: 1P/Foundry use advanced-tool-use, Vertex/Bedrock use tool-search-tool
- // For Bedrock, this header must go in extraBodyParams, not the betas array
- const toolSearchHeader = useToolSearch ? getToolSearchBetaHeader() : null
- if (toolSearchHeader && getAPIProvider() !== 'bedrock') {
- if (!betas.includes(toolSearchHeader)) {
- betas.push(toolSearchHeader)
- }
- }
- // Determine if cached microcompact is enabled for this model.
- // Computed once here (in async context) and captured by paramsFromContext.
- // The beta header is also captured here to avoid a top-level import of the
- // ant-only CACHE_EDITING_BETA_HEADER constant.
- let cachedMCEnabled = false
- let cacheEditingBetaHeader = ''
- if (feature('CACHED_MICROCOMPACT')) {
- const {
- isCachedMicrocompactEnabled,
- isModelSupportedForCacheEditing,
- getCachedMCConfig,
- } = await import('../compact/cachedMicrocompact.js')
- const betas = await import('src/constants/betas.js')
- cacheEditingBetaHeader = betas.CACHE_EDITING_BETA_HEADER
- const featureEnabled = isCachedMicrocompactEnabled()
- const modelSupported = isModelSupportedForCacheEditing(options.model)
- cachedMCEnabled = featureEnabled && modelSupported
- const config = getCachedMCConfig()
- logForDebugging(
- `Cached MC gate: enabled=${featureEnabled} modelSupported=${modelSupported} model=${options.model} supportedModels=${jsonStringify(config.supportedModels)}`,
- )
- }
- const useGlobalCacheFeature = shouldUseGlobalCacheScope()
- const willDefer = (t: Tool) =>
- useToolSearch && (deferredToolNames.has(t.name) || shouldDeferLspTool(t))
- // MCP tools are per-user → dynamic tool section → can't globally cache.
- // Only gate when an MCP tool will actually render (not defer_loading).
- const needsToolBasedCacheMarker =
- useGlobalCacheFeature &&
- filteredTools.some(t => t.isMcp === true && !willDefer(t))
- // Ensure prompt_caching_scope beta header is present when global cache is enabled.
- if (
- useGlobalCacheFeature &&
- !betas.includes(PROMPT_CACHING_SCOPE_BETA_HEADER)
- ) {
- betas.push(PROMPT_CACHING_SCOPE_BETA_HEADER)
- }
- // Determine global cache strategy for logging
- const globalCacheStrategy: GlobalCacheStrategy = useGlobalCacheFeature
- ? needsToolBasedCacheMarker
- ? 'none'
- : 'system_prompt'
- : 'none'
- // Build tool schemas, adding defer_loading for MCP tools when tool search is enabled
- // Note: We pass the full `tools` list (not filteredTools) to toolToAPISchema so that
- // ToolSearchTool's prompt can list ALL available MCP tools. The filtering only affects
- // which tools are actually sent to the API, not what the model sees in tool descriptions.
- const toolSchemas = await Promise.all(
- filteredTools.map(tool =>
- toolToAPISchema(tool, {
- getToolPermissionContext: options.getToolPermissionContext,
- tools,
- agents: options.agents,
- allowedAgentTypes: options.allowedAgentTypes,
- model: options.model,
- deferLoading: willDefer(tool),
- }),
- ),
- )
- if (useToolSearch) {
- const includedDeferredTools = count(filteredTools, t =>
- deferredToolNames.has(t.name),
- )
- logForDebugging(
- `Dynamic tool loading: ${includedDeferredTools}/${deferredToolNames.size} deferred tools included`,
- )
- }
- queryCheckpoint('query_tool_schema_build_end')
- // Normalize messages before building system prompt (needed for fingerprinting)
- // Instrumentation: Track message count before normalization
- logEvent('tengu_api_before_normalize', {
- preNormalizedMessageCount: messages.length,
- })
- queryCheckpoint('query_message_normalization_start')
- let messagesForAPI = normalizeMessagesForAPI(messages, filteredTools)
- queryCheckpoint('query_message_normalization_end')
- // Model-specific post-processing: strip tool-search-specific fields if the
- // selected model doesn't support tool search.
- //
- // Why is this needed in addition to normalizeMessagesForAPI?
- // - normalizeMessagesForAPI uses isToolSearchEnabledNoModelCheck() because it's
- // called from ~20 places (analytics, feedback, sharing, etc.), many of which
- // don't have model context. Adding model to its signature would be a large refactor.
- // - This post-processing uses the model-aware isToolSearchEnabled() check
- // - This handles mid-conversation model switching (e.g., Sonnet → Haiku) where
- // stale tool-search fields from the previous model would cause 400 errors
- //
- // Note: For assistant messages, normalizeMessagesForAPI already normalized the
- // tool inputs, so stripCallerFieldFromAssistantMessage only needs to remove the
- // 'caller' field (not re-normalize inputs).
- if (!useToolSearch) {
- messagesForAPI = messagesForAPI.map(msg => {
- switch (msg.type) {
- case 'user':
- // Strip tool_reference blocks from tool_result content
- return stripToolReferenceBlocksFromUserMessage(msg)
- case 'assistant':
- // Strip 'caller' field from tool_use blocks
- return stripCallerFieldFromAssistantMessage(msg)
- default:
- return msg
- }
- })
- }
- // Repair tool_use/tool_result pairing mismatches that can occur when resuming
- // remote/teleport sessions. Inserts synthetic error tool_results for orphaned
- // tool_uses and strips orphaned tool_results referencing non-existent tool_uses.
- messagesForAPI = ensureToolResultPairing(messagesForAPI)
- // Strip advisor blocks — the API rejects them without the beta header.
- if (!betas.includes(ADVISOR_BETA_HEADER)) {
- messagesForAPI = stripAdvisorBlocks(messagesForAPI)
- }
- // Strip excess media items before making the API call.
- // The API rejects requests with >100 media items but returns a confusing error.
- // Rather than erroring (which is hard to recover from in Cowork/CCD), we
- // silently drop the oldest media items to stay within the limit.
- messagesForAPI = stripExcessMediaItems(
- messagesForAPI,
- API_MAX_MEDIA_PER_REQUEST,
- )
- // Instrumentation: Track message count after normalization
- logEvent('tengu_api_after_normalize', {
- postNormalizedMessageCount: messagesForAPI.length,
- })
- // Compute fingerprint from first user message for attribution.
- // Must run BEFORE injecting synthetic messages (e.g. deferred tool names)
- // so the fingerprint reflects the actual user input.
- const fingerprint = computeFingerprintFromMessages(messagesForAPI)
- // When the delta attachment is enabled, deferred tools are announced
- // via persisted deferred_tools_delta attachments instead of this
- // ephemeral prepend (which busts cache whenever the pool changes).
- if (useToolSearch && !isDeferredToolsDeltaEnabled()) {
- const deferredToolList = tools
- .filter(t => deferredToolNames.has(t.name))
- .map(formatDeferredToolLine)
- .sort()
- .join('\n')
- if (deferredToolList) {
- messagesForAPI = [
- createUserMessage({
- content: `<available-deferred-tools>\n${deferredToolList}\n</available-deferred-tools>`,
- isMeta: true,
- }),
- ...messagesForAPI,
- ]
- }
- }
- // Chrome tool-search instructions: when the delta attachment is enabled,
- // these are carried as a client-side block in mcp_instructions_delta
- // (attachments.ts) instead of here. This per-request sys-prompt append
- // busts the prompt cache when chrome connects late.
- const hasChromeTools = filteredTools.some(t =>
- isToolFromMcpServer(t.name, CLAUDE_IN_CHROME_MCP_SERVER_NAME),
- )
- const injectChromeHere =
- useToolSearch && hasChromeTools && !isMcpInstructionsDeltaEnabled()
- // filter(Boolean) works by converting each element to a boolean - empty strings become false and are filtered out.
- systemPrompt = asSystemPrompt(
- [
- getAttributionHeader(fingerprint),
- getCLISyspromptPrefix({
- isNonInteractive: options.isNonInteractiveSession,
- hasAppendSystemPrompt: options.hasAppendSystemPrompt,
- }),
- ...systemPrompt,
- ...(advisorModel ? [ADVISOR_TOOL_INSTRUCTIONS] : []),
- ...(injectChromeHere ? [CHROME_TOOL_SEARCH_INSTRUCTIONS] : []),
- ].filter(Boolean),
- )
- // Prepend system prompt block for easy API identification
- logAPIPrefix(systemPrompt)
- const enablePromptCaching =
- options.enablePromptCaching ?? getPromptCachingEnabled(options.model)
- const system = buildSystemPromptBlocks(systemPrompt, enablePromptCaching, {
- skipGlobalCacheForSystemPrompt: needsToolBasedCacheMarker,
- querySource: options.querySource,
- })
- const useBetas = betas.length > 0
- // Build minimal context for detailed tracing (when beta tracing is enabled)
- // Note: The actual new_context message extraction is done in sessionTracing.ts using
- // hash-based tracking per querySource (agent) from the messagesForAPI array
- const extraToolSchemas = [...(options.extraToolSchemas ?? [])]
- if (advisorModel) {
- // Server tools must be in the tools array by API contract. Appended after
- // toolSchemas (which carries the cache_control marker) so toggling /advisor
- // only churns the small suffix, not the cached prefix.
- extraToolSchemas.push({
- type: 'advisor_20260301',
- name: 'advisor',
- model: advisorModel,
- } as unknown as BetaToolUnion)
- }
- const allTools = [...toolSchemas, ...extraToolSchemas]
- const isFastMode =
- isFastModeEnabled() &&
- isFastModeAvailable() &&
- !isFastModeCooldown() &&
- isFastModeSupportedByModel(options.model) &&
- !!options.fastMode
- // Sticky-on latches for dynamic beta headers. Each header, once first
- // sent, keeps being sent for the rest of the session so mid-session
- // toggles don't change the server-side cache key and bust ~50-70K tokens.
- // Latches are cleared on /clear and /compact via clearBetaHeaderLatches().
- // Per-call gates (isAgenticQuery, querySource===repl_main_thread) stay
- // per-call so non-agentic queries keep their own stable header set.
- let afkHeaderLatched = getAfkModeHeaderLatched() === true
- if (feature('TRANSCRIPT_CLASSIFIER')) {
- if (
- !afkHeaderLatched &&
- isAgenticQuery &&
- shouldIncludeFirstPartyOnlyBetas() &&
- (autoModeStateModule?.isAutoModeActive() ?? false)
- ) {
- afkHeaderLatched = true
- setAfkModeHeaderLatched(true)
- }
- }
- let fastModeHeaderLatched = getFastModeHeaderLatched() === true
- if (!fastModeHeaderLatched && isFastMode) {
- fastModeHeaderLatched = true
- setFastModeHeaderLatched(true)
- }
- let cacheEditingHeaderLatched = getCacheEditingHeaderLatched() === true
- if (feature('CACHED_MICROCOMPACT')) {
- if (
- !cacheEditingHeaderLatched &&
- cachedMCEnabled &&
- getAPIProvider() === 'firstParty' &&
- options.querySource === 'repl_main_thread'
- ) {
- cacheEditingHeaderLatched = true
- setCacheEditingHeaderLatched(true)
- }
- }
- // Only latch from agentic queries so a classifier call doesn't flip the
- // main thread's context_management mid-turn.
- let thinkingClearLatched = getThinkingClearLatched() === true
- if (!thinkingClearLatched && isAgenticQuery) {
- const lastCompletion = getLastApiCompletionTimestamp()
- if (
- lastCompletion !== null &&
- Date.now() - lastCompletion > CACHE_TTL_1HOUR_MS
- ) {
- thinkingClearLatched = true
- setThinkingClearLatched(true)
- }
- }
- const effort = resolveAppliedEffort(options.model, options.effortValue)
- if (feature('PROMPT_CACHE_BREAK_DETECTION')) {
- // Exclude defer_loading tools from the hash -- the API strips them from the
- // prompt, so they never affect the actual cache key. Including them creates
- // false-positive "tool schemas changed" breaks when tools are discovered or
- // MCP servers reconnect.
- const toolsForCacheDetection = allTools.filter(
- t => !('defer_loading' in t && t.defer_loading),
- )
- // Capture everything that could affect the server-side cache key.
- // Pass latched header values (not live state) so break detection
- // reflects what we actually send, not what the user toggled.
- recordPromptState({
- system,
- toolSchemas: toolsForCacheDetection,
- querySource: options.querySource,
- model: options.model,
- agentId: options.agentId,
- fastMode: fastModeHeaderLatched,
- globalCacheStrategy,
- betas,
- autoModeActive: afkHeaderLatched,
- isUsingOverage: currentLimits.isUsingOverage ?? false,
- cachedMCEnabled: cacheEditingHeaderLatched,
- effortValue: effort,
- extraBodyParams: getExtraBodyParams(),
- })
- }
- const newContext: LLMRequestNewContext | undefined = isBetaTracingEnabled()
- ? {
- systemPrompt: systemPrompt.join('\n\n'),
- querySource: options.querySource,
- tools: jsonStringify(allTools),
- }
- : undefined
- // Capture the span so we can pass it to endLLMRequestSpan later
- // This ensures responses are matched to the correct request when multiple requests run in parallel
- const llmSpan = startLLMRequestSpan(
- options.model,
- newContext,
- messagesForAPI,
- isFastMode,
- )
- const startIncludingRetries = Date.now()
- let start = Date.now()
- let attemptNumber = 0
- const attemptStartTimes: number[] = []
- let stream: Stream<BetaRawMessageStreamEvent> | undefined = undefined
- let streamRequestId: string | null | undefined = undefined
- let clientRequestId: string | undefined = undefined
- // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins -- Response is available in Node 18+ and is used by the SDK
- let streamResponse: Response | undefined = undefined
- // Release all stream resources to prevent native memory leaks.
- // The Response object holds native TLS/socket buffers that live outside the
- // V8 heap (observed on the Node.js/npm path; see GH #32920), so we must
- // explicitly cancel and release it regardless of how the generator exits.
- function releaseStreamResources(): void {
- cleanupStream(stream)
- stream = undefined
- if (streamResponse) {
- streamResponse.body?.cancel().catch(() => {})
- streamResponse = undefined
- }
- }
- // Consume pending cache edits ONCE before paramsFromContext is defined.
- // paramsFromContext is called multiple times (logging, retries), so consuming
- // inside it would cause the first call to steal edits from subsequent calls.
- const consumedCacheEdits = cachedMCEnabled ? consumePendingCacheEdits() : null
- const consumedPinnedEdits = cachedMCEnabled ? getPinnedCacheEdits() : []
- // Capture the betas sent in the last API request, including the ones that
- // were dynamically added, so we can log and send it to telemetry.
- let lastRequestBetas: string[] | undefined
- const paramsFromContext = (retryContext: RetryContext) => {
- const betasParams = [...betas]
- // Append 1M beta dynamically for the Sonnet 1M experiment.
- if (
- !betasParams.includes(CONTEXT_1M_BETA_HEADER) &&
- getSonnet1mExpTreatmentEnabled(retryContext.model)
- ) {
- betasParams.push(CONTEXT_1M_BETA_HEADER)
- }
- // For Bedrock, include both model-based betas and dynamically-added tool search header
- const bedrockBetas =
- getAPIProvider() === 'bedrock'
- ? [
- ...getBedrockExtraBodyParamsBetas(retryContext.model),
- ...(toolSearchHeader ? [toolSearchHeader] : []),
- ]
- : []
- const extraBodyParams = getExtraBodyParams(bedrockBetas)
- const outputConfig: BetaOutputConfig = {
- ...((extraBodyParams.output_config as BetaOutputConfig) ?? {}),
- }
- configureEffortParams(
- effort,
- outputConfig,
- extraBodyParams,
- betasParams,
- options.model,
- )
- configureTaskBudgetParams(
- options.taskBudget,
- outputConfig as BetaOutputConfig & { task_budget?: TaskBudgetParam },
- betasParams,
- )
- // Merge outputFormat into extraBodyParams.output_config alongside effort
- // Requires structured-outputs beta header per SDK (see parse() in messages.mjs)
- if (options.outputFormat && !('format' in outputConfig)) {
- outputConfig.format = options.outputFormat as BetaJSONOutputFormat
- // Add beta header if not already present and provider supports it
- if (
- modelSupportsStructuredOutputs(options.model) &&
- !betasParams.includes(STRUCTURED_OUTPUTS_BETA_HEADER)
- ) {
- betasParams.push(STRUCTURED_OUTPUTS_BETA_HEADER)
- }
- }
- // Retry context gets preference because it tries to course correct if we exceed the context window limit
- const maxOutputTokens =
- retryContext?.maxTokensOverride ||
- options.maxOutputTokensOverride ||
- getMaxOutputTokensForModel(options.model)
- const hasThinking =
- thinkingConfig.type !== 'disabled' &&
- !isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_THINKING)
- let thinking: BetaMessageStreamParams['thinking'] | undefined = undefined
- // IMPORTANT: Do not change the adaptive-vs-budget thinking selection below
- // without notifying the model launch DRI and research. This is a sensitive
- // setting that can greatly affect model quality and bashing.
- if (hasThinking && modelSupportsThinking(options.model)) {
- if (
- !isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_ADAPTIVE_THINKING) &&
- modelSupportsAdaptiveThinking(options.model)
- ) {
- // For models that support adaptive thinking, always use adaptive
- // thinking without a budget.
- thinking = {
- type: 'adaptive',
- } satisfies BetaMessageStreamParams['thinking']
- } else {
- // For models that do not support adaptive thinking, use the default
- // thinking budget unless explicitly specified.
- let thinkingBudget = getMaxThinkingTokensForModel(options.model)
- if (
- thinkingConfig.type === 'enabled' &&
- thinkingConfig.budgetTokens !== undefined
- ) {
- thinkingBudget = thinkingConfig.budgetTokens
- }
- thinkingBudget = Math.min(maxOutputTokens - 1, thinkingBudget)
- thinking = {
- budget_tokens: thinkingBudget,
- type: 'enabled',
- } satisfies BetaMessageStreamParams['thinking']
- }
- }
- // Get API context management strategies if enabled
- const contextManagement = getAPIContextManagement({
- hasThinking,
- isRedactThinkingActive: betasParams.includes(REDACT_THINKING_BETA_HEADER),
- clearAllThinking: thinkingClearLatched,
- })
- const enablePromptCaching =
- options.enablePromptCaching ?? getPromptCachingEnabled(retryContext.model)
- // Fast mode: header is latched session-stable (cache-safe), but
- // `speed='fast'` stays dynamic so cooldown still suppresses the actual
- // fast-mode request without changing the cache key.
- let speed: BetaMessageStreamParams['speed']
- const isFastModeForRetry =
- isFastModeEnabled() &&
- isFastModeAvailable() &&
- !isFastModeCooldown() &&
- isFastModeSupportedByModel(options.model) &&
- !!retryContext.fastMode
- if (isFastModeForRetry) {
- speed = 'fast'
- }
- if (fastModeHeaderLatched && !betasParams.includes(FAST_MODE_BETA_HEADER)) {
- betasParams.push(FAST_MODE_BETA_HEADER)
- }
- // AFK mode beta: latched once auto mode is first activated. Still gated
- // by isAgenticQuery per-call so classifiers/compaction don't get it.
- if (feature('TRANSCRIPT_CLASSIFIER')) {
- if (
- afkHeaderLatched &&
- shouldIncludeFirstPartyOnlyBetas() &&
- isAgenticQuery &&
- !betasParams.includes(AFK_MODE_BETA_HEADER)
- ) {
- betasParams.push(AFK_MODE_BETA_HEADER)
- }
- }
- // Cache editing beta: header is latched session-stable; useCachedMC
- // (controls cache_edits body behavior) stays live so edits stop when
- // the feature disables but the header doesn't flip.
- const useCachedMC =
- cachedMCEnabled &&
- getAPIProvider() === 'firstParty' &&
- options.querySource === 'repl_main_thread'
- if (
- cacheEditingHeaderLatched &&
- getAPIProvider() === 'firstParty' &&
- options.querySource === 'repl_main_thread' &&
- !betasParams.includes(cacheEditingBetaHeader)
- ) {
- betasParams.push(cacheEditingBetaHeader)
- logForDebugging(
- 'Cache editing beta header enabled for cached microcompact',
- )
- }
- // Only send temperature when thinking is disabled — the API requires
- // temperature: 1 when thinking is enabled, which is already the default.
- const temperature = !hasThinking
- ? (options.temperatureOverride ?? 1)
- : undefined
- lastRequestBetas = betasParams
- return {
- model: normalizeModelStringForAPI(options.model),
- messages: addCacheBreakpoints(
- messagesForAPI,
- enablePromptCaching,
- options.querySource,
- useCachedMC,
- consumedCacheEdits,
- consumedPinnedEdits,
- options.skipCacheWrite,
- ),
- system,
- tools: allTools,
- tool_choice: options.toolChoice,
- ...(useBetas && { betas: betasParams }),
- metadata: getAPIMetadata(),
- max_tokens: maxOutputTokens,
- thinking,
- ...(temperature !== undefined && { temperature }),
- ...(contextManagement &&
- useBetas &&
- betasParams.includes(CONTEXT_MANAGEMENT_BETA_HEADER) && {
- context_management: contextManagement,
- }),
- ...extraBodyParams,
- ...(Object.keys(outputConfig).length > 0 && {
- output_config: outputConfig,
- }),
- ...(speed !== undefined && { speed }),
- }
- }
- // Compute log scalars synchronously so the fire-and-forget .then() closure
- // captures only primitives instead of paramsFromContext's full closure scope
- // (messagesForAPI, system, allTools, betas — the entire request-building
- // context), which would otherwise be pinned until the promise resolves.
- {
- const queryParams = paramsFromContext({
- model: options.model,
- thinkingConfig,
- })
- const logMessagesLength = queryParams.messages.length
- const logBetas = useBetas ? (queryParams.betas ?? []) : []
- const logThinkingType = queryParams.thinking?.type ?? 'disabled'
- const logEffortValue = queryParams.output_config?.effort
- void options.getToolPermissionContext().then(permissionContext => {
- logAPIQuery({
- model: options.model,
- messagesLength: logMessagesLength,
- temperature: options.temperatureOverride ?? 1,
- betas: logBetas,
- permissionMode: permissionContext.mode,
- querySource: options.querySource,
- queryTracking: options.queryTracking,
- thinkingType: logThinkingType,
- effortValue: logEffortValue,
- fastMode: isFastMode,
- previousRequestId,
- })
- })
- }
- const newMessages: AssistantMessage[] = []
- let ttftMs = 0
- let partialMessage: BetaMessage | undefined = undefined
- const contentBlocks: (BetaContentBlock | ConnectorTextBlock)[] = []
- let usage: NonNullableUsage = EMPTY_USAGE
- let costUSD = 0
- let stopReason: BetaStopReason | null = null
- let didFallBackToNonStreaming = false
- let fallbackMessage: AssistantMessage | undefined
- let maxOutputTokens = 0
- let responseHeaders: globalThis.Headers | undefined = undefined
- let research: unknown = undefined
- let isFastModeRequest = isFastMode // Keep separate state as it may change if falling back
- let isAdvisorInProgress = false
- try {
- queryCheckpoint('query_client_creation_start')
- const generator = withRetry(
- () =>
- getAnthropicClient({
- maxRetries: 0, // Disabled auto-retry in favor of manual implementation
- model: options.model,
- fetchOverride: options.fetchOverride,
- source: options.querySource,
- }),
- async (anthropic, attempt, context) => {
- attemptNumber = attempt
- isFastModeRequest = context.fastMode ?? false
- start = Date.now()
- attemptStartTimes.push(start)
- // Client has been created by withRetry's getClient() call. This fires
- // once per attempt; on retries the client is usually cached (withRetry
- // only calls getClient() again after auth errors), so the delta from
- // client_creation_start is meaningful on attempt 1.
- queryCheckpoint('query_client_creation_end')
- const params = paramsFromContext(context)
- captureAPIRequest(params, options.querySource) // Capture for bug reports
- maxOutputTokens = params.max_tokens
- // Fire immediately before the fetch is dispatched. .withResponse() below
- // awaits until response headers arrive, so this MUST be before the await
- // or the "Network TTFB" phase measurement is wrong.
- queryCheckpoint('query_api_request_sent')
- if (!options.agentId) {
- headlessProfilerCheckpoint('api_request_sent')
- }
- // Generate and track client request ID so timeouts (which return no
- // server request ID) can still be correlated with server logs.
- // First-party only — 3P providers don't log it (inc-4029 class).
- clientRequestId =
- getAPIProvider() === 'firstParty' && isFirstPartyAnthropicBaseUrl()
- ? randomUUID()
- : undefined
- // Use raw stream instead of BetaMessageStream to avoid O(n²) partial JSON parsing
- // BetaMessageStream calls partialParse() on every input_json_delta, which we don't need
- // since we handle tool input accumulation ourselves
- // biome-ignore lint/plugin: main conversation loop handles attribution separately
- const result = await anthropic.beta.messages
- .create(
- { ...params, stream: true },
- {
- signal,
- ...(clientRequestId && {
- headers: { [CLIENT_REQUEST_ID_HEADER]: clientRequestId },
- }),
- },
- )
- .withResponse()
- queryCheckpoint('query_response_headers_received')
- streamRequestId = result.request_id
- streamResponse = result.response
- return result.data
- },
- {
- model: options.model,
- fallbackModel: options.fallbackModel,
- thinkingConfig,
- ...(isFastModeEnabled() ? { fastMode: isFastMode } : false),
- signal,
- querySource: options.querySource,
- },
- )
- let e
- do {
- e = await generator.next()
- // yield API error messages (the stream has a 'controller' property, error messages don't)
- if (!('controller' in e.value)) {
- yield e.value
- }
- } while (!e.done)
- stream = e.value as Stream<BetaRawMessageStreamEvent>
- // reset state
- newMessages.length = 0
- ttftMs = 0
- partialMessage = undefined
- contentBlocks.length = 0
- usage = EMPTY_USAGE
- stopReason = null
- isAdvisorInProgress = false
- // Streaming idle timeout watchdog: abort the stream if no chunks arrive
- // for STREAM_IDLE_TIMEOUT_MS. Unlike the stall detection below (which only
- // fires when the *next* chunk arrives), this uses setTimeout to actively
- // kill hung streams. Without this, a silently dropped connection can hang
- // the session indefinitely since the SDK's request timeout only covers the
- // initial fetch(), not the streaming body.
- const streamWatchdogEnabled = isEnvTruthy(
- process.env.CLAUDE_ENABLE_STREAM_WATCHDOG,
- )
- const STREAM_IDLE_TIMEOUT_MS =
- parseInt(process.env.CLAUDE_STREAM_IDLE_TIMEOUT_MS || '', 10) || 90_000
- const STREAM_IDLE_WARNING_MS = STREAM_IDLE_TIMEOUT_MS / 2
- let streamIdleAborted = false
- // performance.now() snapshot when watchdog fires, for measuring abort propagation delay
- let streamWatchdogFiredAt: number | null = null
- let streamIdleWarningTimer: ReturnType<typeof setTimeout> | null = null
- let streamIdleTimer: ReturnType<typeof setTimeout> | null = null
- function clearStreamIdleTimers(): void {
- if (streamIdleWarningTimer !== null) {
- clearTimeout(streamIdleWarningTimer)
- streamIdleWarningTimer = null
- }
- if (streamIdleTimer !== null) {
- clearTimeout(streamIdleTimer)
- streamIdleTimer = null
- }
- }
- function resetStreamIdleTimer(): void {
- clearStreamIdleTimers()
- if (!streamWatchdogEnabled) {
- return
- }
- streamIdleWarningTimer = setTimeout(
- warnMs => {
- logForDebugging(
- `Streaming idle warning: no chunks received for ${warnMs / 1000}s`,
- { level: 'warn' },
- )
- logForDiagnosticsNoPII('warn', 'cli_streaming_idle_warning')
- },
- STREAM_IDLE_WARNING_MS,
- STREAM_IDLE_WARNING_MS,
- )
- streamIdleTimer = setTimeout(() => {
- streamIdleAborted = true
- streamWatchdogFiredAt = performance.now()
- logForDebugging(
- `Streaming idle timeout: no chunks received for ${STREAM_IDLE_TIMEOUT_MS / 1000}s, aborting stream`,
- { level: 'error' },
- )
- logForDiagnosticsNoPII('error', 'cli_streaming_idle_timeout')
- logEvent('tengu_streaming_idle_timeout', {
- model:
- options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- request_id: (streamRequestId ??
- 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- timeout_ms: STREAM_IDLE_TIMEOUT_MS,
- })
- releaseStreamResources()
- }, STREAM_IDLE_TIMEOUT_MS)
- }
- resetStreamIdleTimer()
- startSessionActivity('api_call')
- try {
- // stream in and accumulate state
- let isFirstChunk = true
- let lastEventTime: number | null = null // Set after first chunk to avoid measuring TTFB as a stall
- const STALL_THRESHOLD_MS = 30_000 // 30 seconds
- let totalStallTime = 0
- let stallCount = 0
- for await (const part of stream) {
- resetStreamIdleTimer()
- const now = Date.now()
- // Detect and log streaming stalls (only after first event to avoid counting TTFB)
- if (lastEventTime !== null) {
- const timeSinceLastEvent = now - lastEventTime
- if (timeSinceLastEvent > STALL_THRESHOLD_MS) {
- stallCount++
- totalStallTime += timeSinceLastEvent
- logForDebugging(
- `Streaming stall detected: ${(timeSinceLastEvent / 1000).toFixed(1)}s gap between events (stall #${stallCount})`,
- { level: 'warn' },
- )
- logEvent('tengu_streaming_stall', {
- stall_duration_ms: timeSinceLastEvent,
- stall_count: stallCount,
- total_stall_time_ms: totalStallTime,
- event_type:
- part.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- model:
- options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- request_id: (streamRequestId ??
- 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- }
- }
- lastEventTime = now
- if (isFirstChunk) {
- logForDebugging('Stream started - received first chunk')
- queryCheckpoint('query_first_chunk_received')
- if (!options.agentId) {
- headlessProfilerCheckpoint('first_chunk')
- }
- endQueryProfile()
- isFirstChunk = false
- }
- switch (part.type) {
- case 'message_start': {
- partialMessage = part.message
- ttftMs = Date.now() - start
- usage = updateUsage(usage, part.message?.usage)
- // Capture research from message_start if available (internal only).
- // Always overwrite with the latest value.
- if (
- process.env.USER_TYPE === 'ant' &&
- 'research' in (part.message as unknown as Record<string, unknown>)
- ) {
- research = (part.message as unknown as Record<string, unknown>)
- .research
- }
- break
- }
- case 'content_block_start':
- switch (part.content_block.type) {
- case 'tool_use':
- contentBlocks[part.index] = {
- ...part.content_block,
- input: '',
- }
- break
- case 'server_tool_use':
- contentBlocks[part.index] = {
- ...part.content_block,
- input: '' as unknown as { [key: string]: unknown },
- }
- if ((part.content_block.name as string) === 'advisor') {
- isAdvisorInProgress = true
- logForDebugging(`[AdvisorTool] Advisor tool called`)
- logEvent('tengu_advisor_tool_call', {
- model:
- options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- advisor_model: (advisorModel ??
- 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- }
- break
- case 'text':
- contentBlocks[part.index] = {
- ...part.content_block,
- // awkwardly, the sdk sometimes returns text as part of a
- // content_block_start message, then returns the same text
- // again in a content_block_delta message. we ignore it here
- // since there doesn't seem to be a way to detect when a
- // content_block_delta message duplicates the text.
- text: '',
- }
- break
- case 'thinking':
- contentBlocks[part.index] = {
- ...part.content_block,
- // also awkward
- thinking: '',
- // initialize signature to ensure field exists even if signature_delta never arrives
- signature: '',
- }
- break
- default:
- // even more awkwardly, the sdk mutates the contents of text blocks
- // as it works. we want the blocks to be immutable, so that we can
- // accumulate state ourselves.
- contentBlocks[part.index] = { ...part.content_block }
- if (
- (part.content_block.type as string) === 'advisor_tool_result'
- ) {
- isAdvisorInProgress = false
- logForDebugging(`[AdvisorTool] Advisor tool result received`)
- }
- break
- }
- break
- case 'content_block_delta': {
- const contentBlock = contentBlocks[part.index]
- const delta = part.delta as typeof part.delta | ConnectorTextDelta
- if (!contentBlock) {
- logEvent('tengu_streaming_error', {
- error_type:
- 'content_block_not_found_delta' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- part_type:
- part.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- part_index: part.index,
- })
- throw new RangeError('未找到内容块')
- }
- if (
- feature('CONNECTOR_TEXT') &&
- delta.type === 'connector_text_delta'
- ) {
- if (contentBlock.type !== 'connector_text') {
- logEvent('tengu_streaming_error', {
- error_type:
- 'content_block_type_mismatch_connector_text' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- expected_type:
- 'connector_text' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- actual_type:
- contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- throw new Error('内容块不是 connector_text 块')
- }
- contentBlock.connector_text += delta.connector_text
- } else {
- switch (delta.type) {
- case 'citations_delta':
- // TODO: handle citations
- break
- case 'input_json_delta':
- if (
- contentBlock.type !== 'tool_use' &&
- contentBlock.type !== 'server_tool_use'
- ) {
- logEvent('tengu_streaming_error', {
- error_type:
- 'content_block_type_mismatch_input_json' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- expected_type:
- 'tool_use' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- actual_type:
- contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- throw new Error('内容块不是 input_json 块')
- }
- if (typeof contentBlock.input !== 'string') {
- logEvent('tengu_streaming_error', {
- error_type:
- 'content_block_input_not_string' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- input_type:
- typeof contentBlock.input as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- throw new Error('内容块输入不是字符串')
- }
- contentBlock.input += delta.partial_json
- break
- case 'text_delta':
- if (contentBlock.type !== 'text') {
- logEvent('tengu_streaming_error', {
- error_type:
- 'content_block_type_mismatch_text' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- expected_type:
- 'text' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- actual_type:
- contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- throw new Error('内容块不是文本块')
- }
- contentBlock.text += delta.text
- break
- case 'signature_delta':
- if (
- feature('CONNECTOR_TEXT') &&
- contentBlock.type === 'connector_text'
- ) {
- contentBlock.signature = delta.signature
- break
- }
- if (contentBlock.type !== 'thinking') {
- logEvent('tengu_streaming_error', {
- error_type:
- 'content_block_type_mismatch_thinking_signature' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- expected_type:
- 'thinking' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- actual_type:
- contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- throw new Error('内容块不是 thinking 块')
- }
- contentBlock.signature = delta.signature
- break
- case 'thinking_delta':
- if (contentBlock.type !== 'thinking') {
- logEvent('tengu_streaming_error', {
- error_type:
- 'content_block_type_mismatch_thinking_delta' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- expected_type:
- 'thinking' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- actual_type:
- contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- throw new Error('内容块不是 thinking 块')
- }
- contentBlock.thinking += delta.thinking
- break
- }
- }
- // Capture research from content_block_delta if available (internal only).
- // Always overwrite with the latest value.
- if (process.env.USER_TYPE === 'ant' && 'research' in part) {
- research = (part as { research: unknown }).research
- }
- break
- }
- case 'content_block_stop': {
- const contentBlock = contentBlocks[part.index]
- if (!contentBlock) {
- logEvent('tengu_streaming_error', {
- error_type:
- 'content_block_not_found_stop' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- part_type:
- part.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- part_index: part.index,
- })
- throw new RangeError('未找到内容块')
- }
- if (!partialMessage) {
- logEvent('tengu_streaming_error', {
- error_type:
- 'partial_message_not_found' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- part_type:
- part.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- throw new Error('未找到消息')
- }
- const m: AssistantMessage = {
- message: {
- ...partialMessage,
- content: normalizeContentFromAPI(
- [contentBlock] as BetaContentBlock[],
- tools,
- options.agentId,
- ),
- },
- requestId: streamRequestId ?? undefined,
- type: 'assistant',
- uuid: randomUUID(),
- timestamp: new Date().toISOString(),
- ...(process.env.USER_TYPE === 'ant' &&
- research !== undefined && { research }),
- ...(advisorModel && { advisorModel }),
- }
- newMessages.push(m)
- yield m
- break
- }
- case 'message_delta': {
- usage = updateUsage(usage, part.usage)
- // Capture research from message_delta if available (internal only).
- // Always overwrite with the latest value. Also write back to
- // already-yielded messages since message_delta arrives after
- // content_block_stop.
- if (
- process.env.USER_TYPE === 'ant' &&
- 'research' in (part as unknown as Record<string, unknown>)
- ) {
- research = (part as unknown as Record<string, unknown>).research
- for (const msg of newMessages) {
- msg.research = research
- }
- }
- // Write final usage and stop_reason back to the last yielded
- // message. Messages are created at content_block_stop from
- // partialMessage, which was set at message_start before any tokens
- // were generated (output_tokens: 0, stop_reason: null).
- // message_delta arrives after content_block_stop with the real
- // values.
- //
- // IMPORTANT: Use direct property mutation, not object replacement.
- // The transcript write queue holds a reference to message.message
- // and serializes it lazily (100ms flush interval). Object
- // replacement ({ ...lastMsg.message, usage }) would disconnect
- // the queued reference; direct mutation ensures the transcript
- // captures the final values.
- stopReason = part.delta.stop_reason
- const lastMsg = newMessages.at(-1)
- if (lastMsg) {
- lastMsg.message.usage = usage
- lastMsg.message.stop_reason = stopReason
- }
- // Update cost
- const costUSDForPart = calculateUSDCost(resolvedModel, usage)
- costUSD += addToTotalSessionCost(
- costUSDForPart,
- usage,
- options.model,
- )
- const refusalMessage = getErrorMessageIfRefusal(
- part.delta.stop_reason,
- options.model,
- )
- if (refusalMessage) {
- yield refusalMessage
- }
- if (stopReason === 'max_tokens') {
- logEvent('tengu_max_tokens_reached', {
- max_tokens: maxOutputTokens,
- })
- yield createAssistantAPIErrorMessage({
- content: `${API_ERROR_MESSAGE_PREFIX}: Claude's response exceeded the ${
- maxOutputTokens
- } output token maximum. To configure this behavior, set the CLAUDE_CODE_MAX_OUTPUT_TOKENS environment variable.`,
- apiError: 'max_output_tokens',
- error: 'max_output_tokens',
- })
- }
- if (stopReason === 'model_context_window_exceeded') {
- logEvent('tengu_context_window_exceeded', {
- max_tokens: maxOutputTokens,
- output_tokens: usage.output_tokens,
- })
- // Reuse the max_output_tokens recovery path — from the model's
- // perspective, both mean "response was cut off, continue from
- // where you left off."
- yield createAssistantAPIErrorMessage({
- content: `${API_ERROR_MESSAGE_PREFIX}: The model has reached its context window limit.`,
- apiError: 'max_output_tokens',
- error: 'max_output_tokens',
- })
- }
- break
- }
- case 'message_stop':
- break
- }
- yield {
- type: 'stream_event',
- event: part,
- ...(part.type === 'message_start' ? { ttftMs } : undefined),
- }
- }
- // Clear the idle timeout watchdog now that the stream loop has exited
- clearStreamIdleTimers()
- // If the stream was aborted by our idle timeout watchdog, fall back to
- // non-streaming retry rather than treating it as a completed stream.
- if (streamIdleAborted) {
- // Instrumentation: proves the for-await exited after the watchdog fired
- // (vs. hung forever). exit_delay_ms measures abort propagation latency:
- // 0-10ms = abort worked; >>1000ms = something else woke the loop.
- const exitDelayMs =
- streamWatchdogFiredAt !== null
- ? Math.round(performance.now() - streamWatchdogFiredAt)
- : -1
- logForDiagnosticsNoPII(
- 'info',
- 'cli_stream_loop_exited_after_watchdog_clean',
- )
- logEvent('tengu_stream_loop_exited_after_watchdog', {
- request_id: (streamRequestId ??
- 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- exit_delay_ms: exitDelayMs,
- exit_path:
- 'clean' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- model:
- options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- // Prevent double-emit: this throw lands in the catch block below,
- // whose exit_path='error' probe guards on streamWatchdogFiredAt.
- streamWatchdogFiredAt = null
- throw new Error('流空闲超时 - 未收到任何数据块')
- }
- // Detect when the stream completed without producing any assistant messages.
- // This covers two proxy failure modes:
- // 1. No events at all (!partialMessage): proxy returned 200 with non-SSE body
- // 2. Partial events (partialMessage set but no content blocks completed AND
- // no stop_reason received): proxy returned message_start but stream ended
- // before content_block_stop and before message_delta with stop_reason
- // BetaMessageStream had the first check in _endRequest() but the raw Stream
- // does not - without it the generator silently returns no assistant messages,
- // causing "Execution error" in -p mode.
- // Note: We must check stopReason to avoid false positives. For example, with
- // structured output (--json-schema), the model calls a StructuredOutput tool
- // on turn 1, then on turn 2 responds with end_turn and no content blocks.
- // That's a legitimate empty response, not an incomplete stream.
- if (!partialMessage || (newMessages.length === 0 && !stopReason)) {
- logForDebugging(
- !partialMessage
- ? 'Stream completed without receiving message_start event - triggering non-streaming fallback'
- : 'Stream completed with message_start but no content blocks completed - triggering non-streaming fallback',
- { level: 'error' },
- )
- logEvent('tengu_stream_no_events', {
- model:
- options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- request_id: (streamRequestId ??
- 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- throw new Error('流结束但未收到任何事件')
- }
- // Log summary if any stalls occurred during streaming
- if (stallCount > 0) {
- logForDebugging(
- `Streaming completed with ${stallCount} stall(s), total stall time: ${(totalStallTime / 1000).toFixed(1)}s`,
- { level: 'warn' },
- )
- logEvent('tengu_streaming_stall_summary', {
- stall_count: stallCount,
- total_stall_time_ms: totalStallTime,
- model:
- options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- request_id: (streamRequestId ??
- 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- }
- // Check if the cache actually broke based on response tokens
- if (feature('PROMPT_CACHE_BREAK_DETECTION')) {
- void checkResponseForCacheBreak(
- options.querySource,
- usage.cache_read_input_tokens,
- usage.cache_creation_input_tokens,
- messages,
- options.agentId,
- streamRequestId,
- )
- }
- // Process fallback percentage header and quota status if available
- // streamResponse is set when the stream is created in the withRetry callback above
- // TypeScript's control flow analysis can't track that streamResponse is set in the callback
- // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
- const resp = streamResponse as unknown as Response | undefined
- if (resp) {
- extractQuotaStatusFromHeaders(resp.headers)
- // Store headers for gateway detection
- responseHeaders = resp.headers
- }
- } catch (streamingError) {
- // Clear the idle timeout watchdog on error path too
- clearStreamIdleTimers()
- // Instrumentation: if the watchdog had already fired and the for-await
- // threw (rather than exiting cleanly), record that the loop DID exit and
- // how long after the watchdog. Distinguishes true hangs from error exits.
- if (streamIdleAborted && streamWatchdogFiredAt !== null) {
- const exitDelayMs = Math.round(
- performance.now() - streamWatchdogFiredAt,
- )
- logForDiagnosticsNoPII(
- 'info',
- 'cli_stream_loop_exited_after_watchdog_error',
- )
- logEvent('tengu_stream_loop_exited_after_watchdog', {
- request_id: (streamRequestId ??
- 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- exit_delay_ms: exitDelayMs,
- exit_path:
- 'error' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- error_name:
- streamingError instanceof Error
- ? (streamingError.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
- : ('unknown' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS),
- model:
- options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- }
- if (streamingError instanceof APIUserAbortError) {
- // Check if the abort signal was triggered by the user (ESC key)
- // If the signal is aborted, it's a user-initiated abort
- // If not, it's likely a timeout from the SDK
- if (signal.aborted) {
- // This is a real user abort (ESC key was pressed)
- logForDebugging(
- `Streaming aborted by user: ${errorMessage(streamingError)}`,
- )
- if (isAdvisorInProgress) {
- logEvent('tengu_advisor_tool_interrupted', {
- model:
- options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- advisor_model: (advisorModel ??
- 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- }
- throw streamingError
- } else {
- // The SDK threw APIUserAbortError but our signal wasn't aborted
- // This means it's a timeout from the SDK's internal timeout
- logForDebugging(
- `Streaming timeout (SDK abort): ${streamingError.message}`,
- { level: 'error' },
- )
- // Throw a more specific error for timeout
- throw new APIConnectionTimeoutError({ message: '请求超时' })
- }
- }
- // When the flag is enabled, skip the non-streaming fallback and let the
- // error propagate to withRetry. The mid-stream fallback causes double tool
- // execution when streaming tool execution is active: the partial stream
- // starts a tool, then the non-streaming retry produces the same tool_use
- // and runs it again. See inc-4258.
- const disableFallback =
- isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_NONSTREAMING_FALLBACK) ||
- getFeatureValue_CACHED_MAY_BE_STALE(
- 'tengu_disable_streaming_to_non_streaming_fallback',
- false,
- )
- if (disableFallback) {
- logForDebugging(
- `Error streaming (non-streaming fallback disabled): ${errorMessage(streamingError)}`,
- { level: 'error' },
- )
- logEvent('tengu_streaming_fallback_to_non_streaming', {
- model:
- options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- error:
- streamingError instanceof Error
- ? (streamingError.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
- : (String(
- streamingError,
- ) as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS),
- attemptNumber,
- maxOutputTokens,
- thinkingType:
- thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- fallback_disabled: true,
- request_id: (streamRequestId ??
- 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- fallback_cause: (streamIdleAborted
- ? 'watchdog'
- : 'other') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- throw streamingError
- }
- logForDebugging(
- `Error streaming, falling back to non-streaming mode: ${errorMessage(streamingError)}`,
- { level: 'error' },
- )
- didFallBackToNonStreaming = true
- if (options.onStreamingFallback) {
- options.onStreamingFallback()
- }
- logEvent('tengu_streaming_fallback_to_non_streaming', {
- model:
- options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- error:
- streamingError instanceof Error
- ? (streamingError.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
- : (String(
- streamingError,
- ) as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS),
- attemptNumber,
- maxOutputTokens,
- thinkingType:
- thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- fallback_disabled: false,
- request_id: (streamRequestId ??
- 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- fallback_cause: (streamIdleAborted
- ? 'watchdog'
- : 'other') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- // Fall back to non-streaming mode with retries.
- // If the streaming failure was itself a 529, count it toward the
- // consecutive-529 budget so total 529s-before-model-fallback is the
- // same whether the overload was hit in streaming or non-streaming mode.
- // This is a speculative fix for https://github.com/anthropics/claude-code/issues/1513
- // Instrumentation: proves executeNonStreamingRequest was entered (vs. the
- // fallback event firing but the call itself hanging at dispatch).
- logForDiagnosticsNoPII('info', 'cli_nonstreaming_fallback_started')
- logEvent('tengu_nonstreaming_fallback_started', {
- request_id: (streamRequestId ??
- 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- model:
- options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- fallback_cause: (streamIdleAborted
- ? 'watchdog'
- : 'other') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- const result = yield* executeNonStreamingRequest(
- { model: options.model, source: options.querySource },
- {
- model: options.model,
- fallbackModel: options.fallbackModel,
- thinkingConfig,
- ...(isFastModeEnabled() && { fastMode: isFastMode }),
- signal,
- initialConsecutive529Errors: is529Error(streamingError) ? 1 : 0,
- querySource: options.querySource,
- },
- paramsFromContext,
- (attempt, _startTime, tokens) => {
- attemptNumber = attempt
- maxOutputTokens = tokens
- },
- params => captureAPIRequest(params, options.querySource),
- streamRequestId,
- )
- const m: AssistantMessage = {
- message: {
- ...result,
- content: normalizeContentFromAPI(
- result.content,
- tools,
- options.agentId,
- ),
- },
- requestId: streamRequestId ?? undefined,
- type: 'assistant',
- uuid: randomUUID(),
- timestamp: new Date().toISOString(),
- ...(process.env.USER_TYPE === 'ant' &&
- research !== undefined && {
- research,
- }),
- ...(advisorModel && {
- advisorModel,
- }),
- }
- newMessages.push(m)
- fallbackMessage = m
- yield m
- } finally {
- clearStreamIdleTimers()
- }
- } catch (errorFromRetry) {
- // FallbackTriggeredError must propagate to query.ts, which performs the
- // actual model switch. Swallowing it here would turn the fallback into a
- // no-op — the user would just see "Model fallback triggered: X -> Y" as
- // an error message with no actual retry on the fallback model.
- if (errorFromRetry instanceof FallbackTriggeredError) {
- throw errorFromRetry
- }
- // Check if this is a 404 error during stream creation that should trigger
- // non-streaming fallback. This handles gateways that return 404 for streaming
- // endpoints but work fine with non-streaming. Before v2.1.8, BetaMessageStream
- // threw 404s during iteration (caught by inner catch with fallback), but now
- // with raw streams, 404s are thrown during creation (caught here).
- const is404StreamCreationError =
- !didFallBackToNonStreaming &&
- errorFromRetry instanceof CannotRetryError &&
- errorFromRetry.originalError instanceof APIError &&
- errorFromRetry.originalError.status === 404
- if (is404StreamCreationError) {
- // 404 is thrown at .withResponse() before streamRequestId is assigned,
- // and CannotRetryError means every retry failed — so grab the failed
- // request's ID from the error header instead.
- const failedRequestId =
- (errorFromRetry.originalError as APIError).requestID ?? 'unknown'
- logForDebugging(
- 'Streaming endpoint returned 404, falling back to non-streaming mode',
- { level: 'warn' },
- )
- didFallBackToNonStreaming = true
- if (options.onStreamingFallback) {
- options.onStreamingFallback()
- }
- logEvent('tengu_streaming_fallback_to_non_streaming', {
- model:
- options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- error:
- '404_stream_creation' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- attemptNumber,
- maxOutputTokens,
- thinkingType:
- thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- request_id:
- failedRequestId as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- fallback_cause:
- '404_stream_creation' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- try {
- // Fall back to non-streaming mode
- const result = yield* executeNonStreamingRequest(
- { model: options.model, source: options.querySource },
- {
- model: options.model,
- fallbackModel: options.fallbackModel,
- thinkingConfig,
- ...(isFastModeEnabled() && { fastMode: isFastMode }),
- signal,
- },
- paramsFromContext,
- (attempt, _startTime, tokens) => {
- attemptNumber = attempt
- maxOutputTokens = tokens
- },
- params => captureAPIRequest(params, options.querySource),
- failedRequestId,
- )
- const m: AssistantMessage = {
- message: {
- ...result,
- content: normalizeContentFromAPI(
- result.content,
- tools,
- options.agentId,
- ),
- },
- requestId: streamRequestId ?? undefined,
- type: 'assistant',
- uuid: randomUUID(),
- timestamp: new Date().toISOString(),
- ...(process.env.USER_TYPE === 'ant' &&
- research !== undefined && { research }),
- ...(advisorModel && { advisorModel }),
- }
- newMessages.push(m)
- fallbackMessage = m
- yield m
- // Continue to success logging below
- } catch (fallbackError) {
- // Propagate model-fallback signal to query.ts (see comment above).
- if (fallbackError instanceof FallbackTriggeredError) {
- throw fallbackError
- }
- // Fallback also failed, handle as normal error
- logForDebugging(
- `Non-streaming fallback also failed: ${errorMessage(fallbackError)}`,
- { level: 'error' },
- )
- let error = fallbackError
- let errorModel = options.model
- if (fallbackError instanceof CannotRetryError) {
- error = fallbackError.originalError
- errorModel = fallbackError.retryContext.model
- }
- if (error instanceof APIError) {
- extractQuotaStatusFromError(error)
- }
- const requestId =
- streamRequestId ||
- (error instanceof APIError ? error.requestID : undefined) ||
- (error instanceof APIError
- ? (error.error as { request_id?: string })?.request_id
- : undefined)
- logAPIError({
- error,
- model: errorModel,
- messageCount: messagesForAPI.length,
- messageTokens: tokenCountFromLastAPIResponse(messagesForAPI),
- durationMs: Date.now() - start,
- durationMsIncludingRetries: Date.now() - startIncludingRetries,
- attempt: attemptNumber,
- requestId,
- clientRequestId,
- didFallBackToNonStreaming,
- queryTracking: options.queryTracking,
- querySource: options.querySource,
- llmSpan,
- fastMode: isFastModeRequest,
- previousRequestId,
- })
- if (error instanceof APIUserAbortError) {
- releaseStreamResources()
- return
- }
- yield getAssistantMessageFromError(error, errorModel, {
- messages,
- messagesForAPI,
- })
- releaseStreamResources()
- return
- }
- } else {
- // Original error handling for non-404 errors
- logForDebugging(`Error in API request: ${errorMessage(errorFromRetry)}`, {
- level: 'error',
- })
- let error = errorFromRetry
- let errorModel = options.model
- if (errorFromRetry instanceof CannotRetryError) {
- error = errorFromRetry.originalError
- errorModel = errorFromRetry.retryContext.model
- }
- // Extract quota status from error headers if it's a rate limit error
- if (error instanceof APIError) {
- extractQuotaStatusFromError(error)
- }
- // Extract requestId from stream, error header, or error body
- const requestId =
- streamRequestId ||
- (error instanceof APIError ? error.requestID : undefined) ||
- (error instanceof APIError
- ? (error.error as { request_id?: string })?.request_id
- : undefined)
- logAPIError({
- error,
- model: errorModel,
- messageCount: messagesForAPI.length,
- messageTokens: tokenCountFromLastAPIResponse(messagesForAPI),
- durationMs: Date.now() - start,
- durationMsIncludingRetries: Date.now() - startIncludingRetries,
- attempt: attemptNumber,
- requestId,
- clientRequestId,
- didFallBackToNonStreaming,
- queryTracking: options.queryTracking,
- querySource: options.querySource,
- llmSpan,
- fastMode: isFastModeRequest,
- previousRequestId,
- })
- // Don't yield an assistant error message for user aborts
- // The interruption message is handled in query.ts
- if (error instanceof APIUserAbortError) {
- releaseStreamResources()
- return
- }
- yield getAssistantMessageFromError(error, errorModel, {
- messages,
- messagesForAPI,
- })
- releaseStreamResources()
- return
- }
- } finally {
- stopSessionActivity('api_call')
- // Must be in the finally block: if the generator is terminated early
- // via .return() (e.g. consumer breaks out of for-await-of, or query.ts
- // encounters an abort), code after the try/finally never executes.
- // Without this, the Response object's native TLS/socket buffers leak
- // until the generator itself is GC'd (see GH #32920).
- releaseStreamResources()
- // Non-streaming fallback cost: the streaming path tracks cost in the
- // message_delta handler before any yield. Fallback pushes to newMessages
- // then yields, so tracking must be here to survive .return() at the yield.
- if (fallbackMessage) {
- const fallbackUsage = fallbackMessage.message.usage
- usage = updateUsage(EMPTY_USAGE, fallbackUsage)
- stopReason = fallbackMessage.message.stop_reason
- const fallbackCost = calculateUSDCost(resolvedModel, fallbackUsage)
- costUSD += addToTotalSessionCost(
- fallbackCost,
- fallbackUsage,
- options.model,
- )
- }
- }
- // Mark all registered tools as sent to API so they become eligible for deletion
- if (feature('CACHED_MICROCOMPACT') && cachedMCEnabled) {
- markToolsSentToAPIState()
- }
- // Track the last requestId for the main conversation chain so shutdown
- // can send a cache eviction hint to inference. Exclude backgrounded
- // sessions (Ctrl+B) which share the repl_main_thread querySource but
- // run inside an agent context — they are independent conversation chains
- // whose cache should not be evicted when the foreground session clears.
- if (
- streamRequestId &&
- !getAgentContext() &&
- (options.querySource.startsWith('repl_main_thread') ||
- options.querySource === 'sdk')
- ) {
- setLastMainRequestId(streamRequestId)
- }
- // Precompute scalars so the fire-and-forget .then() closure doesn't pin the
- // full messagesForAPI array (the entire conversation up to the context window
- // limit) until getToolPermissionContext() resolves.
- const logMessageCount = messagesForAPI.length
- const logMessageTokens = tokenCountFromLastAPIResponse(messagesForAPI)
- void options.getToolPermissionContext().then(permissionContext => {
- logAPISuccessAndDuration({
- model:
- newMessages[0]?.message.model ?? partialMessage?.model ?? options.model,
- preNormalizedModel: options.model,
- usage,
- start,
- startIncludingRetries,
- attempt: attemptNumber,
- messageCount: logMessageCount,
- messageTokens: logMessageTokens,
- requestId: streamRequestId ?? null,
- stopReason,
- ttftMs,
- didFallBackToNonStreaming,
- querySource: options.querySource,
- headers: responseHeaders,
- costUSD,
- queryTracking: options.queryTracking,
- permissionMode: permissionContext.mode,
- // Pass newMessages for beta tracing - extraction happens in logging.ts
- // only when beta tracing is enabled
- newMessages,
- llmSpan,
- globalCacheStrategy,
- requestSetupMs: start - startIncludingRetries,
- attemptStartTimes,
- fastMode: isFastModeRequest,
- previousRequestId,
- betas: lastRequestBetas,
- })
- })
- // Defensive: also release on normal completion (no-op if finally already ran).
- releaseStreamResources()
- }
- /**
- * Cleans up stream resources to prevent memory leaks.
- * @internal Exported for testing
- */
- export function cleanupStream(
- stream: Stream<BetaRawMessageStreamEvent> | undefined,
- ): void {
- if (!stream) {
- return
- }
- try {
- // Abort the stream via its controller if not already aborted
- if (!stream.controller.signal.aborted) {
- stream.controller.abort()
- }
- } catch {
- // Ignore - stream may already be closed
- }
- }
- /**
- * Updates usage statistics with new values from streaming API events.
- * Note: Anthropic's streaming API provides cumulative usage totals, not incremental deltas.
- * Each event contains the complete usage up to that point in the stream.
- *
- * Input-related tokens (input_tokens, cache_creation_input_tokens, cache_read_input_tokens)
- * are typically set in message_start and remain constant. message_delta events may send
- * explicit 0 values for these fields, which should not overwrite the values from message_start.
- * We only update these fields if they have a non-null, non-zero value.
- */
- export function updateUsage(
- usage: Readonly<NonNullableUsage>,
- partUsage: BetaMessageDeltaUsage | undefined,
- ): NonNullableUsage {
- if (!partUsage) {
- return { ...usage }
- }
- return {
- input_tokens:
- partUsage.input_tokens !== null && partUsage.input_tokens > 0
- ? partUsage.input_tokens
- : usage.input_tokens,
- cache_creation_input_tokens:
- partUsage.cache_creation_input_tokens !== null &&
- partUsage.cache_creation_input_tokens > 0
- ? partUsage.cache_creation_input_tokens
- : usage.cache_creation_input_tokens,
- cache_read_input_tokens:
- partUsage.cache_read_input_tokens !== null &&
- partUsage.cache_read_input_tokens > 0
- ? partUsage.cache_read_input_tokens
- : usage.cache_read_input_tokens,
- output_tokens: partUsage.output_tokens ?? usage.output_tokens,
- server_tool_use: {
- web_search_requests:
- partUsage.server_tool_use?.web_search_requests ??
- usage.server_tool_use.web_search_requests,
- web_fetch_requests:
- partUsage.server_tool_use?.web_fetch_requests ??
- usage.server_tool_use.web_fetch_requests,
- },
- service_tier: usage.service_tier,
- cache_creation: {
- // SDK type BetaMessageDeltaUsage is missing cache_creation, but it's real!
- ephemeral_1h_input_tokens:
- (partUsage as BetaUsage).cache_creation?.ephemeral_1h_input_tokens ??
- usage.cache_creation.ephemeral_1h_input_tokens,
- ephemeral_5m_input_tokens:
- (partUsage as BetaUsage).cache_creation?.ephemeral_5m_input_tokens ??
- usage.cache_creation.ephemeral_5m_input_tokens,
- },
- // cache_deleted_input_tokens: returned by the API when cache editing
- // deletes KV cache content, but not in SDK types. Kept off NonNullableUsage
- // so the string is eliminated from external builds by dead code elimination.
- // Uses the same > 0 guard as other token fields to prevent message_delta
- // from overwriting the real value with 0.
- ...(feature('CACHED_MICROCOMPACT')
- ? {
- cache_deleted_input_tokens:
- (partUsage as unknown as { cache_deleted_input_tokens?: number })
- .cache_deleted_input_tokens != null &&
- (partUsage as unknown as { cache_deleted_input_tokens: number })
- .cache_deleted_input_tokens > 0
- ? (partUsage as unknown as { cache_deleted_input_tokens: number })
- .cache_deleted_input_tokens
- : ((usage as unknown as { cache_deleted_input_tokens?: number })
- .cache_deleted_input_tokens ?? 0),
- }
- : {}),
- inference_geo: usage.inference_geo,
- iterations: partUsage.iterations ?? usage.iterations,
- speed: (partUsage as BetaUsage).speed ?? usage.speed,
- }
- }
- /**
- * Accumulates usage from one message into a total usage object.
- * Used to track cumulative usage across multiple assistant turns.
- */
- export function accumulateUsage(
- totalUsage: Readonly<NonNullableUsage>,
- messageUsage: Readonly<NonNullableUsage>,
- ): NonNullableUsage {
- return {
- input_tokens: totalUsage.input_tokens + messageUsage.input_tokens,
- cache_creation_input_tokens:
- totalUsage.cache_creation_input_tokens +
- messageUsage.cache_creation_input_tokens,
- cache_read_input_tokens:
- totalUsage.cache_read_input_tokens + messageUsage.cache_read_input_tokens,
- output_tokens: totalUsage.output_tokens + messageUsage.output_tokens,
- server_tool_use: {
- web_search_requests:
- totalUsage.server_tool_use.web_search_requests +
- messageUsage.server_tool_use.web_search_requests,
- web_fetch_requests:
- totalUsage.server_tool_use.web_fetch_requests +
- messageUsage.server_tool_use.web_fetch_requests,
- },
- service_tier: messageUsage.service_tier, // Use the most recent service tier
- cache_creation: {
- ephemeral_1h_input_tokens:
- totalUsage.cache_creation.ephemeral_1h_input_tokens +
- messageUsage.cache_creation.ephemeral_1h_input_tokens,
- ephemeral_5m_input_tokens:
- totalUsage.cache_creation.ephemeral_5m_input_tokens +
- messageUsage.cache_creation.ephemeral_5m_input_tokens,
- },
- // See comment in updateUsage — field is not on NonNullableUsage to keep
- // the string out of external builds.
- ...(feature('CACHED_MICROCOMPACT')
- ? {
- cache_deleted_input_tokens:
- ((totalUsage as unknown as { cache_deleted_input_tokens?: number })
- .cache_deleted_input_tokens ?? 0) +
- ((
- messageUsage as unknown as { cache_deleted_input_tokens?: number }
- ).cache_deleted_input_tokens ?? 0),
- }
- : {}),
- inference_geo: messageUsage.inference_geo, // Use the most recent
- iterations: messageUsage.iterations, // Use the most recent
- speed: messageUsage.speed, // Use the most recent
- }
- }
- function isToolResultBlock(
- block: unknown,
- ): block is { type: 'tool_result'; tool_use_id: string } {
- return (
- block !== null &&
- typeof block === 'object' &&
- 'type' in block &&
- (block as { type: string }).type === 'tool_result' &&
- 'tool_use_id' in block
- )
- }
- type CachedMCEditsBlock = {
- type: 'cache_edits'
- edits: { type: 'delete'; cache_reference: string }[]
- }
- type CachedMCPinnedEdits = {
- userMessageIndex: number
- block: CachedMCEditsBlock
- }
- // Exported for testing cache_reference placement constraints
- export function addCacheBreakpoints(
- messages: (UserMessage | AssistantMessage)[],
- enablePromptCaching: boolean,
- querySource?: QuerySource,
- useCachedMC = false,
- newCacheEdits?: CachedMCEditsBlock | null,
- pinnedEdits?: CachedMCPinnedEdits[],
- skipCacheWrite = false,
- ): MessageParam[] {
- logEvent('tengu_api_cache_breakpoints', {
- totalMessageCount: messages.length,
- cachingEnabled: enablePromptCaching,
- skipCacheWrite,
- })
- // Exactly one message-level cache_control marker per request. Mycro's
- // turn-to-turn eviction (page_manager/index.rs: Index::insert) frees
- // local-attention KV pages at any cached prefix position NOT in
- // cache_store_int_token_boundaries. With two markers the second-to-last
- // position is protected and its locals survive an extra turn even though
- // nothing will ever resume from there — with one marker they're freed
- // immediately. For fire-and-forget forks (skipCacheWrite) we shift the
- // marker to the second-to-last message: that's the last shared-prefix
- // point, so the write is a no-op merge on mycro (entry already exists)
- // and the fork doesn't leave its own tail in the KVCC. Dense pages are
- // refcounted and survive via the new hash either way.
- const markerIndex = skipCacheWrite ? messages.length - 2 : messages.length - 1
- const result = messages.map((msg, index) => {
- const addCache = index === markerIndex
- if (msg.type === 'user') {
- return userMessageToMessageParam(
- msg,
- addCache,
- enablePromptCaching,
- querySource,
- )
- }
- return assistantMessageToMessageParam(
- msg,
- addCache,
- enablePromptCaching,
- querySource,
- )
- })
- if (!useCachedMC) {
- return result
- }
- // Track all cache_references being deleted to prevent duplicates across blocks.
- const seenDeleteRefs = new Set<string>()
- // Helper to deduplicate a cache_edits block against already-seen deletions
- const deduplicateEdits = (block: CachedMCEditsBlock): CachedMCEditsBlock => {
- const uniqueEdits = block.edits.filter(edit => {
- if (seenDeleteRefs.has(edit.cache_reference)) {
- return false
- }
- seenDeleteRefs.add(edit.cache_reference)
- return true
- })
- return { ...block, edits: uniqueEdits }
- }
- // Re-insert all previously-pinned cache_edits at their original positions
- for (const pinned of pinnedEdits ?? []) {
- const msg = result[pinned.userMessageIndex]
- if (msg && msg.role === 'user') {
- if (!Array.isArray(msg.content)) {
- msg.content = [{ type: 'text', text: msg.content as string }]
- }
- const dedupedBlock = deduplicateEdits(pinned.block)
- if (dedupedBlock.edits.length > 0) {
- insertBlockAfterToolResults(msg.content, dedupedBlock)
- }
- }
- }
- // Insert new cache_edits into the last user message and pin them
- if (newCacheEdits && result.length > 0) {
- const dedupedNewEdits = deduplicateEdits(newCacheEdits)
- if (dedupedNewEdits.edits.length > 0) {
- for (let i = result.length - 1; i >= 0; i--) {
- const msg = result[i]
- if (msg && msg.role === 'user') {
- if (!Array.isArray(msg.content)) {
- msg.content = [{ type: 'text', text: msg.content as string }]
- }
- insertBlockAfterToolResults(msg.content, dedupedNewEdits)
- // Pin so this block is re-sent at the same position in future calls
- pinCacheEdits(i, newCacheEdits)
- logForDebugging(
- `Added cache_edits block with ${dedupedNewEdits.edits.length} deletion(s) to message[${i}]: ${dedupedNewEdits.edits.map(e => e.cache_reference).join(', ')}`,
- )
- break
- }
- }
- }
- }
- // Add cache_reference to tool_result blocks that are within the cached prefix.
- // Must be done AFTER cache_edits insertion since that modifies content arrays.
- if (enablePromptCaching) {
- // Find the last message containing a cache_control marker
- let lastCCMsg = -1
- for (let i = 0; i < result.length; i++) {
- const msg = result[i]!
- if (Array.isArray(msg.content)) {
- for (const block of msg.content) {
- if (block && typeof block === 'object' && 'cache_control' in block) {
- lastCCMsg = i
- }
- }
- }
- }
- // Add cache_reference to tool_result blocks that are strictly before
- // the last cache_control marker. The API requires cache_reference to
- // appear "before or on" the last cache_control — we use strict "before"
- // to avoid edge cases where cache_edits splicing shifts block indices.
- //
- // Create new objects instead of mutating in-place to avoid contaminating
- // blocks reused by secondary queries that use models without cache_editing support.
- if (lastCCMsg >= 0) {
- for (let i = 0; i < lastCCMsg; i++) {
- const msg = result[i]!
- if (msg.role !== 'user' || !Array.isArray(msg.content)) {
- continue
- }
- let cloned = false
- for (let j = 0; j < msg.content.length; j++) {
- const block = msg.content[j]
- if (block && isToolResultBlock(block)) {
- if (!cloned) {
- msg.content = [...msg.content]
- cloned = true
- }
- msg.content[j] = Object.assign({}, block, {
- cache_reference: block.tool_use_id,
- })
- }
- }
- }
- }
- }
- return result
- }
- export function buildSystemPromptBlocks(
- systemPrompt: SystemPrompt,
- enablePromptCaching: boolean,
- options?: {
- skipGlobalCacheForSystemPrompt?: boolean
- querySource?: QuerySource
- },
- ): TextBlockParam[] {
- // IMPORTANT: Do not add any more blocks for caching or you will get a 400
- return splitSysPromptPrefix(systemPrompt, {
- skipGlobalCacheForSystemPrompt: options?.skipGlobalCacheForSystemPrompt,
- }).map(block => {
- return {
- type: 'text' as const,
- text: block.text,
- ...(enablePromptCaching &&
- block.cacheScope !== null && {
- cache_control: getCacheControl({
- scope: block.cacheScope,
- querySource: options?.querySource,
- }),
- }),
- }
- })
- }
- type HaikuOptions = Omit<Options, 'model' | 'getToolPermissionContext'>
- export async function queryHaiku({
- systemPrompt = asSystemPrompt([]),
- userPrompt,
- outputFormat,
- signal,
- options,
- }: {
- systemPrompt: SystemPrompt
- userPrompt: string
- outputFormat?: BetaJSONOutputFormat
- signal: AbortSignal
- options: HaikuOptions
- }): Promise<AssistantMessage> {
- const result = await withVCR(
- [
- createUserMessage({
- content: systemPrompt.map(text => ({ type: 'text', text })),
- }),
- createUserMessage({
- content: userPrompt,
- }),
- ],
- async () => {
- const messages = [
- createUserMessage({
- content: userPrompt,
- }),
- ]
- const result = await queryModelWithoutStreaming({
- messages,
- systemPrompt,
- thinkingConfig: { type: 'disabled' },
- tools: [],
- signal,
- options: {
- ...options,
- model: getSmallFastModel(),
- enablePromptCaching: options.enablePromptCaching ?? false,
- outputFormat,
- async getToolPermissionContext() {
- return getEmptyToolPermissionContext()
- },
- },
- })
- return [result]
- },
- )
- // We don't use streaming for Haiku so this is safe
- return result[0]! as AssistantMessage
- }
- type QueryWithModelOptions = Omit<Options, 'getToolPermissionContext'>
- /**
- * Query a specific model through the Claude Code infrastructure.
- * This goes through the full query pipeline including proper authentication,
- * betas, and headers - unlike direct API calls.
- */
- export async function queryWithModel({
- systemPrompt = asSystemPrompt([]),
- userPrompt,
- outputFormat,
- signal,
- options,
- }: {
- systemPrompt: SystemPrompt
- userPrompt: string
- outputFormat?: BetaJSONOutputFormat
- signal: AbortSignal
- options: QueryWithModelOptions
- }): Promise<AssistantMessage> {
- const result = await withVCR(
- [
- createUserMessage({
- content: systemPrompt.map(text => ({ type: 'text', text })),
- }),
- createUserMessage({
- content: userPrompt,
- }),
- ],
- async () => {
- const messages = [
- createUserMessage({
- content: userPrompt,
- }),
- ]
- const result = await queryModelWithoutStreaming({
- messages,
- systemPrompt,
- thinkingConfig: { type: 'disabled' },
- tools: [],
- signal,
- options: {
- ...options,
- enablePromptCaching: options.enablePromptCaching ?? false,
- outputFormat,
- async getToolPermissionContext() {
- return getEmptyToolPermissionContext()
- },
- },
- })
- return [result]
- },
- )
- return result[0]! as AssistantMessage
- }
- // Non-streaming requests have a 10min max per the docs:
- // https://platform.claude.com/docs/en/api/errors#long-requests
- // The SDK's 21333-token cap is derived from 10min × 128k tokens/hour, but we
- // bypass it by setting a client-level timeout, so we can cap higher.
- export const MAX_NON_STREAMING_TOKENS = 64_000
- /**
- * Adjusts thinking budget when max_tokens is capped for non-streaming fallback.
- * Ensures the API constraint: max_tokens > thinking.budget_tokens
- *
- * @param params - The parameters that will be sent to the API
- * @param maxTokensCap - The maximum allowed tokens (MAX_NON_STREAMING_TOKENS)
- * @returns Adjusted parameters with thinking budget capped if needed
- */
- export function adjustParamsForNonStreaming<
- T extends {
- max_tokens: number
- thinking?: BetaMessageStreamParams['thinking']
- },
- >(params: T, maxTokensCap: number): T {
- const cappedMaxTokens = Math.min(params.max_tokens, maxTokensCap)
- // Adjust thinking budget if it would exceed capped max_tokens
- // to maintain the constraint: max_tokens > thinking.budget_tokens
- const adjustedParams = { ...params }
- if (
- adjustedParams.thinking?.type === 'enabled' &&
- adjustedParams.thinking.budget_tokens
- ) {
- adjustedParams.thinking = {
- ...adjustedParams.thinking,
- budget_tokens: Math.min(
- adjustedParams.thinking.budget_tokens,
- cappedMaxTokens - 1, // Must be at least 1 less than max_tokens
- ),
- }
- }
- return {
- ...adjustedParams,
- max_tokens: cappedMaxTokens,
- }
- }
- function isMaxTokensCapEnabled(): boolean {
- // 3P default: false (not validated on Bedrock/Vertex)
- return getFeatureValue_CACHED_MAY_BE_STALE('tengu_otk_slot_v1', false)
- }
- export function getMaxOutputTokensForModel(model: string): number {
- const maxOutputTokens = getModelMaxOutputTokens(model)
- // Slot-reservation cap: drop default to 8k for all models. BQ p99 output
- // = 4,911 tokens; 32k/64k defaults over-reserve 8-16× slot capacity.
- // Requests hitting the cap get one clean retry at 64k (query.ts
- // max_output_tokens_escalate). Math.min keeps models with lower native
- // defaults (e.g. claude-3-opus at 4k) at their native value. Applied
- // before the env-var override so CLAUDE_CODE_MAX_OUTPUT_TOKENS still wins.
- const defaultTokens = isMaxTokensCapEnabled()
- ? Math.min(maxOutputTokens.default, CAPPED_DEFAULT_MAX_TOKENS)
- : maxOutputTokens.default
- const result = validateBoundedIntEnvVar(
- 'CLAUDE_CODE_MAX_OUTPUT_TOKENS',
- process.env.CLAUDE_CODE_MAX_OUTPUT_TOKENS,
- defaultTokens,
- maxOutputTokens.upperLimit,
- )
- return result.effective
- }
|