index.ts 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. type TextStreamUpdate = {
  2. done: boolean;
  3. value: string;
  4. };
  5. // createOpenAITextStream takes a ReadableStreamDefaultReader from an SSE response,
  6. // and returns an async generator that emits delta updates with large deltas chunked into random sized chunks
  7. export async function createOpenAITextStream(
  8. messageStream: ReadableStreamDefaultReader,
  9. splitLargeDeltas: boolean
  10. ): Promise<AsyncGenerator<TextStreamUpdate>> {
  11. let iterator = openAIStreamToIterator(messageStream);
  12. if (splitLargeDeltas) {
  13. iterator = streamLargeDeltasAsRandomChunks(iterator);
  14. }
  15. return iterator;
  16. }
  17. async function* openAIStreamToIterator(
  18. reader: ReadableStreamDefaultReader
  19. ): AsyncGenerator<TextStreamUpdate> {
  20. while (true) {
  21. const { value, done } = await reader.read();
  22. if (done) {
  23. yield { done: true, value: '' };
  24. break;
  25. }
  26. const lines = value.split('\n');
  27. for (let line of lines) {
  28. if (line.endsWith('\r')) {
  29. // Remove trailing \r
  30. line = line.slice(0, -1);
  31. }
  32. if (line !== '') {
  33. console.log(line);
  34. if (line === 'data: [DONE]') {
  35. yield { done: true, value: '' };
  36. } else if (line.startsWith(':')) {
  37. // Events starting with : are comments https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
  38. // OpenRouter sends heartbeats like ": OPENROUTER PROCESSING"
  39. continue;
  40. } else {
  41. try {
  42. const data = JSON.parse(line.replace(/^data: /, ''));
  43. console.log(data);
  44. yield { done: false, value: data.choices?.[0]?.delta?.content ?? '' };
  45. } catch (e) {
  46. console.error('Error extracting delta from SSE event:', e);
  47. }
  48. }
  49. }
  50. }
  51. }
  52. }
  53. // streamLargeDeltasAsRandomChunks will chunk large deltas (length > 5) into random sized chunks between 1-3 characters
  54. // This is to simulate a more fluid streaming, even though some providers may send large chunks of text at once
  55. async function* streamLargeDeltasAsRandomChunks(
  56. iterator: AsyncGenerator<TextStreamUpdate>
  57. ): AsyncGenerator<TextStreamUpdate> {
  58. for await (const textStreamUpdate of iterator) {
  59. if (textStreamUpdate.done) {
  60. yield textStreamUpdate;
  61. return;
  62. }
  63. let content = textStreamUpdate.value;
  64. if (content.length < 5) {
  65. yield { done: false, value: content };
  66. continue;
  67. }
  68. while (content != '') {
  69. const chunkSize = Math.min(Math.floor(Math.random() * 3) + 1, content.length);
  70. const chunk = content.slice(0, chunkSize);
  71. yield { done: false, value: chunk };
  72. // Do not sleep if the tab is hidden
  73. // Timers are throttled to 1s in hidden tabs
  74. if (document?.visibilityState !== 'hidden') {
  75. await sleep(5);
  76. }
  77. content = content.slice(chunkSize);
  78. }
  79. }
  80. }
  81. const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));