index.ts 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import { EventSourceParserStream } from 'eventsource-parser/stream';
  2. import type { ParsedEvent } from 'eventsource-parser';
  3. type TextStreamUpdate = {
  4. done: boolean;
  5. value: string;
  6. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  7. citations?: any;
  8. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  9. error?: any;
  10. usage?: ResponseUsage;
  11. };
  12. type ResponseUsage = {
  13. /** Including images and tools if any */
  14. prompt_tokens: number;
  15. /** The tokens generated */
  16. completion_tokens: number;
  17. /** Sum of the above two fields */
  18. total_tokens: number;
  19. /** Any other fields that aren't part of the base OpenAI spec */
  20. [other: string]: unknown;
  21. };
  22. // createOpenAITextStream takes a responseBody with a SSE response,
  23. // and returns an async generator that emits delta updates with large deltas chunked into random sized chunks
  24. export async function createOpenAITextStream(
  25. responseBody: ReadableStream<Uint8Array>,
  26. splitLargeDeltas: boolean
  27. ): Promise<AsyncGenerator<TextStreamUpdate>> {
  28. const eventStream = responseBody
  29. .pipeThrough(new TextDecoderStream())
  30. .pipeThrough(new EventSourceParserStream())
  31. .getReader();
  32. let iterator = openAIStreamToIterator(eventStream);
  33. if (splitLargeDeltas) {
  34. iterator = streamLargeDeltasAsRandomChunks(iterator);
  35. }
  36. return iterator;
  37. }
  38. async function* openAIStreamToIterator(
  39. reader: ReadableStreamDefaultReader<ParsedEvent>
  40. ): AsyncGenerator<TextStreamUpdate> {
  41. while (true) {
  42. const { value, done } = await reader.read();
  43. if (done) {
  44. yield { done: true, value: '' };
  45. break;
  46. }
  47. if (!value) {
  48. continue;
  49. }
  50. const data = value.data;
  51. if (data.startsWith('[DONE]')) {
  52. yield { done: true, value: '' };
  53. break;
  54. }
  55. try {
  56. const parsedData = JSON.parse(data);
  57. console.log(parsedData);
  58. if (parsedData.error) {
  59. yield { done: true, value: '', error: parsedData.error };
  60. break;
  61. }
  62. if (parsedData.citations) {
  63. yield { done: false, value: '', citations: parsedData.citations };
  64. continue;
  65. }
  66. yield {
  67. done: false,
  68. value: parsedData.choices?.[0]?.delta?.content ?? '',
  69. usage: parsedData.usage
  70. };
  71. } catch (e) {
  72. console.error('Error extracting delta from SSE event:', e);
  73. }
  74. }
  75. }
  76. // streamLargeDeltasAsRandomChunks will chunk large deltas (length > 5) into random sized chunks between 1-3 characters
  77. // This is to simulate a more fluid streaming, even though some providers may send large chunks of text at once
  78. async function* streamLargeDeltasAsRandomChunks(
  79. iterator: AsyncGenerator<TextStreamUpdate>
  80. ): AsyncGenerator<TextStreamUpdate> {
  81. for await (const textStreamUpdate of iterator) {
  82. if (textStreamUpdate.done) {
  83. yield textStreamUpdate;
  84. return;
  85. }
  86. if (textStreamUpdate.citations) {
  87. yield textStreamUpdate;
  88. continue;
  89. }
  90. let content = textStreamUpdate.value;
  91. if (content.length < 5) {
  92. yield { done: false, value: content };
  93. continue;
  94. }
  95. while (content != '') {
  96. const chunkSize = Math.min(Math.floor(Math.random() * 3) + 1, content.length);
  97. const chunk = content.slice(0, chunkSize);
  98. yield { done: false, value: chunk };
  99. // Do not sleep if the tab is hidden
  100. // Timers are throttled to 1s in hidden tabs
  101. if (document?.visibilityState !== 'hidden') {
  102. await sleep(5);
  103. }
  104. content = content.slice(chunkSize);
  105. }
  106. }
  107. }
  108. const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));