1. Introduction
This section is non-normative.
Large swathes of the web platform are built on streaming data: that is, data that is created, processed, and consumed in an incremental fashion, without ever reading all of it into memory. The Streams Standard provides a common set of APIs for creating and interfacing with such streaming data, embodied in readable streams , writable streams , and transform streams .
These APIs have been designed to efficiently map to low-level I/O primitives, including specializations for byte streams where appropriate. They allow easy composition of multiple streams into pipe chains , or can be used directly via readers and writers . Finally, they are designed to automatically provide backpressure and queuing.
This
standard
provides
the
base
stream
primitives
which
other
parts
of
the
web
platform
can
use
to
expose
their
streaming
data.
For
example,
[FETCH]
exposes
Response
bodies
as
ReadableStream
instances.
More
generally,
the
platform
is
full
of
streaming
abstractions
waiting
to
be
expressed
as
streams:
multimedia
streams,
file
streams,
inter-global
communication,
and
more
benefit
from
being
able
to
process
data
incrementally
instead
of
buffering
it
all
into
memory
and
processing
it
in
one
go.
By
providing
the
foundation
for
these
streams
to
be
exposed
to
developers,
the
Streams
Standard
enables
use
cases
like:
- Video effects: piping a readable video stream through a transform stream that applies effects in real time.
-
Decompression:
piping
a
file
stream
through
a
transform
stream
that
selectively
decompresses
files
from
a
.tgz
archive,
turning
them
into
img
elements as the user scrolls through an image gallery. -
Image
decoding:
piping
an
HTTP
response
stream
through
a
transform
stream
that
decodes
bytes
into
bitmap
data,
and
then
through
another
transform
that
translates
bitmaps
into
PNGs.
If
installed
inside
the
fetch
hook of a service worker, this would allow developers to transparently polyfill new image formats. [SERVICE-WORKERS]
Web developers can also use the APIs described here to create their own streams, with the same APIs as those provided by the platform. Other developers can then transparently compose platform-provided streams with those supplied by libraries. In this way, the APIs described here provide unifying abstraction for all streams, encouraging an ecosystem to grow around these shared and composable interfaces.
2. Model
A
chunk
is
a
single
piece
of
data
that
is
written
to
or
read
from
a
stream.
It
can
be
of
any
type;
streams
can
even
contain
chunks
of
different
types.
A
chunk
will
often
not
be
the
most
atomic
unit
of
data
for
a
given
stream;
for
example
a
byte
stream
might
contain
chunks
consisting
of
16
KiB
Uint8Array
s,
instead
of
single
bytes.
2.1. Readable Streams
A
readable
stream
represents
a
source
of
data,
from
which
you
can
read.
In
other
words,
data
comes
out
of
a
readable
stream.
Concretely,
a
readable
stream
is
an
instance
of
the
ReadableStream
class.
Although a readable stream can be created with arbitrary behavior, most readable streams wrap a lower-level I/O source, called the underlying source . There are two types of underlying source: push sources and pull sources.
Push sources push data at you, whether or not you are listening for it. They may also provide a mechanism for pausing and resuming the flow of data. An example push source is a TCP socket, where data is constantly being pushed from the OS level, at a rate that can be controlled by changing the TCP window size.
Pull sources require you to request data from them. The data may be available synchronously, e.g. if it is held by the operating system’s in-memory buffers, or asynchronously, e.g. if it has to be read from disk. An example pull source is a file handle, where you seek to specific locations and read specific amounts.
Readable
streams
are
designed
to
wrap
both
types
of
sources
behind
a
single,
unified
interface.
The
implementation
details
of
a
source
are
provided
by
an
object
with
certain
methods
and
properties.
It
is
passed
to
the
ReadableStream()
constructor.
Chunks
are
enqueued
into
the
stream
by
the
stream’s
underlying
source
.
They
can
then
be
read
one
at
a
time
via
the
stream’s
public
interface,
in
particular
by
using
a
readable
stream
reader
acquired
using
the
stream’s
getReader()
method.
Code that reads from a readable stream using its public interface is known as a consumer .
Consumers
also
have
the
ability
to
cancel
a
readable
stream,
using
its
cancel()
method.
This
indicates
that
the
consumer
has
lost
interest
in
the
stream,
and
will
immediately
close
the
stream,
throw
away
any
queued
chunks
,
and
execute
any
cancellation
mechanism
of
the
underlying
source
.
Consumers
can
also
tee
a
readable
stream
using
its
tee()
method.
This
will
lock
the
stream,
making
it
no
longer
directly
usable;
however,
it
will
create
two
new
streams,
called
branches
,
which
can
be
consumed
independently.
For
streams
representing
bytes,
an
extended
version
of
the
readable
stream
is
provided
to
handle
bytes
efficiently,
in
particular
by
minimizing
copies.
The
underlying
source
for
such
a
readable
stream
is
called
an
underlying
byte
source
.
A
readable
stream
whose
underlying
source
is
an
underlying
byte
source
is
sometimes
called
a
readable
byte
stream
.
Consumers
of
a
readable
byte
stream
can
acquire
a
BYOB
reader
using
the
stream’s
getReader()
method.
2.2. Writable Streams
A
writable
stream
represents
a
destination
for
data,
into
which
you
can
write.
In
other
words,
data
goes
in
to
a
writable
stream.
Concretely,
a
writable
stream
is
an
instance
of
the
WritableStream
class.
Analogously to readable streams, most writable streams wrap a lower-level I/O sink, called the underlying sink . Writable streams work to abstract away some of the complexity of the underlying sink, by queuing subsequent writes and only delivering them to the underlying sink one by one.
Chunks
are
written
to
the
stream
via
its
public
interface,
and
are
passed
one
at
a
time
to
the
stream’s
underlying
sink
.
The
implementation
details
of
the
sink
are
provided
by
an
object
with
certain
methods
that
is
passed
to
the
WritableStream()
constructor.
Code that writes into a writable stream using its public interface is known as a producer .
Producers
also
have
the
ability
to
abort
a
writable
stream,
using
its
abort()
method.
This
indicates
that
the
producer
believes
something
has
gone
wrong,
and
that
future
writes
should
be
discontinued.
It
puts
the
stream
in
an
errored
state,
even
without
a
signal
from
the
underlying
sink
.
2.3. Transform Streams
A
transform
stream
consists
of
a
pair
of
streams:
a
writable
stream,
stream
,
known
as
its
writable
side
,
and
a
readable
stream.
stream
,
known
as
its
readable
side
.
In
a
manner
specific
to
the
transform
stream
in
question,
writes
to
the
writable
side
result
in
new
data
being
made
available
for
reading
from
the
readable
side.
Concretely,
any
object
with
a
writable
property
and
a
readable
property
can
serve
as
a
transform
stream.
However,
we
plan
to
provide
a
the
standard
TransformStream
class
which
makes
it
much
easier
to
create
such
a
pair
that
is
properly
entangled.
It
wraps
a
transformer
object
which
defines
the
specific
transformation
to
be
performed.
An
identity
transform
stream
is
a
type
of
transform
stream
which
forwards
all
chunks
written
to
its
writable
side
to
its
readable
side
,
without
any
changes.
This
can
be
useful
in
a
variety
of
scenarios
.
By
default,
the
TransformStream
constructor
will
create
an
identity
transform
stream,
when
no
transform
method
is
present
on
the
transformer
object.
Some examples of potential transform streams include:
- A GZIP compressor, to which uncompressed bytes are written and from which compressed bytes are read;
- A video decoder, to which encoded bytes are written and from which uncompressed video frames are read;
- A text decoder, to which bytes are written and from which strings are read;
- A CSV-to-JSON converter, to which strings representing lines of a CSV file are written and from which corresponding JavaScript objects are read.
2.4. Pipe Chains and Backpressure
Streams
are
primarily
used
by
piping
them
to
each
other.
A
readable
stream
can
be
piped
directly
to
a
writable
stream,
using
its
pipeTo()
method,
or
it
can
be
piped
through
one
or
more
transform
streams
first,
using
its
pipeThrough()
method.
A set of streams piped together in this way is referred to as a pipe chain . In a pipe chain, the original source is the underlying source of the first readable stream in the chain; the ultimate sink is the underlying sink of the final writable stream in the chain.
Once a pipe chain is constructed, it will propagate signals regarding how fast chunks should flow through it. If any step in the chain cannot yet accept chunks, it propagates a signal backwards through the pipe chain, until eventually the original source is told to stop producing chunks so fast. This process of normalizing flow from the original source according to how fast the chain can process chunks is called backpressure .
Concretely,
the
original
source
is
given
the
controller.desiredSize
(or
byteController.desiredSize
)
value,
and
can
then
adjust
its
rate
of
data
flow
accordingly.
This
value
is
derived
from
the
writer.desiredSize
corresponding
to
the
ultimate
sink
,
which
gets
updated
as
the
ultimate
sink
finishes
writing
chunks
.
The
pipeTo()
method
used
to
construct
the
chain
automatically
ensures
this
information
propagates
back
through
the
pipe
chain
.
When teeing a readable stream, the backpressure signals from its two branches will aggregate, such that if neither branch is read from, a backpressure signal will be sent to the underlying source of the original stream.
Piping locks the readable and writable streams, preventing them from being manipulated for the duration of the pipe operation. This allows the implementation to perform important optimizations, such as directly shuttling data from the underlying source to the underlying sink while bypassing many of the intermediate queues.
2.5. Internal Queues and Queuing Strategies
Both readable and writable streams maintain internal queues , which they use for similar purposes. In the case of a readable stream, the internal queue contains chunks that have been enqueued by the underlying source , but not yet read by the consumer. In the case of a writable stream, the internal queue contains chunks which have been written to the stream by the producer, but not yet processed and acknowledged by the underlying sink .
A queuing strategy is an object that determines how a stream should signal backpressure based on the state of its internal queue . The queuing strategy assigns a size to each chunk , and compares the total size of all chunks in the queue to a specified number, known as the high water mark . The resulting difference, high water mark minus total size, is used to determine the desired size to fill the stream’s queue .
For readable streams, an underlying source can use this desired size as a backpressure signal, slowing down chunk generation so as to try to keep the desired size above or at zero. For writable streams, a producer can behave similarly, avoiding writes that would cause the desired size to go negative.
Concretely,
a
queueing
strategy
is
given
by
any
JavaScript
object
with
a
highWaterMark
property
and
a
size()
method.
In
JavaScript,
such
a
strategy
could
be
written
manually
as
{
highWaterMark:
3,
size()
{
return
1;
}}
,
or
using
the
built-in
CountQueuingStrategy
class,
as
new
CountQueuingStrategy({
highWaterMark:
3
})
.
2.6. Locking
A
readable
stream
reader
,
or
simply
reader,
is
an
object
that
allows
direct
reading
of
chunks
from
a
readable
stream
.
Without
a
reader,
a
consumer
can
only
perform
high-level
operations
on
the
readable
stream:
canceling
the
stream,
or
piping
the
readable
stream
to
a
writable
stream.
A
reader
is
acquired
via
the
stream’s
getReader()
method.
A
readable
byte
stream
has
the
ability
to
vend
two
types
of
readers:
default
readers
and
BYOB
readers
.
BYOB
("bring
your
own
buffer")
readers
allow
reading
into
a
developer-supplied
buffer,
thus
minimizing
copies.
A
non-byte
readable
stream
can
only
vend
default
readers.
Default
readers
are
instances
of
the
ReadableStreamDefaultReader
class,
while
BYOB
readers
are
instances
of
ReadableStreamBYOBReader
.
Similarly,
a
writable
stream
writer
,
or
simply
writer,
is
an
object
that
allows
direct
writing
of
chunks
to
a
writable
stream
.
Without
a
writer,
a
producer
can
only
perform
the
high-level
operations
of
aborting
the
stream
or
piping
a
readable
stream
to
the
writable
stream.
Writers
are
represented
by
the
WritableStreamDefaultWriter
class.
Under the covers, these high-level operations actually use a reader or writer themselves.
A
given
readable
or
writable
stream
only
has
at
most
one
reader
or
writer
at
a
time.
We
say
in
this
case
the
stream
is
locked
,
and
that
the
reader
or
writer
is
active
.
This
state
can
be
determined
using
the
readableStream.locked
or
writableStream.locked
properties.
A
reader
or
writer
also
has
the
capability
to
release
its
lock
,
which
makes
it
no
longer
active,
and
allows
further
readers
or
writers
to
be
acquired.
This
is
done
via
the
defaultReader.releaseLock()
,
byobReader.releaseLock()
,
or
writer.releaseLock()
method,
as
appropriate.
3. Readable Streams
3.1. Using Readable Streams
readableStream.pipeTo(writableStream)
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
readableStream.pipeTo(new WritableStream({
write(chunk) {
console.log("Chunk received", chunk);
},
close() {
console.log("All data successfully read!");
},
abort(e) {
console.error("Something went wrong!", e);
}
}));
By
returning
promises
from
your
write
implementation,
you
can
signal
backpressure
to
the
readable
stream.
read()
method
to
get
successive
chunks.
For
example,
this
code
logs
the
next
chunk
in
the
stream,
if
available:
const reader = readableStream.getReader();
reader.read().then(
({ value, done }) => {
if (done) {
console.log("The stream was already closed!");
} else {
console.log(value);
}
},
e => console.error("The stream became errored and cannot be read from!", e)
);
This more manual method of reading a stream is mainly useful for library authors building new high-level operations on streams, beyond the provided ones of piping and teeing .
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1024);
readInto(startingAB)
.then(buffer => console.log("The first 1024 bytes:", buffer))
.catch(e => console.error("Something went wrong!", e));
function readInto(buffer, offset = 0) {
if (offset === buffer.byteLength) {
return Promise.resolve(buffer);
}
const view = new Uint8Array(buffer, offset, buffer.byteLength - offset);
return reader.read(view).then(newView => {
return readInto(newView.buffer, offset + newView.byteLength);
});
}
An
important
thing
to
note
here
is
that
the
final
buffer
value
is
different
from
the
startingAB
,
but
it
(and
all
intermediate
buffers)
shares
the
same
backing
memory
allocation.
At
each
step,
the
buffer
is
transferred
to
a
new
ArrayBuffer
object.
The
newView
is
a
new
Uint8Array
,
with
that
ArrayBuffer
object
as
its
buffer
property,
the
offset
that
bytes
were
written
to
as
its
byteOffset
property,
and
the
number
of
bytes
that
were
written
as
its
byteLength
property.
3.2.
Class
ReadableStream
The
ReadableStream
class
is
a
concrete
instance
of
the
general
readable
stream
concept.
It
is
adaptable
to
any
chunk
type,
and
maintains
an
internal
queue
to
keep
track
of
data
supplied
by
the
underlying
source
but
not
yet
read
by
any
consumer.
3.2.1. Class Definition
This section is non-normative.
If
one
were
to
write
the
ReadableStream
class
in
something
close
to
the
syntax
of
[ECMASCRIPT]
,
it
would
look
like
class ReadableStream {
constructor(underlyingSource = {}, { size, highWaterMark } = {})
get locked()
cancel(reason)
getReader()
pipeThrough({ writable, readable }, options)
pipeTo(dest, { preventClose, preventAbort, preventCancel } = {})
tee()
}
3.2.2. Internal Slots
Instances
of
ReadableStream
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[disturbed]] |
A
boolean
flag
set
to
|
[[readableStreamController]] |
A
ReadableStreamDefaultController
or
ReadableByteStreamController
created
with
the
ability
to
control
the
state
and
queue
of
this
stream;
also
used
for
the
IsReadableStream
brand
check
|
[[reader]] |
A
ReadableStreamDefaultReader
or
ReadableStreamBYOBReader
instance,
if
the
stream
is
locked
to
a
reader
,
or
|
[[state]] |
A
string
containing
the
stream’s
current
state,
used
internally;
one
of
"readable"
,
"closed"
,
or
"errored"
|
[[storedError]] | A value indicating how the stream failed, to be given as a failure reason or exception when trying to operate on an errored stream |
3.2.3. new ReadableStream( underlyingSource = {}, { size , highWaterMark } = {})
underlyingSource
object
passed
to
the
constructor
can
implement
any
of
the
following
methods
to
govern
how
the
constructed
stream
instance
behaves:
-
start(controller)
is called immediately, and is typically used to adapt a push source by setting up relevant event listeners, or to acquire access to a pull source . If this process is asynchronous, it can return a promise to signal success or failure. -
pull(controller)
is called when the stream’s internal queue of chunks is not full, and will be called repeatedly until the queue reaches its high water mark . Ifpull
returns a promise, thenpull
will not be called again until that promise fulfills; if the promise rejects, the stream will become errored. -
cancel(reason)
is called when the consumer signals that they are no longer interested in the stream. It can perform any actions necessary to release access to the underlying source . If this process is asynchronous, it can return a promise to signal success or failure.
Both
start
and
pull
are
given
the
ability
to
manipulate
the
stream’s
internal
queue
and
state
via
the
passed
controller
object.
This
is
an
example
of
the
revealing
constructor
pattern
.
If
the
underlyingSource
object
contains
a
property
type
set
to
"bytes"
,
this
readable
stream
is
a
readable
byte
stream
,
and
can
successfully
vend
BYOB
readers
.
In
that
case,
the
passed
controller
object
will
be
an
instance
of
ReadableByteStreamController
.
Otherwise,
it
will
be
an
instance
of
ReadableStreamDefaultController
.
For
readable
byte
streams
,
underlyingSource
can
also
contain
a
property
autoAllocateChunkSize
,
which
can
be
set
to
a
positive
integer
to
enable
the
auto-allocation
feature
for
this
stream.
In
that
case,
when
a
consumer
uses
a
default
reader
,
the
stream
implementation
will
automatically
allocate
an
ArrayBuffer
of
the
given
size,
and
call
the
underlying
source
code
as
if
the
consumer
was
using
a
BYOB
reader
.
This
can
cut
down
on
the
amount
of
code
needed
when
writing
the
underlying
source
implementation,
as
can
be
seen
by
comparing
§8.3
A
readable
byte
stream
with
an
underlying
push
source
(no
backpressure
support)
without
auto-allocation
to
§8.5
A
readable
byte
stream
with
an
underlying
pull
source
with
auto-allocation.
The
constructor
also
accepts
a
second
argument
containing
the
queuing
strategy
object
with
two
properties:
a
non-negative
number
highWaterMark
,
and
a
function
size(chunk)
.
The
supplied
strategy
could
be
an
instance
of
the
built-in
CountQueuingStrategy
or
ByteLengthQueuingStrategy
classes,
or
it
could
be
custom.
If
no
strategy
is
supplied,
the
default
behavior
will
be
the
same
as
a
CountQueuingStrategy
with
a
high
water
mark
of
1.
-
Set
this .[[state]] to"readable"
. -
Set
this .[[reader]] andthis .[[storedError]] toundefined . -
Set
this .[[disturbed]] tofalse . -
Set
this .[[readableStreamController]] toundefined . -
Let
type
be
?
GetV ( underlyingSource ,"type"
). -
Let
typeString
be
?
ToString ( type ). -
If
typeString
is
"bytes"
,-
If
highWaterMark
is
undefined , let highWaterMark be0 . -
Set
this .[[readableStreamController]] to ?Construct (ReadableByteStreamController
, «this , underlyingSource , highWaterMark »).
-
If
highWaterMark
is
-
Otherwise,
if
type
is
undefined ,-
If
highWaterMark
is
undefined , let highWaterMark be1 . -
Set
this .[[readableStreamController]] to ?Construct (ReadableStreamDefaultController
, «this , underlyingSource , size , highWaterMark »).
-
If
highWaterMark
is
-
Otherwise,
throw
a
RangeError exception.
3.2.4.
Properties
of
the
ReadableStream
Prototype
3.2.4.1. get locked
locked
getter
returns
whether
or
not
the
readable
stream
is
locked
to
a
reader
.
-
If
!
IsReadableStream (this ) isfalse , throw aTypeError exception. -
Return
!
IsReadableStreamLocked (this ).
3.2.4.2. cancel( reason )
cancel
method
cancels
the
stream,
signaling
a
loss
of
interest
in
the
stream
by
a
consumer.
The
supplied
reason
argument
will
be
given
to
the
underlying
source,
which
might
or
might
not
use
it.
-
If
!
IsReadableStream (this ) isfalse , return a promise rejected with aTypeError exception. -
If
!
IsReadableStreamLocked (this ) istrue , return a promise rejected with aTypeError exception. -
Return
!
ReadableStreamCancel (this , reason ).
3.2.4.3. getReader({ mode } = {})
getReader
method
creates
a
reader
of
the
type
specified
by
the
mode
option
and
locks
the
stream
to
the
new
reader.
While
the
stream
is
locked,
no
other
reader
can
be
acquired
until
this
one
is
released
.
This functionality is especially useful for creating abstractions that desire the ability to consume a stream in its entirety. By getting a reader for the stream, you can ensure nobody else can interleave reads with yours or cancel the stream, which would interfere with your abstraction.
When
mode
is
ReadableStreamDefaultReader
).
The
reader
provides
the
ability
to
directly
read
individual
chunks
from
the
stream
via
the
reader’s
read()
method.
When
mode
is
"byob"
,
the
getReader
method
creates
a
BYOB
reader
(an
instance
of
ReadableStreamBYOBReader
).
This
feature
only
works
on
readable
byte
streams
,
i.e.
streams
which
were
constructed
specifically
with
the
ability
to
handle
"bring
your
own
buffer"
reading.
The
reader
provides
the
ability
to
directly
read
individual
chunks
from
the
stream
via
the
reader’s
read()
method,
into
developer-supplied
buffers,
allowing
more
precise
control
over
allocation.
-
If
!
IsReadableStream (this ) isfalse , throw aTypeError exception. -
If
mode
is
undefined , return ?AcquireReadableStreamDefaultReader (this ). -
Set
mode
to
?
ToString ( mode ). -
If
mode
is
"byob"
, return ?AcquireReadableStreamBYOBReader (this ). -
Throw
a
RangeError exception.
function readAllChunks(readableStream) {
const reader = readableStream.getReader();
const chunks = [];
return pump();
function pump() {
return reader.read().then(({ value, done }) => {
if (done) {
return chunks;
}
chunks.push(value);
return pump();
});
}
}
Note how the first thing it does is obtain a reader, and from then on it uses the reader exclusively. This ensures that no other consumer can interfere with the stream, either by reading chunks or by canceling the stream.
3.2.4.4. pipeThrough({ writable , readable }, options )
pipeThrough
method
provides
a
convenient,
chainable
way
of
piping
this
readable
stream
through
a
transform
stream
(or
any
other
{
writable,
readable
}
pair).
It
simply
pipes
the
stream
into
the
writable
side
of
the
supplied
pair,
and
returns
the
readable
side
for
further
use.
Piping a stream will generally lock it for the duration of the pipe, preventing any other consumer from acquiring a reader.
This
method
is
intentionally
generic;
it
does
not
require
that
its
ReadableStream
object.
It
also
does
not
require
that
its
writable
argument
be
a
WritableStream
instance,
or
that
its
readable
argument
be
a
ReadableStream
instance.
-
If
writable
is
undefined or readable isundefined , throw aTypeError exception indicating that undefined values are not permitted.undefined is special-cased here to make it easier to debug errors such asrs.pipeThrough(writable, readable)
which otherwise would silently do nothing.pipeThrough()
is otherwise agnostic about the values that it passes through. -
Let
promise
be
?
Invoke (this ,"pipeTo"
, « writable , options »). -
If
Type ( promise ) is Object and promise has a [[PromiseIsHandled]] internal slot, set promise .[[PromiseIsHandled]] totrue . - Return readable .
pipeThrough(transform,
options)
would
look
like
httpResponseBody
.pipeThrough(decompressorTransform)
.pipeThrough(ignoreNonImageFilesTransform)
.pipeTo(mediaGallery);
3.2.4.5. pipeTo( dest , { preventClose , preventAbort , preventCancel } = {})
pipeTo
method
pipes
this
readable
stream
to
a
given
writable
stream
.
The
way
in
which
the
piping
process
behaves
under
various
error
conditions
can
be
customized
with
a
number
of
passed
options.
It
returns
a
promise
that
fulfills
when
the
piping
process
completes
successfully,
or
rejects
if
any
errors
were
encountered.
Piping a stream will lock it for the duration of the pipe, preventing any other consumer from acquiring a reader.
Errors and closures of the source and destination streams propagate as follows:
-
An error in the source readable stream will abort the destination writable stream , unless
preventAbort
is truthy. The returned promise will be rejected with the source’s error, or with any error that occurs during aborting the destination. -
An error in the destination writable stream will cancel the source readable stream , unless
preventCancel
is truthy. The returned promise will be rejected with the destination’s error, or with any error that occurs during canceling the source. -
When the source readable stream closes, the destination writable stream will be closed, unless
preventClose
is true. The returned promise will be fulfilled once this process completes, unless an error is encountered while closing the destination, in which case it will be rejected with that error. -
If the destination writable stream starts out closed or closing, the source readable stream will be canceled , unless
preventCancel
is true. The returned promise will be rejected with an error indicating piping to a closed stream failed, or with any error that occurs during canceling the source.
-
If
!
IsReadableStream (this ) isfalse , return a promise rejected with aTypeError exception. -
If
!
IsWritableStream ( dest ) isfalse , return a promise rejected with aTypeError exception. -
Set
preventClose
to
!
ToBoolean ( preventClose ), set preventAbort to !ToBoolean ( preventAbort ), and set preventCancel to !ToBoolean ( preventCancel ). -
If
!
IsReadableStreamLocked (this ) istrue , return a promise rejected with aTypeError exception. -
If
!
IsWritableStreamLocked ( dest ) istrue , return a promise rejected with aTypeError exception. -
If
!
IsReadableByteStreamController (this .[[readableStreamController]]) istrue , let reader be either !AcquireReadableStreamBYOBReader (this ) or !AcquireReadableStreamDefaultReader (this ), at the user agent’s discretion. -
Otherwise,
let
reader
be
!
AcquireReadableStreamDefaultReader (this ). -
Let
writer
be
!
AcquireWritableStreamDefaultWriter ( dest ). -
Let
shuttingDown
be
false . - Let promise be a new promise .
-
In
parallel
,
using
reader
and
writer
,
read
all
chunks
from
this and write them to dest . Due to the locking provided by the reader and writer, the exact manner in which this happens is not observable to author code, and so there is flexibility in how this is done. The following constraints apply regardless of the exact algorithm used:- Public API must not be used: while reading or writing, or performing any of the operations below, the JavaScript-modifiable reader, writer, and stream APIs (i.e. methods on the appropriate prototypes) must not be used. Instead, the streams must be manipulated directly.
-
Backpressure
must
be
enforced:
-
While
WritableStreamDefaultWriterGetDesiredSize ( writer ) is ≤0 or isnull , the user agent must not read from reader . -
If
reader
is
a
BYOB
reader
,
WritableStreamDefaultWriterGetDesiredSize ( writer ) should be used to determine the size of the chunks read from reader . -
Reads
or
writes
should
not
be
delayed
for
reasons
other
than
these
backpressure
signals.
An implementation that waits for each write to successfully complete before proceeding to the next read/write operation violates this recommendation. In doing so, such an implementation makes the internal queue of dest useless, as it ensures dest always contains at most one queued chunk .
-
While
-
Shutdown
must
stop
activity:
if
shuttingDown
becomes
true , the user agent must not initiate further reads from reader , and must only perform writes of already-read chunks , as described below. In particular, the user agent must check the below conditions before performing any reads or writes, since they might lead to immediate shutdown. -
Error
and
close
states
must
be
propagated:
the
following
conditions
must
be
applied
in
order.
-
Errors
must
be
propagated
forward:
if
this .[[state]] is or becomes"errored"
, then-
If
preventAbort
is
false , shutdown with an action of !WritableStreamAbort ( dest ,this .[[storedError]]) and withthis .[[storedError]]. -
Otherwise,
shutdown
with
this .[[storedError]].
-
If
preventAbort
is
-
Errors
must
be
propagated
backward:
if
dest
.[[state]]
is
or
becomes
"errored"
, then-
If
preventCancel
is
false , shutdown with an action of !ReadableStreamCancel (this , dest .[[storedError]]) and with dest .[[storedError]]. - Otherwise, shutdown with dest .[[storedError]].
-
If
preventCancel
is
-
Closing
must
be
propagated
forward:
if
this .[[state]] is or becomes"closed"
, then-
If
preventClose
is
false , shutdown with an action of !WritableStreamDefaultWriterCloseWithErrorPropagation ( writer ). - Otherwise, shutdown .
-
If
preventClose
is
-
Closing
must
be
propagated
backward:
if
!
WritableStreamCloseQueuedOrInFlight ( dest ) istrue or dest .[[state]] is"closed"
, then- Assert: no chunks have been read or written.
-
Let
destClosed
be
a
new
TypeError . -
If
preventCancel
is
false , shutdown with an action of !ReadableStreamCancel (this , destClosed ) and with destClosed . - Otherwise, shutdown with destClosed .
-
Errors
must
be
propagated
forward:
if
-
Shutdown
with
an
action
:
if
any
of
the
above
requirements
ask
to
shutdown
with
an
action
action
,
optionally
with
an
error
originalError
,
then:
-
If
dest
.[[state]]
is
"writable"
and !WritableStreamCloseQueuedOrInFlight (dest) isfalse , -
If
shuttingDown
is
true , abort these substeps. -
Set
shuttingDown
to
true . - Let p be the result of performing action .
- Upon fulfillment of p , finalize , passing along originalError if it was given.
- Upon rejection of p with reason newError , finalize with newError .
-
If
dest
.[[state]]
is
-
Shutdown
:
if
any
of
the
above
requirements
or
steps
ask
to
shutdown,
optionally
with
an
error
error
,
then:
-
If
dest
.[[state]]
is
"writable"
and !WritableStreamCloseQueuedOrInFlight (dest) isfalse , -
If
shuttingDown
is
true , abort these substeps. -
Set
shuttingDown
to
true . - Finalize , passing along error if it was given.
-
If
dest
.[[state]]
is
-
Finalize
:
both
forms
of
shutdown
will
eventually
ask
to
finalize,
optionally
with
an
error
error
,
which
means
to
perform
the
following
steps:
-
Perform
!
WritableStreamDefaultWriterRelease ( writer ). -
Perform
!
ReadableStreamReaderGenericRelease ( reader ). - If error was given, reject promise with error .
-
Otherwise,
resolve
promise
with
undefined .
-
Perform
!
- Return promise .
3.2.4.6. tee()
tee
method
tees
this
readable
stream,
returning
a
two-element
array
containing
the
two
resulting
branches
as
new
ReadableStream
instances.
Teeing a stream will lock it, preventing any other consumer from acquiring a reader. To cancel the stream, cancel both of the resulting branches; a composite cancellation reason will then be propagated to the stream’s underlying source .
Note that the chunks seen in each branch will be the same object. If the chunks are not immutable, this could allow interference between the two branches.
-
If
!
IsReadableStream (this ) isfalse , throw aTypeError exception. -
Let
branches
be
?
ReadableStreamTee (this ,false ). -
Return
!
CreateArrayFromList ( branches ).
cacheEntry
representing
an
on-disk
file,
and
another
writable
stream
httpRequestBody
representing
an
upload
to
a
remote
server,
you
could
pipe
the
same
readable
stream
to
both
destinations
at
once:
const [forLocal, forRemote] = readableStream.tee();
Promise.all([
forLocal.pipeTo(cacheEntry),
forRemote.pipeTo(httpRequestBody)
])
.then(() => console.log("Saved the stream to the cache and also uploaded it!"))
.catch(e => console.error("Either caching or uploading failed: ", e));
3.3. General Readable Stream Abstract Operations
The following abstract operations, unlike most in this specification, are meant to be generally useful by other specifications, instead of just being part of the implementation of this spec’s classes.
3.3.1. AcquireReadableStreamBYOBReader ( stream ) throws
This abstract operation is meant to be called from other specifications that may wish to acquire a BYOB reader for a given stream.
-
Return
?
Construct (ReadableStreamBYOBReader
, « stream »).
3.3.2. AcquireReadableStreamDefaultReader ( stream ) throws
This abstract operation is meant to be called from other specifications that may wish to acquire a default reader for a given stream.
-
Return
?
Construct (ReadableStreamDefaultReader
, « stream »).
3.3.3. IsReadableStream ( x ) nothrow
-
If
Type ( x ) is not Object, returnfalse . -
If
x
does
not
have
a
[[readableStreamController]]
internal
slot,
return
false . -
Return
true .
3.3.4. IsReadableStreamDisturbed ( stream ) nothrow
This abstract operation is meant to be called from other specifications that may wish to query whether or not a readable stream has ever been read from or canceled.
-
Assert:
!
IsReadableStream ( stream ) istrue . - Return stream .[[disturbed]].
3.3.5. IsReadableStreamLocked ( stream ) nothrow
This abstract operation is meant to be called from other specifications that may wish to query whether or not a readable stream is locked to a reader .
-
Assert:
!
IsReadableStream ( stream ) istrue . -
If
stream
.[[reader]]
is
undefined , returnfalse . -
Return
true .
3.3.6. ReadableStreamTee ( stream , cloneForBranch2 ) throws
This abstract operation is meant to be called from other specifications that may wish to tee a given readable stream.
The second argument, cloneForBranch2 , governs whether or not the data from the original stream will be cloned (using HTML’s serializable objects framework) before appearing in the second of the returned branches. This is useful for scenarios where both branches are to be consumed in such a way that they might otherwise interfere with each other, such as by transferring their chunks . However, it does introduce a noticable asymmetry between the two branches, and limits the possible chunks to serializable ones. [HTML]
In this standard
"pull"
,
pull
).
"cancel"
,
cancel1
).
ReadableStream
,
underlyingSource1
).
"pull"
,
pull
).
"cancel"
,
cancel2
).
ReadableStream
,
underlyingSource2
).
A ReadableStreamTee pull function is an anonymous built-in function that pulls data from a given readable stream reader and enqueues it into two other streams ("branches" of the associated tee). Each
"value"
).
"done"
).
A ReadableStreamTee branch 1 cancel function is an anonymous built-in function that reacts to the cancellation of the first of the two branches of the associated tee. Each
A ReadableStreamTee branch 2 cancel function is an anonymous built-in function that reacts to the cancellation of the second of the two branches of the associated tee. Each
3.4. The Interface Between Readable Streams and Controllers
In
terms
of
specification
factoring,
the
way
that
the
ReadableStream
class
encapsulates
the
behavior
of
both
simple
readable
streams
and
readable
byte
streams
into
a
single
class
is
by
centralizing
most
of
the
potentially-varying
logic
inside
the
two
controller
classes,
ReadableStreamDefaultController
and
ReadableByteStreamController
.
Those
classes
define
most
of
the
stateful
internal
slots
and
abstract
operations
for
how
a
stream’s
internal
queue
is
managed
and
how
it
interfaces
with
its
underlying
source
or
underlying
byte
source
.
Each
controller
class
defines
two
internal
methods,
which
are
called
by
the
ReadableStream
algorithms:
- [[CancelSteps]]( reason )
- The controller’s steps that run in reaction to the stream being canceled , used to clean up the state stored in the controller and inform the underlying source .
- [[PullSteps]]()
- The controller’s steps that run when a default reader is read from, used to pull from the controller any queued chunks , or pull from the underlying source to get more chunks.
(These
are
defined
as
internal
methods,
instead
of
as
abstract
operations,
so
that
they
can
be
called
polymorphically
by
the
ReadableStream
algorithms,
without
having
to
branch
on
which
type
of
controller
is
present.)
The
rest
of
this
section
concerns
abstract
operations
that
go
in
the
other
direction:
they
are
used
by
the
controller
implementations
to
affect
their
associated
ReadableStream
object.
This
translates
internal
state
changes
of
the
controller
into
developer-facing
results
visible
through
the
ReadableStream
's
public
API.
3.4.1. ReadableStreamAddReadIntoRequest ( stream ) nothrow
-
Assert:
!
IsReadableStreamBYOBReader ( stream .[[reader]]) istrue . -
Assert:
stream
.[[state]]
is
"readable"
or"closed"
. - Let promise be a new promise .
-
Let
readIntoRequest
be
Record {[[promise]]: promise }. - Append readIntoRequest as the last element of stream .[[reader]].[[readIntoRequests]].
- Return promise .
3.4.2. ReadableStreamAddReadRequest ( stream ) nothrow
-
Assert:
!
IsReadableStreamDefaultReader ( stream .[[reader]]) istrue . -
Assert:
stream
.[[state]]
is
"readable"
. - Let promise be a new promise .
-
Let
readRequest
be
Record {[[promise]]: promise }. - Append readRequest as the last element of stream .[[reader]].[[readRequests]].
- Return promise .
3.4.3. ReadableStreamCancel ( stream , reason ) nothrow
-
Set
stream
.[[disturbed]]
to
true . -
If
stream
.[[state]]
is
"closed"
, return a promise resolved withundefined . -
If
stream
.[[state]]
is
"errored"
, return a promise rejected with stream .[[storedError]]. -
Perform
!
ReadableStreamClose ( stream ). - Let sourceCancelPromise be ! stream .[[readableStreamController]]. [[CancelSteps]] ( reason ).
-
Return
the
result
of
transforming
sourceCancelPromise
with
a
fulfillment
handler
that
returns
undefined .
3.4.4. ReadableStreamClose ( stream ) nothrow
-
Assert:
stream
.[[state]]
is
"readable"
. -
Set
stream
.[[state]]
to
"closed"
. - Let reader be stream .[[reader]].
-
If
reader
is
undefined , return. -
If
!
IsReadableStreamDefaultReader ( reader ) istrue ,-
Repeat
for
each
readRequest
that
is
an
element
of
reader
.[[readRequests]],
-
Resolve
readRequest
.[[promise]]
with
!
CreateIterResultObject (undefined ,true ).
-
Resolve
readRequest
.[[promise]]
with
!
-
Set
reader
.[[readRequests]]
to
an
empty
List .
-
Repeat
for
each
readRequest
that
is
an
element
of
reader
.[[readRequests]],
-
Resolve
reader
.[[closedPromise]]
with
undefined .
"closed"
,
but
stream
.[[closeRequested]]
is
cancel(reason)
.
In
this
case
we
allow
the
controller’s
close
method
to
be
called
and
silently
do
nothing,
since
the
cancelation
was
outside
the
control
of
the
underlying
source.
3.4.5. ReadableStreamError ( stream , e ) nothrow
-
Assert:
!
IsReadableStream ( stream ) istrue . -
Assert:
stream
.[[state]]
is
"readable"
. -
Set
stream
.[[state]]
to
"errored"
. - Set stream .[[storedError]] to e .
- Let reader be stream .[[reader]].
-
If
reader
is
undefined , return. -
If
!
IsReadableStreamDefaultReader ( reader ) istrue , -
Otherwise,
-
Assert:
!
IsReadableStreamBYOBReader ( reader ). -
Repeat
for
each
readIntoRequest
that
is
an
element
of
reader
.[[readIntoRequests]],
- Reject readIntoRequest .[[promise]] with e .
-
Set
reader
.[[readIntoRequests]]
to
a
new
empty
List .
-
Assert:
!
- Reject reader .[[closedPromise]] with e .
-
Set
reader
.[[closedPromise]].[[PromiseIsHandled]]
to
true .
3.4.6. ReadableStreamFulfillReadIntoRequest ( stream , chunk , done ) nothrow
- Let reader be stream .[[reader]].
- Let readIntoRequest be the first element of reader .[[readIntoRequests]].
- Remove readIntoRequest from reader .[[readIntoRequests]], shifting all other elements downward (so that the second becomes the first, and so on).
-
Resolve
readIntoRequest
.[[promise]]
with
!
CreateIterResultObject ( chunk , done ).
3.4.7. ReadableStreamFulfillReadRequest ( stream , chunk , done ) nothrow
- Let reader be stream .[[reader]].
- Let readRequest be the first element of reader .[[readRequests]].
- Remove readRequest from reader .[[readRequests]], shifting all other elements downward (so that the second becomes the first, and so on).
-
Resolve
readRequest
.[[promise]]
with
!
CreateIterResultObject ( chunk , done ).
3.4.8. ReadableStreamGetNumReadIntoRequests ( stream ) nothrow
- Return the number of elements in stream .[[reader]].[[readIntoRequests]].
3.4.9. ReadableStreamGetNumReadRequests ( stream ) nothrow
- Return the number of elements in stream .[[reader]].[[readRequests]].
3.4.10. ReadableStreamHasBYOBReader ( stream ) nothrow
- Let reader be stream .[[reader]].
-
If
reader
is
undefined , returnfalse . -
If
!
IsReadableStreamBYOBReader ( reader ) isfalse , returnfalse . -
Return
true .
3.4.11. ReadableStreamHasDefaultReader ( stream ) nothrow
- Let reader be stream .[[reader]].
-
If
reader
is
undefined , returnfalse . -
If
!
IsReadableStreamDefaultReader ( reader ) isfalse , returnfalse . -
Return
true .
3.5.
Class
ReadableStreamDefaultReader
The
ReadableStreamDefaultReader
class
represents
a
default
reader
designed
to
be
vended
by
a
ReadableStream
instance.
3.5.1. Class Definition
This section is non-normative.
If
one
were
to
write
the
ReadableStreamDefaultReader
class
in
something
close
to
the
syntax
of
[ECMASCRIPT]
,
it
would
look
like
class ReadableStreamDefaultReader {
constructor(stream)
get closed()
cancel(reason)
read()
releaseLock()
}
3.5.2. Internal Slots
Instances
of
ReadableStreamDefaultReader
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[closedPromise]] |
A
promise
returned
by
the
reader’s
closed
getter
|
[[ownerReadableStream]] |
A
ReadableStream
instance
that
owns
this
reader
|
[[readRequests]] |
A
read()
method
that
have
not
yet
been
resolved,
due
to
the
consumer
requesting
chunks
sooner
than
they
are
available;
also
used
for
the
IsReadableStreamDefaultReader
brand
check
|
3.5.3. new ReadableStreamDefaultReader( stream )
ReadableStreamDefaultReader
constructor
is
generally
not
meant
to
be
used
directly;
instead,
a
stream’s
getReader()
method
ought
to
be
be
used.
-
If
!
IsReadableStream ( stream ) isfalse , throw aTypeError exception. -
If
!
IsReadableStreamLocked ( stream ) istrue , throw aTypeError exception. -
Perform
!
ReadableStreamReaderGenericInitialize (this , stream ). -
Set
this .[[readRequests]] to a new emptyList .
3.5.4.
Properties
of
the
ReadableStreamDefaultReader
Prototype
3.5.4.1. get closed
closed
getter
returns
a
promise
that
will
be
fulfilled
when
the
stream
becomes
closed
or
the
reader’s
lock
is
released
,
or
rejected
if
the
stream
ever
errors.
-
If
!
IsReadableStreamDefaultReader (this ) isfalse , return a promise rejected with aTypeError exception. -
Return
this .[[closedPromise]].
3.5.4.2. cancel( reason )
cancel
method
behaves
the
same
as
that
for
the
associated
stream.
-
If
!
IsReadableStreamDefaultReader (this ) isfalse , return a promise rejected with aTypeError exception. -
If
this .[[ownerReadableStream]] isundefined , return a promise rejected with aTypeError exception. -
Return
!
ReadableStreamReaderGenericCancel (this , reason ).
3.5.4.3. read()
read
method
will
return
a
promise
that
allows
access
to
the
next
chunk
from
the
stream’s
internal
queue,
if
available.
-
If
the
chunk
does
become
available,
the
promise
will
be
fulfilled
with
an
object
of
the
form
{ value: theChunk, done: false }
. -
If
the
stream
becomes
closed,
the
promise
will
be
fulfilled
with
an
object
of
the
form
{ value: undefined, done: true }
. - If the stream becomes errored, the promise will be rejected with the relevant error.
If reading a chunk causes the queue to become empty, more data will be pulled from the underlying source .
-
If
!
IsReadableStreamDefaultReader (this ) isfalse , return a promise rejected with aTypeError exception. -
If
this .[[ownerReadableStream]] isundefined , return a promise rejected with aTypeError exception. -
Return
!
ReadableStreamDefaultReaderRead (this ).
3.5.4.4. releaseLock()
releaseLock
method
releases
the
reader’s
lock
on
the
corresponding
stream.
After
the
lock
is
released,
the
reader
is
no
longer
active
.
If
the
associated
stream
is
errored
when
the
lock
is
released,
the
reader
will
appear
errored
in
the
same
way
from
now
on;
otherwise,
the
reader
will
appear
closed.
A
reader’s
lock
cannot
be
released
while
it
still
has
a
pending
read
request,
i.e.,
if
a
promise
returned
by
the
reader’s
read()
method
has
not
yet
been
settled.
Attempting
to
do
so
will
throw
a
-
If
!
IsReadableStreamDefaultReader (this ) isfalse , throw aTypeError exception. -
If
this .[[ownerReadableStream]] isundefined , return. -
If
this .[[readRequests]] is not empty, throw aTypeError exception. -
Perform
!
ReadableStreamReaderGenericRelease (this ).
3.6.
Class
ReadableStreamBYOBReader
The
ReadableStreamBYOBReader
class
represents
a
BYOB
reader
designed
to
be
vended
by
a
ReadableStream
instance.
3.6.1. Class Definition
This section is non-normative.
If
one
were
to
write
the
ReadableStreamBYOBReader
class
in
something
close
to
the
syntax
of
[ECMASCRIPT]
,
it
would
look
like
class ReadableStreamBYOBReader {
constructor(stream)
get closed()
cancel(reason)
read(view)
releaseLock()
}
3.6.2. Internal Slots
Instances
of
ReadableStreamBYOBReader
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[closedPromise]] |
A
promise
returned
by
the
reader’s
closed
getter
|
[[ownerReadableStream]] |
A
ReadableStream
instance
that
owns
this
reader
|
[[readIntoRequests]] |
A
read(view)
method
that
have
not
yet
been
resolved,
due
to
the
consumer
requesting
chunks
sooner
than
they
are
available;
also
used
for
the
IsReadableStreamBYOBReader
brand
check
|
3.6.3. new ReadableStreamBYOBReader( stream )
ReadableStreamBYOBReader
constructor
is
generally
not
meant
to
be
used
directly;
instead,
a
stream’s
getReader()
method
ought
to
be
used.
-
If
!
IsReadableStream ( stream ) isfalse , throw aTypeError exception. -
If
!
IsReadableByteStreamController ( stream .[[readableStreamController]]) isfalse , throw aTypeError exception. -
If
!
IsReadableStreamLocked ( stream ) istrue , throw aTypeError exception. -
Perform
!
ReadableStreamReaderGenericInitialize (this , stream ). -
Set
this .[[readIntoRequests]] to a new emptyList .
3.6.4.
Properties
of
the
ReadableStreamBYOBReader
Prototype
3.6.4.1. get closed
closed
getter
returns
a
promise
that
will
be
fulfilled
when
the
stream
becomes
closed
or
the
reader’s
lock
is
released
,
or
rejected
if
the
stream
ever
errors.
-
If
!
IsReadableStreamBYOBReader (this ) isfalse , return a promise rejected with aTypeError exception. -
Return
this .[[closedPromise]].
3.6.4.2. cancel( reason )
cancel
method
behaves
the
same
as
that
for
the
associated
stream.
-
If
!
IsReadableStreamBYOBReader (this ) isfalse , return a promise rejected with aTypeError exception. -
If
this .[[ownerReadableStream]] isundefined , return a promise rejected with aTypeError exception. -
Return
!
ReadableStreamReaderGenericCancel (this , reason ).
3.6.4.3. read( view )
read
method
will
write
read
bytes
into
view
and
return
a
promise
resolved
with
a
possibly
transferred
buffer
as
described
below.
-
If
the
chunk
does
become
available,
the
promise
will
be
fulfilled
with
an
object
of
the
form
{ value: theChunk, done: false }
. -
If
the
stream
becomes
closed,
the
promise
will
be
fulfilled
with
an
object
of
the
form
{ value: undefined, done: true }
. - If the stream becomes errored, the promise will be rejected with the relevant error.
If reading a chunk causes the queue to become empty, more data will be pulled from the underlying byte source .
-
If
!
IsReadableStreamBYOBReader (this ) isfalse , return a promise rejected with aTypeError exception. -
If
this .[[ownerReadableStream]] isundefined , return a promise rejected with aTypeError exception. -
If
Type ( view ) is not Object, return a promise rejected with aTypeError exception. -
If
view
does
not
have
a
[[ViewedArrayBuffer]]
internal
slot,
return
a
promise
rejected
with
a
TypeError exception. -
If
!
IsDetachedBuffer ( view .[[ViewedArrayBuffer]]) istrue , return a promise rejected with aTypeError exception. -
If
view
.[[ByteLength]]
is
0 , return a promise rejected with aTypeError exception. -
Return
!
ReadableStreamBYOBReaderRead (this , view ).
3.6.4.4. releaseLock()
releaseLock
method
releases
the
reader’s
lock
on
the
corresponding
stream.
After
the
lock
is
released,
the
reader
is
no
longer
active
.
If
the
associated
stream
is
errored
when
the
lock
is
released,
the
reader
will
appear
errored
in
the
same
way
from
now
on;
otherwise,
the
reader
will
appear
closed.
A
reader’s
lock
cannot
be
released
while
it
still
has
a
pending
read
request,
i.e.,
if
a
promise
returned
by
the
reader’s
read()
method
has
not
yet
been
settled.
Attempting
to
do
so
will
throw
a
-
If
!
IsReadableStreamBYOBReader (this ) isfalse , throw aTypeError exception. -
If
this .[[ownerReadableStream]] isundefined , return. -
If
this .[[readIntoRequests]] is not empty, throw aTypeError exception. -
Perform
!
ReadableStreamReaderGenericRelease (this ).
3.7. Readable Stream Reader Abstract Operations
3.7.1. IsReadableStreamDefaultReader ( x ) nothrow
-
If
Type ( x ) is not Object, returnfalse . -
If
x
does
not
have
a
[[readRequests]]
internal
slot,
return
false . -
Return
true .
3.7.2. IsReadableStreamBYOBReader ( x ) nothrow
-
If
Type ( x ) is not Object, returnfalse . -
If
x
does
not
have
a
[[readIntoRequests]]
internal
slot,
return
false . -
Return
true .
3.7.3. ReadableStreamReaderGenericCancel ( reader , reason ) nothrow
- Let stream be reader .[[ownerReadableStream]].
-
Assert:
stream
is
not
undefined . -
Return
!
ReadableStreamCancel ( stream , reason ).
3.7.4. ReadableStreamReaderGenericInitialize ( reader , stream ) nothrow
- Set reader .[[ownerReadableStream]] to stream .
- Set stream .[[reader]] to reader .
-
If
stream
.[[state]]
is
"readable"
,- Set reader .[[closedPromise]] to a new promise .
-
Otherwise,
if
stream
.[[state]]
is
"closed"
,-
Set
reader
.[[closedPromise]]
to
a
promise
resolved
with
undefined .
-
Set
reader
.[[closedPromise]]
to
a
promise
resolved
with
-
Otherwise,
-
Assert:
stream
.[[state]]
is
"errored"
. - Set reader .[[closedPromise]] to a promise rejected with stream .[[storedError]].
-
Set
reader
.[[closedPromise]].[[PromiseIsHandled]]
to
true .
-
Assert:
stream
.[[state]]
is
3.7.5. ReadableStreamReaderGenericRelease ( reader ) nothrow
-
Assert:
reader
.[[ownerReadableStream]]
is
not
undefined . - Assert: reader .[[ownerReadableStream]].[[reader]] is reader .
-
If
reader
.[[ownerReadableStream]].[[state]]
is
"readable"
, reject reader .[[closedPromise]] with aTypeError exception. -
Otherwise,
set
reader
.[[closedPromise]]
to
a
promise
rejected
with
a
TypeError exception. -
Set
reader
.[[closedPromise]].[[PromiseIsHandled]]
to
true . -
Set
reader
.[[ownerReadableStream]].[[reader]]
to
undefined . -
Set
reader
.[[ownerReadableStream]]
to
undefined .
3.7.6. ReadableStreamBYOBReaderRead ( reader , view ) nothrow
- Let stream be reader .[[ownerReadableStream]].
-
Assert:
stream
is
not
undefined . -
Set
stream
.[[disturbed]]
to
true . -
If
stream
.[[state]]
is
"errored"
, return a promise rejected with stream .[[storedError]]. -
Return
!
ReadableByteStreamControllerPullInto ( stream .[[readableStreamController]], view ).
3.7.7. ReadableStreamDefaultReaderRead ( reader ) nothrow
- Let stream be reader .[[ownerReadableStream]].
-
Assert:
stream
is
not
undefined . -
Set
stream
.[[disturbed]]
to
true . -
If
stream
.[[state]]
is
"closed"
, return a promise resolved with !CreateIterResultObject (undefined ,true ). -
If
stream
.[[state]]
is
"errored"
, return a promise rejected with stream .[[storedError]]. -
Assert:
stream
.[[state]]
is
"readable"
. - Return ! stream .[[readableStreamController]]. [[PullSteps]] ().
3.8.
Class
ReadableStreamDefaultController
The
ReadableStreamDefaultController
class
has
methods
that
allow
control
of
a
ReadableStream
's
state
and
internal
queue
.
When
constructing
a
ReadableStream
that
is
not
a
readable
byte
stream
,
the
underlying
source
is
given
a
corresponding
ReadableStreamDefaultController
instance
to
manipulate.
3.8.1. Class Definition
This section is non-normative.
If
one
were
to
write
the
ReadableStreamDefaultController
class
in
something
close
to
the
syntax
of
[ECMASCRIPT]
,
it
would
look
like
class ReadableStreamDefaultController {
constructor(stream, underlyingSource, size, highWaterMark)
get desiredSize()
close()
enqueue(chunk)
error(e)
}
3.8.2. Internal Slots
Instances
of
ReadableStreamDefaultController
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[closeRequested]] | A boolean flag indicating whether the stream has been closed by its underlying source , but still has chunks in its internal queue that have not yet been read |
[[controlledReadableStream]] |
The
ReadableStream
instance
controlled
|
[[pullAgain]] |
A
boolean
flag
set
to
pull
method
to
pull
more
data,
but
the
pull
could
not
yet
be
done
since
a
previous
call
is
still
executing
|
[[pulling]] |
A
boolean
flag
set
to
pull
method
is
executing
and
has
not
yet
fulfilled,
used
to
prevent
reentrant
calls
|
[[queue]] |
A
|
[[queueTotalSize]] | The total size of all the chunks stored in [[queue]] (see §6.3 Queue-with-Sizes Operations ) |
[[started]] | A boolean flag indicating whether the underlying source has finished starting |
[[strategyHWM]] | A number supplied to the constructor as part of the stream’s queuing strategy , indicating the point at which the stream will apply backpressure to its underlying source |
[[strategySize]] |
A
function
supplied
to
the
constructor
as
part
of
the
stream’s
queuing
strategy
,
designed
to
calculate
the
size
of
enqueued
chunks
;
can
be
|
[[underlyingSource]] | An object representation of the stream’s underlying source ; also used for the IsReadableStreamDefaultController brand check |
3.8.3. new ReadableStreamDefaultController( stream , underlyingSource , size , highWaterMark )
ReadableStreamDefaultController
constructor
cannot
be
used
directly;
it
only
works
on
a
ReadableStream
that
is
in
the
middle
of
being
constructed.
-
If
!
IsReadableStream ( stream ) isfalse , throw aTypeError exception. -
If
stream
.[[readableStreamController]]
is
not
undefined , throw aTypeError exception. -
Set
this .[[controlledReadableStream]] to stream . -
Set
this .[[underlyingSource]] to underlyingSource . -
Perform
!
ResetQueue (this ). -
Set
this .[[started]],this .[[closeRequested]],this .[[pullAgain]], andthis .[[pulling]] tofalse . -
Let
normalizedStrategy
be
?
ValidateAndNormalizeQueuingStrategy ( size , highWaterMark ). -
Set
this .[[strategySize]] to normalizedStrategy .[[size]] andthis .[[strategyHWM]] to normalizedStrategy .[[highWaterMark]]. -
Let
controller
be
this . -
Let
startResult
be
?
InvokeOrNoop ( underlyingSource ,"start"
, «this »). -
Let
startPromise
be
a
promise
resolved
with
startResult
:
-
Upon
fulfillment
of
startPromise
,
-
Set
controller
.[[started]]
to
true . -
Assert:
controller
.[[pulling]]
is
false . -
Assert:
controller
.[[pullAgain]]
is
false . -
Perform
!
ReadableStreamDefaultControllerCallPullIfNeeded ( controller ).
-
Set
controller
.[[started]]
to
-
Upon
rejection
of
startPromise
with
reason
r
,
-
Perform
!
ReadableStreamDefaultControllerErrorIfNeeded ( controller , r ).
-
Perform
!
-
Upon
fulfillment
of
startPromise
,
3.8.4.
Properties
of
the
ReadableStreamDefaultController
Prototype
3.8.4.1. get desiredSize
desiredSize
getter
returns
the
desired
size
to
fill
the
controlled
stream’s
internal
queue
.
It
can
be
negative,
if
the
queue
is
over-full.
An
underlying
source
ought
to
use
this
information
to
determine
when
and
how
to
apply
backpressure
.
-
If
!
IsReadableStreamDefaultController (this ) isfalse , throw aTypeError exception. -
Return
!
ReadableStreamDefaultControllerGetDesiredSize (this ).
3.8.4.2. close()
close
method
will
close
the
controlled
readable
stream.
Consumers
will
still
be
able
to
read
any
previously-enqueued
chunks
from
the
stream,
but
once
those
are
read,
the
stream
will
become
closed.
-
If
!
IsReadableStreamDefaultController (this ) isfalse , throw aTypeError exception. -
If
!
ReadableStreamDefaultControllerCanCloseOrEnqueue (this ) isfalse , throw aTypeError exception. -
Perform
!
ReadableStreamDefaultControllerClose (this ).
3.8.4.3. enqueue( chunk )
enqueue
method
will
enqueue
a
given
chunk
in
the
controlled
readable
stream.
-
If
!
IsReadableStreamDefaultController (this ) isfalse , throw aTypeError exception. -
If
!
ReadableStreamDefaultControllerCanCloseOrEnqueue (this ) isfalse , throw aTypeError exception. -
Return
?
ReadableStreamDefaultControllerEnqueue (this , chunk ).
3.8.4.4. error( e )
error
method
will
error
the
readable
stream,
making
all
future
interactions
with
it
fail
with
the
given
error
e
.
-
If
!
IsReadableStreamDefaultController (this ) isfalse , throw aTypeError exception. -
Let
stream
be
this .[[controlledReadableStream]]. -
If
stream
.[[state]]
is
not
"readable"
, throw aTypeError exception. -
Perform
!
ReadableStreamDefaultControllerError (this , e ).
3.8.5. Readable Stream Default Controller Internal Methods
The
following
are
additional
internal
methods
implemented
by
each
ReadableStreamDefaultController
instance.
The
readable
stream
implementation
will
polymorphically
call
to
either
these
or
their
counterparts
for
BYOB
controllers.
3.8.5.1. [[CancelSteps]] ( reason )
-
Perform
!
ResetQueue (this ). -
Return
!
PromiseInvokeOrNoop (this .[[underlyingSource]],"cancel"
, « reason »)
3.8.5.2. [[PullSteps]] ()
-
Let
stream
be
this .[[controlledReadableStream]]. -
If
this .[[queue]] is not empty,-
Let
chunk
be
!
DequeueValue (this ). -
If
this .[[closeRequested]] istrue andthis .[[queue]] is empty, perform !ReadableStreamClose ( stream ). -
Otherwise,
perform
!
ReadableStreamDefaultControllerCallPullIfNeeded (this ). -
Return
a
promise
resolved
with
!
CreateIterResultObject ( chunk ,false ).
-
Let
chunk
be
!
-
Let
pendingPromise
be
!
ReadableStreamAddReadRequest ( stream ). -
Perform
!
ReadableStreamDefaultControllerCallPullIfNeeded (this ). - Return pendingPromise .
3.9. Readable Stream Default Controller Abstract Operations
3.9.1. IsReadableStreamDefaultController ( x ) nothrow
-
If
Type ( x ) is not Object, returnfalse . -
If
x
does
not
have
an
[[underlyingSource]]
internal
slot,
return
false . -
Return
true .
3.9.2. ReadableStreamDefaultControllerCallPullIfNeeded ( controller ) nothrow
-
Let
shouldPull
be
!
ReadableStreamDefaultControllerShouldCallPull ( controller ). -
If
shouldPull
is
false , return. -
If
controller
.[[pulling]]
is
true ,-
Set
controller
.[[pullAgain]]
to
true . - Return.
-
Set
controller
.[[pullAgain]]
to
-
Assert:
controller
.[[pullAgain]]
is
false . -
Set
controller
.[[pulling]]
to
true . -
Let
pullPromise
be
!
PromiseInvokeOrNoop ( controller .[[underlyingSource]],"pull"
, « controller »). -
Upon
fulfillment
of
pullPromise
,
-
Set
controller
.[[pulling]]
to
false . -
If
controller
.[[pullAgain]]
is
true ,-
Set
controller
.[[pullAgain]]
to
false . -
Perform
!
ReadableStreamDefaultControllerCallPullIfNeeded ( controller ).
-
Set
controller
.[[pullAgain]]
to
-
Set
controller
.[[pulling]]
to
-
Upon
rejection
of
pullPromise
with
reason
e
,
-
Perform
!
ReadableStreamDefaultControllerErrorIfNeeded ( controller , e ).
-
Perform
!
3.9.3. ReadableStreamDefaultControllerShouldCallPull ( controller ) nothrow
- Let stream be controller .[[controlledReadableStream]].
-
If
!
ReadableStreamDefaultControllerCanCloseOrEnqueue ( controller ) isfalse , returnfalse . -
If
controller
.[[started]]
is
false , returnfalse . -
If
!
IsReadableStreamLocked ( stream ) istrue and !ReadableStreamGetNumReadRequests ( stream ) >0 , returntrue . -
Let
desiredSize
be
ReadableStreamDefaultControllerGetDesiredSize ( controller ). -
If
desiredSize
>
0 , returntrue . -
Return
false .
3.9.4. ReadableStreamDefaultControllerClose ( controller ) nothrow
This abstract operation can be called by other specifications that wish to close a readable stream, in the same way a developer-created stream would be closed by its associated controller object. Specifications should not do this to streams they did not create, and must ensure they have obeyed the preconditions (listed here as asserts).
- Let stream be controller .[[controlledReadableStream]].
-
Assert:
!
ReadableStreamDefaultControllerCanCloseOrEnqueue ( controller ) istrue . -
Set
controller
.[[closeRequested]]
to
true . -
If
controller
.[[queue]]
is
empty,
perform
!
ReadableStreamClose ( stream ).
3.9.5. ReadableStreamDefaultControllerEnqueue ( controller , chunk ) throws
This abstract operation can be called by other specifications that wish to enqueue chunks in a readable stream, in the same way a developer would enqueue chunks using the stream’s associated controller object. Specifications should not do this to streams they did not create, and must ensure they have obeyed the preconditions (listed here as asserts).
- Let stream be controller .[[controlledReadableStream]].
-
Assert:
!
ReadableStreamDefaultControllerCanCloseOrEnqueue ( controller ) istrue . -
If
!
IsReadableStreamLocked ( stream ) istrue and !ReadableStreamGetNumReadRequests ( stream ) >0 , perform !ReadableStreamFulfillReadRequest ( stream , chunk ,false ). -
Otherwise,
-
Let
chunkSize
be
1 . -
If
controller
.[[strategySize]]
is
not
undefined ,-
Set
chunkSize
to
Call ( controller .[[strategySize]],undefined , « chunk »). -
If
chunkSize
is
an
abrupt completion ,-
Perform
!
ReadableStreamDefaultControllerErrorIfNeeded ( controller , chunkSize .[[Value]]). - Return chunkSize .
-
Perform
!
- Let chunkSize be chunkSize .[[Value]].
-
Set
chunkSize
to
-
Let
enqueueResult
be
EnqueueValueWithSize ( controller , chunk , chunkSize ). -
If
enqueueResult
is
an
abrupt completion ,-
Perform
!
ReadableStreamDefaultControllerErrorIfNeeded ( controller , enqueueResult .[[Value]]). - Return enqueueResult .
-
Perform
!
-
Let
chunkSize
be
-
Perform
!
ReadableStreamDefaultControllerCallPullIfNeeded ( controller ).
3.9.6. ReadableStreamDefaultControllerError ( controller , e ) nothrow
This abstract operation can be called by other specifications that wish to move a readable stream to an errored state, in the same way a developer would error a stream using its associated controller object. Specifications should not do this to streams they did not create, and must ensure they have obeyed the precondition (listed here as an assert).
- Let stream be controller .[[controlledReadableStream]].
-
Assert:
stream
.[[state]]
is
"readable"
. -
Perform
!
ResetQueue ( controller ). -
Perform
!
ReadableStreamError ( stream , e ).
3.9.7. ReadableStreamDefaultControllerErrorIfNeeded ( controller , e ) nothrow
-
If
controller
.[[controlledReadableStream]].[[state]]
is
"readable"
, perform !ReadableStreamDefaultControllerError ( controller , e ).
3.9.8. ReadableStreamDefaultControllerGetDesiredSize ( controller ) nothrow
This
abstract
operation
can
be
called
by
other
specifications
that
wish
to
determine
the
desired
size
to
fill
this
stream’s
internal
queue
,
similar
to
how
a
developer
would
consult
the
desiredSize
property
of
the
stream’s
associated
controller
object.
Specifications
should
not
use
this
on
streams
they
did
not
create.
- Let stream be controller .[[controlledReadableStream]].
- Let state be stream .[[state]].
-
If
state
is
"errored"
, returnnull . -
If
state
is
"closed"
, return0 . - Return controller .[[strategyHWM]] − controller .[[queueTotalSize]].
3.9.9. ReadableStreamDefaultControllerHasBackpressure ( controller ) nothrow
This abstract operation is used in the implementation of TransformStream.
-
If
!
ReadableStreamDefaultControllerShouldCallPull ( controller ) istrue , returnfalse . -
Otherwise,
return
true .
3.9.10. ReadableStreamDefaultControllerCanCloseOrEnqueue ( controller ) nothrow
- Let state be controller .[[controlledReadableStream]].[[state]].
-
If
controller
.[[closeRequested]]
is
false and state is"readable"
, returntrue . -
Otherwise,
return
false .
"readable"
,
happens
when
the
stream
is
errored
via
error(e)
,
or
when
it
is
closed
without
its
controller’s
close
method
ever
being
called:
e.g.,
if
the
stream
was
closed
by
a
call
to
cancel(reason)
.
3.10.
Class
ReadableByteStreamController
The
ReadableByteStreamController
class
has
methods
that
allow
control
of
a
ReadableStream
's
state
and
internal
queue
.
When
constructing
a
ReadableStream
,
the
underlying
byte
source
is
given
a
corresponding
ReadableByteStreamController
instance
to
manipulate.
3.10.1. Class Definition
This section is non-normative.
If
one
were
to
write
the
ReadableByteStreamController
class
in
something
close
to
the
syntax
of
[ECMASCRIPT]
,
it
would
look
like
class ReadableByteStreamController {
constructor(stream, underlyingByteSource, highWaterMark)
get byobRequest()
get desiredSize()
close()
enqueue(chunk)
error(e)
}
3.10.2. Internal Slots
Instances
of
ReadableByteStreamController
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[autoAllocateChunkSize]] |
A
positive
integer,
when
the
automatic
buffer
allocation
feature
is
enabled.
In
that
case,
this
value
specifies
the
size
of
buffer
to
allocate.
It
is
|
[[closeRequested]] | A boolean flag indicating whether the stream has been closed by its underlying byte source , but still has chunks in its internal queue that have not yet been read |
[[controlledReadableStream]] |
The
ReadableStream
instance
controlled
|
[[pullAgain]] |
A
boolean
flag
set
to
pull
method
to
pull
more
data,
but
the
pull
could
not
yet
be
done
since
a
previous
call
is
still
executing
|
[[pulling]] |
A
boolean
flag
set
to
pull
method
is
executing
and
has
not
yet
fulfilled,
used
to
prevent
reentrant
calls
|
[[byobRequest]] |
A
ReadableStreamBYOBRequest
instance
representing
the
current
BYOB
pull
request
|
[[pendingPullIntos]] |
A
|
[[queue]] |
A
|
[[queueTotalSize]] | The total size (in bytes) of all the chunks stored in [[queue]] |
[[started]] | A boolean flag indicating whether the underlying source has finished starting |
[[strategyHWM]] | A number supplied to the constructor as part of the stream’s queuing strategy , indicating the point at which the stream will apply backpressure to its underlying byte source |
[[underlyingByteSource]] | An object representation of the stream’s underlying byte source ; also used for the IsReadableByteStreamController brand check |
Although
ReadableByteStreamController
instances
have
[[queue]]
and
[[queueTotalSize]]
slots,
we
do
not
use
most
of
the
abstract
operations
in
§6.3
Queue-with-Sizes
Operations
on
them,
as
the
way
in
which
we
manipulate
this
queue
is
rather
different
than
the
others
in
the
spec.
Instead,
we
update
the
two
slots
together
manually.
This might be cleaned up in a future spec refactoring.
3.10.3. new ReadableByteStreamController( stream , underlyingByteSource , highWaterMark )
ReadableByteStreamController
constructor
cannot
be
used
directly;
it
only
works
on
a
ReadableStream
that
is
in
the
middle
of
being
constructed.
-
If
!
IsReadableStream ( stream ) isfalse , throw aTypeError exception. -
If
stream
.[[readableStreamController]]
is
not
undefined , throw aTypeError exception. -
Set
this .[[controlledReadableStream]] to stream . -
Set
this .[[underlyingByteSource]] to underlyingByteSource . -
Set
this .[[pullAgain]], andthis .[[pulling]] tofalse . -
Perform
!
ReadableByteStreamControllerClearPendingPullIntos (this ). -
Perform
!
ResetQueue (this ). -
Set
this .[[started]] andthis .[[closeRequested]] tofalse . -
Set
this .[[strategyHWM]] to ?ValidateAndNormalizeHighWaterMark ( highWaterMark ). -
Let
autoAllocateChunkSize
be
?
GetV ( underlyingByteSource ,"autoAllocateChunkSize"
). -
If
autoAllocateChunkSize
is
not
undefined ,-
If
!
IsInteger ( autoAllocateChunkSize ) isfalse , or if autoAllocateChunkSize ≤0 , throw aRangeError exception.
-
If
!
-
Set
this .[[autoAllocateChunkSize]] to autoAllocateChunkSize . -
Set
this .[[pendingPullIntos]] to a new emptyList . -
Let
controller
be
this . -
Let
startResult
be
?
InvokeOrNoop ( underlyingByteSource ,"start"
, «this »). -
Let
startPromise
be
a
promise
resolved
with
startResult
:
-
Upon
fulfillment
of
startPromise
,
-
Set
controller
.[[started]]
to
true . -
Assert:
controller
.[[pulling]]
is
false . -
Assert:
controller
.[[pullAgain]]
is
false . -
Perform
!
ReadableByteStreamControllerCallPullIfNeeded ( controller ).
-
Set
controller
.[[started]]
to
-
Upon
rejection
of
startPromise
with
reason
r
,
-
If
stream
.[[state]]
is
"readable"
, perform !ReadableByteStreamControllerError ( controller , r ).
-
If
stream
.[[state]]
is
-
Upon
fulfillment
of
startPromise
,
3.10.4.
Properties
of
the
ReadableByteStreamController
Prototype
3.10.4.1. get byobRequest
byobRequest
getter
returns
the
current
BYOB
pull
request.
-
If
IsReadableByteStreamController (this ) isfalse , throw aTypeError exception. -
If
this .[[byobRequest]] isundefined andthis .[[pendingPullIntos]] is not empty,-
Let
firstDescriptor
be
the
first
element
of
this .[[pendingPullIntos]]. -
Let
view
be
!
Construct ( %Uint8Array% , « firstDescriptor .[[buffer]], firstDescriptor .[[byteOffset]] + firstDescriptor .[[bytesFilled]], firstDescriptor .[[byteLength]] − firstDescriptor .[[bytesFilled]] »). -
Set
this .[[byobRequest]] to !Construct (ReadableStreamBYOBRequest
, «this , view »).
-
Let
firstDescriptor
be
the
first
element
of
-
Return
this .[[byobRequest]].
3.10.4.2. get desiredSize
desiredSize
getter
returns
the
desired
size
to
fill
the
controlled
stream’s
internal
queue
.
It
can
be
negative,
if
the
queue
is
over-full.
An
underlying
source
ought
to
use
this
information
to
determine
when
and
how
to
apply
backpressure
.
-
If
!
IsReadableByteStreamController (this ) isfalse , throw aTypeError exception. -
Return
!
ReadableByteStreamControllerGetDesiredSize (this ).
3.10.4.3. close()
close
method
will
close
the
controlled
readable
stream.
Consumers
will
still
be
able
to
read
any
previously-enqueued
chunks
from
the
stream,
but
once
those
are
read,
the
stream
will
become
closed.
-
If
!
IsReadableByteStreamController (this ) isfalse , throw aTypeError exception. -
If
this .[[closeRequested]] istrue , throw aTypeError exception. -
If
this .[[controlledReadableStream]].[[state]] is not"readable"
, throw aTypeError exception. -
Perform
?
ReadableByteStreamControllerClose (this ).
3.10.4.4. enqueue( chunk )
enqueue
method
will
enqueue
a
given
chunk
in
the
controlled
readable
stream.
-
If
!
IsReadableByteStreamController (this ) isfalse , throw aTypeError exception. -
If
this .[[closeRequested]] istrue , throw aTypeError exception. -
If
this .[[controlledReadableStream]].[[state]] is not"readable"
, throw aTypeError exception. -
If
Type ( chunk ) is not Object, throw aTypeError exception. -
If
chunk
does
not
have
a
[[ViewedArrayBuffer]]
internal
slot,
throw
a
TypeError exception. -
If
!
IsDetachedBuffer ( chunk .[[ViewedArrayBuffer]]) istrue , throw aTypeError exception. -
Return
!
ReadableByteStreamControllerEnqueue (this , chunk ).
3.10.4.5. error( e )
error
method
will
error
the
readable
stream,
making
all
future
interactions
with
it
fail
with
the
given
error
e
.
-
If
!
IsReadableByteStreamController (this ) isfalse , throw aTypeError exception. -
Let
stream
be
this .[[controlledReadableStream]]. -
If
stream
.[[state]]
is
not
"readable"
, throw aTypeError exception. -
Perform
!
ReadableByteStreamControllerError (this , e ).
3.10.5. Readable Stream BYOB Controller Internal Methods
The
following
are
additional
internal
methods
implemented
by
each
ReadableByteStreamController
instance.
The
readable
stream
implementation
will
polymorphically
call
to
either
these
or
their
counterparts
for
default
controllers.
3.10.5.1. [[CancelSteps]] ( reason )
-
If
this .[[pendingPullIntos]] is not empty,-
Let
firstDescriptor
be
the
first
element
of
this .[[pendingPullIntos]]. -
Set
firstDescriptor
.[[bytesFilled]]
to
0 .
-
Let
firstDescriptor
be
the
first
element
of
-
Perform
!
ResetQueue (this ). -
Return
!
PromiseInvokeOrNoop (this .[[underlyingByteSource]],"cancel"
, « reason »)
3.10.5.2. [[PullSteps]] ()
-
Let
stream
be
this .[[controlledReadableStream]]. -
Assert:
!
ReadableStreamHasDefaultReader ( stream ) istrue . -
If
this .[[queueTotalSize]] >0 ,-
Assert:
!
ReadableStreamGetNumReadRequests ( stream ) is0 . -
Let
entry
be
the
first
element
of
this .[[queue]]. -
Remove
entry
from
this .[[queue]], shifting all other elements downward (so that the second becomes the first, and so on). -
Set
this .[[queueTotalSize]] tothis .[[queueTotalSize]] − entry .[[byteLength]]. -
Perform
!
ReadableByteStreamControllerHandleQueueDrain (this ). -
Let
view
be
!
Construct ( %Uint8Array% , « entry .[[buffer]], entry .[[byteOffset]], entry .[[byteLength]] »). -
Return
a
promise
resolved
with
!
CreateIterResultObject ( view ,false ).
-
Assert:
!
-
Let
autoAllocateChunkSize
be
this .[[autoAllocateChunkSize]]. -
If
autoAllocateChunkSize
is
not
undefined ,-
Let
buffer
be
Construct (%ArrayBuffer% , « autoAllocateChunkSize »). -
If
buffer
is
an
abrupt completion , return a promise rejected with buffer .[[Value]]. -
Let
pullIntoDescriptor
be
Record {[[buffer]]: buffer .[[Value]], [[byteOffset]]:0 , [[byteLength]]: autoAllocateChunkSize , [[bytesFilled]]:0 , [[elementSize]]:1 , [[ctor]]: %Uint8Array% , [[readerType]]:"default"
}. -
Append
pullIntoDescriptor
as
the
last
element
of
this .[[pendingPullIntos]].
-
Let
buffer
be
-
Let
promise
be
!
ReadableStreamAddReadRequest ( stream ). -
Perform
!
ReadableByteStreamControllerCallPullIfNeeded (this ). - Return promise .
3.11.
Class
ReadableStreamBYOBRequest
The
ReadableStreamBYOBRequest
class
represents
a
pull
into
request
in
a
ReadableByteStreamController
.
3.11.1. Class Definition
This section is non-normative.
If
one
were
to
write
the
ReadableStreamBYOBRequest
class
in
something
close
to
the
syntax
of
[ECMASCRIPT]
,
it
would
look
like
class ReadableStreamBYOBRequest {
constructor(controller, view)
get view()
respond(bytesWritten)
respondWithNewView(view)
}
3.11.2. Internal Slots
Instances
of
ReadableStreamBYOBRequest
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[associatedReadableByteStreamController]] |
The
parent
ReadableByteStreamController
instance
|
[[view]] | A typed array representing the destination region to which the controller can write generated data |
3.11.3. new ReadableStreamBYOBRequest( controller , view )
-
If
!
IsReadableByteStreamController ( controller ) isfalse , throw aTypeError exception. -
Set
this .[[associatedReadableByteStreamController]] to controller . -
Set
this .[[view]] to view .
3.11.4.
Properties
of
the
ReadableStreamBYOBRequest
Prototype
3.11.4.1. get view
-
If
!
IsReadableStreamBYOBRequest (this ) isfalse , throw aTypeError exception. -
Return
this .[[view]].
3.11.4.2. respond( bytesWritten )
-
If
!
IsReadableStreamBYOBRequest (this ) isfalse , throw aTypeError exception. -
If
this .[[associatedReadableByteStreamController]] isundefined , throw aTypeError exception. -
If
!
IsDetachedBuffer (this .[[view]].[[ViewedArrayBuffer]]) istrue , throw aTypeError exception. -
Return
?
ReadableByteStreamControllerRespond (this .[[associatedReadableByteStreamController]], bytesWritten ).
3.11.4.3. respondWithNewView( view )
-
If
!
IsReadableStreamBYOBRequest (this ) isfalse , throw aTypeError exception. -
If
this .[[associatedReadableByteStreamController]] isundefined , throw aTypeError exception. -
If
Type ( view ) is not Object, throw aTypeError exception. -
If
view
does
not
have
a
[[ViewedArrayBuffer]]
internal
slot,
throw
a
TypeError exception. -
If
!
IsDetachedBuffer ( view .[[ViewedArrayBuffer]]) istrue , throw aTypeError exception. -
Return
?
ReadableByteStreamControllerRespondWithNewView (this .[[associatedReadableByteStreamController]], view ).
3.12. Readable Stream BYOB Controller Abstract Operations
3.12.1. IsReadableStreamBYOBRequest ( x ) nothrow
-
If
Type ( x ) is not Object, returnfalse . -
If
x
does
not
have
an
[[associatedReadableByteStreamController]]
internal
slot,
return
false . -
Return
true .
3.12.2. IsReadableByteStreamController ( x ) nothrow
-
If
Type ( x ) is not Object, returnfalse . -
If
x
does
not
have
an
[[underlyingByteSource]]
internal
slot,
return
false . -
Return
true .
3.12.3. ReadableByteStreamControllerCallPullIfNeeded ( controller ) nothrow
-
Let
shouldPull
be
!
ReadableByteStreamControllerShouldCallPull ( controller ). -
If
shouldPull
is
false , return. -
If
controller
.[[pulling]]
is
true ,-
Set
controller
.[[pullAgain]]
to
true . - Return.
-
Set
controller
.[[pullAgain]]
to
-
Assert:
controller
.[[pullAgain]]
is
false . -
Set
controller
.[[pulling]]
to
true . -
Let
pullPromise
be
!
PromiseInvokeOrNoop ( controller .[[underlyingByteSource]],"pull"
, « controller »). -
Upon
fulfillment
of
pullPromise
,
-
Set
controller
.[[pulling]]
to
false . -
If
controller
.[[pullAgain]]
is
true ,-
Set
controller
.[[pullAgain]]
to
false . -
Perform
!
ReadableByteStreamControllerCallPullIfNeeded ( controller ).
-
Set
controller
.[[pullAgain]]
to
-
Set
controller
.[[pulling]]
to
-
Upon
rejection
of
pullPromise
with
reason
e
,
-
If
controller
.[[controlledReadableStream]].[[state]]
is
"readable"
, perform !ReadableByteStreamControllerError ( controller , e ).
-
If
controller
.[[controlledReadableStream]].[[state]]
is
3.12.4. ReadableByteStreamControllerClearPendingPullIntos ( controller ) nothrow
-
Perform
!
ReadableByteStreamControllerInvalidateBYOBRequest ( controller ). -
Set
controller
.[[pendingPullIntos]]
to
a
new
empty
List .
3.12.5. ReadableByteStreamControllerClose ( controller ) throws
- Let stream be controller .[[controlledReadableStream]].
-
Assert:
controller
.[[closeRequested]]
is
false . -
Assert:
stream
.[[state]]
is
"readable"
. -
If
controller
.[[queueTotalSize]]
>
0 ,-
Set
controller
.[[closeRequested]]
to
true . - Return.
-
Set
controller
.[[closeRequested]]
to
-
If
controller
.[[pendingPullIntos]]
is
not
empty,
- Let firstPendingPullInto be the first element of controller .[[pendingPullIntos]].
-
If
firstPendingPullInto
.[[bytesFilled]]
>
0 ,-
Let
e
be
a
new
TypeError exception. -
Perform
!
ReadableByteStreamControllerError ( controller , e ). - Throw e .
-
Let
e
be
a
new
-
Perform
!
ReadableStreamClose ( stream ).
3.12.6. ReadableByteStreamControllerCommitPullIntoDescriptor ( stream , pullIntoDescriptor ) nothrow
-
Assert:
stream
.[[state]]
is
not
"errored"
. -
Let
done
be
false . -
If
stream
.[[state]]
is
"closed"
,-
Assert:
pullIntoDescriptor
.[[bytesFilled]]
is
0 . -
Set
done
to
true .
-
Assert:
pullIntoDescriptor
.[[bytesFilled]]
is
-
Let
filledView
be
!
ReadableByteStreamControllerConvertPullIntoDescriptor ( pullIntoDescriptor ). -
If
pullIntoDescriptor
.[[readerType]]
is
"default"
,-
Perform
!
ReadableStreamFulfillReadRequest ( stream , filledView , done ).
-
Perform
!
-
Otherwise,
-
Assert:
pullIntoDescriptor
.[[readerType]]
is
"byob"
. -
Perform
!
ReadableStreamFulfillReadIntoRequest ( stream , filledView , done ).
-
Assert:
pullIntoDescriptor
.[[readerType]]
is
3.12.7. ReadableByteStreamControllerConvertPullIntoDescriptor ( pullIntoDescriptor ) nothrow
- Let bytesFilled be pullIntoDescriptor .[[bytesFilled]].
- Let elementSize be pullIntoDescriptor .[[elementSize]].
- Assert: bytesFilled ≤ pullIntoDescriptor .[[byteLength]].
-
Assert:
bytesFilled
mod
elementSize
is
0 . -
Return
!
Construct ( pullIntoDescriptor .[[ctor]], « pullIntoDescriptor .[[buffer]], pullIntoDescriptor .[[byteOffset]], bytesFilled ÷ elementSize »).
3.12.8. ReadableByteStreamControllerEnqueue ( controller , chunk ) nothrow
- Let stream be controller .[[controlledReadableStream]].
-
Assert:
controller
.[[closeRequested]]
is
false . -
Assert:
stream
.[[state]]
is
"readable"
. - Let buffer be chunk .[[ViewedArrayBuffer]].
- Let byteOffset be chunk .[[ByteOffset]].
- Let byteLength be chunk .[[ByteLength]].
-
Let
transferredBuffer
be
!
TransferArrayBuffer ( buffer ). -
If
!
ReadableStreamHasDefaultReader ( stream ) istrue -
If
!
ReadableStreamGetNumReadRequests ( stream ) is0 ,-
Perform
!
ReadableByteStreamControllerEnqueueChunkToQueue ( controller , transferredBuffer , byteOffset , byteLength ).
-
Perform
!
-
Otherwise,
- Assert: controller .[[queue]] is empty.
-
Let
transferredView
be
!
Construct ( %Uint8Array% , « transferredBuffer , byteOffset , byteLength »). -
Perform
!
ReadableStreamFulfillReadRequest ( stream , transferredView ,false ).
-
If
!
-
Otherwise,
if
!
ReadableStreamHasBYOBReader ( stream ) istrue ,-
Perform
!
ReadableByteStreamControllerEnqueueChunkToQueue ( controller , transferredBuffer , byteOffset , byteLength ). -
Perform
!
ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue ( controller ).
-
Perform
!
-
Otherwise,
-
Assert:
!
IsReadableStreamLocked ( stream ) isfalse . -
Perform
!
ReadableByteStreamControllerEnqueueChunkToQueue ( controller , transferredBuffer , byteOffset , byteLength ).
-
Assert:
!
3.12.9. ReadableByteStreamControllerEnqueueChunkToQueue ( controller , buffer , byteOffset , byteLength ) nothrow
-
Append
Record {[[buffer]]: buffer , [[byteOffset]]: byteOffset , [[byteLength]]: byteLength } as the last element of controller .[[queue]]. - Add byteLength to controller .[[queueTotalSize]].
3.12.10. ReadableByteStreamControllerError ( controller , e ) nothrow
- Let stream be controller .[[controlledReadableStream]].
-
Assert:
stream
.[[state]]
is
"readable"
. -
Perform
!
ReadableByteStreamControllerClearPendingPullIntos ( controller ). -
Perform
!
ResetQueue ( controller ). -
Perform
!
ReadableStreamError ( stream , e ).
3.12.11. ReadableByteStreamControllerFillHeadPullIntoDescriptor ( controller , size , pullIntoDescriptor ) nothrow
- Assert: either controller .[[pendingPullIntos]] is empty, or the first element of controller .[[pendingPullIntos]] is pullIntoDescriptor .
-
Perform
!
ReadableByteStreamControllerInvalidateBYOBRequest ( controller ). - Set pullIntoDescriptor .[[bytesFilled]] to pullIntoDescriptor .[[bytesFilled]] + size .
3.12.12. ReadableByteStreamControllerFillPullIntoDescriptorFromQueue ( controller , pullIntoDescriptor ) nothrow
- Let elementSize be pullIntoDescriptor .[[elementSize]].
- Let currentAlignedBytes be pullIntoDescriptor .[[bytesFilled]] − ( pullIntoDescriptor .[[bytesFilled]] mod elementSize ).
-
Let
maxBytesToCopy
be
min ( controller .[[queueTotalSize]], pullIntoDescriptor .[[byteLength]] − pullIntoDescriptor .[[bytesFilled]]). - Let maxBytesFilled be pullIntoDescriptor .[[bytesFilled]] + maxBytesToCopy .
- Let maxAlignedBytes be maxBytesFilled − ( maxBytesFilled mod elementSize ).
- Let totalBytesToCopyRemaining be maxBytesToCopy .
-
Let
ready
be
false . -
If
maxAlignedBytes
>
currentAlignedBytes
,
- Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor .[[bytesFilled]].
-
Set
ready
to
true .
- Let queue be controller .[[queue]].
-
Repeat
the
following
steps
while
totalBytesToCopyRemaining
>
0 ,- Let headOfQueue be the first element of queue .
-
Let
bytesToCopy
be
min ( totalBytesToCopyRemaining , headOfQueue .[[byteLength]]). - Let destStart be pullIntoDescriptor .[[byteOffset]] + pullIntoDescriptor .[[bytesFilled]].
-
Perform
!
CopyDataBlockBytes ( pullIntoDescriptor .[[buffer]].[[ArrayBufferData]], destStart , headOfQueue .[[buffer]].[[ArrayBufferData]], headOfQueue .[[byteOffset]], bytesToCopy ). -
If
headOfQueue
.[[byteLength]]
is
bytesToCopy
,
- Remove the first element of queue , shifting all other elements downward (so that the second becomes the first, and so on).
-
Otherwise,
- Set headOfQueue .[[byteOffset]] to headOfQueue .[[byteOffset]] + bytesToCopy .
- Set headOfQueue .[[byteLength]] to headOfQueue .[[byteLength]] − bytesToCopy .
- Set controller .[[queueTotalSize]] to controller .[[queueTotalSize]] − bytesToCopy .
-
Perform
!
ReadableByteStreamControllerFillHeadPullIntoDescriptor ( controller , bytesToCopy , pullIntoDescriptor ). - Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy .
-
If
ready
is
false ,-
Assert:
controller
.[[queueTotalSize]]
is
0 . -
Assert:
pullIntoDescriptor
.[[bytesFilled]]
>
0 . - Assert: pullIntoDescriptor .[[bytesFilled]] < pullIntoDescriptor .[[elementSize]].
-
Assert:
controller
.[[queueTotalSize]]
is
- Return ready .
3.12.13. ReadableByteStreamControllerGetDesiredSize ( controller ) nothrow
- Let stream be controller .[[controlledReadableStream]].
- Let state be stream .[[state]].
-
If
state
is
"errored"
, returnnull . -
If
state
is
"closed"
, return0 . - Return controller .[[strategyHWM]] − controller .[[queueTotalSize]].
3.12.14. ReadableByteStreamControllerHandleQueueDrain ( controller ) nothrow
-
Assert:
controller
.[[controlledReadableStream]].[[state]]
is
"readable"
. -
If
controller
.[[queueTotalSize]]
is
0 and controller .[[closeRequested]] istrue ,-
Perform
!
ReadableStreamClose ( controller .[[controlledReadableStream]]).
-
Perform
!
-
Otherwise,
-
Perform
!
ReadableByteStreamControllerCallPullIfNeeded ( controller ).
-
Perform
!
3.12.15. ReadableByteStreamControllerInvalidateBYOBRequest ( controller ) nothrow
-
If
controller
.[[byobRequest]]
is
undefined , return. -
Set
controller
.[[byobRequest]].[[associatedReadableByteStreamController]]
to
undefined . -
Set
controller
.[[byobRequest]].[[view]]
to
undefined . -
Set
controller
.[[byobRequest]]
to
undefined .
3.12.16. ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue ( controller ) nothrow
-
Assert:
controller
.[[closeRequested]]
is
false . -
Repeat
the
following
steps
while
controller
.[[pendingPullIntos]]
is
not
empty,
-
If
controller
.[[queueTotalSize]]
is
0 , return. - Let pullIntoDescriptor be the first element of controller .[[pendingPullIntos]].
-
If
!
ReadableByteStreamControllerFillPullIntoDescriptorFromQueue ( controller , pullIntoDescriptor ) istrue ,-
Perform
!
ReadableByteStreamControllerShiftPendingPullInto ( controller ). -
Perform
!
ReadableByteStreamControllerCommitPullIntoDescriptor ( controller .[[controlledReadableStream]], pullIntoDescriptor ).
-
Perform
!
-
If
controller
.[[queueTotalSize]]
is
3.12.17. ReadableByteStreamControllerPullInto ( controller , view ) nothrow
- Let stream be controller .[[controlledReadableStream]].
- Let elementSize be 1.
-
Let
ctor
be
%DataView% . -
If
view
has
a
[[TypedArrayName]]
internal
slot
(i.e.,
it
is
not
a
DataView
),- Set elementSize to the element size specified in the typed array constructors table for view .[[TypedArrayName]].
- Set ctor to the constructor specified in the typed array constructors table for view .[[TypedArrayName]].
-
Let
buffer
be
!
TransferArrayBuffer ( view .[[ViewedArrayBuffer]]). -
Let
pullIntoDescriptor
be
Record {[[buffer]]: buffer , [[byteOffset]]: view .[[ByteOffset]], [[byteLength]]: view .[[ByteLength]], [[bytesFilled]]:0 , [[elementSize]]: elementSize , [[ctor]]: ctor , [[readerType]]:"byob"
}. -
If
controller
.[[pendingPullIntos]]
is
not
empty,
- Append pullIntoDescriptor as the last element of controller .[[pendingPullIntos]].
-
Return
!
ReadableStreamAddReadIntoRequest ( stream ).
-
If
stream
.[[state]]
is
"closed"
,-
Let
emptyView
be
!
Construct ( ctor , « pullIntoDescriptor .[[buffer]], pullIntoDescriptor .[[byteOffset]],0 »). -
Return
a
promise
resolved
with
!
CreateIterResultObject ( emptyView ,true ).
-
Let
emptyView
be
!
-
If
controller
.[[queueTotalSize]]
>
0 ,-
If
!
ReadableByteStreamControllerFillPullIntoDescriptorFromQueue ( controller , pullIntoDescriptor ) istrue ,-
Let
filledView
be
!
ReadableByteStreamControllerConvertPullIntoDescriptor ( pullIntoDescriptor ). -
Perform
!
ReadableByteStreamControllerHandleQueueDrain ( controller ). -
Return
a
promise
resolved
with
!
CreateIterResultObject ( filledView ,false ).
-
Let
filledView
be
!
-
If
controller
.[[closeRequested]]
is
true ,-
Let
e
be
a
TypeError exception. -
Perform
!
ReadableByteStreamControllerError ( controller , e ). - Return a promise rejected with e .
-
Let
e
be
a
-
If
!
- Append pullIntoDescriptor as the last element of controller .[[pendingPullIntos]].
-
Let
promise
be
!
ReadableStreamAddReadIntoRequest ( stream ). -
Perform
!
ReadableByteStreamControllerCallPullIfNeeded ( controller ). - Return promise .
3.12.18. ReadableByteStreamControllerRespond ( controller , bytesWritten ) throws
-
Let
bytesWritten
be
?
ToNumber ( bytesWritten ). -
If
!
IsFiniteNonNegativeNumber ( bytesWritten ) isfalse ,-
Throw
a
RangeError exception.
-
Throw
a
- Assert: controller .[[pendingPullIntos]] is not empty.
-
Perform
?
ReadableByteStreamControllerRespondInternal ( controller , bytesWritten ).
3.12.19. ReadableByteStreamControllerRespondInClosedState ( controller , firstDescriptor ) nothrow
-
Set
firstDescriptor
.[[buffer]]
to
!
TransferArrayBuffer ( firstDescriptor .[[buffer]]). -
Assert:
firstDescriptor
.[[bytesFilled]]
is
0 . - Let stream be controller .[[controlledReadableStream]].
-
If
ReadableStreamHasBYOBReader ( stream ) istrue ,-
Repeat
the
following
steps
while
!
ReadableStreamGetNumReadIntoRequests ( stream ) >0 ,-
Let
pullIntoDescriptor
be
!
ReadableByteStreamControllerShiftPendingPullInto ( controller ). -
Perform
!
ReadableByteStreamControllerCommitPullIntoDescriptor ( stream , pullIntoDescriptor ).
-
Let
pullIntoDescriptor
be
!
-
Repeat
the
following
steps
while
!
3.12.20. ReadableByteStreamControllerRespondInReadableState ( controller , bytesWritten , pullIntoDescriptor ) throws
-
If
pullIntoDescriptor
.[[bytesFilled]]
+
bytesWritten
>
pullIntoDescriptor
.[[byteLength]],
throw
a
RangeError exception. -
Perform
!
ReadableByteStreamControllerFillHeadPullIntoDescriptor ( controller , bytesWritten , pullIntoDescriptor ). - If pullIntoDescriptor .[[bytesFilled]] < pullIntoDescriptor .[[elementSize]], return.
-
Perform
!
ReadableByteStreamControllerShiftPendingPullInto ( controller ). - Let remainderSize be pullIntoDescriptor .[[bytesFilled]] mod pullIntoDescriptor .[[elementSize]].
-
If
remainderSize
>
0 ,- Let end be pullIntoDescriptor .[[byteOffset]] + pullIntoDescriptor .[[bytesFilled]].
-
Let
remainder
be
?
CloneArrayBuffer ( pullIntoDescriptor .[[buffer]], end − remainderSize , remainderSize ,%ArrayBuffer% ). -
Perform
!
ReadableByteStreamControllerEnqueueChunkToQueue ( controller , remainder ,0 , remainder .[[ByteLength]]).
-
Set
pullIntoDescriptor
.[[buffer]]
to
!
TransferArrayBuffer ( pullIntoDescriptor .[[buffer]]). - Set pullIntoDescriptor .[[bytesFilled]] to pullIntoDescriptor .[[bytesFilled]] − remainderSize .
-
Perform
!
ReadableByteStreamControllerCommitPullIntoDescriptor ( controller .[[controlledReadableStream]], pullIntoDescriptor ). -
Perform
!
ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue ( controller ).
3.12.21. ReadableByteStreamControllerRespondInternal ( controller , bytesWritten ) throws
- Let firstDescriptor be the first element of controller .[[pendingPullIntos]].
- Let stream be controller .[[controlledReadableStream]].
-
If
stream
.[[state]]
is
"closed"
,-
If
bytesWritten
is
not
0 , throw aTypeError exception. -
Perform
!
ReadableByteStreamControllerRespondInClosedState ( controller , firstDescriptor ).
-
If
bytesWritten
is
not
-
Otherwise,
-
Assert:
stream
.[[state]]
is
"readable"
. -
Perform
?
ReadableByteStreamControllerRespondInReadableState ( controller , bytesWritten , firstDescriptor ).
-
Assert:
stream
.[[state]]
is
3.12.22. ReadableByteStreamControllerRespondWithNewView ( controller , view ) throws
- Assert: controller .[[pendingPullIntos]] is not empty.
- Let firstDescriptor be the first element of controller .[[pendingPullIntos]].
-
If
firstDescriptor
.[[byteOffset]]
+
firstDescriptor
.[[bytesFilled]]
is
not
view
.[[ByteOffset]],
throw
a
RangeError exception. -
If
firstDescriptor
.[[byteLength]]
is
not
view
.[[ByteLength]],
throw
a
RangeError exception. - Set firstDescriptor .[[buffer]] to view .[[ViewedArrayBuffer]].
-
Perform
?
ReadableByteStreamControllerRespondInternal ( controller , view .[[ByteLength]]).
3.12.23. ReadableByteStreamControllerShiftPendingPullInto ( controller ) nothrow
- Let descriptor be the first element of controller .[[pendingPullIntos]].
- Remove descriptor from controller .[[pendingPullIntos]], shifting all other elements downward (so that the second becomes the first, and so on).
-
Perform
!
ReadableByteStreamControllerInvalidateBYOBRequest ( controller ). - Return descriptor .
3.12.24. ReadableByteStreamControllerShouldCallPull ( controller ) nothrow
- Let stream be controller .[[controlledReadableStream]].
-
If
stream
.[[state]]
is
not
"readable"
, returnfalse . -
If
controller
.[[closeRequested]]
is
true , returnfalse . -
If
controller
.[[started]]
is
false , returnfalse . -
If
!
ReadableStreamHasDefaultReader ( stream ) istrue and !ReadableStreamGetNumReadRequests ( stream ) >0 , returntrue . -
If
!
ReadableStreamHasBYOBReader ( stream ) istrue and !ReadableStreamGetNumReadIntoRequests ( stream ) >0 , returntrue . -
If
!
ReadableByteStreamControllerGetDesiredSize ( controller ) >0 , returntrue . -
Return
false .
4. Writable Streams
4.1. Using Writable Streams
readableStream.pipeTo(writableStream)
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
write()
and
close()
methods.
Since
writable
streams
queue
any
incoming
writes,
and
take
care
internally
to
forward
them
to
the
underlying
sink
in
sequence,
you
can
indiscriminately
write
to
a
writable
stream
without
much
ceremony:
function writeArrayToStream(array, writableStream) {
const writer = writableStream.getWriter();
array.forEach(chunk => writer.write(chunk));
return writer.close();
}
writeArrayToStream([1, 2, 3, 4, 5], writableStream)
.then(() => console.log("All done!"))
.catch(e => console.error("Error with the stream: " + e));
close()
method.
That
promise
will
reject
if
anything
goes
wrong
with
the
stream—initializing
it,
writing
to
it,
or
closing
it.
And
it
will
fulfill
once
the
stream
is
successfully
closed.
Often
this
is
all
you
care
about.
However,
if
you
care
about
the
success
of
writing
a
specific
chunk
,
you
can
use
the
promise
returned
by
the
writer’s
write()
method:
writer.write("i am a chunk of data")
.then(() => console.log("chunk successfully written!"))
.catch(e => console.error(e));
What "success" means is up to a given stream instance (or more precisely, its underlying sink ) to decide. For example, for a file stream it could simply mean that the OS has accepted the write, and not necessarily that the chunk has been flushed to disk.
desiredSize
and
ready
properties
of
writable
stream
writers
allow
producers
to
more
precisely
respond
to
flow
control
signals
from
the
stream,
to
keep
memory
usage
below
the
stream’s
specified
high
water
mark
.
The
following
example
writes
an
infinite
sequence
of
random
bytes
to
a
stream,
using
desiredSize
to
determine
how
many
bytes
to
generate
at
a
given
time,
and
using
ready
to
wait
for
the
backpressure
to
subside.
async function writeRandomBytesForever(writableStream) {
const writer = writableStream.getWriter();
while (true) {
await writer.ready;
const bytes = new Uint8Array(writer.desiredSize);
window.crypto.getRandomValues(bytes);
await writer.write(bytes);
}
}
writeRandomBytesForever(myWritableStream).catch(e => console.error("Something broke", e));
4.2.
Class
WritableStream
4.2.1. Class Definition
This section is non-normative.
If
one
were
to
write
the
WritableStream
class
in
something
close
to
the
syntax
of
[ECMASCRIPT]
,
it
would
look
like
class WritableStream {
constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {})
get locked()
abort(reason)
getWriter()
}
4.2.2. Internal Slots
Instances
of
WritableStream
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[backpressure]] | The backpressure signal set by the controller |
[[closeRequest]] |
The
promise
returned
from
the
writer
close()
method
|
[[inFlightWriteRequest]] |
A
slot
set
to
the
promise
for
the
current
in-flight
write
operation
while
the
underlying
sink
’s
write
method
is
executing
and
has
not
yet
fulfilled,
used
to
prevent
reentrant
calls
|
[[inFlightCloseRequest]] |
A
slot
set
to
the
promise
for
the
current
in-flight
close
operation
while
the
underlying
sink
’s
close
method
is
executing
and
has
not
yet
fulfilled,
used
to
prevent
the
abort()
method
from
interrupting
close
|
[[pendingAbortRequest]] |
A
abort()
and
the
reason
passed
to
abort()
|
[[state]] |
A
string
containing
the
stream’s
current
state,
used
internally;
one
of
"writable"
,
"closed"
,
"erroring"
,
or
"errored"
|
[[storedError]] |
A
value
indicating
how
the
stream
failed,
to
be
given
as
a
failure
reason
or
exception
when
trying
to
operate
on
the
stream
while
in
the
"errored"
state
|
[[writableStreamController]] |
A
WritableStreamDefaultController
created
with
the
ability
to
control
the
state
and
queue
of
this
stream;
also
used
for
the
IsWritableStream
brand
check
|
[[writer]] |
A
WritableStreamDefaultWriter
instance,
if
the
stream
is
locked
to
a
writer
,
or
|
[[writeRequests]] |
A
|
The
[[inFlightCloseRequest]]
slot
and
[[closeRequest]]
slot
are
mutually
exclusive.
Similarly,
no
element
will
be
removed
from
[[writeRequests]]
while
[[inFlightWriteRequest]]
is
not
undefined
.
Implementations
can
optimize
storage
for
these
slots
based
on
these
invariants.
4.2.3. new WritableStream( underlyingSink = {}, { size , highWaterMark = 1 } = {})
underlyingSink
object
passed
to
the
constructor
can
implement
any
of
the
following
methods
to
govern
how
the
constructed
stream
instance
behaves:
-
start(controller)
is called immediately, and can perform any actions necessary to acquire access to the underlying sink . If this process is asynchronous, it can return a promise to signal success or failure. -
write(chunk, controller)
is called when a new chunk of data is ready to be written to the underlying sink . It can return a promise to signal success or failure of the write operation. The stream implementation guarantees that this method will be called only after previous writes have succeeded, and never afterclose
orabort
is called. -
close()
is called after the producer signals that they are done writing chunks to the stream, and all queued-up writes successfully complete. It can perform any actions necessary to finalize writes to the underlying sink , and release access to it. If this process is asynchronous, it can return a promise to signal success or failure. The stream implementation guarantees that this method will be called only after all queued-up writes have succeeded. -
abort(reason)
is called when the producer signals they wish to abruptly close the stream and put it in an errored state. It can clean up any held resources, much likeclose
, but perhaps with some custom handling. Unlikeclose
,abort
will be called even if writes are queued up; those chunks will be thrown away. If this process is asynchronous, it can return a promise to signal success or failure.
The
controller
object
passed
to
start
,
write
and
close
is
an
instance
of
WritableStreamDefaultController
,
and
has
the
ability
to
error
the
stream.
The
constructor
also
accepts
a
second
argument
containing
the
queuing
strategy
object
with
two
properties:
a
non-negative
number
highWaterMark
,
and
a
function
size(chunk)
.
The
supplied
strategy
could
be
an
instance
of
the
built-in
CountQueuingStrategy
or
ByteLengthQueuingStrategy
classes,
or
it
could
be
custom.
If
no
strategy
is
supplied,
the
default
behavior
will
be
the
same
as
a
CountQueuingStrategy
with
a
high
water
mark
of
1.
-
Set
this .[[state]] to"writable"
. -
Set
this .[[storedError]],this .[[writer]],this .[[writableStreamController]],this .[[inFlightWriteRequest]],this .[[closeRequest]],this .[[inFlightCloseRequest]] andthis .[[pendingAbortRequest]] toundefined . -
Set
this .[[writeRequests]] to a new emptyList . -
Set
this .[[backpressure]] tofalse . -
Let
type
be
?
GetV ( underlyingSink ,"type"
). -
If
type
is
not
undefined , throw aRangeError exception.This is to allow us to add new potential types in the future, without backward-compatibility concerns.
-
Set
this .[[writableStreamController]] to ?Construct (WritableStreamDefaultController
, «this , underlyingSink , size , highWaterMark »). -
Perform
?
this .[[writableStreamController]].[[StartSteps]]().
4.2.4.
Properties
of
the
WritableStream
Prototype
4.2.4.1. get locked
locked
getter
returns
whether
or
not
the
writable
stream
is
locked
to
a
writer
.
-
If
!
IsWritableStream (this ) isfalse , throw aTypeError exception. -
Return
!
IsWritableStreamLocked (this ).
4.2.4.2. abort( reason )
abort
method
aborts
the
stream,
signaling
that
the
producer
can
no
longer
successfully
write
to
the
stream
and
it
is
to
be
immediately
moved
to
an
errored
state,
with
any
queued-up
writes
discarded.
This
will
also
execute
any
abort
mechanism
of
the
underlying
sink
.
-
If
!
IsWritableStream (this ) isfalse , return a promise rejected with aTypeError exception. -
If
!
IsWritableStreamLocked (this ) istrue , return a promise rejected with aTypeError exception. -
Return
!
WritableStreamAbort (this , reason ).
4.2.4.3. getWriter()
getWriter
method
creates
a
writer
(an
instance
of
WritableStreamDefaultWriter
)
and
locks
the
stream
to
the
new
writer.
While
the
stream
is
locked,
no
other
writer
can
be
acquired
until
this
one
is
released
.
This functionality is especially useful for creating abstractions that desire the ability to write to a stream without interruption or interleaving. By getting a writer for the stream, you can ensure nobody else can write at the same time, which would cause the resulting written data to be unpredictable and probably useless.
-
If
!
IsWritableStream (this ) isfalse , throw aTypeError exception. -
Return
?
AcquireWritableStreamDefaultWriter (this ).
4.3. General Writable Stream Abstract Operations
The following abstract operations, unlike most in this specification, are meant to be generally useful by other specifications, instead of just being part of the implementation of this spec’s classes.
4.3.1. AcquireWritableStreamDefaultWriter ( stream ) throws
-
Return
?
Construct (WritableStreamDefaultWriter
, « stream »).
4.3.2. IsWritableStream ( x ) nothrow
-
If
Type ( x ) is not Object, returnfalse . -
If
x
does
not
have
a
[[writableStreamController]]
internal
slot,
return
false . -
Return
true .
4.3.3. IsWritableStreamLocked ( stream ) nothrow
This abstract operation is meant to be called from other specifications that may wish to query whether or not a writable stream is locked to a writer .
-
Assert:
!
IsWritableStream ( stream ) istrue . -
If
stream
.[[writer]]
is
undefined , returnfalse . -
Return
true .
4.3.4. WritableStreamAbort ( stream , reason ) nothrow
- Let state be stream .[[state]].
-
If
state
is
"closed"
, return a promise resolved withundefined . -
If
state
is
"errored"
, return a promise rejected with stream .[[storedError]]. -
Let
error
be
a
new
TypeError indicating that the stream has been requested to abort. -
If
stream
.[[pendingAbortRequest]]
is
not
undefined , return a promise rejected with error . -
Assert:
state
is
"writable"
or"erroring"
. -
Let
wasAlreadyErroring
be
false . -
If
state
is
"erroring"
,-
Set
wasAlreadyErroring
to
true . -
Set
reason
to
undefined .
-
Set
wasAlreadyErroring
to
- Let promise be a new promise .
-
Set
stream
.[[pendingAbortRequest]]
to
Record {[[promise]]: promise , [[reason]]: reason , [[wasAlreadyErroring]]: wasAlreadyErroring }. -
If
wasAlreadyErroring
is
false , perform !WritableStreamStartErroring ( stream , error ). - Return promise .
4.4. Writable Stream Abstract Operations Used by Controllers
To
allow
future
flexibility
to
add
different
writable
stream
behaviors
(similar
to
the
distinction
between
default
readable
streams
and
readable
byte
streams
),
much
of
the
internal
state
of
a
writable
stream
is
encapsulated
by
the
WritableStreamDefaultController
class.
The
abstract
operations
in
this
section
are
interfaces
that
are
used
by
the
controller
implementation
to
affect
its
associated
WritableStream
object,
translating
the
controller’s
internal
state
changes
into
developer-facing
results
visible
through
the
WritableStream
's
public
API.
4.4.1. WritableStreamAddWriteRequest ( stream ) nothrow
-
Assert:
!
IsWritableStreamLocked ( stream ) istrue . -
Assert:
stream
.[[state]]
is
"writable"
. - Let promise be a new promise .
- Append promise as the last element of stream .[[writeRequests]].
- Return promise .
4.4.2. WritableStreamDealWithRejection ( stream , error ) nothrow
- Let state be stream .[[state]].
-
If
state
is
"writable"
,-
Perform
!
WritableStreamStartErroring ( stream , error ). - Return.
-
Perform
!
-
Assert:
state
is
"erroring"
. -
Perform
!
WritableStreamFinishErroring ( stream ).
4.4.3. WritableStreamStartErroring ( stream , reason ) nothrow
-
Assert:
stream
.[[storedError]]
is
undefined . -
Assert:
stream
.[[state]]
is
"writable"
. - Let controller be stream .[[writableStreamController]].
-
Assert:
controller
is
not
undefined . -
Set
stream
.[[state]]
to
"erroring"
. - Set stream .[[storedError]] to reason .
- Let writer be stream .[[writer]].
-
If
writer
is
not
undefined , perform !WritableStreamDefaultWriterEnsureReadyPromiseRejected ( writer , reason ). -
If
!
WritableStreamHasOperationMarkedInFlight ( stream ) isfalse and controller .[[started]] istrue , perform !WritableStreamFinishErroring ( stream ).
4.4.4. WritableStreamFinishErroring ( stream ) nothrow
-
Assert:
stream
.[[state]]
is
"erroring"
. -
Assert:
!
WritableStreamHasOperationMarkedInFlight ( stream ) isfalse . -
Set
stream
.[[state]]
to
"errored"
. - Perform ! stream .[[writableStreamController]].[[ErrorSteps]]().
- Let storedError be stream .[[storedError]].
-
Repeat
for
each
writeRequest
that
is
an
element
of
stream
.[[writeRequests]],
- Reject writeRequest with storedError .
-
Set
stream
.[[writeRequests]]
to
an
empty
List . -
if
stream
.[[pendingAbortRequest]]
is
undefined ,-
Perform
!
WritableStreamRejectCloseAndClosedPromiseIfNeeded ( stream ). - Return.
-
Perform
!
- Let abortRequest be stream .[[pendingAbortRequest]].
-
Set
stream
.[[pendingAbortRequest]]
to
undefined . -
If
abortRequest
.[[wasAlreadyErroring]]
is
true ,- Reject abortRequest .[[promise]] with storedError .
-
Perform
!
WritableStreamRejectCloseAndClosedPromiseIfNeeded ( stream ). - Return.
- Let promise be ! stream.[[writableStreamController]].[[AbortSteps]]( abortRequest .[[reason]]).
-
Upon
fulfillment
of
promise
,
-
Resolve
abortRequest
.[[promise]]
with
undefined . -
Perform
!
WritableStreamRejectCloseAndClosedPromiseIfNeeded ( stream ).
-
Resolve
abortRequest
.[[promise]]
with
-
Upon
rejection
of
promise
with
reason
reason
,
- Reject abortRequest .[[promise]] with reason .
-
Perform
!
WritableStreamRejectCloseAndClosedPromiseIfNeeded ( stream ).
4.4.5. WritableStreamFinishInFlightWrite ( stream ) nothrow
-
Assert:
stream
.[[inFlightWriteRequest]]
is
not
undefined . -
Resolve
stream
.[[inFlightWriteRequest]]
with
undefined . -
Set
stream
.[[inFlightWriteRequest]]
to
undefined .
4.4.6. WritableStreamFinishInFlightWriteWithError ( stream , error ) nothrow
-
Assert:
stream
.[[inFlightWriteRequest]]
is
not
undefined . - Reject stream .[[inFlightWriteRequest]] with error .
-
Set
stream
.[[inFlightWriteRequest]]
to
undefined . -
Assert:
stream
.[[state]]
is
"writable"
or"erroring"
. -
Perform
!
WritableStreamDealWithRejection ( stream , error ).
4.4.7. WritableStreamFinishInFlightClose ( stream ) nothrow
-
Assert:
stream
.[[inFlightCloseRequest]]
is
not
undefined . -
Resolve
stream
.[[inFlightCloseRequest]]
with
undefined . -
Set
stream
.[[inFlightCloseRequest]]
to
undefined . - Let state be stream .[[state]].
-
Assert:
stream
.[[state]]
is
"writable"
or"erroring"
. -
If
state
is
"erroring"
,-
Set
stream
.[[storedError]]
to
undefined . -
If
stream
.[[pendingAbortRequest]]
is
not
undefined ,-
Resolve
stream
.[[pendingAbortRequest]].[[promise]]
with
undefined . -
Set
stream
.[[pendingAbortRequest]]
to
undefined .
-
Resolve
stream
.[[pendingAbortRequest]].[[promise]]
with
-
Set
stream
.[[storedError]]
to
-
Set
stream
.[[state]]
to
"closed"
. - Let writer be stream .[[writer]].
-
If
writer
is
not
undefined , resolve writer .[[closedPromise]] withundefined . -
Assert:
stream
.[[pendingAbortRequest]]
is
undefined . -
Assert:
stream
.[[storedError]]
is
undefined .
4.4.8. WritableStreamFinishInFlightCloseWithError ( stream , error ) nothrow
-
Assert:
stream
.[[inFlightCloseRequest]]
is
not
undefined . - Reject stream .[[inFlightCloseRequest]] with error .
-
Set
stream
.[[inFlightCloseRequest]]
to
undefined . -
Assert:
stream
.[[state]]
is
"writable"
or"erroring"
. -
If
stream
.[[pendingAbortRequest]]
is
not
undefined ,- Reject stream .[[pendingAbortRequest]].[[promise]] with error .
-
Set
stream
.[[pendingAbortRequest]]
to
undefined .
-
Perform
!
WritableStreamDealWithRejection ( stream , error ).
4.4.9. WritableStreamCloseQueuedOrInFlight ( stream ) nothrow
-
If
stream
.[[closeRequest]]
is
undefined and stream .[[inFlightCloseRequest]] isundefined , returnfalse . -
Return
true .
4.4.10. WritableStreamHasOperationMarkedInFlight ( stream ) nothrow
-
If
stream
.[[inFlightWriteRequest]]
is
undefined and controller .[[inFlightCloseRequest]] isundefined , returnfalse . -
Return
true .
4.4.11. WritableStreamMarkCloseRequestInFlight ( stream ) nothrow
-
Assert:
stream
.[[inFlightCloseRequest]]
is
undefined . -
Assert:
stream
.[[closeRequest]]
is
not
undefined . - Set stream .[[inFlightCloseRequest]] to stream .[[closeRequest]].
-
Set
stream
.[[closeRequest]]
to
undefined .
4.4.12. WritableStreamMarkFirstWriteRequestInFlight ( stream ) nothrow
-
Assert:
stream
.[[inFlightWriteRequest]]
is
undefined . - Assert: stream .[[writeRequests]] is not empty.
- Let writeRequest be the first element of stream .[[writeRequests]].
- Remove writeRequest from stream .[[writeRequests]], shifting all other elements downward (so that the second becomes the first, and so on).
- Set stream .[[inFlightWriteRequest]] to writeRequest .
4.4.13. WritableStreamRejectCloseAndClosedPromiseIfNeeded ( stream ) nothrow
-
Assert:
stream
.[[state]]
is
"errored"
. -
If
stream
.[[closeRequest]]
is
not
undefined ,-
Assert:
stream
.[[inFlightCloseRequest]]
is
undefined . - Reject stream .[[closeRequest]] with stream .[[storedError]].
-
Set
stream
.[[closeRequest]]
to
undefined .
-
Assert:
stream
.[[inFlightCloseRequest]]
is
- Let writer be stream .[[writer]].
-
If
writer
is
not
undefined ,- Reject writer .[[closedPromise]] with stream .[[storedError]].
-
Set
writer
.[[closedPromise]].[[PromiseIsHandled]]
to
true .
4.4.14. WritableStreamUpdateBackpressure ( stream , backpressure ) nothrow
-
Assert:
stream
.[[state]]
is
"writable"
. -
Assert:
!
WritableStreamCloseQueuedOrInFlight ( stream ) isfalse . - Let writer be stream .[[writer]].
-
If
writer
is
not
undefined and backpressure is not stream .[[backpressure]],-
If
backpressure
is
true , set writer .[[readyPromise]] to a new promise . -
Otherwise,
-
Assert:
backpressure
is
false . -
Resolve
writer
.[[readyPromise]]
with
undefined .
-
Assert:
backpressure
is
-
If
backpressure
is
- Set stream .[[backpressure]] to backpressure .
4.5.
Class
WritableStreamDefaultWriter
The
WritableStreamDefaultWriter
class
represents
a
writable
stream
writer
designed
to
be
vended
by
a
WritableStream
instance.
4.5.1. Class Definition
This section is non-normative.
If
one
were
to
write
the
WritableStreamDefaultWriter
class
in
something
close
to
the
syntax
of
[ECMASCRIPT]
,
it
would
look
like
class WritableStreamDefaultWriter {
constructor(stream)
get closed()
get desiredSize()
get ready()
abort(reason)
close()
releaseLock()
write(chunk)
}
4.5.2. Internal Slots
Instances
of
WritableStreamDefaultWriter
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[closedPromise]] |
A
promise
returned
by
the
writer’s
closed
getter
|
[[ownerWritableStream]] |
A
WritableStream
instance
that
owns
this
writer
|
[[readyPromise]] |
A
promise
returned
by
the
writer’s
ready
getter
|
4.5.3. new WritableStreamDefaultWriter( stream )
WritableStreamDefaultWriter
constructor
is
generally
not
meant
to
be
used
directly;
instead,
a
stream’s
getWriter()
method
ought
to
be
be
used.
-
If
!
IsWritableStream ( stream ) isfalse , throw aTypeError exception. -
If
!
IsWritableStreamLocked ( stream ) istrue , throw aTypeError exception. -
Set
this .[[ownerWritableStream]] to stream . -
Set
stream
.[[writer]]
to
this . - Let state be stream .[[state]].
-
If
state
is
"writable"
,-
If
!
WritableStreamCloseQueuedOrInFlight ( stream ) isfalse and stream .[[backpressure]] istrue , setthis .[[readyPromise]] to a new promise . -
Otherwise,
set
this .[[readyPromise]] to a promise resolved withundefined . -
Set
this .[[closedPromise]] to a new promise .
-
If
!
-
Otherwise,
if
state
is
"erroring"
,-
Set
this .[[readyPromise]] to a promise rejected with stream .[[storedError]]. -
Set
this .[[readyPromise]].[[PromiseIsHandled]] totrue . -
Set
this .[[closedPromise]] to a new promise .
-
Set
-
Otherwise,
if
state
is
"closed"
,-
Set
this .[[readyPromise]] to a promise resolved withundefined . -
Set
this .[[closedPromise]] to a promise resolved withundefined .
-
Set
-
Otherwise,
-
Assert:
state
is
"errored"
. - Let storedError be stream .[[storedError]].
-
Set
this .[[readyPromise]] to a promise rejected with storedError . -
Set
this .[[readyPromise]].[[PromiseIsHandled]] totrue . -
Set
this .[[closedPromise]] to a promise rejected with storedError . -
Set
this .[[closedPromise]].[[PromiseIsHandled]] totrue .
-
Assert:
state
is
4.5.4.
Properties
of
the
WritableStreamDefaultWriter
Prototype
4.5.4.1. get closed
closed
getter
returns
a
promise
that
will
be
fulfilled
when
the
stream
becomes
closed,
or
rejected
if
the
stream
ever
errors
or
the
writer’s
lock
is
released
before
the
stream
finishes
closing.
-
If
!
IsWritableStreamDefaultWriter (this ) isfalse , return a promise rejected with aTypeError exception. -
Return
this .[[closedPromise]].
4.5.4.2. get desiredSize
desiredSize
getter
returns
the
desired
size
to
fill
the
stream’s
internal
queue
.
It
can
be
negative,
if
the
queue
is
over-full.
A
producer
can
use
this
information
to
determine
the
right
amount
of
data
to
write.
It
will
be
-
If
!
IsWritableStreamDefaultWriter (this ) isfalse , throw aTypeError exception. -
If
this .[[ownerWritableStream]] isundefined , throw aTypeError exception. -
Return
!
WritableStreamDefaultWriterGetDesiredSize (this ).
4.5.4.3. get ready
ready
getter
returns
a
promise
that
will
be
fulfilled
when
the
desired
size
to
fill
the
stream’s
internal
queue
transitions
from
nonpositive
to
positive,
signaling
that
it
is
no
longer
applying
backpressure
.
Once
the
desired
size
to
fill
the
stream’s
internal
queue
dips
back
to
zero
or
below,
the
getter
will
return
a
new
promise
that
stays
pending
until
the
next
transition.
If the stream becomes errored or aborted, or the writer’s lock is released , the returned promise will become rejected.
-
If
!
IsWritableStreamDefaultWriter (this ) isfalse , return a promise rejected with aTypeError exception. -
Return
this .[[readyPromise]].
4.5.4.4. abort( reason )
abort
method
behaves
the
same
as
that
for
the
associated
stream.
(Otherwise,
it
returns
a
rejected
promise.)
-
If
!
IsWritableStreamDefaultWriter (this ) isfalse , return a promise rejected with aTypeError exception. -
If
this .[[ownerWritableStream]] isundefined , return a promise rejected with aTypeError exception. -
Return
!
WritableStreamDefaultWriterAbort (this , reason ).
4.5.4.5. close()
close
method
will
close
the
associated
writable
stream.
The
underlying
sink
will
finish
processing
any
previously-written
chunks
,
before
invoking
its
close
behavior.
During
this
time
any
further
attempts
to
write
will
fail
(without
erroring
the
stream).
The
method
returns
a
promise
that
is
fulfilled
with
-
If
!
IsWritableStreamDefaultWriter (this ) isfalse , return a promise rejected with aTypeError exception. -
Let
stream
be
this .[[ownerWritableStream]]. -
If
stream
is
undefined , return a promise rejected with aTypeError exception. -
If
!
WritableStreamCloseQueuedOrInFlight ( stream ) istrue , return a promise rejected with aTypeError exception. -
Return
!
WritableStreamDefaultWriterClose (this ).
4.5.4.6. releaseLock()
releaseLock
method
releases
the
writer’s
lock
on
the
corresponding
stream.
After
the
lock
is
released,
the
writer
is
no
longer
active
.
If
the
associated
stream
is
errored
when
the
lock
is
released,
the
writer
will
appear
errored
in
the
same
way
from
now
on;
otherwise,
the
writer
will
appear
closed.
Note
that
the
lock
can
still
be
released
even
if
some
ongoing
writes
have
not
yet
finished
(i.e.
even
if
the
promises
returned
from
previous
calls
to
write()
have
not
yet
settled).
It’s
not
necessary
to
hold
the
lock
on
the
writer
for
the
duration
of
the
write;
the
lock
instead
simply
prevents
other
producers
from
writing
in
an
interleaved
manner.
-
If
!
IsWritableStreamDefaultWriter (this ) isfalse , throw aTypeError exception. -
Let
stream
be
this .[[ownerWritableStream]]. -
If
stream
is
undefined , return. -
Assert:
stream
.[[writer]]
is
not
undefined . -
Perform
!
WritableStreamDefaultWriterRelease (this ).
4.5.4.7. write( chunk )
write
method
writes
the
given
chunk
to
the
writable
stream,
by
waiting
until
any
previous
writes
have
finished
successfully,
and
then
sending
the
chunk
to
the
underlying
sink
.
It
will
return
a
promise
that
fulfills
with
Note that what "success" means is up to the underlying sink ; it might indicate simply that the chunk has been accepted, and not necessarily that it is safely saved to its ultimate destination.
-
If
!
IsWritableStreamDefaultWriter (this ) isfalse , return a promise rejected with aTypeError exception. -
If
this .[[ownerWritableStream]] isundefined , return a promise rejected with aTypeError exception. -
Return
!
WritableStreamDefaultWriterWrite (this , chunk ).
4.6. Writable Stream Writer Abstract Operations
4.6.1. IsWritableStreamDefaultWriter ( x ) nothrow
-
If
Type ( x ) is not Object, returnfalse . -
If
x
does
not
have
an
[[ownerWritableStream]]
internal
slot,
return
false . -
Return
true .
4.6.2. WritableStreamDefaultWriterAbort ( writer , reason ) nothrow
- Let stream be writer .[[ownerWritableStream]].
-
Assert:
stream
is
not
undefined . -
Return
!
WritableStreamAbort ( stream , reason ).
4.6.3. WritableStreamDefaultWriterClose ( writer ) nothrow
- Let stream be writer .[[ownerWritableStream]].
-
Assert:
stream
is
not
undefined . - Let state be stream .[[state]].
-
If
state
is
"closed"
or"errored"
, return a promise rejected with aTypeError exception. -
Assert:
state
is
"writable"
or"erroring"
. -
Assert:
!
WritableStreamCloseQueuedOrInFlight ( stream ) isfalse . - Let promise be a new promise .
- Set stream .[[closeRequest]] to promise .
-
If
stream
.[[backpressure]]
is
true and state is"writable"
, resolve writer .[[readyPromise]] withundefined . -
Perform
!
WritableStreamDefaultControllerClose ( stream .[[writableStreamController]]). - Return promise .
4.6.4. WritableStreamDefaultWriterCloseWithErrorPropagation ( writer ) nothrow
This
abstract
operation
helps
implement
the
error
propagation
semantics
of
pipeTo()
.
- Let stream be writer .[[ownerWritableStream]].
-
Assert:
stream
is
not
undefined . - Let state be stream .[[state]].
-
If
!
WritableStreamCloseQueuedOrInFlight ( stream ) istrue or state is"closed"
, return a promise resolved withundefined . -
If
state
is
"errored"
, return a promise rejected with stream .[[storedError]]. -
Assert:
state
is
"writable"
or"erroring"
. -
Return
!
WritableStreamDefaultWriterClose ( writer ).
4.6.5. WritableStreamDefaultWriterEnsureClosedPromiseRejected( writer , error ) nothrow
-
If
writer
.[[closedPromise]].[[PromiseState]]
is
"pending"
, reject writer .[[closedPromise]] with error . - Otherwise, set writer .[[closedPromise]] to a promise rejected with error .
-
Set
writer
.[[closedPromise]].[[PromiseIsHandled]]
to
true .
4.6.6. WritableStreamDefaultWriterEnsureReadyPromiseRejected( writer , error ) nothrow
-
If
writer
.[[readyPromise]].[[PromiseState]]
is
"pending"
, reject writer .[[readyPromise]] with error . - Otherwise, set writer .[[readyPromise]] to a promise rejected with error .
-
Set
writer
.[[readyPromise]].[[PromiseIsHandled]]
to
true .
4.6.7. WritableStreamDefaultWriterGetDesiredSize ( writer ) nothrow
- Let stream be writer .[[ownerWritableStream]].
- Let state be stream .[[state]].
-
If
state
is
"errored"
or"erroring"
, returnnull . -
If
state
is
"closed"
, return0 . -
Return
!
WritableStreamDefaultControllerGetDesiredSize ( stream .[[writableStreamController]]).
4.6.8. WritableStreamDefaultWriterRelease ( writer ) nothrow
- Let stream be writer .[[ownerWritableStream]].
-
Assert:
stream
is
not
undefined . - Assert: stream .[[writer]] is writer .
-
Let
releasedError
be
a
new
TypeError . -
Perform
!
WritableStreamDefaultWriterEnsureReadyPromiseRejected ( writer , releasedError ). -
Perform
!
WritableStreamDefaultWriterEnsureClosedPromiseRejected ( writer , releasedError ). -
Set
stream
.[[writer]]
to
undefined . -
Set
writer
.[[ownerWritableStream]]
to
undefined .
4.6.9. WritableStreamDefaultWriterWrite ( writer , chunk ) nothrow
- Let stream be writer .[[ownerWritableStream]].
-
Assert:
stream
is
not
undefined . - Let controller be stream .[[writableStreamController]].
-
Let
chunkSize
be
!
WritableStreamDefaultControllerGetChunkSize ( controller , chunk ). -
If
stream
is
not
equal
to
writer
.[[ownerWritableStream]],
return
a
promise
rejected
with
a
TypeError exception. - Let state be stream .[[state]].
-
If
state
is
"errored"
, return a promise rejected with stream .[[storedError]]. -
If
!
WritableStreamCloseQueuedOrInFlight ( stream ) istrue or state is"closed"
, return a promise rejected with aTypeError exception indicating that the stream is closing or closed. -
If
state
is
"erroring"
, return a promise rejected with stream .[[storedError]]. -
Assert:
state
is
"writable"
. -
Let
promise
be
!
WritableStreamAddWriteRequest ( stream ). -
Perform
!
WritableStreamDefaultControllerWrite ( controller , chunk , chunkSize ). - Return promise .
4.7.
Class
WritableStreamDefaultController
The
WritableStreamDefaultController
class
has
methods
that
allow
control
of
a
WritableStream
's
state.
When
constructing
a
WritableStream
,
the
underlying
sink
is
given
a
corresponding
WritableStreamDefaultController
instance
to
manipulate.
4.7.1. Class Definition
This section is non-normative.
If
one
were
to
write
the
WritableStreamDefaultController
class
in
something
close
to
the
syntax
of
[ECMASCRIPT]
,
it
would
look
like
class WritableStreamDefaultController {
constructor(stream, underlyingSink, size, highWaterMark)
error(e)
}
4.7.2. Internal Slots
Instances
of
WritableStreamDefaultController
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[controlledWritableStream]] |
The
WritableStream
instance
controlled
|
[[queue]] |
A
|
[[queueTotalSize]] | The total size of all the chunks stored in [[queue]] (see §6.3 Queue-with-Sizes Operations ) |
[[started]] | A boolean flag indicating whether the underlying sink has finished starting |
[[strategyHWM]] | A number supplied to the constructor as part of the stream’s queuing strategy , indicating the point at which the stream will apply backpressure to its underlying sink |
[[strategySize]] |
A
function
supplied
to
the
constructor
as
part
of
the
stream’s
queuing
strategy
,
designed
to
calculate
the
size
of
enqueued
chunks
;
can
be
|
[[underlyingSink]] | An object representation of the stream’s underlying sink ; also used for the IsWritableStreamDefaultController brand check |
4.7.3. new WritableStreamDefaultController( stream , underlyingSink , size , highWaterMark )
WritableStreamDefaultController
constructor
cannot
be
used
directly;
it
only
works
on
a
WritableStream
that
is
in
the
middle
of
being
constructed.
-
If
!
IsWritableStream ( stream ) isfalse , throw aTypeError exception. -
If
stream
.[[writableStreamController]]
is
not
undefined , throw aTypeError exception. -
Set
this .[[controlledWritableStream]] to stream . -
Set
this .[[underlyingSink]] to underlyingSink . -
Perform
!
ResetQueue (this ). -
Set
this .[[started]] tofalse . -
Let
normalizedStrategy
be
?
ValidateAndNormalizeQueuingStrategy ( size , highWaterMark ). -
Set
this .[[strategySize]] to normalizedStrategy .[[size]] andthis .[[strategyHWM]] to normalizedStrategy .[[highWaterMark]]. -
Let
backpressure
be
!
WritableStreamDefaultControllerGetBackpressure (this ). -
Perform
!
WritableStreamUpdateBackpressure ( stream , backpressure ).
4.7.4.
Properties
of
the
WritableStreamDefaultController
Prototype
4.7.4.1. error( e )
error
method
will
error
the
writable
stream,
making
all
future
interactions
with
it
fail
with
the
given
error
e
.
This method is rarely used, since usually it suffices to return a rejected promise from one of the underlying sink ’s methods. However, it can be useful for suddenly shutting down a stream in response to an event outside the normal lifecycle of interactions with the underlying sink .
-
If
!
IsWritableStreamDefaultController (this ) isfalse , throw aTypeError exception. -
Let
state
be
this .[[controlledWritableStream]].[[state]]. -
If
state
is
not
"writable"
, return. -
Perform
!
WritableStreamDefaultControllerError (this , e ).
4.7.5. Writable Stream Default Controller Internal Methods
The
following
are
additional
internal
methods
implemented
by
each
WritableStreamDefaultController
instance.
The
writable
stream
implementation
will
call
into
these.
The reason these are in method form, instead of as abstract operations, is to make it clear that the writable stream implementation is decoupled from the controller implementation, and could in the future be expanded with other controllers, as long as those controllers implemented such internal methods. A similar scenario is seen for readable streams, where there actually are multiple controller types and as such the counterpart internal methods are used polymorphically.
4.7.5.1. [[AbortSteps]]( reason )
-
Return
!
PromiseInvokeOrNoop (this .[[underlyingSink]],"abort"
, « reason »).
4.7.5.2. [[ErrorSteps]]()
-
Perform
!
ResetQueue (this ).
4.7.5.3. [[StartSteps]]()
-
Let
startResult
be
?
InvokeOrNoop (this .[[underlyingSink]],"start"
, «this »). -
Let
stream
be
this .[[controlledWritableStream]]. - Let startPromise be a promise resolved with startResult :
-
Upon
fulfillment
of
startPromise
,
-
Assert:
stream
.[[state]]
is
"writable"
or"erroring"
. -
Set
this .[[started]] totrue . -
Perform
!
WritableStreamDefaultControllerAdvanceQueueIfNeeded (this ).
-
Assert:
stream
.[[state]]
is
-
Upon
rejection
of
startPromise
with
reason
r
,
-
Assert:
stream
.[[state]]
is
"writable"
or"erroring"
. -
Set
this .[[started]] totrue . -
Perform
!
WritableStreamDealWithRejection ( stream , r ).
-
Assert:
stream
.[[state]]
is
4.8. Writable Stream Default Controller Abstract Operations
4.8.1. IsWritableStreamDefaultController ( x ) nothrow
-
If
Type ( x ) is not Object, returnfalse . -
If
x
does
not
have
an
[[underlyingSink]]
internal
slot,
return
false . -
Return
true .
4.8.2. WritableStreamDefaultControllerClose ( controller ) nothrow
-
Perform
!
EnqueueValueWithSize ( controller ,"close"
,0 ). -
Perform
!
WritableStreamDefaultControllerAdvanceQueueIfNeeded ( controller ).
4.8.3. WritableStreamDefaultControllerGetChunkSize ( controller , chunk ) nothrow
- Let strategySize be controller .[[strategySize]].
-
If
strategySize
is
undefined , return 1. -
Let
returnValue
be
Call ( strategySize ,undefined , « chunk »). -
If
returnValue
is
an
abrupt completion ,-
Perform
!
WritableStreamDefaultControllerErrorIfNeeded ( controller , returnValue .[[Value]]). - Return 1.
-
Perform
!
- Return returnValue .[[Value]].
4.8.4. WritableStreamDefaultControllerGetDesiredSize ( controller ) nothrow
- Return controller .[[strategyHWM]] − controller .[[queueTotalSize]].
4.8.5. WritableStreamDefaultControllerWrite ( controller , chunk , chunkSize ) nothrow
-
Let
writeRecord
be
Record {[[chunk]]: chunk }. -
Let
enqueueResult
be
EnqueueValueWithSize ( controller , writeRecord , chunkSize ). -
If
enqueueResult
is
an
abrupt completion ,-
Perform
!
WritableStreamDefaultControllerErrorIfNeeded ( controller , enqueueResult .[[Value]]). - Return.
-
Perform
!
- Let stream be controller .[[controlledWritableStream]].
-
If
!
WritableStreamCloseQueuedOrInFlight ( stream ) isfalse and stream .[[state]] is"writable"
,-
Let
backpressure
be
!
WritableStreamDefaultControllerGetBackpressure ( controller ). -
Perform
!
WritableStreamUpdateBackpressure ( stream , backpressure ).
-
Let
backpressure
be
!
-
Perform
!
WritableStreamDefaultControllerAdvanceQueueIfNeeded ( controller ).
4.8.6. WritableStreamDefaultControllerAdvanceQueueIfNeeded ( controller ) nothrow
- Let stream be controller .[[controlledWritableStream]].
-
If
controller
.[[started]]
is
false , return. -
If
stream
.[[inFlightWriteRequest]]
is
not
undefined , return. - Let state be stream .[[state]].
-
If
state
is
"closed"
or"errored"
, return. -
If
state
is
"erroring"
,-
Perform
!
WritableStreamFinishErroring ( stream ). - Return.
-
Perform
!
- If controller .[[queue]] is empty, return.
-
Let
writeRecord
be
!
PeekQueueValue ( controller ). -
If
writeRecord
is
"close"
, perform !WritableStreamDefaultControllerProcessClose ( controller ). -
Otherwise,
perform
!
WritableStreamDefaultControllerProcessWrite ( controller , writeRecord .[[chunk]]).
4.8.7. WritableStreamDefaultControllerErrorIfNeeded ( controller , error ) nothrow
-
If
controller
.[[controlledWritableStream]].[[state]]
is
"writable"
, perform !WritableStreamDefaultControllerError ( controller , error ).
4.8.8. WritableStreamDefaultControllerProcessClose ( controller ) nothrow
- Let stream be controller .[[controlledWritableStream]].
-
Perform
!
WritableStreamMarkCloseRequestInFlight ( stream ). -
Perform
!
DequeueValue ( controller ). - Assert: controller .[[queue]] is empty.
-
Let
sinkClosePromise
be
!
PromiseInvokeOrNoop ( controller .[[underlyingSink]],"close"
, « »). -
Upon
fulfillment
of
sinkClosePromise
,
-
Perform
!
WritableStreamFinishInFlightClose ( stream ).
-
Perform
!
-
Upon
rejection
of
sinkClosePromise
with
reason
reason
,
-
Perform
!
WritableStreamFinishInFlightCloseWithError ( stream , reason ).
-
Perform
!
4.8.9. WritableStreamDefaultControllerProcessWrite ( controller , chunk ) nothrow
- Let stream be controller .[[controllerWritableStream]].
-
Perform
!
WritableStreamMarkFirstWriteRequestInFlight ( stream ). -
Let
sinkWritePromise
be
!
PromiseInvokeOrNoop ( controller .[[underlyingSink]],"write"
, « chunk , controller »). -
Upon
fulfillment
of
sinkWritePromise
,
-
Perform
!
WritableStreamFinishInFlightWrite ( stream ). - Let state be stream .[[state]].
-
Assert:
state
is
"writable"
or"erroring"
. -
Perform
!
DequeueValue ( controller ). -
If
!
WritableStreamCloseQueuedOrInFlight ( stream ) isfalse and state is"writable"
,-
Let
backpressure
be
!
WritableStreamDefaultControllerGetBackpressure ( controller ). -
Perform
!
WritableStreamUpdateBackpressure ( stream , backpressure ).
-
Let
backpressure
be
!
-
Perform
!
WritableStreamDefaultControllerAdvanceQueueIfNeeded ( controller ).
-
Perform
!
-
Upon
rejection
of
sinkWritePromise
with
reason
,
-
Perform
!
WritableStreamFinishInFlightWriteWithError ( stream , reason ).
-
Perform
!
4.8.10. WritableStreamDefaultControllerGetBackpressure ( controller ) nothrow
-
Let
desiredSize
be
!
WritableStreamDefaultControllerGetDesiredSize ( controller ). -
Return
desiredSize
≤
0 .
4.8.11. WritableStreamDefaultControllerError ( controller , error ) nothrow
- Let stream be controller .[[controlledWritableStream]].
-
Assert:
stream
.[[state]]
is
"writable"
. -
Perform
!
WritableStreamStartErroring ( stream , error ).
5. Transform Streams
5.1. Using Transform Streams
readableStream .pipeThrough(transformStream) .pipeTo(writableStream) .then(() => console.log("All data successfully transformed!")) .catch(e => console.error("Something went wrong!", e));
readable
and
writable
properties
of
a
transform
stream
directly
to
access
the
usual
interfaces
of
a
readable
stream
and
writable
stream
.
In
this
example
we
supply
input
data
to
the
stream
using
the
writer
interface.
The
output
is
then
piped
to
anotherWritableStream
.const writer = transformStream.writable.getWriter();
writer.write("input chunk");
transformStream.readable.pipeTo(anotherWritableStream);
fetch()
API
accepts
a
readable
stream
request
body
,
but
it
can
be
more
convenient
to
write
data
for
uploading
via
a
writable
stream
interface.
Using
an
identity
transform
stream
addresses
this:
const { writable, readable } = new TransformStream();
fetch("...", { body: readable }).then(response => /* ... */);
writable.write(new Uint8Array([0x73, 0x74, 0x72, 0x65, 0x61, 0x6D, 0x73, 0x21]));
writable.close();
Another
use
of
identity
transform
streams
is
to
add
additional
buffering
to
a
pipe
.
In
this
example
we
add
extra
buffering
between
readableStream
and
writableStream
.
const writableStrategy = new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 });
readableStream
.pipeThrough(new TransformStream(undefined, writableStrategy)) .pipeTo(writableStream);
5.2.
Class
TransformStream
5.2.1. Class Definition
This section is non-normative.
If
one
were
to
write
the
TransformStream
class
in
something
close
to
the
syntax
of
[ECMASCRIPT]
,
it
would
look
like
class TransformStream {
constructor(transformer = {}, writableStrategy = {}, readableStrategy = {})
get readable()
get writable()
}
5.2.2. Internal Slots
Instances
of
TransformStream
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[backpressure]] | Whether there was backpressure on [[readable]] the last time it was observed |
[[backpressureChangePromise]] | A promise which is fulfilled and replaced every time the value of [[backpressure]] changes |
[[readable]] |
The
ReadableStream
instance
controlled
by
this
object
|
[[transformer]] | The transformer object that was passed to the constructor |
[[transformStreamController]] |
A
TransformStreamDefaultController
created
with
the
ability
to
control
[[readable]]
and
[[writable]];
also
used
for
the
IsTransformStream
brand
check
|
[[writable]] |
The
WritableStream
instance
controlled
by
this
object
|
5.2.3. new TransformStream( transformer = {}, writableStrategy = undefined, { size , highWaterMark = 0 } = {})
transformer
object
passed
to
the
constructor
can
implement
any
of
the
following
methods
to
govern
how
the
constructed
stream
instance
behaves:
start(controller)
is called immediately, and is typically used to enqueue prefix chunks that will be read from the readable side but don’t depend on any writes to the writable side . If this process is asynchronous, it can return a promise to signal success or failure.transform(chunk, controller)
is called when a new chunk originally written to the writable side is ready to be transformed. It can return a promise to signal success or failure of the transformation. The results of the transformation can be enqueued to the readable side using thecontroller.enqueue()
method. This permits a single chunk written to the writable side to result in zero or multiple chunks on the readable side.The stream implementation guarantees that
transform
will be called only after previous transformations have succeeded, and never beforestart
has completed or afterflush
is called. When notransform
method is supplied, the identity transform is used, which enqueues chunks unchanged from the writable side to the readable side.flush(controller)
is called after all chunks written to the writable side have beendevelopedtransformed and the writable side has been closed. It is typically used to enqueue suffix chunks to the readable side, before that too becomes closed. If this process is asynchronous, it can return a promise to signal success or failure.
The
controller
object
passed
to
start
,
transform
and
flush
is
an
instance
of
TransformStreamDefaultController
,
and
has
the
ability
to
enqueue
chunks
to
the
readable
side,
or
to
terminate
or
error
the
stream.
The
second
and
third
arguments
to
the
constructor
are
the
queuing
strategy
objects
for
the
writable
and
readable
sides
respectively.
These
are
used
in
the
testable
implementation,
but
construction
of
the
WritableStream
and
ReadableStream
objects
and
can
be
used
to
add
buffering
to
a
TransformStream
,
in
order
to
smooth
out
variations
in
the
speed
of
the
transformation,
or
to
increase
the
amount
of
buffering
in
a
pipe
.
-
Set
this .[[transformer]] to transformer . -
Set
this .[[transformStreamController]] toundefined . -
Set
this .[[backpressure]] andthis .[[backpressureChangePromise]] toundefined . -
Let
readableType
be
?
GetV ( transformer ,"readableType"
). -
If
readableType
is
not
yet re-encodedundefined , throw aRangeError exception. -
Let
writableType
be
?
GetV ( transformer ,"writableType"
). -
If
writableType
is
not
undefined , throw aRangeError exception. -
Let
controller
be
?
Construct (TransformStreamDefaultController
, «this »). -
Set
this .[[transformStreamController]] to controller . - Let startPromise be a new promise .
-
Let
source
be
!
Construct (TransformStreamDefaultSource
, «this , startPromise »). -
Let
readableStrategy
be
!
ObjectCreate ( %ObjectProperty% ). -
Perform
!
CreateDataProperty ( readableStrategy ,"size"
, size ). -
Perform
!
CreateDataProperty ( readableStrategy ,"highWaterMark"
, highWaterMark ). -
Set
this .[[readable]] to ?Construct (ReadableStream
, « source , readableStrategy »). -
Let
sink
be
!
Construct (TransformStreamDefaultSink
, «this , startPromise »). -
Set
this .[[writable]] to ?Construct (WritableStream
, « sink , writableStrategy »). -
Perform
!
TransformStreamSetBackpressure (this ,true ). -
Let
startResult
be
?
InvokeOrNoop ( transformer ,"start"
, « controller »). - Resolve startPromise with startResult .
5.2.4.
Properties
of
the
TransformStream
Prototype
5.2.4.1. get readable
readable
getter
gives
access
to
the
readable
side
of
the
transform
stream.
-
If
!
IsTransformStream (this ) isfalse , throw aTypeError exception. -
Return
this .[[readable]].
5.2.4.2. get writable
writable
getter
gives
access
to
the
writable
side
of
the
transform
stream.
-
If
!
IsTransformStream (this ) isfalse , throw aTypeError exception. -
Return
this .[[writable]].
5.3. General Transform Stream Abstract Operations
5.3.1. IsTransformStream ( x ) nothrow
-
If
Type ( x ) is not Object, returnfalse . -
If
x
does
not
have
a
[[transformStreamController]]
internal
slot,
return
false . -
Return
true .
5.3.2. TransformStreamError ( stream , e ) nothrow
-
Perform
!
WritableStreamDefaultControllerErrorIfNeeded ( stream .[[writable]].[[writableStreamController]], e ). -
If
stream
.[[readable]].[[state]]
is
"readable"
, perform !ReadableStreamDefaultControllerError ( stream .[[readable]].[[readableStreamController]], e ). -
If
stream
.[[backpressure]]
is
true , perform !TransformStreamSetBackpressure ( stream ,false ).
5.3.3. TransformStreamSetBackpressure ( stream , backpressure ) nothrow
- Assert: stream .[[backpressure]] is not backpressure .
-
If
stream
.[[backpressureChangePromise]]
is
not
undefined , resolve stream.[[backpressureChangePromise]] withundefined . - Set stream .[[backpressureChangePromise]] to a new promise .
- Set stream .[[backpressure]] to backpressure .
5.4.
Class
TransformStreamDefaultController
The
TransformStreamDefaultController
class
has
methods
that
allow
manipulation
of
the
associated
ReadableStream
and
WritableStream
.
When
constructing
a
TransformStream
,
the
transformer
is
given
a
corresponding
TransformStreamDefaultController
instance
to
manipulate.
5.4.1. Class Definition
This section is non-normative.
If
one
were
to
write
the
TransformStreamDefaultController
class
in
spec
language.
We
something
close
to
the
syntax
of
[ECMASCRIPT]
,
it
would
look
like
class TransformStreamDefaultController {
constructor(stream)
get desiredSize()
enqueue(chunk)
error(reason)
terminate()
}
5.4.2. Internal Slots
Instances
of
TransformStreamDefaultController
are
waiting
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[controlledTransformStream]] |
The
TransformStream
instance
controlled;
also
used
for
the
|
5.4.3. new TransformStreamDefaultController( stream )
TransformStreamDefaultController
constructor
cannot
be
used
directly;
it
only
works
on
a
TransformStream
that
is
in
the
middle
of
being
constructed.
-
If
!
IsTransformStream ( stream ) isfalse , throw aTypeError exception. -
If
stream
.[[transformStreamController]]
is
not
undefined , throw aTypeError exception. -
Set
this .[[controlledTransformStream]] tovalidate their design before doing so. Instream .
5.4.4.
Properties
of
the
meantime,
see
reference-implementation/lib/transform-stream.js
TransformStreamDefaultController
Prototype
5.4.4.1. get desiredSize
desiredSize
getter
returns
the
desired
size
to
fill
the
readable
side’s
internal
queue
.
It
can
be
negative,
if
the
queue
is
over-full.
-
If
!
IsTransformStreamDefaultController (this ) isfalse , throw aTypeError exception. -
Let
readableController
be
this .[[controlledTransformStream]].[[readable]].[[readableStreamController]]. -
Return
!
ReadableStreamDefaultControllerGetDesiredSize ( readableController ).
5.4.4.2. enqueue( chunk )
-
If
!
IsTransformStreamDefaultController (this ) isfalse , throw aTypeError exception. -
Perform
?
TransformStreamDefaultControllerEnqueue (this , chunk ).
5.4.4.3. error( reason )
error
method
will
error
both
the
readable
side
and
the
writable
side
of
the
controlled
transform
stream
,
making
all
future
interactions
fail
with
the
given
reason
.
Any
chunks
queued
for
transformation
will
be
discarded.
-
If
!
IsTransformStreamDefaultController (this ) isfalse , throw aTypeError exception. -
Perform
!
TransformStreamDefaultControllerError (this , reason ).
5.4.4.4. terminate()
terminate
method
will
close
the
readable
side
and
error
the
writable
side
of
the
controlled
transform
stream
.
This
is
useful
when
the
transformer
only
needs
to
consume
a
portion
of
the
chunks
written
to
the
writable
side
.
-
If
!
IsTransformStreamDefaultController (this ) isfalse , throw aTypeError exception. -
Perform
!
TransformStreamDefaultControllerTerminate (this ).
5.5. Transform Stream Default Controller Abstract Operations
5.5.1. IsTransformStreamDefaultController ( x ) nothrow
-
If
Type ( x ) is not Object, returnfalse . -
If
x
does
not
have
an
[[controlledTransformStream]]
internal
slot,
return
false . -
Return
true .
5.5.2. TransformStreamDefaultControllerEnqueue ( controller , chunk ) throws
This abstract operation can be called by other specifications that wish to enqueue chunks in the readable side , in the same way a developer would enqueue chunks using the stream’s associated controller object. Specifications should not do this to streams they did not create.
- Let stream be controller .[[controlledTransformStream]].
- Let readableController be stream .[[readable]].[[readableStreamController]].
-
If
!
ReadableStreamDefaultControllerCanCloseOrEnqueue ( readableController ) isfalse , throw aTypeError exception. -
Let
enqueueResult
be
ReadableStreamDefaultControllerEnqueue ( readableController , chunk ). -
If
enqueueResult
is
an
abrupt completion ,-
Perform
!
TransformStreamError ( stream , e ). - Throw stream .[[readable]].[[storedError]].
-
Perform
!
-
Let
backpressure
be
!
ReadableStreamDefaultControllerHasBackpressure ( readableController ). -
If
backpressure
is
not
stream
.[[backpressure]],
-
Assert:
backpressure
is
true . -
Perform
!
TransformStreamSetBackpressure ( stream ,true ).
-
Assert:
backpressure
is
5.5.3. TransformStreamDefaultControllerError ( controller , chunk ) nothrow
This abstract operation can be called by other specifications that wish to move a transform stream to an errored state, in the same way a developer would error a stream using its associated controller object. Specifications should not do this to streams they did not create.
- Let stream be controller .[[controlledTransformStream]].
-
Perform
!
TransformStreamError ( stream , e ).
5.5.4. TransformStreamDefaultControllerTerminate ( controller ) nothrow
This abstract operation can be called by other specifications that wish to terminate a transform stream, in the same way a developer-created stream would be closed by its associated controller object. Specifications should not do this to streams they did not create.
- Let stream be controller .[[controlledTransformStream]].
- Let readableController be stream .[[readable]].[[readableStreamController]].
-
If
!
ReadableStreamDefaultControllerCanCloseOrEnqueue ( readableController ) istrue , perform !ReadableStreamDefaultControllerClose ( readableController ). -
Let
error
be
a
TypeError exception indicating that the stream has been terminated. -
Perform
!
WritableStreamDefaultControllerErrorIfNeeded ( stream .[[writable]].[[writableStreamController]], error ). -
If
stream
.[[backpressure]]
is
true , perform !TransformStreamSetBackpressure ( stream ,false ).
5.6.
Class
TransformStreamDefaultSink
The
TransformStreamDefaultSink
class
is
used
internally
as
the
underlying
sink
that
is
passed
to
the
WritableStream
constructor
when
constructing
the
[[writable]]
slot
of
a
TransformStream
.
WritableStream
back
to
the
TransformStream
implementation
for
simplicity.
Since
the
TransformStreamDefaultSink
class
is
not
observable,
implementations
do
not
need
to
implement
it,
but
could
instead
do
something
else
observably
equivalent.
See
#813
for
an
alternate
specification
approach
that
will
make
this
clearer
in
the
future.
5.6.1. Class Definition
This section is non-normative.
If
one
were
to
write
the
TransformStreamDefaultSink
class
in
something
close
to
the
syntax
of
[ECMASCRIPT]
,
it
would
look
like
class TransformStreamDefaultSink {
constructor(stream, startPromise)
start()
write(chunk)
abort()
close()
}
5.6.2. Internal Slots
Instances
of
TransformStreamDefaultSink
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[ownerTransformStream]] |
The
TransformStream
instance
|
[[startPromise]] | The startPromise parameter that was passed to the constructor |
5.6.3. new TransformStreamDefaultSink( stream , startPromise )
-
Set
this .[[ownerTransformStream]] to stream . -
Set
this .[[startPromise]] to startPromise .
5.6.4.
Properties
of
the
TransformStreamDefaultSink
Prototype
5.6.4.1. start()
-
Return
this .[[startPromise]].
5.6.4.2. write( chunk )
-
Let
stream
be
this .[[ownerTransformStream]]. -
Assert:
stream
.[[writable]].[[state]]
is
"writable"
. -
If
stream
.[[backpressure]]
is
true ,- Let backpressureChangePromise be stream .[[backpressureChangePromise]].
-
Assert:
backpressureChangePromise
is
not
undefined . -
Return
the
result
of
transforming
backpressureChangePromise
with
a
fulfillment
handler
which
performs
the
following
steps:
- Let writable be stream .[[writable]].
- Let state be writable .[[state]].
-
If
state
is
"erroring"
, throw writable .[[storedError]]. -
Assert:
state
is
"writable"
. -
Return
!
TransformStreamDefaultSinkTransform (this , chunk ).
-
Return
!
TransformStreamDefaultSinkTransform (this , chunk ).
5.6.4.3. abort()
-
Let
e
be
a
TypeError exception indicating that the writable stream was aborted. -
Perform
!
TransformStreamError (this .[[ownerTransformStream]], e ).
5.6.4.4. close()
-
Let
stream
be
this .[[ownerTransformStream]]. - Let readable be stream .[[readable]].
-
Let
flushPromise
be
!
PromiseInvokeOrNoop ( stream .[[transformer]],"flush"
, « stream .[[transformStreamController]] »). -
Return
the
result
of
transforming
flushPromise
with:
-
A
fulfullment
handler
that
performs
the
following
steps:
-
If
readable
.[[state]]
is
"errored"
, throw readable .[[storedError]]. - Let readableController be readable .[[readableStreamController]].
-
If
!
ReadableStreamDefaultControllerCanCloseOrEnqueue ( readableController ) istrue , perform !ReadableStreamDefaultControllerClose ( readableController ).
-
If
readable
.[[state]]
is
-
A
rejection
handler
that,
when
called
with
argument
r
,
performs
the
following
steps:
-
Perform
!
TransformStreamError ( stream , r ). - Throw readable .[[storedError]].
-
Perform
!
-
A
fulfullment
handler
that
performs
the
following
steps:
5.7. Transform Stream Default Sink Abstract Operations
5.7.1. TransformStreamDefaultSinkInvokeTransform ( stream , chunk ) throws
- Let controller be stream .[[transformStreamController]].
- Let transformer be stream .[[transformer]].
-
Let
method
be
?
GetV ( transformer ,"transform"
). -
If
method
is
undefined ,-
Perform
?
TransformStreamDefaultControllerEnqueue ( controller , chunk ). -
Return
undefined .
-
Perform
?
-
Return
?
Call ( method , transformer , « chunk , controller »).
5.7.2. TransformStreamDefaultSinkTransform ( sink , chunk ) nothrow
- Let stream be sink .[[ownerTransformStream]].
-
Assert:
stream
.[[readable]].[[state]]
is
not
"errored"
. -
Assert:
stream
.[[backpressure]]
is
false . -
Let
transformPromise
be
undefined . -
Let
transformResult
be
TransformStreamDefaultSinkInvokeTransform ( stream , chunk ). -
If
transformResult
is
an
abrupt completion , set transformPromise to a promise rejected with transformResult .[[Value]]. - Otherwise, set transformPromise to a promise resolved with transformResult .[[Value]].
-
Return
the
result
of
transforming
transformPromise
with
a
rejection
handler
that,
when
called
with
argument
e
,
performs
the
following
steps:
-
Perform
!
TransformStreamError ( stream , e ). - Throw e .
-
Perform
!
5.8.
Class
TransformStreamDefaultSource
The
TransformStreamDefaultSource
class
is
used
internally
as
the
underlying
source
that
is
passed
to
the
ReadableStream
constructor
when
constructing
the
[[readable]]
slot.
ReadableStream
back
to
the
TransformStream
implementation
for
simplicity.
Since
the
TransformStreamDefaultSource
class
is
not
observable,
implementations
do
not
need
to
implement
it,
but
could
instead
do
something
else
observably
equivalent.
See
#813
for
an
alternate
specification
approach
that
will
make
this
clearer
in
the
future.
5.8.1. Class Definition
This section is non-normative.
If
one
were
to
write
the
TransformStreamDefaultSource
class
in
something
close
to
the
syntax
of
[ECMASCRIPT]
,
it
would
look
like
class TransformStreamDefaultSource {
constructor(stream, startPromise)
start()
pull()
cancel(reason)
}
5.8.2. Internal Slots
Instances
of
TransformStreamDefaultSource
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[ownerTransformStream]] |
The
TransformStream
instance
|
[[startPromise]] | The startPromise parameter that was passed to the constructor |
5.8.3. new TransformStreamDefaultSource( stream , startPromise )
-
Set
this .[[ownerTransformStream]] to stream . -
Set
this .[[startPromise]] to startPromise .
5.8.4.
Properties
of
the
TransformStreamDefaultSource
Prototype
5.8.4.1. start()
-
Return
this .[[startPromise]].
5.8.4.2. pull()
-
Let
stream
be
this .[[ownerTransformStream]]. -
Assert:
stream
.[[backpressure]]
is
true . -
Assert:
stream
.[[backpressureChangePromise]]
is
not
undefined . -
Perform
!
TransformStreamSetBackpressure ( stream ,false ). - Return stream .[[backpressureChangePromise]].
5.8.4.3. cancel( reason )
-
Perform
!
TransformStreamError (this .[[ownerTransformStream]], reason ).
6. Other Stream APIs and Operations
6.1.
Class
ByteLengthQueuingStrategy
A
common
queuing
strategy
when
dealing
with
bytes
is
to
wait
until
the
accumulated
byteLength
properties
of
the
incoming
chunks
reaches
a
specified
high-water
mark.
As
such,
this
is
provided
as
a
built-in
queuing
strategy
that
can
be
used
when
constructing
streams.
const stream = new ReadableStream(
{ ... },
new ByteLengthQueuingStrategy({ highWaterMark: 16 * 1024 })
);
In this case, 16 KiB worth of chunks can be enqueued by the readable stream’s underlying source before the readable stream implementation starts sending backpressure signals to the underlying source.
const stream = new WritableStream(
{ ... },
new ByteLengthQueuingStrategy({ highWaterMark: 32 * 1024 })
);
In this case, 32 KiB worth of chunks can be accumulated in the writable stream’s internal queue, waiting for previous writes to the underlying sink to finish, before the writable stream starts sending backpressure signals to any producers .
6.1.1. Class Definition
This section is non-normative.
If
one
were
to
write
the
ByteLengthQueuingStrategy
class
in
something
close
to
the
syntax
of
[ECMASCRIPT]
,
it
would
look
like
class ByteLengthQueuingStrategy {
constructor({ highWaterMark })
size(chunk)
}
Each
ByteLengthQueuingStrategy
instance
will
additionally
have
an
own
data
property
highWaterMark
set
by
its
constructor.
6.1.2. new ByteLengthQueuingStrategy({ highWaterMark })
-
Perform
!
CreateDataProperty (this ,"highWaterMark"
, highWaterMark ).
6.1.3.
Properties
of
the
ByteLengthQueuingStrategy
Prototype
6.1.3.1. size( chunk )
size
method
returns
the
given
chunk’s
byteLength
property.
(If
the
chunk
doesn’t
have
one,
it
will
return
This
method
is
intentionally
generic;
it
does
not
require
that
its
ByteLengthQueuingStrategy
object.
-
Return
?
GetV ( chunk ,"byteLength"
).
6.2.
Class
CountQueuingStrategy
A common queuing strategy when dealing with streams of generic objects is to simply count the number of chunks that have been accumulated so far, waiting until this number reaches a specified high-water mark. As such, this strategy is also provided out of the box.
const stream = new ReadableStream(
{ ... },
new CountQueuingStrategy({ highWaterMark: 10 })
);
In this case, 10 chunks (of any kind) can be enqueued by the readable stream’s underlying source before the readable stream implementation starts sending backpressure signals to the underlying source.
const stream = new WritableStream(
{ ... },
new CountQueuingStrategy({ highWaterMark: 5 })
);
In this case, five chunks (of any kind) can be accumulated in the writable stream’s internal queue, waiting for previous writes to the underlying sink to finish, before the writable stream starts sending backpressure signals to any producers .
6.2.1. Class Definition
This section is non-normative.
If
one
were
to
write
the
CountQueuingStrategy
class
in
something
close
to
the
syntax
of
[ECMASCRIPT]
,
it
would
look
like
class CountQueuingStrategy {
constructor({ highWaterMark })
size()
}
Each
CountQueuingStrategy
instance
will
additionally
have
an
own
data
property
highWaterMark
set
by
its
constructor.
6.2.2. new CountQueuingStrategy({ highWaterMark })
-
Perform
!
CreateDataProperty (this ,"highWaterMark"
, highWaterMark ).
6.2.3.
Properties
of
the
CountQueuingStrategy
Prototype
6.2.3.1. size()
size
method
returns
one
always,
so
that
the
total
queue
size
is
a
count
of
the
number
of
chunks
in
the
queue.
This
method
is
intentionally
generic;
it
does
not
require
that
its
CountQueuingStrategy
object.
-
Return
1 .
6.3. Queue-with-Sizes Operations
The streams in this specification use a "queue-with-sizes" data structure to store queued up values, along with their determined sizes. Various specification objects contain a queue-with-sizes, represented by the object having two paired internal slots, always named [[queue]] and [[queueTotalSize]]. [[queue]] is a
Number
,
i.e.
a
double-precision
floating
point
number.
The following abstract operations are used when operating on objects that contain queues-with-sizes, in order to ensure that the two internal slots stay synchronized.
Due to the limited precision of floating-point arithmetic, the framework specified here, of keeping a running total in the [[queueTotalSize]] slot, is not equivalent to adding up the size of all chunks in [[queue]]. (However, this only makes a difference when there is a huge (~10 15 ) variance in size between chunks, or when trillions of chunks are enqueued.)
6.3.1. DequeueValue ( container ) nothrow
- Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
- Assert: container .[[queue]] is not empty.
- Let pair be the first element of container .[[queue]].
- Remove pair from container .[[queue]], shifting all other elements downward (so that the second becomes the first, and so on).
- Set container .[[queueTotalSize]] to container .[[queueTotalSize]] − pair .[[size]].
-
If
container
.[[queueTotalSize]]
<
0 , set container .[[queueTotalSize]] to0 . (This can occur due to rounding errors.) - Return pair .[[value]].
6.3.2. EnqueueValueWithSize ( container , value , size ) throws
- Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
-
Let
size
be
?
ToNumber ( size ). -
If
!
IsFiniteNonNegativeNumber ( size ) isfalse , throw aRangeError exception. -
Append
Record {[[value]]: value , [[size]]: size } as the last element of container .[[queue]]. - Set container .[[queueTotalSize]] to container .[[queueTotalSize]] + size .
6.3.3. PeekQueueValue ( container ) nothrow
- Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
- Assert: container .[[queue]] is not empty.
- Let pair be the first element of container .[[queue]].
- Return pair .[[value]].
6.3.4. ResetQueue ( container ) nothrow
- Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
-
Set
container
.[[queue]]
to
a
new
empty
List . -
Set
container
.[[queueTotalSize]]
to
0 .
6.4. Miscellaneous Operations
A few abstract operations are used in this specification for utility purposes. We define them here.
6.4.1. InvokeOrNoop ( O , P , args ) throws
-
Assert:
O
is
not
undefined . -
Assert:
!
IsPropertyKey ( P ) istrue . -
Assert:
args
is
a
List . -
Let
method
be
?
GetV ( O , P ). -
If
method
is
undefined , returnundefined . -
Return
?
Call ( method , O , args ).
6.4.2. IsFiniteNonNegativeNumber ( v ) nothrow
-
If
v
is
NaN , returnfalse . -
If
v
is
+∞ , returnfalse . -
If
v
<
0 , returnfalse . -
Return
true .
6.4.3. PromiseInvokeOrNoop ( O , P , args ) nothrow
-
Assert:
O
is
not
undefined . -
Assert:
!
IsPropertyKey ( P ) istrue . -
Assert:
args
is
a
List . -
Let
returnValue
be
InvokeOrNoop ( O , P , args ). -
If
returnValue
is
an
abrupt completion , return a promise rejected with returnValue .[[Value]]. - Otherwise, return a promise resolved with returnValue .[[Value]].
6.4.4. TransferArrayBuffer ( O ) nothrow
-
Assert:
Type ( O ) is Object. - Assert: O has an [[ArrayBufferData]] internal slot.
-
Assert:
!
IsDetachedBuffer ( O ) isfalse . - Let arrayBufferData be O .[[ArrayBufferData]].
- Let arrayBufferByteLength be O .[[ArrayBufferByteLength]].
-
Perform
!
DetachArrayBuffer ( O ). -
Return
a
new
ArrayBuffer
object (created inthe current Realm Record ) whose [[ArrayBufferData]] internal slot value is arrayBufferData and whose [[ArrayBufferByteLength]] internal slot value is arrayBufferByteLength .
6.4.5. ValidateAndNormalizeHighWaterMark ( highWaterMark ) throws
-
Set
highWaterMark
to
?
ToNumber ( highWaterMark ). -
If
highWaterMark
is
NaN or highWaterMark <0 , throw aRangeError exception.+∞ is explicitly allowed as a valid high water mark . It causes backpressure to never be applied. - Return highWaterMark .
6.4.6. ValidateAndNormalizeQueuingStrategy ( size , highWaterMark ) throws
-
If
size
is
not
undefined and !IsCallable ( size ) isfalse , throw aTypeError exception. -
Let
highWaterMark
be
?
ValidateAndNormalizeHighWaterMark ( highWaterMark ). -
Return
Record {[[size]]: size , [[highWaterMark]]: highWaterMark }.
7. Global Properties
The following constructors must be exposed on the
The
attributes
of
these
properties
must
be
{
[[Writable]]:
ReadableStreamDefaultReader
,
ReadableStreamBYOBReader
,
ReadableStreamDefaultController
,
ReadableByteStreamController
,
WritableStreamDefaultWriter
,
WritableStreamDefaultController
,
and
TransformStreamDefaultController
classes
are
specifically
not
exposed,
as
they
are
not
independently
useful.
8. Examples of Creating Streams
This section, and all its subsections, are non-normative.
The
previous
examples
throughout
the
standard
have
focused
on
how
to
use
streams.
Here
we
show
how
to
create
a
stream,
using
the
ReadableStream
or
WritableStream
constructors.
8.1. A readable stream with an underlying push source (no backpressure support)
The
following
function
creates
readable
streams
that
wrap
WebSocket
instances
[HTML]
,
which
are
push
sources
that
do
not
support
backpressure
signals.
It
illustrates
how,
when
adapting
a
push
source,
usually
most
of
the
work
happens
in
the
start
function.
function makeReadableWebSocketStream(url, protocols) {
const ws = new WebSocket(url, protocols);
ws.binaryType = "arraybuffer";
return new ReadableStream({
start(controller) {
ws.onmessage = event => controller.enqueue(event.data);
ws.onclose = () => controller.close();
ws.onerror = () => controller.error(new Error("The WebSocket errored!"));
},
cancel() {
ws.close();
}
});
}
We can then use this function to create readable streams for a web socket, and pipe that stream to an arbitrary writable stream:
const webSocketStream = makeReadableWebSocketStream("wss://example.com:443/", "protocol");
webSocketStream.pipeTo(writableStream)
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
8.2. A readable stream with an underlying push source and backpressure support
The
following
function
returns
readable
streams
that
wrap
"backpressure
sockets,"
which
are
hypothetical
objects
that
have
the
same
API
as
web
sockets,
but
also
provide
the
ability
to
pause
and
resume
the
flow
of
data
with
their
readStop
and
readStart
methods.
In
doing
so,
this
example
shows
how
to
apply
backpressure
to
underlying
sources
that
support
it.
function makeReadableBackpressureSocketStream(host, port) {
const socket = createBackpressureSocket(host, port);
return new ReadableStream({
start(controller) {
socket.ondata = event => {
controller.enqueue(event.data);
if (controller.desiredSize <= 0) {
// The internal queue is full, so propagate
// the backpressure signal to the underlying source.
socket.readStop();
}
};
socket.onend = () => controller.close();
socket.onerror = () => controller.error(new Error("The socket errored!"));
},
pull() {
// This is called if the internal queue has been emptied, but the
// stream’s consumer still wants more data. In that case, restart
// the flow of data if we have previously paused it.
socket.readStart();
},
cancel() {
socket.close();
}
});
}
We can then use this function to create readable streams for such "backpressure sockets" in the same way we do for web sockets. This time, however, when we pipe to a destination that cannot accept data as fast as the socket is producing it, or if we leave the stream alone without reading from it for some time, a backpressure signal will be sent to the socket.
8.3. A readable byte stream with an underlying push source (no backpressure support)
The
following
function
returns
readable
byte
streams
that
wraps
a
hypothetical
UDP
socket
API,
including
a
promise-returning
select2()
method
that
is
meant
to
be
evocative
of
the
POSIX
select(2)
system
call.
Since
the
UDP
protocol
does
not
have
any
built-in
backpressure
support,
the
backpressure
signal
given
by
desiredSize
is
ignored,
and
the
stream
ensures
that
when
data
is
available
from
the
socket
but
not
yet
requested
by
the
developer,
it
is
enqueued
in
the
stream’s
internal
queue
,
to
avoid
overflow
of
the
kernel-space
queue
and
a
consequent
loss
of
data.
This has some interesting consequences for how consumers interact with the stream. If the consumer does not read data as fast as the socket produces it, the chunks will remain in the stream’s internal queue indefinitely. In this case, using a BYOB reader will cause an extra copy, to move the data from the stream’s internal queue to the developer-supplied buffer. However, if the consumer consumes the data quickly enough, a BYOB reader will allow zero-copy reading directly into developer-supplied buffers.
(You
can
imagine
a
more
complex
version
of
this
example
which
uses
desiredSize
to
inform
an
out-of-band
backpressure
signaling
mechanism,
for
example
by
sending
a
message
down
the
socket
to
adjust
the
rate
of
data
being
sent.
That
is
left
as
an
exercise
for
the
reader.)
const DEFAULT_CHUNK_SIZE = 65536;
function makeUDPSocketStream(host, port) {
const socket = createUDPSocket(host, port);
return new ReadableStream({
type: "bytes",
start(controller) {
readRepeatedly().catch(e => controller.error(e));
function readRepeatedly() {
return socket.select2().then(() => {
// Since the socket can become readable even when there’s
// no pending BYOB requests, we need to handle both cases.
let bytesRead;
if (controller.byobRequest) {
const v = controller.byobRequest.view;
bytesRead = socket.readInto(v.buffer, v.byteOffset, v.byteLength);
controller.byobRequest.respond(bytesRead);
} else {
const buffer = new ArrayBuffer(DEFAULT_CHUNK_SIZE);
bytesRead = socket.readInto(buffer, 0, DEFAULT_CHUNK_SIZE);
controller.enqueue(new Uint8Array(buffer, 0, bytesRead));
}
if (bytesRead === 0) {
controller.close();
return;
}
return readRepeatedly();
});
}
},
cancel() {
socket.close();
}
});
}
ReadableStream
instances
returned
from
this
function
can
now
vend
BYOB
readers
,
with
all
of
the
aforementioned
benefits
and
caveats.
8.4. A readable stream with an underlying pull source
The
following
function
returns
readable
streams
that
wrap
portions
of
the
Node.js
file
system
API
(which
themselves
map
fairly
directly
to
C’s
fopen
,
fread
,
and
fclose
trio).
Files
are
a
typical
example
of
pull
sources
.
Note
how
in
contrast
to
the
examples
with
push
sources,
most
of
the
work
here
happens
on-demand
in
the
pull
function,
and
not
at
startup
time
in
the
start
function.
const fs = require("pr/fs"); // https://github.com/jden/pr
const CHUNK_SIZE = 1024;
function makeReadableFileStream(filename) {
let fd;
let position = 0;
return new ReadableStream({
start() {
return fs.open(filename, "r").then(result => {
fd = result;
});
},
pull(controller) {
const buffer = new ArrayBuffer(CHUNK_SIZE);
return fs.read(fd, buffer, 0, CHUNK_SIZE, position).then(bytesRead => {
if (bytesRead === 0) {
return fs.close(fd).then(() => controller.close());
} else {
position += bytesRead;
controller.enqueue(new Uint8Array(buffer, 0, bytesRead));
}
});
},
cancel() {
return fs.close(fd);
}
});
}
We can then create and use readable streams for files just as we could before for sockets.
8.5. A readable byte stream with an underlying pull source
The following function returns readable byte streams that allow efficient zero-copy reading of files, again using the Node.js file system API . Instead of using a predetermined chunk size of 1024, it attempts to fill the developer-supplied buffer, allowing full control.
const fs = require("pr/fs"); // https://github.com/jden/pr
const DEFAULT_CHUNK_SIZE = 1024;
function makeReadableByteFileStream(filename) {
let fd;
let position = 0;
return new ReadableStream({
type: "bytes",
start() {
return fs.open(filename, "r").then(result => {
fd = result;
});
},
pull(controller) {
// Even when the consumer is using the default reader, the auto-allocation
// feature allocates a buffer and passes it to us via byobRequest.
const v = controller.byobRequest.view;
return fs.read(fd, v.buffer, v.byteOffset, v.byteLength, position).then(bytesRead => {
if (bytesRead === 0) {
return fs.close(fd).then(() => controller.close());
} else {
position += bytesRead;
controller.byobRequest.respond(bytesRead);
}
});
},
cancel() {
return fs.close(fd);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE
});
}
With
this
in
hand,
we
can
create
and
use
BYOB
readers
for
the
returned
ReadableStream
.
But
we
can
also
create
default
readers
,
using
them
in
the
same
simple
and
generic
manner
as
usual.
The
adaptation
between
the
low-level
byte
tracking
of
the
underlying
byte
source
shown
here,
and
the
higher-level
chunk-based
consumption
of
a
default
reader
,
is
all
taken
care
of
automatically
by
the
streams
implementation.
The
auto-allocation
feature,
via
the
autoAllocateChunkSize
option,
even
allows
us
to
write
less
code,
compared
to
the
manual
branching
in
§8.3
A
readable
byte
stream
with
an
underlying
push
source
(no
backpressure
support)
.
8.6. A writable stream with no backpressure or success signals
The
following
function
returns
a
writable
stream
that
wraps
a
WebSocket
[HTML]
.
Web
sockets
do
not
provide
any
way
to
tell
when
a
given
chunk
of
data
has
been
successfully
sent
(without
awkward
polling
of
bufferedAmount
,
which
we
leave
as
an
exercise
to
the
reader).
As
such,
this
writable
stream
has
no
ability
to
communicate
accurate
backpressure
signals
or
write
success/failure
to
its
producers
.
That
is,
the
promises
returned
by
its
writer
’s
write()
method
and
ready
getter
will
always
fulfill
immediately.
function makeWritableWebSocketStream(url, protocols) {
const ws = new WebSocket(url, protocols);
return new WritableStream({
start(controller) {
ws.onerror = () => {
controller.error(new Error("The WebSocket errored!"));
ws.onclose = null;
};
ws.onclose = () => controller.error(new Error("The server closed the connection unexpectedly!"));
return new Promise(resolve => ws.onopen = resolve);
},
write(chunk) {
ws.send(chunk);
// Return immediately, since the web socket gives us no easy way to tell
// when the write completes.
},
close() {
return closeWS(1000);
},
abort(reason) {
return closeWS(4000, reason && reason.message);
},
});
function closeWS(code, reasonString) {
return new Promise((resolve, reject) => {
ws.onclose = e => {
if (e.wasClean) {
resolve();
} else {
reject(new Error("The connection was not closed cleanly"));
}
};
ws.close(code, reasonString);
});
}
}
We can then use this function to create writable streams for a web socket, and pipe an arbitrary readable stream to it:
const webSocketStream = makeWritableWebSocketStream("wss://example.com:443/", "protocol");
readableStream.pipeTo(webSocketStream)
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
8.7. A writable stream with backpressure and success signals
The
following
function
returns
writable
streams
that
wrap
portions
of
the
Node.js
file
system
API
(which
themselves
map
fairly
directly
to
C’s
fopen
,
fwrite
,
and
fclose
trio).
Since
the
API
we
are
wrapping
provides
a
way
to
tell
when
a
given
write
succeeds,
this
stream
will
be
able
to
communicate
backpressure
signals
as
well
as
whether
an
individual
write
succeeded
or
failed.
const fs = require("pr/fs"); // https://github.com/jden/pr
function makeWritableFileStream(filename) {
let fd;
return new WritableStream({
start() {
return fs.open(filename, "w").then(result => {
fd = result;
});
},
write(chunk) {
return fs.write(fd, chunk, 0, chunk.length);
},
close() {
return fs.close(fd);
},
abort() {
return fs.close(fd);
}
});
}
We can then use this function to create a writable stream for a file, and write individual chunks of data to it:
const fileStream = makeWritableFileStream("/example/path/on/fs.txt");
const writer = fileStream.getWriter();
writer.write("To stream, or not to stream\n");
writer.write("That is the question\n");
writer.close()
.then(() => console.log("chunks written and stream closed successfully!"))
.catch(e => console.error(e));
Note
that
if
a
particular
call
to
fs.write
takes
a
longer
time,
the
returned
promise
will
fulfill
later.
In
the
meantime,
additional
writes
can
be
queued
up,
which
are
stored
in
the
stream’s
internal
queue.
The
accumulation
of
chunks
in
this
queue
can
change
the
stream
to
return
a
pending
promise
from
the
ready
getter,
which
is
a
signal
to
producers
that
they
would
benefit
from
backing
off
and
stopping
writing,
if
possible.
The
way
in
which
the
writable
stream
queues
up
writes
is
especially
important
in
this
case,
since
as
stated
in
the
documentation
for
fs.write
,
"it
is
unsafe
to
use
fs.write
multiple
times
on
the
same
file
without
waiting
for
the
[promise]."
But
we
don’t
have
to
worry
about
that
when
writing
the
makeWritableFileStream
function,
since
the
stream
implementation
guarantees
that
the
underlying
sink
’s
write
method
will
not
be
called
until
any
promises
returned
by
previous
calls
have
fulfilled!
8.8. A { readable, writable } stream pair wrapping the same underlying resource
The
following
function
returns
an
object
of
the
form
{
readable,
writable
}
,
with
the
readable
property
containing
a
readable
stream
and
the
writable
property
containing
a
writable
stream,
where
both
streams
wrap
the
same
underlying
web
socket
resource.
In
essence,
this
combines
§8.1
A
readable
stream
with
an
underlying
push
source
(no
backpressure
support)
and
§8.6
A
writable
stream
with
no
backpressure
or
success
signals
.
While doing so, it illustrates how you can use JavaScript classes to create reusable underlying sink and underlying source abstractions.
function streamifyWebSocket(url, protocol) {
const ws = new WebSocket(url, protocols);
ws.binaryType = "arraybuffer";
return {
readable: new ReadableStream(new WebSocketSource(ws)),
writable: new WritableStream(new WebSocketSink(ws))
};
}
class WebSocketSource {
constructor(ws) {
this._ws = ws;
}
start(controller) {
this._ws.onmessage = event => controller.enqueue(event.data);
this._ws.onclose = () => controller.close();
this._ws.addEventListener("error", () => {
controller.error(new Error("The WebSocket errored!"));
});
}
cancel() {
this._ws.close();
}
}
class WebSocketSink {
constructor(ws) {
this._ws = ws;
}
start(controller) {
this._ws.onclose = () => controller.error(new Error("The server closed the connection unexpectedly!"));
this._ws.addEventListener("error", () => {
controller.error(new Error("The WebSocket errored!"));
this._ws.onclose = null;
});
return new Promise(resolve => this._ws.onopen = resolve);
}
write(chunk) {
this._ws.send(chunk);
}
close() {
return this._closeWS(1000);
}
abort(reason) {
return this._closeWS(4000, reason && reason.message);
}
_closeWS(code, reason) {
return new Promise((resolve, reject) => {
this._ws.onclose = e => {
if (e.wasClean) {
resolve();
} else {
reject(new Error("The connection was not closed cleanly"));
}
};
this._ws.close(code, reasonString);
});
}
}
We can then use the objects created by this function to communicate with a remote web socket, using the standard stream APIs:
const streamyWS = streamifyWebSocket("wss://example.com:443/", "protocol");
const writer = streamyWS.writable.getWriter();
const reader = streamyWS.readable.getReader();
writer.write("Hello");
writer.write("web socket!");
reader.read().then(({ value, done }) => {
console.log("The web socket says: ", value);
});
Note
how
in
this
setup
canceling
the
readable
side
will
implicitly
close
the
writable
side,
and
similarly,
closing
or
aborting
the
writable
side
will
implicitly
close
the
readable
side.
Conventions
This
specification
uses
algorithm
conventions
very
similar
to
those
of
[ECMASCRIPT]
,
whose
rules
should
be
used
to
interpret
it
(apart
from
the
exceptions
enumerated
below).
In
particular,
the
objects
specified
here
should
be
treated
as
built-in
objects
.
For
example,
their
name
and
length
properties
are
derived
as
described
by
that
specification,
as
are
the
default
property
descriptor
values
and
the
treatment
of
missing,
We also depart from the [ECMASCRIPT] conventions in the following ways, mostly for brevity. It is hoped (and vaguely planned) that the conventions of ECMAScript itself will evolve in these ways.
-
We
prefix
section
headings
with
new
to indicate they are defining constructors; when doing so, we assume that NewTarget will be checked before the algorithm starts. -
We
use
the
default
argument
notation
= {}
in a couple of cases, meaning that before the algorithm starts,undefined (including the implicitundefined when no argument is provided) is instead treated as a new object created as if byObjectCreate (%ObjectPrototype% ). (This object may then be destructured, if combined with the below destructuring convention.) - We use destructuring notation in function and method declarations, and assume that DestructuringAssignmentEvaluation was performed appropriately before the algorithm starts.
-
We
use
"
this " instead of "this value". - We use the shorthand phrases from the [PROMISES-GUIDE] to operate on promises at a higher level than the ECMAScript spec does.
It’s also worth noting that, as in [ECMASCRIPT] , all numbers are represented as double-precision floating point values, and all arithmetic operations performed on them must be done in the standard way for such values.
Acknowledgments
The editors would like to thank Anne van Kesteren, Ben Kelly, Bert Belder, Brian di Palma, Calvin Metcalf, Dominic Tarr, Ed Hager, Forbes Lindesay, Forrest Norvell, Gorgi Kosev, 贺师俊 (hax), Isaac Schlueter, isonmad, Jake Archibald, Jake Verbaten, Janessa Det, Jens Nockert, Mangala Sadhu Sangeet Singh Khalsa, Marcos Caceres, Marvin Hagemeister, Michael Mior, Mihai Potra, Romain Bellessort, Simon Menke, Stephen Sugden, Tab Atkins, Tanguy Krotoff, Thorsten Lorenz, Till Schneidereit, Tim Caswell, Trevor Norris, tzik, Will Chan, Youenn Fablet, 平野裕 (Yutaka Hirano), and Xabier Rodríguez for their contributions to this specification. Community involvement in this specification has been above and beyond; we couldn’t have done it without you.
This standard is written by Adam Rice ( Google , ricea@chromium.org ), Domenic Denicola ( Google , d@domenic.me ), and 吉野剛史 (Takeshi Yoshino, Google , tyoshino@chromium.org ).
Per CC0 , to the extent possible under law, the editors have waived all copyright and related or neighboring rights to this work.