missionary flow tutorial – Peter Nagy

{:deps {compliment/compliment {:mvn/version "0.3.9"}
        org.clojure/clojure {:mvn/version "1.11.1"}
        com.hyperfiddle/rcf {:mvn/version "20220902-130636"}
        missionary/missionary {:mvn/version "b.27-SNAPSHOT"}}}
Extensible Data Notation
(require '[hyperfiddle.rcf :as rcf :refer [tests ! % with]])
(require '[missionary.core :as m])
(import '[clojure.lang IFn IDeref])
(hyperfiddle.rcf/enable!)
6.8s

Let's create a minimal flow by hand. We will create it first and explain it afterwards.

(defn hello [n t]
  (n)
  (reify
    IDeref 
    (deref [_] (t) "Hello world!")))
0.1s

We can test this as

(tests
  (m/? (m/reduce conj hello))
  := ["Hello world!"])
0.7s

This is not a fully compliant implementation, but it can already run for a simple hello world example. Let's read it now.

A flow is a function taking 2 arguments, named n and t. Upon calling hello we immediately call n as a 0-argument function and return an object implementing IDeref, i.e. we can call deref on it, or the shorthand @ reader macro.

At some point m/reduce must have called deref on this, so we called t as a 0-argument function and returned "Hello world!". It seems deref was only called once, since we only see the "Hello world!" string once in the result.

Let's give a real name to some of these things:

  • n stands for notifier. It is a thunk (0-argument function) that notifies the consumer (the one wants to take values from the flow) that it can transfer a new value.

  • t is for terminator. It is a thunk that notifies the consumer the flow won't have more values.

  • the defer implementation we call transfer. The consumer calls it after notification to transfer the new value.

  • a flow is a function, when we call it we say we spawned a process. So a flow is only a blueprint. In that sense it is similar to a java Iterable vs. Iterator, i.e. an Iterable carries no state, only the Iterator does. Same with flow vs. process.

  • the returned process is also called in iterator.

With this knowledge we might be able to read our hello flow like this:

  • upon spawning the process we immediately notify we have a value ready

  • m/reduce recieves the notification and can transfer a value

  • during transfer we send a termination notice and return "Hello world!"

  • m/reduce recieves the value. Since it also recieves a termination notice it knows it finished its job.

Maybe we would be able to go as far as reimplement m/reduce?

(defn myreduce [rf flow]
  (let [notified? (atom false)
        terminate? (atom false)
        process (flow #(reset! notified? true) #(reset! terminate? true))]
    (loop [ret (rf)]
      (if @terminate?
        ret 
        (if @notified?
          (do (reset! notified? false)
              (recur (rf ret @process)))
          (throw (ex-info "bad" {})))))))
0.1s

Let's see (fingers crossed...)

(tests
  (myreduce conj hello)
  := ["Hello world!"])
0.4s

We did it! Will it work with m/seed? It turns a sequence into a flow


(tests
  (myreduce conj (m/seed [1 2 3]))
  := [1 2 3])
0.4s

Cool! Let's walk through the code a bit:

  • we look for a new notification in form of a boolean

  • we look for termination in form of a boolean

  • we spawn a process with the notifier setting notified? to true and terminator setting terminate? to true

  • until we get a termination notice we keep looking for more notifications. Once terminated we'll return what we managed to collect

  • if there is a notification we transfer the value and recurse

  • else we throw

We simplified in many ways here, although m/seed also worked. What our flow and m/seed have in common is that they are synchronous. Missionary can model any asynchronous flow as well, which calls notifier and terminator at some unknown point in the future. Our simplistic reduce implementation only allows synchronous flows. That's fine though, at least we can see that flows can model a java Iterable or clojure's IReduce. Just bear in mind the real power comes from modelling asynchronous data with it.

There's one more concept we skipped, cancellation. It's great that a flow can send a termination notice but we also might want to stop early, i.e. take only 5 values from a flow even though it could produce more. For this case one can call a flow as a thunk. Our hello world flow would then be

(defn hello [n t]
  (n)
  (reify
    IDeref (deref [_] (t) "Hello world!")
    IFn    (invoke [_])))

We don't have any cleanup work to do upon cancellation, but now we implement the flow protocol.

To recap, there's 2 sides of the coin here:

  • internally a flow calls notifier and terminator thunks it recieves as arguments

  • externally we can call transfer and cancel on a process (running flow)

We showed how myreduce supplies its own notifier and terminator for the flow it wishes to spawn. All flow operators like latest or eduction use the same concept.

Can you guess what should reduce do when it recieves a reduced value?

Runtimes (1)