streaming.mjs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. var _Stream_client;
  2. import { __classPrivateFieldGet, __classPrivateFieldSet } from "../internal/tslib.mjs";
  3. import { AnthropicError } from "./error.mjs";
  4. import { makeReadableStream } from "../internal/shims.mjs";
  5. import { findDoubleNewlineIndex, LineDecoder } from "../internal/decoders/line.mjs";
  6. import { ReadableStreamToAsyncIterable } from "../internal/shims.mjs";
  7. import { isAbortError } from "../internal/errors.mjs";
  8. import { safeJSON } from "../internal/utils/values.mjs";
  9. import { encodeUTF8 } from "../internal/utils/bytes.mjs";
  10. import { loggerFor } from "../internal/utils/log.mjs";
  11. import { APIError } from "./error.mjs";
  12. export class Stream {
  13. constructor(iterator, controller, client) {
  14. this.iterator = iterator;
  15. _Stream_client.set(this, void 0);
  16. this.controller = controller;
  17. __classPrivateFieldSet(this, _Stream_client, client, "f");
  18. }
  19. static fromSSEResponse(response, controller, client) {
  20. let consumed = false;
  21. const logger = client ? loggerFor(client) : console;
  22. async function* iterator() {
  23. if (consumed) {
  24. throw new AnthropicError('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
  25. }
  26. consumed = true;
  27. let done = false;
  28. try {
  29. for await (const sse of _iterSSEMessages(response, controller)) {
  30. if (sse.event === 'completion') {
  31. try {
  32. yield JSON.parse(sse.data);
  33. }
  34. catch (e) {
  35. logger.error(`Could not parse message into JSON:`, sse.data);
  36. logger.error(`From chunk:`, sse.raw);
  37. throw e;
  38. }
  39. }
  40. if (sse.event === 'message_start' ||
  41. sse.event === 'message_delta' ||
  42. sse.event === 'message_stop' ||
  43. sse.event === 'content_block_start' ||
  44. sse.event === 'content_block_delta' ||
  45. sse.event === 'content_block_stop') {
  46. try {
  47. yield JSON.parse(sse.data);
  48. }
  49. catch (e) {
  50. logger.error(`Could not parse message into JSON:`, sse.data);
  51. logger.error(`From chunk:`, sse.raw);
  52. throw e;
  53. }
  54. }
  55. if (sse.event === 'ping') {
  56. continue;
  57. }
  58. if (sse.event === 'error') {
  59. throw new APIError(undefined, safeJSON(sse.data) ?? sse.data, undefined, response.headers);
  60. }
  61. }
  62. done = true;
  63. }
  64. catch (e) {
  65. // If the user calls `stream.controller.abort()`, we should exit without throwing.
  66. if (isAbortError(e))
  67. return;
  68. throw e;
  69. }
  70. finally {
  71. // If the user `break`s, abort the ongoing request.
  72. if (!done)
  73. controller.abort();
  74. }
  75. }
  76. return new Stream(iterator, controller, client);
  77. }
  78. /**
  79. * Generates a Stream from a newline-separated ReadableStream
  80. * where each item is a JSON value.
  81. */
  82. static fromReadableStream(readableStream, controller, client) {
  83. let consumed = false;
  84. async function* iterLines() {
  85. const lineDecoder = new LineDecoder();
  86. const iter = ReadableStreamToAsyncIterable(readableStream);
  87. for await (const chunk of iter) {
  88. for (const line of lineDecoder.decode(chunk)) {
  89. yield line;
  90. }
  91. }
  92. for (const line of lineDecoder.flush()) {
  93. yield line;
  94. }
  95. }
  96. async function* iterator() {
  97. if (consumed) {
  98. throw new AnthropicError('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
  99. }
  100. consumed = true;
  101. let done = false;
  102. try {
  103. for await (const line of iterLines()) {
  104. if (done)
  105. continue;
  106. if (line)
  107. yield JSON.parse(line);
  108. }
  109. done = true;
  110. }
  111. catch (e) {
  112. // If the user calls `stream.controller.abort()`, we should exit without throwing.
  113. if (isAbortError(e))
  114. return;
  115. throw e;
  116. }
  117. finally {
  118. // If the user `break`s, abort the ongoing request.
  119. if (!done)
  120. controller.abort();
  121. }
  122. }
  123. return new Stream(iterator, controller, client);
  124. }
  125. [(_Stream_client = new WeakMap(), Symbol.asyncIterator)]() {
  126. return this.iterator();
  127. }
  128. /**
  129. * Splits the stream into two streams which can be
  130. * independently read from at different speeds.
  131. */
  132. tee() {
  133. const left = [];
  134. const right = [];
  135. const iterator = this.iterator();
  136. const teeIterator = (queue) => {
  137. return {
  138. next: () => {
  139. if (queue.length === 0) {
  140. const result = iterator.next();
  141. left.push(result);
  142. right.push(result);
  143. }
  144. return queue.shift();
  145. },
  146. };
  147. };
  148. return [
  149. new Stream(() => teeIterator(left), this.controller, __classPrivateFieldGet(this, _Stream_client, "f")),
  150. new Stream(() => teeIterator(right), this.controller, __classPrivateFieldGet(this, _Stream_client, "f")),
  151. ];
  152. }
  153. /**
  154. * Converts this stream to a newline-separated ReadableStream of
  155. * JSON stringified values in the stream
  156. * which can be turned back into a Stream with `Stream.fromReadableStream()`.
  157. */
  158. toReadableStream() {
  159. const self = this;
  160. let iter;
  161. return makeReadableStream({
  162. async start() {
  163. iter = self[Symbol.asyncIterator]();
  164. },
  165. async pull(ctrl) {
  166. try {
  167. const { value, done } = await iter.next();
  168. if (done)
  169. return ctrl.close();
  170. const bytes = encodeUTF8(JSON.stringify(value) + '\n');
  171. ctrl.enqueue(bytes);
  172. }
  173. catch (err) {
  174. ctrl.error(err);
  175. }
  176. },
  177. async cancel() {
  178. await iter.return?.();
  179. },
  180. });
  181. }
  182. }
  183. export async function* _iterSSEMessages(response, controller) {
  184. if (!response.body) {
  185. controller.abort();
  186. if (typeof globalThis.navigator !== 'undefined' &&
  187. globalThis.navigator.product === 'ReactNative') {
  188. throw new AnthropicError(`The default react-native fetch implementation does not support streaming. Please use expo/fetch: https://docs.expo.dev/versions/latest/sdk/expo/#expofetch-api`);
  189. }
  190. throw new AnthropicError(`Attempted to iterate over a response with no body`);
  191. }
  192. const sseDecoder = new SSEDecoder();
  193. const lineDecoder = new LineDecoder();
  194. const iter = ReadableStreamToAsyncIterable(response.body);
  195. for await (const sseChunk of iterSSEChunks(iter)) {
  196. for (const line of lineDecoder.decode(sseChunk)) {
  197. const sse = sseDecoder.decode(line);
  198. if (sse)
  199. yield sse;
  200. }
  201. }
  202. for (const line of lineDecoder.flush()) {
  203. const sse = sseDecoder.decode(line);
  204. if (sse)
  205. yield sse;
  206. }
  207. }
  208. /**
  209. * Given an async iterable iterator, iterates over it and yields full
  210. * SSE chunks, i.e. yields when a double new-line is encountered.
  211. */
  212. async function* iterSSEChunks(iterator) {
  213. let data = new Uint8Array();
  214. for await (const chunk of iterator) {
  215. if (chunk == null) {
  216. continue;
  217. }
  218. const binaryChunk = chunk instanceof ArrayBuffer ? new Uint8Array(chunk)
  219. : typeof chunk === 'string' ? encodeUTF8(chunk)
  220. : chunk;
  221. let newData = new Uint8Array(data.length + binaryChunk.length);
  222. newData.set(data);
  223. newData.set(binaryChunk, data.length);
  224. data = newData;
  225. let patternIndex;
  226. while ((patternIndex = findDoubleNewlineIndex(data)) !== -1) {
  227. yield data.slice(0, patternIndex);
  228. data = data.slice(patternIndex);
  229. }
  230. }
  231. if (data.length > 0) {
  232. yield data;
  233. }
  234. }
  235. class SSEDecoder {
  236. constructor() {
  237. this.event = null;
  238. this.data = [];
  239. this.chunks = [];
  240. }
  241. decode(line) {
  242. if (line.endsWith('\r')) {
  243. line = line.substring(0, line.length - 1);
  244. }
  245. if (!line) {
  246. // empty line and we didn't previously encounter any messages
  247. if (!this.event && !this.data.length)
  248. return null;
  249. const sse = {
  250. event: this.event,
  251. data: this.data.join('\n'),
  252. raw: this.chunks,
  253. };
  254. this.event = null;
  255. this.data = [];
  256. this.chunks = [];
  257. return sse;
  258. }
  259. this.chunks.push(line);
  260. if (line.startsWith(':')) {
  261. return null;
  262. }
  263. let [fieldname, _, value] = partition(line, ':');
  264. if (value.startsWith(' ')) {
  265. value = value.substring(1);
  266. }
  267. if (fieldname === 'event') {
  268. this.event = value;
  269. }
  270. else if (fieldname === 'data') {
  271. this.data.push(value);
  272. }
  273. return null;
  274. }
  275. }
  276. function partition(str, delimiter) {
  277. const index = str.indexOf(delimiter);
  278. if (index !== -1) {
  279. return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)];
  280. }
  281. return [str, '', ''];
  282. }
  283. //# sourceMappingURL=streaming.mjs.map