123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- import { EventSourceParserStream } from 'eventsource-parser/stream';
- import type { ParsedEvent } from 'eventsource-parser';
- type TextStreamUpdate = {
- done: boolean;
- value: string;
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- citations?: any;
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- error?: any;
- usage?: ResponseUsage;
- };
- type ResponseUsage = {
- /** Including images and tools if any */
- prompt_tokens: number;
- /** The tokens generated */
- completion_tokens: number;
- /** Sum of the above two fields */
- total_tokens: number;
- /** Any other fields that aren't part of the base OpenAI spec */
- [other: string]: unknown;
- };
- // createOpenAITextStream takes a responseBody with a SSE response,
- // and returns an async generator that emits delta updates with large deltas chunked into random sized chunks
- export async function createOpenAITextStream(
- responseBody: ReadableStream<Uint8Array>,
- splitLargeDeltas: boolean
- ): Promise<AsyncGenerator<TextStreamUpdate>> {
- const eventStream = responseBody
- .pipeThrough(new TextDecoderStream())
- .pipeThrough(new EventSourceParserStream())
- .getReader();
- let iterator = openAIStreamToIterator(eventStream);
- if (splitLargeDeltas) {
- iterator = streamLargeDeltasAsRandomChunks(iterator);
- }
- return iterator;
- }
- async function* openAIStreamToIterator(
- reader: ReadableStreamDefaultReader<ParsedEvent>
- ): AsyncGenerator<TextStreamUpdate> {
- while (true) {
- const { value, done } = await reader.read();
- if (done) {
- yield { done: true, value: '' };
- break;
- }
- if (!value) {
- continue;
- }
- const data = value.data;
- if (data.startsWith('[DONE]')) {
- yield { done: true, value: '' };
- break;
- }
- try {
- const parsedData = JSON.parse(data);
- console.log(parsedData);
- if (parsedData.error) {
- yield { done: true, value: '', error: parsedData.error };
- break;
- }
- if (parsedData.citations) {
- yield { done: false, value: '', citations: parsedData.citations };
- continue;
- }
- yield {
- done: false,
- value: parsedData.choices?.[0]?.delta?.content ?? '',
- usage: parsedData.usage
- };
- } catch (e) {
- console.error('Error extracting delta from SSE event:', e);
- }
- }
- }
- // streamLargeDeltasAsRandomChunks will chunk large deltas (length > 5) into random sized chunks between 1-3 characters
- // This is to simulate a more fluid streaming, even though some providers may send large chunks of text at once
- async function* streamLargeDeltasAsRandomChunks(
- iterator: AsyncGenerator<TextStreamUpdate>
- ): AsyncGenerator<TextStreamUpdate> {
- for await (const textStreamUpdate of iterator) {
- if (textStreamUpdate.done) {
- yield textStreamUpdate;
- return;
- }
- if (textStreamUpdate.citations) {
- yield textStreamUpdate;
- continue;
- }
- let content = textStreamUpdate.value;
- if (content.length < 5) {
- yield { done: false, value: content };
- continue;
- }
- while (content != '') {
- const chunkSize = Math.min(Math.floor(Math.random() * 3) + 1, content.length);
- const chunk = content.slice(0, chunkSize);
- yield { done: false, value: chunk };
- // Do not sleep if the tab is hidden
- // Timers are throttled to 1s in hidden tabs
- if (document?.visibilityState !== 'hidden') {
- await sleep(5);
- }
- content = content.slice(chunkSize);
- }
- }
- }
- const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
|