| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- use crate::error::ApiError;
- use crate::types::StreamEvent;
- #[derive(Debug, Default)]
- pub struct SseParser {
- buffer: Vec<u8>,
- }
- impl SseParser {
- #[must_use]
- pub fn new() -> Self {
- Self::default()
- }
- pub fn push(&mut self, chunk: &[u8]) -> Result<Vec<StreamEvent>, ApiError> {
- self.buffer.extend_from_slice(chunk);
- let mut events = Vec::new();
- while let Some(frame) = self.next_frame() {
- if let Some(event) = parse_frame(&frame)? {
- events.push(event);
- }
- }
- Ok(events)
- }
- pub fn finish(&mut self) -> Result<Vec<StreamEvent>, ApiError> {
- if self.buffer.is_empty() {
- return Ok(Vec::new());
- }
- let trailing = std::mem::take(&mut self.buffer);
- match parse_frame(&String::from_utf8_lossy(&trailing))? {
- Some(event) => Ok(vec![event]),
- None => Ok(Vec::new()),
- }
- }
- fn next_frame(&mut self) -> Option<String> {
- let separator = self
- .buffer
- .windows(2)
- .position(|window| window == b"\n\n")
- .map(|position| (position, 2))
- .or_else(|| {
- self.buffer
- .windows(4)
- .position(|window| window == b"\r\n\r\n")
- .map(|position| (position, 4))
- })?;
- let (position, separator_len) = separator;
- let frame = self
- .buffer
- .drain(..position + separator_len)
- .collect::<Vec<_>>();
- let frame_len = frame.len().saturating_sub(separator_len);
- Some(String::from_utf8_lossy(&frame[..frame_len]).into_owned())
- }
- }
- pub fn parse_frame(frame: &str) -> Result<Option<StreamEvent>, ApiError> {
- let trimmed = frame.trim();
- if trimmed.is_empty() {
- return Ok(None);
- }
- let mut data_lines = Vec::new();
- let mut event_name: Option<&str> = None;
- for line in trimmed.lines() {
- if line.starts_with(':') {
- continue;
- }
- if let Some(name) = line.strip_prefix("event:") {
- event_name = Some(name.trim());
- continue;
- }
- if let Some(data) = line.strip_prefix("data:") {
- data_lines.push(data.trim_start());
- }
- }
- if matches!(event_name, Some("ping")) {
- return Ok(None);
- }
- if data_lines.is_empty() {
- return Ok(None);
- }
- let payload = data_lines.join("\n");
- if payload == "[DONE]" {
- return Ok(None);
- }
- serde_json::from_str::<StreamEvent>(&payload)
- .map(Some)
- .map_err(ApiError::from)
- }
- #[cfg(test)]
- mod tests {
- use super::{parse_frame, SseParser};
- use crate::types::{ContentBlockDelta, MessageDelta, OutputContentBlock, StreamEvent, Usage};
- #[test]
- fn parses_single_frame() {
- let frame = concat!(
- "event: content_block_start\n",
- "data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"Hi\"}}\n\n"
- );
- let event = parse_frame(frame).expect("frame should parse");
- assert_eq!(
- event,
- Some(StreamEvent::ContentBlockStart(
- crate::types::ContentBlockStartEvent {
- index: 0,
- content_block: OutputContentBlock::Text {
- text: "Hi".to_string(),
- },
- },
- ))
- );
- }
- #[test]
- fn parses_chunked_stream() {
- let mut parser = SseParser::new();
- let first = b"event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"Hel";
- let second = b"lo\"}}\n\n";
- assert!(parser
- .push(first)
- .expect("first chunk should buffer")
- .is_empty());
- let events = parser.push(second).expect("second chunk should parse");
- assert_eq!(
- events,
- vec![StreamEvent::ContentBlockDelta(
- crate::types::ContentBlockDeltaEvent {
- index: 0,
- delta: ContentBlockDelta::TextDelta {
- text: "Hello".to_string(),
- },
- }
- )]
- );
- }
- #[test]
- fn ignores_ping_and_done() {
- let mut parser = SseParser::new();
- let payload = concat!(
- ": keepalive\n",
- "event: ping\n",
- "data: {\"type\":\"ping\"}\n\n",
- "event: message_delta\n",
- "data: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"tool_use\",\"stop_sequence\":null},\"usage\":{\"input_tokens\":1,\"output_tokens\":2}}\n\n",
- "event: message_stop\n",
- "data: {\"type\":\"message_stop\"}\n\n",
- "data: [DONE]\n\n"
- );
- let events = parser
- .push(payload.as_bytes())
- .expect("parser should succeed");
- assert_eq!(
- events,
- vec![
- StreamEvent::MessageDelta(crate::types::MessageDeltaEvent {
- delta: MessageDelta {
- stop_reason: Some("tool_use".to_string()),
- stop_sequence: None,
- },
- usage: Usage {
- input_tokens: 1,
- cache_creation_input_tokens: 0,
- cache_read_input_tokens: 0,
- output_tokens: 2,
- },
- }),
- StreamEvent::MessageStop(crate::types::MessageStopEvent {}),
- ]
- );
- }
- #[test]
- fn ignores_data_less_event_frames() {
- let frame = "event: ping\n\n";
- let event = parse_frame(frame).expect("frame without data should be ignored");
- assert_eq!(event, None);
- }
- #[test]
- fn parses_split_json_across_data_lines() {
- let frame = concat!(
- "event: content_block_delta\n",
- "data: {\"type\":\"content_block_delta\",\"index\":0,\n",
- "data: \"delta\":{\"type\":\"text_delta\",\"text\":\"Hello\"}}\n\n"
- );
- let event = parse_frame(frame).expect("frame should parse");
- assert_eq!(
- event,
- Some(StreamEvent::ContentBlockDelta(
- crate::types::ContentBlockDeltaEvent {
- index: 0,
- delta: ContentBlockDelta::TextDelta {
- text: "Hello".to_string(),
- },
- }
- ))
- );
- }
- #[test]
- fn parses_thinking_content_block_start() {
- let frame = concat!(
- "event: content_block_start\n",
- "data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"thinking\",\"thinking\":\"\",\"signature\":null}}\n\n"
- );
- let event = parse_frame(frame).expect("frame should parse");
- assert_eq!(
- event,
- Some(StreamEvent::ContentBlockStart(
- crate::types::ContentBlockStartEvent {
- index: 0,
- content_block: OutputContentBlock::Thinking {
- thinking: String::new(),
- signature: None,
- },
- },
- ))
- );
- }
- #[test]
- fn parses_thinking_related_deltas() {
- let thinking = concat!(
- "event: content_block_delta\n",
- "data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"thinking_delta\",\"thinking\":\"step 1\"}}\n\n"
- );
- let signature = concat!(
- "event: content_block_delta\n",
- "data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"signature_delta\",\"signature\":\"sig_123\"}}\n\n"
- );
- let thinking_event = parse_frame(thinking).expect("thinking delta should parse");
- let signature_event = parse_frame(signature).expect("signature delta should parse");
- assert_eq!(
- thinking_event,
- Some(StreamEvent::ContentBlockDelta(
- crate::types::ContentBlockDeltaEvent {
- index: 0,
- delta: ContentBlockDelta::ThinkingDelta {
- thinking: "step 1".to_string(),
- },
- }
- ))
- );
- assert_eq!(
- signature_event,
- Some(StreamEvent::ContentBlockDelta(
- crate::types::ContentBlockDeltaEvent {
- index: 0,
- delta: ContentBlockDelta::SignatureDelta {
- signature: "sig_123".to_string(),
- },
- }
- ))
- );
- }
- }
|