missionary flow tutorial – Peter Nagy
{:deps {compliment/compliment {:mvn/version "0.3.9"}
org.clojure/clojure {:mvn/version "1.11.1"}
com.hyperfiddle/rcf {:mvn/version "20220902-130636"}
missionary/missionary {:mvn/version "b.27-SNAPSHOT"}}}
(require [hyperfiddle.rcf :as rcf :refer [tests ! % with]])
(require [missionary.core :as m])
(import [clojure.lang IFn IDeref])
(hyperfiddle.rcf/enable!)
Let's create a minimal flow by hand. We will create it first and explain it afterwards.
(defn hello [n t]
(n)
(reify
IDeref
(deref [_] (t) "Hello world!")))
We can test this as
(tests
(m/? (m/reduce conj hello))
:= ["Hello world!"])
This is not a fully compliant implementation, but it can already run for a simple hello world example. Let's read it now.
A flow is a function taking 2 arguments, named n
and t
. Upon calling hello
we immediately call n
as a 0-argument function and return an object implementing IDeref
, i.e. we can call deref
on it, or the shorthand @
reader macro.
At some point m/reduce
must have called deref
on this, so we called t
as a 0-argument function and returned "Hello world!". It seems deref
was only called once, since we only see the "Hello world!" string once in the result.
Let's give a real name to some of these things:
n
stands for notifier. It is a thunk (0-argument function) that notifies the consumer (the one wants to take values from the flow) that it can transfer a new value.t
is for terminator. It is a thunk that notifies the consumer the flow won't have more values.the defer implementation we call transfer. The consumer calls it after notification to transfer the new value.
a flow is a function, when we call it we say we spawned a process. So a flow is only a blueprint. In that sense it is similar to a java Iterable vs. Iterator, i.e. an Iterable carries no state, only the Iterator does. Same with flow vs. process.
the returned process is also called in iterator.
With this knowledge we might be able to read our hello
flow like this:
upon spawning the process we immediately notify we have a value ready
m/reduce
recieves the notification and can transfer a valueduring transfer we send a termination notice and return "Hello world!"
m/reduce
recieves the value. Since it also recieves a termination notice it knows it finished its job.
Maybe we would be able to go as far as reimplement m/reduce
?
(defn myreduce [rf flow]
(let [notified? (atom false)
terminate? (atom false)
process (flow (reset! notified? true) (reset! terminate? true))]
(loop [ret (rf)]
(if terminate?
ret
(if notified?
(do (reset! notified? false)
(recur (rf ret process)))
(throw (ex-info "bad" {})))))))
Let's see (fingers crossed...)
(tests
(myreduce conj hello)
:= ["Hello world!"])
We did it! Will it work with m/seed
? It turns a sequence into a flow
(tests
(myreduce conj (m/seed [1 2 3]))
:= [1 2 3])
Cool! Let's walk through the code a bit:
we look for a new notification in form of a boolean
we look for termination in form of a boolean
we spawn a process with the notifier setting
notified?
to true and terminator settingterminate?
to trueuntil we get a termination notice we keep looking for more notifications. Once terminated we'll return what we managed to collect
if there is a notification we transfer the value and recurse
else we throw
We simplified in many ways here, although m/seed
also worked. What our flow and m/seed
have in common is that they are synchronous. Missionary can model any asynchronous flow as well, which calls notifier and terminator at some unknown point in the future. Our simplistic reduce implementation only allows synchronous flows. That's fine though, at least we can see that flows can model a java Iterable or clojure's IReduce. Just bear in mind the real power comes from modelling asynchronous data with it.
There's one more concept we skipped, cancellation. It's great that a flow can send a termination notice but we also might want to stop early, i.e. take only 5 values from a flow even though it could produce more. For this case one can call a flow as a thunk. Our hello world flow would then be
(defn hello [n t]
(n)
(reify
IDeref (deref [_] (t) "Hello world!")
IFn (invoke [_])))
We don't have any cleanup work to do upon cancellation, but now we implement the flow protocol.
To recap, there's 2 sides of the coin here:
internally a flow calls notifier and terminator thunks it recieves as arguments
externally we can call transfer and cancel on a process (running flow)
We showed how myreduce
supplies its own notifier and terminator for the flow it wishes to spawn. All flow operators like latest
or eduction
use the same concept.
Can you guess what should reduce do when it recieves a reduced
value?