index.ts 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import { EventSourceParserStream } from 'eventsource-parser/stream';
  2. import type { ParsedEvent } from 'eventsource-parser';
  3. type TextStreamUpdate = {
  4. done: boolean;
  5. value: string;
  6. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  7. sources?: any;
  8. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  9. selectedModelId?: any;
  10. error?: any;
  11. usage?: ResponseUsage;
  12. };
  13. type ResponseUsage = {
  14. /** Including images and tools if any */
  15. prompt_tokens: number;
  16. /** The tokens generated */
  17. completion_tokens: number;
  18. /** Sum of the above two fields */
  19. total_tokens: number;
  20. /** Any other fields that aren't part of the base OpenAI spec */
  21. [other: string]: unknown;
  22. };
  23. // createOpenAITextStream takes a responseBody with a SSE response,
  24. // and returns an async generator that emits delta updates with large deltas chunked into random sized chunks
  25. export async function createOpenAITextStream(
  26. responseBody: ReadableStream<Uint8Array>,
  27. splitLargeDeltas: boolean
  28. ): Promise<AsyncGenerator<TextStreamUpdate>> {
  29. const eventStream = responseBody
  30. .pipeThrough(new TextDecoderStream())
  31. .pipeThrough(new EventSourceParserStream())
  32. .getReader();
  33. let iterator = openAIStreamToIterator(eventStream);
  34. if (splitLargeDeltas) {
  35. iterator = streamLargeDeltasAsRandomChunks(iterator);
  36. }
  37. return iterator;
  38. }
  39. async function* openAIStreamToIterator(
  40. reader: ReadableStreamDefaultReader<ParsedEvent>
  41. ): AsyncGenerator<TextStreamUpdate> {
  42. while (true) {
  43. const { value, done } = await reader.read();
  44. if (done) {
  45. yield { done: true, value: '' };
  46. break;
  47. }
  48. if (!value) {
  49. continue;
  50. }
  51. const data = value.data;
  52. if (data.startsWith('[DONE]')) {
  53. yield { done: true, value: '' };
  54. break;
  55. }
  56. try {
  57. const parsedData = JSON.parse(data);
  58. console.log(parsedData);
  59. if (parsedData.error) {
  60. yield { done: true, value: '', error: parsedData.error };
  61. break;
  62. }
  63. if (parsedData.sources) {
  64. yield { done: false, value: '', sources: parsedData.sources };
  65. continue;
  66. }
  67. if (parsedData.selected_model_id) {
  68. yield { done: false, value: '', selectedModelId: parsedData.selected_model_id };
  69. continue;
  70. }
  71. if (parsedData.usage) {
  72. yield { done: false, value: '', usage: parsedData.usage };
  73. continue;
  74. }
  75. yield {
  76. done: false,
  77. value: parsedData.choices?.[0]?.delta?.content ?? '',
  78. };
  79. } catch (e) {
  80. console.error('Error extracting delta from SSE event:', e);
  81. }
  82. }
  83. }
  84. // streamLargeDeltasAsRandomChunks will chunk large deltas (length > 5) into random sized chunks between 1-3 characters
  85. // This is to simulate a more fluid streaming, even though some providers may send large chunks of text at once
  86. async function* streamLargeDeltasAsRandomChunks(
  87. iterator: AsyncGenerator<TextStreamUpdate>
  88. ): AsyncGenerator<TextStreamUpdate> {
  89. for await (const textStreamUpdate of iterator) {
  90. if (textStreamUpdate.done) {
  91. yield textStreamUpdate;
  92. return;
  93. }
  94. if (textStreamUpdate.error) {
  95. yield textStreamUpdate;
  96. continue;
  97. }
  98. if (textStreamUpdate.sources) {
  99. yield textStreamUpdate;
  100. continue;
  101. }
  102. if (textStreamUpdate.selectedModelId) {
  103. yield textStreamUpdate;
  104. continue;
  105. }
  106. if (textStreamUpdate.usage) {
  107. yield textStreamUpdate;
  108. continue;
  109. }
  110. let content = textStreamUpdate.value;
  111. if (content.length < 5) {
  112. yield { done: false, value: content };
  113. continue;
  114. }
  115. while (content != '') {
  116. const chunkSize = Math.min(Math.floor(Math.random() * 3) + 1, content.length);
  117. const chunk = content.slice(0, chunkSize);
  118. yield { done: false, value: chunk };
  119. // Do not sleep if the tab is hidden
  120. // Timers are throttled to 1s in hidden tabs
  121. if (document?.visibilityState !== 'hidden') {
  122. await sleep(5);
  123. }
  124. content = content.slice(chunkSize);
  125. }
  126. }
  127. }
  128. const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));