streaming.mjs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. import { EventStreamMarshaller } from '@smithy/eventstream-serde-node';
  2. import { fromBase64, toBase64 } from '@smithy/util-base64';
  3. import { streamCollector } from '@smithy/fetch-http-handler';
  4. import { Stream as CoreStream } from '@anthropic-ai/sdk/streaming';
  5. import { AnthropicError } from '@anthropic-ai/sdk/error';
  6. import { APIError } from '@anthropic-ai/sdk';
  7. import { de_ResponseStream } from "../AWS_restJson1.mjs";
  8. import { ReadableStreamToAsyncIterable } from "../internal/shims.mjs";
  9. import { safeJSON } from "../internal/utils/values.mjs";
  10. import { loggerFor } from "../internal/utils/log.mjs";
  11. export const toUtf8 = (input) => new TextDecoder('utf-8').decode(input);
  12. export const fromUtf8 = (input) => new TextEncoder().encode(input);
  13. // `de_ResponseStream` parses a Bedrock response stream and emits events as they are found.
  14. // It requires a "context" argument which has many fields, but for what we're using it for
  15. // it only needs this.
  16. export const getMinimalSerdeContext = () => {
  17. const marshaller = new EventStreamMarshaller({ utf8Encoder: toUtf8, utf8Decoder: fromUtf8 });
  18. return {
  19. base64Decoder: fromBase64,
  20. base64Encoder: toBase64,
  21. utf8Decoder: fromUtf8,
  22. utf8Encoder: toUtf8,
  23. eventStreamMarshaller: marshaller,
  24. streamCollector: streamCollector,
  25. };
  26. };
  27. export class Stream extends CoreStream {
  28. static fromSSEResponse(response, controller, client) {
  29. let consumed = false;
  30. const logger = client ? loggerFor(client) : console;
  31. async function* iterMessages() {
  32. if (!response.body) {
  33. controller.abort();
  34. throw new AnthropicError(`Attempted to iterate over a response with no body`);
  35. }
  36. const responseBodyIter = ReadableStreamToAsyncIterable(response.body);
  37. const eventStream = de_ResponseStream(responseBodyIter, getMinimalSerdeContext());
  38. for await (const event of eventStream) {
  39. if (event.chunk && event.chunk.bytes) {
  40. const s = toUtf8(event.chunk.bytes);
  41. yield { event: 'chunk', data: s, raw: [] };
  42. }
  43. else if (event.internalServerException) {
  44. yield { event: 'error', data: 'InternalServerException', raw: [] };
  45. }
  46. else if (event.modelStreamErrorException) {
  47. yield { event: 'error', data: 'ModelStreamErrorException', raw: [] };
  48. }
  49. else if (event.validationException) {
  50. yield { event: 'error', data: 'ValidationException', raw: [] };
  51. }
  52. else if (event.throttlingException) {
  53. yield { event: 'error', data: 'ThrottlingException', raw: [] };
  54. }
  55. }
  56. }
  57. // Note: this function is copied entirely from the core SDK
  58. async function* iterator() {
  59. if (consumed) {
  60. throw new Error('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
  61. }
  62. consumed = true;
  63. let done = false;
  64. try {
  65. for await (const sse of iterMessages()) {
  66. if (sse.event === 'chunk') {
  67. try {
  68. yield JSON.parse(sse.data);
  69. }
  70. catch (e) {
  71. logger.error(`Could not parse message into JSON:`, sse.data);
  72. logger.error(`From chunk:`, sse.raw);
  73. throw e;
  74. }
  75. }
  76. if (sse.event === 'error') {
  77. const errText = sse.data;
  78. const errJSON = safeJSON(errText);
  79. const errMessage = errJSON ? undefined : errText;
  80. throw APIError.generate(undefined, errJSON, errMessage, response.headers);
  81. }
  82. }
  83. done = true;
  84. }
  85. catch (e) {
  86. // If the user calls `stream.controller.abort()`, we should exit without throwing.
  87. if (isAbortError(e))
  88. return;
  89. throw e;
  90. }
  91. finally {
  92. // If the user `break`s, abort the ongoing request.
  93. if (!done)
  94. controller.abort();
  95. }
  96. }
  97. return new Stream(iterator, controller);
  98. }
  99. }
  100. function isAbortError(err) {
  101. return (typeof err === 'object' &&
  102. err !== null &&
  103. // Spec-compliant fetch implementations
  104. (('name' in err && err.name === 'AbortError') ||
  105. // Expo fetch
  106. ('message' in err && String(err.message).includes('FetchRequestCanceledException'))));
  107. }
  108. //# sourceMappingURL=streaming.mjs.map