| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- var _Stream_client;
- import { __classPrivateFieldGet, __classPrivateFieldSet } from "../internal/tslib.mjs";
- import { AnthropicError } from "./error.mjs";
- import { makeReadableStream } from "../internal/shims.mjs";
- import { findDoubleNewlineIndex, LineDecoder } from "../internal/decoders/line.mjs";
- import { ReadableStreamToAsyncIterable } from "../internal/shims.mjs";
- import { isAbortError } from "../internal/errors.mjs";
- import { safeJSON } from "../internal/utils/values.mjs";
- import { encodeUTF8 } from "../internal/utils/bytes.mjs";
- import { loggerFor } from "../internal/utils/log.mjs";
- import { APIError } from "./error.mjs";
- export class Stream {
- constructor(iterator, controller, client) {
- this.iterator = iterator;
- _Stream_client.set(this, void 0);
- this.controller = controller;
- __classPrivateFieldSet(this, _Stream_client, client, "f");
- }
- static fromSSEResponse(response, controller, client) {
- let consumed = false;
- const logger = client ? loggerFor(client) : console;
- async function* iterator() {
- if (consumed) {
- throw new AnthropicError('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
- }
- consumed = true;
- let done = false;
- try {
- for await (const sse of _iterSSEMessages(response, controller)) {
- if (sse.event === 'completion') {
- try {
- yield JSON.parse(sse.data);
- }
- catch (e) {
- logger.error(`Could not parse message into JSON:`, sse.data);
- logger.error(`From chunk:`, sse.raw);
- throw e;
- }
- }
- if (sse.event === 'message_start' ||
- sse.event === 'message_delta' ||
- sse.event === 'message_stop' ||
- sse.event === 'content_block_start' ||
- sse.event === 'content_block_delta' ||
- sse.event === 'content_block_stop') {
- try {
- yield JSON.parse(sse.data);
- }
- catch (e) {
- logger.error(`Could not parse message into JSON:`, sse.data);
- logger.error(`From chunk:`, sse.raw);
- throw e;
- }
- }
- if (sse.event === 'ping') {
- continue;
- }
- if (sse.event === 'error') {
- throw new APIError(undefined, safeJSON(sse.data) ?? sse.data, undefined, response.headers);
- }
- }
- done = true;
- }
- catch (e) {
- // If the user calls `stream.controller.abort()`, we should exit without throwing.
- if (isAbortError(e))
- return;
- throw e;
- }
- finally {
- // If the user `break`s, abort the ongoing request.
- if (!done)
- controller.abort();
- }
- }
- return new Stream(iterator, controller, client);
- }
- /**
- * Generates a Stream from a newline-separated ReadableStream
- * where each item is a JSON value.
- */
- static fromReadableStream(readableStream, controller, client) {
- let consumed = false;
- async function* iterLines() {
- const lineDecoder = new LineDecoder();
- const iter = ReadableStreamToAsyncIterable(readableStream);
- for await (const chunk of iter) {
- for (const line of lineDecoder.decode(chunk)) {
- yield line;
- }
- }
- for (const line of lineDecoder.flush()) {
- yield line;
- }
- }
- async function* iterator() {
- if (consumed) {
- throw new AnthropicError('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
- }
- consumed = true;
- let done = false;
- try {
- for await (const line of iterLines()) {
- if (done)
- continue;
- if (line)
- yield JSON.parse(line);
- }
- done = true;
- }
- catch (e) {
- // If the user calls `stream.controller.abort()`, we should exit without throwing.
- if (isAbortError(e))
- return;
- throw e;
- }
- finally {
- // If the user `break`s, abort the ongoing request.
- if (!done)
- controller.abort();
- }
- }
- return new Stream(iterator, controller, client);
- }
- [(_Stream_client = new WeakMap(), Symbol.asyncIterator)]() {
- return this.iterator();
- }
- /**
- * Splits the stream into two streams which can be
- * independently read from at different speeds.
- */
- tee() {
- const left = [];
- const right = [];
- const iterator = this.iterator();
- const teeIterator = (queue) => {
- return {
- next: () => {
- if (queue.length === 0) {
- const result = iterator.next();
- left.push(result);
- right.push(result);
- }
- return queue.shift();
- },
- };
- };
- return [
- new Stream(() => teeIterator(left), this.controller, __classPrivateFieldGet(this, _Stream_client, "f")),
- new Stream(() => teeIterator(right), this.controller, __classPrivateFieldGet(this, _Stream_client, "f")),
- ];
- }
- /**
- * Converts this stream to a newline-separated ReadableStream of
- * JSON stringified values in the stream
- * which can be turned back into a Stream with `Stream.fromReadableStream()`.
- */
- toReadableStream() {
- const self = this;
- let iter;
- return makeReadableStream({
- async start() {
- iter = self[Symbol.asyncIterator]();
- },
- async pull(ctrl) {
- try {
- const { value, done } = await iter.next();
- if (done)
- return ctrl.close();
- const bytes = encodeUTF8(JSON.stringify(value) + '\n');
- ctrl.enqueue(bytes);
- }
- catch (err) {
- ctrl.error(err);
- }
- },
- async cancel() {
- await iter.return?.();
- },
- });
- }
- }
- export async function* _iterSSEMessages(response, controller) {
- if (!response.body) {
- controller.abort();
- if (typeof globalThis.navigator !== 'undefined' &&
- globalThis.navigator.product === 'ReactNative') {
- throw new AnthropicError(`The default react-native fetch implementation does not support streaming. Please use expo/fetch: https://docs.expo.dev/versions/latest/sdk/expo/#expofetch-api`);
- }
- throw new AnthropicError(`Attempted to iterate over a response with no body`);
- }
- const sseDecoder = new SSEDecoder();
- const lineDecoder = new LineDecoder();
- const iter = ReadableStreamToAsyncIterable(response.body);
- for await (const sseChunk of iterSSEChunks(iter)) {
- for (const line of lineDecoder.decode(sseChunk)) {
- const sse = sseDecoder.decode(line);
- if (sse)
- yield sse;
- }
- }
- for (const line of lineDecoder.flush()) {
- const sse = sseDecoder.decode(line);
- if (sse)
- yield sse;
- }
- }
- /**
- * Given an async iterable iterator, iterates over it and yields full
- * SSE chunks, i.e. yields when a double new-line is encountered.
- */
- async function* iterSSEChunks(iterator) {
- let data = new Uint8Array();
- for await (const chunk of iterator) {
- if (chunk == null) {
- continue;
- }
- const binaryChunk = chunk instanceof ArrayBuffer ? new Uint8Array(chunk)
- : typeof chunk === 'string' ? encodeUTF8(chunk)
- : chunk;
- let newData = new Uint8Array(data.length + binaryChunk.length);
- newData.set(data);
- newData.set(binaryChunk, data.length);
- data = newData;
- let patternIndex;
- while ((patternIndex = findDoubleNewlineIndex(data)) !== -1) {
- yield data.slice(0, patternIndex);
- data = data.slice(patternIndex);
- }
- }
- if (data.length > 0) {
- yield data;
- }
- }
- class SSEDecoder {
- constructor() {
- this.event = null;
- this.data = [];
- this.chunks = [];
- }
- decode(line) {
- if (line.endsWith('\r')) {
- line = line.substring(0, line.length - 1);
- }
- if (!line) {
- // empty line and we didn't previously encounter any messages
- if (!this.event && !this.data.length)
- return null;
- const sse = {
- event: this.event,
- data: this.data.join('\n'),
- raw: this.chunks,
- };
- this.event = null;
- this.data = [];
- this.chunks = [];
- return sse;
- }
- this.chunks.push(line);
- if (line.startsWith(':')) {
- return null;
- }
- let [fieldname, _, value] = partition(line, ':');
- if (value.startsWith(' ')) {
- value = value.substring(1);
- }
- if (fieldname === 'event') {
- this.event = value;
- }
- else if (fieldname === 'data') {
- this.data.push(value);
- }
- return null;
- }
- }
- function partition(str, delimiter) {
- const index = str.indexOf(delimiter);
- if (index !== -1) {
- return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)];
- }
- return [str, '', ''];
- }
- //# sourceMappingURL=streaming.mjs.map
|