sse.rs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. use serde::{Deserialize, Serialize};
  2. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
  3. pub struct SseEvent {
  4. pub event: Option<String>,
  5. pub data: String,
  6. pub id: Option<String>,
  7. pub retry: Option<u64>,
  8. }
  9. #[derive(Debug, Clone, Default)]
  10. pub struct IncrementalSseParser {
  11. buffer: String,
  12. event_name: Option<String>,
  13. data_lines: Vec<String>,
  14. id: Option<String>,
  15. retry: Option<u64>,
  16. }
  17. impl IncrementalSseParser {
  18. #[must_use]
  19. pub fn new() -> Self {
  20. Self::default()
  21. }
  22. pub fn push_chunk(&mut self, chunk: &str) -> Vec<SseEvent> {
  23. self.buffer.push_str(chunk);
  24. let mut events = Vec::new();
  25. while let Some(index) = self.buffer.find('\n') {
  26. let mut line = self.buffer.drain(..=index).collect::<String>();
  27. if line.ends_with('\n') {
  28. line.pop();
  29. }
  30. if line.ends_with('\r') {
  31. line.pop();
  32. }
  33. self.process_line(&line, &mut events);
  34. }
  35. events
  36. }
  37. pub fn finish(&mut self) -> Vec<SseEvent> {
  38. let mut events = Vec::new();
  39. if !self.buffer.is_empty() {
  40. let line = std::mem::take(&mut self.buffer);
  41. self.process_line(line.trim_end_matches('\r'), &mut events);
  42. }
  43. if let Some(event) = self.take_event() {
  44. events.push(event);
  45. }
  46. events
  47. }
  48. fn process_line(&mut self, line: &str, events: &mut Vec<SseEvent>) {
  49. if line.is_empty() {
  50. if let Some(event) = self.take_event() {
  51. events.push(event);
  52. }
  53. return;
  54. }
  55. if line.starts_with(':') {
  56. return;
  57. }
  58. let (field, value) = line.split_once(':').map_or((line, ""), |(field, value)| {
  59. let trimmed = value.strip_prefix(' ').unwrap_or(value);
  60. (field, trimmed)
  61. });
  62. match field {
  63. "event" => self.event_name = Some(value.to_owned()),
  64. "data" => self.data_lines.push(value.to_owned()),
  65. "id" => self.id = Some(value.to_owned()),
  66. "retry" => self.retry = value.parse::<u64>().ok(),
  67. _ => {}
  68. }
  69. }
  70. fn take_event(&mut self) -> Option<SseEvent> {
  71. if self.data_lines.is_empty() && self.event_name.is_none() && self.id.is_none() && self.retry.is_none() {
  72. return None;
  73. }
  74. let data = self.data_lines.join("\n");
  75. self.data_lines.clear();
  76. Some(SseEvent {
  77. event: self.event_name.take(),
  78. data,
  79. id: self.id.take(),
  80. retry: self.retry.take(),
  81. })
  82. }
  83. }
  84. #[cfg(test)]
  85. mod tests {
  86. use super::{IncrementalSseParser, SseEvent};
  87. #[test]
  88. fn parses_streaming_events() {
  89. let mut parser = IncrementalSseParser::new();
  90. let first = parser.push_chunk("event: message\ndata: hel");
  91. assert!(first.is_empty());
  92. let second = parser.push_chunk("lo\n\nid: 1\ndata: world\n\n");
  93. assert_eq!(
  94. second,
  95. vec![
  96. SseEvent {
  97. event: Some(String::from("message")),
  98. data: String::from("hello"),
  99. id: None,
  100. retry: None,
  101. },
  102. SseEvent {
  103. event: None,
  104. data: String::from("world"),
  105. id: Some(String::from("1")),
  106. retry: None,
  107. },
  108. ]
  109. );
  110. }
  111. }