From 254b7df844a4056e50940a9c0226643aa82feea7 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Mon, 23 Sep 2024 04:56:13 -0700 Subject: [PATCH 1/5] Break stream on done event without enqueuing it --- lib/stream.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/stream.js b/lib/stream.js index 2f72e2c..6cb1c30 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -79,6 +79,10 @@ function createReadableStream({ url, fetch, options = {} }) { .pipeThrough(new EventSourceParserStream()); for await (const event of streamAsyncIterator(stream)) { + if (event.event === "done") { + break; + } + if (event.event === "error") { controller.error(new Error(event.data)); break; @@ -87,10 +91,6 @@ function createReadableStream({ url, fetch, options = {} }) { controller.enqueue( new ServerSentEvent(event.event, event.data, event.id) ); - - if (event.event === "done") { - break; - } } controller.close(); From 4430e6b90abe10b95a6423ad794a4aece65dec68 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Mon, 23 Sep 2024 04:56:35 -0700 Subject: [PATCH 2/5] Fix doc comment for fetch parameter --- lib/stream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/stream.js b/lib/stream.js index 6cb1c30..e74154e 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -104,7 +104,7 @@ function createReadableStream({ url, fetch, options = {} }) { * * @param {object} config * @param {string} config.url The URL to connect to. - * @param {typeof fetch} [config.fetch] The URL to connect to. + * @param {typeof fetch} [config.fetch] The fetch function. * @returns {ReadableStream} */ function createFileOutput({ url, fetch }) { From 139fb8945fdcf026b101b51ceef884f60cce5685 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Mon, 23 Sep 2024 04:58:28 -0700 Subject: [PATCH 3/5] Transform URLs in streaming responses when useFileOutput is enabled --- lib/stream.js | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/lib/stream.js b/lib/stream.js index e74154e..e865937 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -48,15 +48,18 @@ class ServerSentEvent { * @param {string} config.url The URL to connect to. * @param {typeof fetch} [config.fetch] The URL to connect to. * @param {object} [config.options] The EventSource options. + * @param {boolean} [config.options.useFileOutput] Whether to use the file output stream. * @returns {ReadableStream & AsyncIterable} */ function createReadableStream({ url, fetch, options = {} }) { + const { useFileOutput = false, headers = {}, ...initOptions } = options; + return new ReadableStream({ async start(controller) { const init = { - ...options, + ...initOptions, headers: { - ...options.headers, + ...headers, Accept: "text/event-stream", }, }; @@ -88,9 +91,15 @@ function createReadableStream({ url, fetch, options = {} }) { break; } - controller.enqueue( - new ServerSentEvent(event.event, event.data, event.id) - ); + let data = event.data; + if ( + useFileOutput && + typeof data === "string" && + (data.startsWith("https:") || data.startsWith("data:")) + ) { + data = createFileOutput({ data, fetch }); + } + controller.enqueue(new ServerSentEvent(event.event, data, event.id)); } controller.close(); From 6a2156d7efbffb0cdaddecac226f0e3d5be8cfd5 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Mon, 23 Sep 2024 05:00:27 -0700 Subject: [PATCH 4/5] Revert 'Break stream on done event without enqueuing it' --- lib/stream.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/stream.js b/lib/stream.js index e865937..3241ffe 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -82,10 +82,6 @@ function createReadableStream({ url, fetch, options = {} }) { .pipeThrough(new EventSourceParserStream()); for await (const event of streamAsyncIterator(stream)) { - if (event.event === "done") { - break; - } - if (event.event === "error") { controller.error(new Error(event.data)); break; @@ -100,6 +96,10 @@ function createReadableStream({ url, fetch, options = {} }) { data = createFileOutput({ data, fetch }); } controller.enqueue(new ServerSentEvent(event.event, data, event.id)); + + if (event.event === "done") { + break; + } } controller.close(); From 51d300b96b086a33ed3e3d41ba202eb5bd1f2430 Mon Sep 17 00:00:00 2001 From: Mattt Date: Wed, 25 Sep 2024 10:42:09 -0700 Subject: [PATCH 5/5] Default to useFileOutput = true for readable streams --- lib/stream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/stream.js b/lib/stream.js index 3241ffe..2c899bd 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -52,7 +52,7 @@ class ServerSentEvent { * @returns {ReadableStream & AsyncIterable} */ function createReadableStream({ url, fetch, options = {} }) { - const { useFileOutput = false, headers = {}, ...initOptions } = options; + const { useFileOutput = true, headers = {}, ...initOptions } = options; return new ReadableStream({ async start(controller) {