1
1
import { Observable } from '../../Observable' ;
2
2
import { Subscription } from '../../Subscription' ;
3
+ import { from } from '../../observable/from' ;
4
+ import { ObservableInput } from '../../types' ;
5
+
6
+ export function fromFetch < T > (
7
+ input : string | Request ,
8
+ init : RequestInit & {
9
+ selector : ( response : Response ) => ObservableInput < T >
10
+ }
11
+ ) : Observable < T > ;
12
+
13
+ export function fromFetch (
14
+ input : string | Request ,
15
+ init ?: RequestInit
16
+ ) : Observable < Response > ;
3
17
4
18
/**
5
19
* Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to
@@ -42,7 +56,36 @@ import { Subscription } from '../../Subscription';
42
56
* data$.subscribe({
43
57
* next: result => console.log(result),
44
58
* complete: () => console.log('done')
45
- * })
59
+ * });
60
+ * ```
61
+ *
62
+ * ### Use with Chunked Transfer Encoding
63
+ *
64
+ * With HTTP responses that use [chunked transfer encoding](https://tools.ietf.org/html/rfc7230#section-3.3.1),
65
+ * the promise returned by `fetch` will resolve as soon as the response's headers are
66
+ * received.
67
+ *
68
+ * That means the `fromFetch` observable will emit a `Response` - and will
69
+ * then complete - before the body is received. When one of the methods on the
70
+ * `Response` - like `text()` or `json()` - is called, the returned promise will not
71
+ * resolve until the entire body has been received. Unsubscribing from any observable
72
+ * that uses the promise as an observable input will not abort the request.
73
+ *
74
+ * To facilitate aborting the retrieval of responses that use chunked transfer encoding,
75
+ * a `selector` can be specified via the `init` parameter:
76
+ *
77
+ * ```ts
78
+ * import { of } from 'rxjs';
79
+ * import { fromFetch } from 'rxjs/fetch';
80
+ *
81
+ * const data$ = fromFetch('https://api.github.com/users?per_page=5', {
82
+ * selector: response => response.json()
83
+ * });
84
+ *
85
+ * data$.subscribe({
86
+ * next: result => console.log(result),
87
+ * complete: () => console.log('done')
88
+ * });
46
89
* ```
47
90
*
48
91
* @param input The resource you would like to fetch. Can be a url or a request object.
@@ -51,8 +94,14 @@ import { Subscription } from '../../Subscription';
51
94
* @returns An Observable, that when subscribed to performs an HTTP request using the native `fetch`
52
95
* function. The {@link Subscription} is tied to an `AbortController` for the the fetch.
53
96
*/
54
- export function fromFetch ( input : string | Request , init ?: RequestInit ) : Observable < Response > {
55
- return new Observable < Response > ( subscriber => {
97
+ export function fromFetch < T > (
98
+ input : string | Request ,
99
+ initWithSelector : RequestInit & {
100
+ selector ?: ( response : Response ) => ObservableInput < T >
101
+ } = { }
102
+ ) : Observable < Response | T > {
103
+ const { selector, ...init } = initWithSelector ;
104
+ return new Observable < Response | T > ( subscriber => {
56
105
const controller = new AbortController ( ) ;
57
106
const signal = controller . signal ;
58
107
let abortable = true ;
@@ -91,9 +140,26 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab
91
140
}
92
141
93
142
fetch ( input , perSubscriberInit ) . then ( response => {
94
- abortable = false ;
95
- subscriber . next ( response ) ;
96
- subscriber . complete ( ) ;
143
+ if ( selector ) {
144
+ subscription . add ( from ( selector ( response ) ) . subscribe (
145
+ value => subscriber . next ( value ) ,
146
+ err => {
147
+ abortable = false ;
148
+ if ( ! unsubscribed ) {
149
+ // Only forward the error if it wasn't an abort.
150
+ subscriber . error ( err ) ;
151
+ }
152
+ } ,
153
+ ( ) => {
154
+ abortable = false ;
155
+ subscriber . complete ( ) ;
156
+ }
157
+ ) ) ;
158
+ } else {
159
+ abortable = false ;
160
+ subscriber . next ( response ) ;
161
+ subscriber . complete ( ) ;
162
+ }
97
163
} ) . catch ( err => {
98
164
abortable = false ;
99
165
if ( ! unsubscribed ) {
0 commit comments