Streams and Subscriptions
For every tool in the async/await
toolbox, there is an equilavent in
Effection. We were already introduced to the most important of these in
the introduction.
Async | Effection |
---|---|
Promise | Operation |
await | yield* |
async function | function* |
Likewise, there are also equivalents for the Async iteration protocols. They are:
Async | Effection |
---|---|
AsyncIterator | Subscription |
AsyncIterable | Stream |
for await | for yield* |
If you're familiar with how to use these constructs, then you will be right at home working with sequences of data in Effection. In particular, streams are useful for modeling things like UI events and network connections, and in this guide we'll see how to create, transform, and consume them.
Subscription
The Effection equivalent of AsyncIterator
is the
Subscription
. Like its async counterpart, a
subscription acts like a queue that provides a sequence of values via a
next()
method. The only difference is that instead of returning a
Promise
that fulfills to an iterator result, it returns an
Operation
that yields an iterator result.
function* forEach(subscription) {
let next = yield* subscription.next();
while (!next.done) {
console.log(`value: ${next.value}`);
next = yield* subscription.next();
}
console.log(`done: ${next.value}`);
}
Subscriptions, like async iterators, are stateful objects, and calling
the next()
method on them alters their internal state. Crucially,
taking the next value from the subscription not only retrieves that
value, but also removes it, so that we don't iterate the same value twice.
Just as with async interators, subscriptions are low-level constructs that you don't interact with all that often, but it is good to know that they are there. Most of the time however, instead of working with subscriptions directly, you'll work with streams.
Stream
Streams are the Effection equivalent of async iterables,
and like them, are stateless objects that does not do anything by
themselves. Whereas a Subscription
represents hot, active state, a
Stream
on the other hand contains a recipe on how to create
a subscription.
A Stream can have any number of subscriptions, each of which receives
the same set of values. Effection ships with Channel
, which is
a very handy way to create streams and by proxy, subscriptions. Let's use this
to show the difference between them:
import { main, createChannel } from 'effection';
await main(function*() {
let channel = createChannel();
// the channel has no subscribers yet!
yield* channel.send('too early');
let subscription1 = yield* channel;
let subscription2 = yield* channel;
yield* send('hello');
yield* send('world');
console.log(yield* subscription1.next());
//=> { done: false, value: "hello" }
console.log(yield* subscription1.next());
//=> { done: false, value: "world" }
console.log(yield* subscription2.next());
//=> { done: false, value: "hello" }
console.log(yield* subscription2.next());
//=> { done: false, value: "world" }
});
As you can see, the channel can have multiple subscribers and sending a message to the channel adds it to each active subscription. If the channel does not have any active subscriptions, then sending a message to it does nothing.
for yield* each
The entire point of having a stream is to consume values from it, and we
have already seen how we can use yield*
to subscribe to a Stream and turn
a Stream into a Subscription. But there is an easier way!
When working with an async iterator, you would not normally traverse it by
manually invoking the next()
method repeatedly. Instead, you would use the
for await of
syntax to conveniently focus on the
work to be done with each item instead of the mechanics of iteration.
It is the same with Effection streams.
The simplest way to consume a stream in Effection is with the for yield* each
loop. It is almost identical to the for await
loop from
async/await
. The biggest difference is that you must yield to the
each.next
operation as the last step of your loop.
import { main, createChannel, each, spawn, sleep } from 'effection';
await main(function*() {
let channel = createChannel();
yield* spawn(function*() {
yield* sleep(1000);
yield* channel.send('hello');
yield* sleep(1000);
yield* channel.send('world');
});
for (let value of yield* each(channel)) {
console.log('got value:', value);
yield* each.next();
}
Why do we need to use spawn()
here? We know that sending values to a
stream does nothing unless someone is subscribed to the stream, so we
cannot send any values before we begin our loop, but we also cannot
send any values after we begin our loop because the loop will never
complete until the stream closes (more about that later). So we need
to run both the loop and the sending of values concurrently, and as
we've already learned, when we need to do multiple things
concurrently, that's when we use spawn()
.
We could also flip this example around like this:
import { main, createChannel, next, spawn, sleep } from 'effection';
await main(function*() {
let channel = createChannel();
yield* spawn(function*() {
for (let value of yield* each(channel)) {
console.log('got value:', value);
yield* each.next();
}
});
yield* sleep(1000);
yield* channel.send('hello');
yield* sleep(1000);
yield* channel.send('world');
});
Why Subscriptions?
Unlike listening for one-off events, a subscription guarantees that we can never miss any messages. We can see how a subscription will not drop a message even though it may receive deliveries even while the consumer is doing other things:
imchannel { main, createChannel, spawn, sleep } from 'effection';
await main(function*() {
let channel = createChannel();
let subscription = yield* channel;
yield* spawn(function*() {
yield* sleep(1000);
yield* channel.send('hello');
yield* channel.send('world');
});
let { value: firstValue } = yield* subscription.next();
console.log(firstValue); // logs 'hello'
yield* sleep(1000);
let { value: secondValue } = yield* subscription.next();
console.log(secondValue); // logs 'world'
});
Closing streams
All good things must come to an end, including a stream of data. In these cases
Effection streams and subscriptions optionally support passing a final piece of
data when there are no more values to be received. Usually, if present, this
piece of data represents a summary of the stream and something about its final
end state. For example, when a websocket is closed it emits a
close
event that contains a numeric code and a description of
why the socked was closed.
The value of this last piece of data is the value of the iterator result when
the done
attribute is true. Unsprisingly, this mirrors the behavior of an
async iterator exactly.
import { main, sleep, spawn, createChannel } from "effection";
await main(function*() {
let channel = createChannel();
let subscription = yield* channel;
yield* spawn(function*() {
yield* channel.send('hello');
yield* sleep(1000);
yield* channel.send('hello');
yield* channel.close({ words: 2 });
});
let next = yield* subscription.next();
while (!next.done) {
console.log(next.value);
next = yield* subscription.next();
}
console.log(`${next.value} words total`);
});
//=> hello
//=> world
//=> 2 words total
Just like with an async iterator, if you want to access the very last iterator
result, you cannot use the for...of
. You must use a while-loop
. This is
because the body of the loop only executes for iterator result where the done
property is false
. It does not execute for the last iterator result where
done
is true.
Creating your own streams
Most of the examples thus far have shown how to consume streams that
are originated from within an operation, and so have used the
preferred mechanism for that which is the Channel. To do
this, we run the channel's send()
operation. But because send()
is
an operation, it means that in order to send a message to the stream,
we must already have to be inside an operation. However, this poses a
problem for events that originate from outside of an operation such
as an event listener. In this case, we need to be able to call a plain
vanilla javascript function which is not an operation, and still
notify any subscribers to a stream. For these cases there is the
Signal
interface which can be created with
createSignal
function.
In the following example, we use a signal as an event callback to log all of the events
import { createSignal, each } from "effection";
export function* logClicks(button) {
let clicks = createSignal();
try {
button.addEventListener("click", clicks.send);
for (let click of yield* each(clicks)) {
console.dir({ click });
yield* each.next();
}
} finally {
button.removeEventListener("click", clicks.send);
}
}
First, we create the signal, then attach its send()
function to the
event target's callback. It then loops over every click event received
in this way and logs out its contents to the console. Notice that,
like a well-behaved operation, we make sure to uninstall the listener
on the button so that we left it in the same state we found it.
This is a good demonstration of the concept, but it isn't as useful as it could be because any time we want to do something like the above to consume a stream of clicks, we'd first need to create a signal, attach the signal's callback, and then iterate over its content. For example, if we wanted to make an operation that disabled a button by preventing the default action every time, we would have to regurgitate the same boilerplate code as in our click logger:
import { createSignal, each } from "effection";
export function* cancelClicks(button) {
let clicks = createSignal();
try {
button.addEventListener("click", clicks.send);
for (let click of yield* each(clicks)) {
click.preventDefault();
yield* each.next();
}
} finally {
button.removeEventListener("click", clicks.send);
}
}
In other words, the problem with this method, is that you have to
define the stream and consume it in the same place. But what we'd like
to do instead is to define the stream in one place, but consume it in
another. That way, we could specify the setup and teardown logic once,
but utilize it multiple times. If we had a way to do that, say a clicksOn()
function that could take any html element and return a stream of clicks on that
element, then we could write something like the following:
function* logAndCancel(button) {
let clicks = clicksOn(button);
yield* spawn(function*() {
for (let click of yield* each(clicks)) {
console.log(click);
yield* each.next();
}
})
yield* spawn(function*() {
for (let click of yield* each(clicks)) {
click.preventDefault();
yield* each.next();
}
});
}
It turns out that resources are just what we need to make
this happen. If you recall, a Stream
is just an
Operation
that returns a
Subscription
. So the simplest way to implement such
an operation is as a resource that provides a subscription.
💡Most of the time, you won't need to implement your own streams, but when you do, a resource is the easiest way to do it.
Armed with resources, we can now implement our hypothetical clicksOn
utility.
import { resource } from "effection"
export function clicksOn(button) {
return resource(function*(provide) {
let clicks = createSignal();
try {
button.addEventListener("click", clicks.send);
let subscription = yield* clicks;
yield* provide(subscription);
} finally {
button.removeEventListener("click", send);
}
});
}
We have now encapsulated the setup and teardown of our listener in a single place, but the actual consuming of the stream of clicks can happen anywhere.
In fact, Effection provides a handy way to create a stream of events out of any
EventTarget
using the built in on()
function, and this is
precisely the technique that it uses. You can read more about using event
targets in the next section.