index.ts 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import { EventSourceParserStream } from 'eventsource-parser/stream';
  2. import type { ParsedEvent } from 'eventsource-parser';
  3. type TextStreamUpdate = {
  4. done: boolean;
  5. value: string;
  6. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  7. citations?: any;
  8. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  9. selectedModelId?: any;
  10. error?: any;
  11. usage?: ResponseUsage;
  12. };
  13. type ResponseUsage = {
  14. /** Including images and tools if any */
  15. prompt_tokens: number;
  16. /** The tokens generated */
  17. completion_tokens: number;
  18. /** Sum of the above two fields */
  19. total_tokens: number;
  20. /** Any other fields that aren't part of the base OpenAI spec */
  21. [other: string]: unknown;
  22. };
  23. // createOpenAITextStream takes a responseBody with a SSE response,
  24. // and returns an async generator that emits delta updates with large deltas chunked into random sized chunks
  25. export async function createOpenAITextStream(
  26. responseBody: ReadableStream<Uint8Array>,
  27. splitLargeDeltas: boolean
  28. ): Promise<AsyncGenerator<TextStreamUpdate>> {
  29. const eventStream = responseBody
  30. .pipeThrough(new TextDecoderStream())
  31. .pipeThrough(new EventSourceParserStream())
  32. .getReader();
  33. let iterator = openAIStreamToIterator(eventStream);
  34. if (splitLargeDeltas) {
  35. iterator = streamLargeDeltasAsRandomChunks(iterator);
  36. }
  37. return iterator;
  38. }
  39. async function* openAIStreamToIterator(
  40. reader: ReadableStreamDefaultReader<ParsedEvent>
  41. ): AsyncGenerator<TextStreamUpdate> {
  42. while (true) {
  43. const { value, done } = await reader.read();
  44. if (done) {
  45. yield { done: true, value: '' };
  46. break;
  47. }
  48. if (!value) {
  49. continue;
  50. }
  51. const data = value.data;
  52. if (data.startsWith('[DONE]')) {
  53. yield { done: true, value: '' };
  54. break;
  55. }
  56. try {
  57. const parsedData = JSON.parse(data);
  58. console.log(parsedData);
  59. if (parsedData.error) {
  60. yield { done: true, value: '', error: parsedData.error };
  61. break;
  62. }
  63. if (parsedData.citations) {
  64. yield { done: false, value: '', citations: parsedData.citations };
  65. continue;
  66. }
  67. if (parsedData.selected_model_id) {
  68. yield { done: false, value: '', selectedModelId: parsedData.selected_model_id };
  69. continue;
  70. }
  71. yield {
  72. done: false,
  73. value: parsedData.choices?.[0]?.delta?.content ?? '',
  74. usage: parsedData.usage
  75. };
  76. } catch (e) {
  77. console.error('Error extracting delta from SSE event:', e);
  78. }
  79. }
  80. }
  81. // streamLargeDeltasAsRandomChunks will chunk large deltas (length > 5) into random sized chunks between 1-3 characters
  82. // This is to simulate a more fluid streaming, even though some providers may send large chunks of text at once
  83. async function* streamLargeDeltasAsRandomChunks(
  84. iterator: AsyncGenerator<TextStreamUpdate>
  85. ): AsyncGenerator<TextStreamUpdate> {
  86. for await (const textStreamUpdate of iterator) {
  87. if (textStreamUpdate.done) {
  88. yield textStreamUpdate;
  89. return;
  90. }
  91. if (textStreamUpdate.citations) {
  92. yield textStreamUpdate;
  93. continue;
  94. }
  95. let content = textStreamUpdate.value;
  96. if (content.length < 5) {
  97. yield { done: false, value: content };
  98. continue;
  99. }
  100. while (content != '') {
  101. const chunkSize = Math.min(Math.floor(Math.random() * 3) + 1, content.length);
  102. const chunk = content.slice(0, chunkSize);
  103. yield { done: false, value: chunk };
  104. // Do not sleep if the tab is hidden
  105. // Timers are throttled to 1s in hidden tabs
  106. if (document?.visibilityState !== 'hidden') {
  107. await sleep(5);
  108. }
  109. content = content.slice(chunkSize);
  110. }
  111. }
  112. }
  113. const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));