Solving Search Problems with Parallel Depth-First Search

Introduction

In my previous post Constraint Satisfaction Problems and Functional Backtracking Search I presented a functional solution for solving CSPs with backtracking search, which is a variant of depth-first search. As I wrote the post, I thought it would be trivial to write a parallel depth-first search once that I have a functional solution, because the functional programming style is a natural fit for leveraging the fork-join model. In Clojure, it is even possible to run certain data transformations in parallel just by using the parallel running versions of common functions like reduce, filter or map of the reducers library. But soon I realized, that the functional backtracking code cannot be parallelized by the reducers library for various reasons. First, the reducers library does not parallelize lazy sequences and second and foremost, the way we construct our lazy sequence does not fit well with a fork-join approach. Let us have a look at our sequential backtracking search and understand, why that is the case.

Setup

{:deps
 {org.clojure/clojure {:mvn/version "1.10.1"}
  org.clojure/tools.deps.alpha
  {:git/url "https://github.com/clojure/tools.deps.alpha.git"
   :sha "f6c080bd0049211021ea59e516d1785b08302515"}
  compliment {:mvn/version "0.3.9"}
  org.clojure/core.async {:mvn/version "1.1.587"}}}
deps.edn
Extensible Data Notation

Refresher: Sequential Backtracking Search

Let us take the code from the previous post and rewrite it against a protocol so that the same problem can be solved sequentially or parallel without any code changes. We call that protocol Searchable, because we are searching for a solution to a problem. We also write a default implementation for nil to support nil-punning.

(defprotocol Searchable
  (children [this])
  (xform [this]))
(extend-type nil
  Searchable
  (children [_] nil)
  (xform [_] (map identity)))
0.1s
Clojure
nil

Our backtracking search is performing depth-first search on a tree of (partial) assignments. Connected nodes of trees form a parent-child relationship, so we have a children function on a Searchable type, that calculates the child-nodes of a searchable node in the search tree. We also have a xform function, that returns a transducer, e.g. to remove all children of a node that are not consistent when searching for a solution to a CSP.

(defn sequential-backtracking-seq [xform csps]
  (lazy-seq (if-let [[$first & $rest]
                     (sequence xform csps)]
              (cons $first
                    (sequential-backtracking-seq xform
                                                 (concat (children $first)
                                                         $rest))))))
(defn sequential-backtracking [csp]
  (first (filter (comp empty? children)
                 (sequential-backtracking-seq (xform csp)
                                              (children csp)))))
0.1s
Clojure
user/sequential-backtracking

Solving the Map Coloring Problem sequentially

We want to make sure that our sequential backtracking search still works with the new abstraction. For that we want to solve the map coloring problem again. We can reuse most of the code from the previous post again.

(defn constraint [state-a state-b]
  (fn [csp]
    (if-let [a (get-in csp [:assignment state-a])]
      (= a (get-in csp [:assignment state-b]))
      false)))
(defn init [domains variables constraints]
  {:domains                   domains
   :variables                 variables
   :any-constraints-violated? (apply some-fn constraints)
   :assignment                (zipmap variables (repeat nil))})
(defn select-unassigned-variable [csp]
  (first (filter (comp nil?
                       (partial get-in csp)
                       (partial conj [:assignment]))
                 (:variables csp))))
(defn assign-next-var [csp d]
  (if-let [$unassigned-variable (select-unassigned-variable csp)]
    (assoc-in csp
              [:assignment $unassigned-variable]
              d)
    nil))
(defn next-csps [csp]
  (filter some?
          (map (partial assign-next-var csp)
               (:domains csp))))
(defn complete? [csp]
  (and csp (every? some? (vals (:assignment csp)))))
(defn consistent? [{:keys [any-constraints-violated?]
                    :as   csp}]
  (not (any-constraints-violated? csp)))
0.2s
Clojure
user/consistent?

Additionally, we have to wire it all together by implementing the Searchable protocol:

(defrecord MapColoringCsp []
  Searchable
  (children [this] (next-csps this))
  (xform [_] (filter consistent?)))
(def csp (map->MapColoringCsp
           (init [:red :green :blue]
                 [:WA :NT :Q :NSW :V :SA :T]
                 #{(constraint :WA :NT)
                   (constraint :WA :SA)
                   (constraint :NT :SA)
                   (constraint :NT :Q)
                   (constraint :SA :Q)
                   (constraint :SA :NSW)
                   (constraint :SA :V)
                   (constraint :Q :NSW)
                   (constraint :V :T)})))
(-> csp
  (sequential-backtracking)
  (time)
  :assignment)
0.5s
Clojure
Map {:WA: :red, :NT: :green, :Q: :red, :NSW: :green, :V: :red, :SA: :blue, :T: :green}

Parallelizing Depth-First Search

Looking at the sequential solution, the essential parts happen within the next-csps and consistent? functions, which have already been called when taking elements from the lazy sequence. That means that there is no point in forking and joining the resulting sequence.

In the following, we develop two variations to speed up the solution of Constraint Satisfaction Problems through the use of parallel computing. The first approach makes use of a recursive fork–join algorithm, the second one is build on top of abstractions inspired by Communicating Sequential Processes.

Recursive Fork-Join Parallel Search

For the recursive fork-join solution we make use of Java's Fork-Join-Framework:

(import '[java.util.concurrent ForkJoinPool
                               ForkJoinTask
                               RecursiveTask])
0.1s
Clojure
java.util.concurrent.RecursiveTask

Additionally, we wrap it with a nicer API [source]:

(defmacro fork [& body]
  `(let [^ForkJoinTask task# (proxy [RecursiveTask] []
                               (compute []
                                 (do ~@body)))]
     (if (ForkJoinTask/inForkJoinPool)
       (.fork task#)
       (.execute (ForkJoinPool/commonPool) task#))
     task#))
(defn join [^ForkJoinTask task]
  (.join task))
(defn invoke-all [tasks]
  (ForkJoinTask/invokeAll (into-array tasks)))
0.1s
Clojure
user/invoke-all

The main part of the algorithm is implemented in the solve-csp function and at first glance looks like a regular recursive backtracking search. The main difference is, that for each assignment to a CSP variable, all the alternative assignments are checked in parallel.

(defn join-consistent-xf [csp]
  (comp (mapcat join)
        (xform csp)))
(defn solve-csp [[csp :as stack]]
  (fork
    (if (or (nil? csp)
            (empty? (children csp)))
      stack
      (let [$next-csps (map (comp solve-csp list)
                            (children csp))]
        (invoke-all $next-csps)
        (join (solve-csp (into (rest stack)
                               (join-consistent-xf csp)
                               $next-csps)))))))
(defn fork-join-backtracking [csp]
  (first (join (solve-csp (list csp)))))
0.1s
Clojure
user/fork-join-backtracking

Solving the Map Coloring Problem via Fork-Join

We test the recursive fork-join solution again with our map colouring problem from the previous post:

(-> csp
  (fork-join-backtracking)
  (time)
  :assignment)
1.1s
Clojure
Map {:WA: :blue, :NT: :red, :Q: :blue, :NSW: :red, :V: :blue, :SA: :green, :T: :green}

Drawbacks of the Recursive Fork-Join Solution

For this small problem, the fork-join solution is rather inefficient. Usually a task should perform up until a certain threshold of work before it should spawn new tasks.

But there are two more severe problems with this algorithm. First, for deep CSPs, i.e. for CSPs with many variables, the recursive solution would run in an StackOverflowError:

(def deep-csp
  (map->MapColoringCsp (init [:red]
                             (range 10000)
                             #{(constantly false)})))
(try
	(fork-join-backtracking deep-csp)
  (catch java.lang.StackOverflowError e
    (type e)))
1.4s
Clojure
java.lang.StackOverflowError

The second problem lies in the eagerness of the recursion. The algorithm would amass as many tasks as there are states to explore without applying back pressure against the exhaustion of resources, and no prioritization of depth over breadth.

Hence, we try a different approach that makes use of a library called core.async, which is inspired by the paper "Communicating sequential processes" by Tony Hoare, to implement a parallel depth-first search that addresses both of these problems.

Parallel Depth-First Search

The implementation of sequential depth-first search uses a stack to store the nodes of the graph to traverse. Since there is only one producer of elements for the stack, it is guaranteed that the last added element of the stack is always the deepest known node. If there are multiple parallel workers searching the graph depth-first, the last added item to the stack is no longer guaranteed to be the deepest known node. To solve this problem, it is helpful to interpret a stack as a priority queue that prioritizes on time added. For parallel depth-first search, we need a priority queue that prioritizes on depth in the graph instead. We can use a java.util.TreeSet to implement such a generic priority queue, but this class is not synchronized, i.e. "If multiple threads access a tree set concurrently, and at least one of the threads modifies the set, it must be synchronized externally." [source] In order to enable concurrent write and reads on our priority queue, we implement the Buffer-Protocol of the core.async-library that also adds the ability to back pressure the allocation of resources.

(require '[clojure.core.async.impl.protocols :as async-buffer])
(deftype OrderedSetBuffer [^long n ^java.util.TreeSet buf]
  async-buffer/Buffer
  (full? [this]
    (>= (.size buf) n))
  (remove! [this]
    (.pollLast buf))
  (add!* [this itm]
    (.add buf itm)
    this)
  (close-buf! [this])
  clojure.lang.Counted
  (count [this]
    (.size buf)))
0.7s
Clojure
user.OrderedSetBuffer

Now that we have an ordered set buffer, we can implement a stack buffer, by requiring the elements of the buffer to be depth-first searchable:

(defprotocol DepthFirstSearchable
  (depth [this]))
(defn depth-first-comparator [depth-first-searchable-0
                              depth-first-searchable-1]
  (let [result (- (depth depth-first-searchable-0)
                  (depth depth-first-searchable-1))]
    (if (= result 0) -1 result)))
(defn stack-buffer [^long n]
  (new OrderedSetBuffer n
       (new java.util.TreeSet
            ^java.util.Comparator depth-first-comparator)))
0.2s
Clojure
user/stack-buffer

We can then use the library core.async to synchronize the stack buffer for asynchronous reads and writes by wrapping it in a channel:

(require '[clojure.core.async :as async])
;; allow numbers to be put on the stack buffer by
;; making them implement DepthFirstSearchable
(extend-type java.lang.Number DepthFirstSearchable (depth [this] this))
(let [ch (async/chan (stack-buffer 4))]
  ;; go blocks are lightweight processes not bound to threads
  ;; async/>! writes to a channel in a go block
  (async/go
   (async/>! ch 1) 
   (async/>! ch 3))
  (async/go
   (async/>! ch 2)
   (async/>! ch 4))
  
  ;; async/<!! reads from a channel outside a go block
  (async/<!! (async/timeout 100)) ;; wait for 100 ms
  (async/close! ch)
  (repeatedly 4 (partial async/<!! ch))) ;; should return (4 3 2 1)
9.0s
Clojure
List(4) (4, 3, 2, 1)

While the DepthFirstSearchable protocol specifies the direction of the search, we want to design a generic parallel depth-first search for trees that can be customized in two additional ways.

The first way is whether we want to combine the inspected nodes with each other, for example to count how many nodes we needed to visit in order to find a solution for a problem. Therefore, we define the Combinable protocol.

(defprotocol Combinable
  (combine [this other]))
0.1s
Clojure
Combinable

We also provide a default implementation of combine that takes the two arguments this and other and always returns other.

(extend-protocol Combinable
  Object (combine [this other] other)
  nil    (combine [this other] other))
(combine :a :b)
0.1s
Clojure
:b

The second way to customize our generic parallel depth-first search is to specify whether the search stops as soon as the first solution has been found or if the search continues until all possible nodes have been visited. For the latter case, we define a marker protocol named ExhaustiveSearch:

(defprotocol ExhaustiveSearch)
(defn return-first-solution? [problem]
  (not (satisfies? ExhaustiveSearch problem)))
0.1s
Clojure
user/return-first-solution?

The general idea of how to work with these abstractions in the form of protocols is that we differentiate between the main thread, which reads elements from a channel and acts as an interface to our stack buffer, and the workers, which do the actual traversing of the tree and write to the channel. On each level of the tree, the workers try to offload a node from their stack on the channel, so that the main thread can start a new worker with this node as a starting point. The main thread will stop spawning new workers as soon as the specified degree of parallelism has been reached. Thus, we achieve back pressure by limiting the worker pool to a certain size via the parallelism parameter.

Let us start with the code for the asynchronous workers:

(defmacro combine->offer->recur [$ stack chan]
  `(let [[$first# & [spawn# & $nnext# :as $next#]] ~stack]
     (recur
       (combine ~$ $first#)
       (if (and spawn# (async/offer! ~chan spawn#))
         $nnext#
         $next#))))
(defn add-async-worker-to [worker-pool problem xform chan]
  (->> (async/go-loop [$ problem
                       stack (list)]
         (if-let [$children (seq (children $))]
           (if-let [next-stack (seq (into stack xform $children))]
             (combine->offer->recur $ next-stack chan)
             $)
           (cond
             (return-first-solution? $) (reduced $)
             (empty? stack) $
             :else (combine->offer->recur $ stack chan))))
       (conj worker-pool)))
0.2s
Clojure
user/add-async-worker-to

Asynchronous worker added to the worker pool perform a regular depth-first-search except that if there is more than one element on the stack, a worker tries to put the second element of the stack on the channel. If the channel is full, this does not succeed, but it never blocks the worker.

Now we have a look at the main thread. The main thread assumes the following tasks:

  • Starting new workers if the worker pool is not full

  • Combining the results of different workers into a single result

  • Stopping the search when a solution has been found and the search should not be exhaustive

  • Returning nil if no solution has been found

(defn transduce-1
  "apply a transducer on a single value"
  [xform x]
  (let [f (xform combine)]
    (f nil x)))
(defn remove-worker-from [worker-pool worker]
  (filterv #(not= % worker) worker-pool))
;; given a vector of channels (our async workers are also channels),
;; async/alts!! will return the first element that is ready for consumption
;; and the channel it originates from
(defn next-channel-value [worker-pool chan parallelism]
  (if (<= parallelism (count worker-pool))
    (async/alts!! worker-pool)
    (if (seq worker-pool)
      (async/alts!! (conj worker-pool chan))
      (async/alts!! [chan] :default nil))))
(defn rec:parallel-depth-first-search [root-problem xform chan parallelism]
  (loop [problem root-problem
         worker-pool []]
    (let [[$val ch] (next-channel-value worker-pool chan parallelism)]
      (cond
        (reduced? $val) @$val ; solution found and no exhaustive search
        (= ch chan) (recur problem
                           (add-async-worker-to worker-pool $val xform chan))
        (= ch :default) problem ; search completed
        :else (recur (combine problem $val)
                     (remove-worker-from worker-pool ch))))))
(defn parallel-depth-first-search
  ([init chan-size parallelism]
   (let [xform (xform init)
         root-problem (transduce-1 xform init)
         chan (async/chan (stack-buffer chan-size))]
     (async/>!! chan root-problem)
     (let [result (rec:parallel-depth-first-search root-problem
                                                   xform
                                                   chan
                                                   parallelism)]
       (async/close! chan)
       result))))
0.1s
Clojure
user/parallel-depth-first-search

Solving the Map Coloring Problem via Parallel Depth-First Search

To use of our parallel depth-first search, we must make the map coloring problem parallelizable by implementing Searchable as well as the DepthFirstSearchable protocol.

(defrecord ParallelMapColoringCsp []
  Searchable
  (children [this] (next-csps this))
  (xform [_] (filter consistent?))
  DepthFirstSearchable
  (depth [this] (count (remove nil? (vals (:assignment this))))))
(def p-csp (map->ParallelMapColoringCsp
            (init [:red :green :blue]
                  [:WA :NT :Q :NSW :V :SA :T]
                  #{(constraint :WA :NT)
                    (constraint :WA :SA)
                    (constraint :NT :SA)
                    (constraint :NT :Q)
                    (constraint :SA :Q)
                    (constraint :SA :NSW)
                    (constraint :SA :V)
                    (constraint :Q :NSW)
                    (constraint :V :T)})))
(-> p-csp
  (parallel-depth-first-search 4 4)
  (time)
  :assignment)
0.5s
Clojure
Map {:WA: :blue, :NT: :green, :Q: :blue, :NSW: :green, :V: :blue, :SA: :red, :T: :green}

The map coloring problem is too small to expect any performance improvements from a parallel solution. Therefore, we need to approach a different problem with a larger search space. We also have not sufficient tested, whether our implementation always returns a solution if one exists. Since there are many combinations of coloring possible, we would not have noticed if we only explored a subset of all search nodes in the search tree. To be sure that our implementation always returns a solution if one exists, we have to test that our search explores the full search space. We can do that by solving a problem that has to perform an exhaustive search. The problem of calculating partial sums in a brute force way is such a problem.

Calculating Partial Sums

The partial sums of the series 1 + 2 + 3 + 4 + 5 + 6 + ⋯ are 1, 3, 6, 10, 15, etc. The nth partial sum is given by a simple formula: inline_formula not implemented [source].

Suppose we would like to prove the Pythagoreans right by calculating the nth partial sum iteratively, we could formulate this as a search problem, and then we could either solve it with sequential or parallel depth-first search.

For that, we distribute all numbers up to n uniformly in our search tree with the largest node of value max-size and a branch factor of branch-factor.

(defn psp:parent [{:keys [value branch] :as problem}]
  (when (pos? value)
    (assoc problem :value (int (/ (dec value) branch)))))
(defn psp:ancestors [problem]
  (take-while some?
              (rest (iterate psp:parent problem))))
(defrecord PartialSumsProblem []
  Searchable
  (children [{:keys [value branch max-size] :as problem}]
    (map (partial assoc problem :value)
         (range (inc (* branch value))
                (inc (min (* branch (inc value))
                          max-size)))))
  (xform [_]
    (map #(assoc % :count (:value %))))
  DepthFirstSearchable
  (depth [this] (count (psp:ancestors this)))
  ExhaustiveSearch
  Combinable
  (combine [this other]
    (update other :count + (:count this))))
(defn psp:root [branch-factor max-size]
  (map->PartialSumsProblem {:value    0
                            :branch   branch-factor
                            :max-size max-size}))
0.1s
Clojure
user/psp:root

We can very easily check if our solution is correct for arbitrary large n with the proven formula:

(defn partial-sum [n]
	(/ (* n (inc n)) 2))
(def n 1000000)
(partial-sum n)
0.1s
Clojure
500000500000

To calculate a partial sum via depth-first search, we define a specific partial sum search problem:

;; the branch factor of 11 is arbitrary
(def partial-sum-problem (psp:root 11 n))
0.1s
Clojure
user/partial-sum-problem

Then we can calculate the partial sum either sequentially like this:

(defn sequential-partial-sum [problem]
  (transduce (comp (take-while some?)
                   (map :value))
             +
             (sequential-backtracking-seq (xform problem)
                                          (children problem))))
(time (sequential-partial-sum partial-sum-problem))
7.8s
Clojure
500000500000

Or we can calculate the partial sum in parallel:

(-> partial-sum-problem
  (parallel-depth-first-search 2 2)
  :count
  (time))
1.7s
Clojure
500000500000

The result is the same, but we could reduce the runtime with parallel depth-first search significantly compared to sequential depth-first search.

Conclusion and Outlook

We were able to express different search problems with the same abstractions that could be run either sequentially or in parallel and our implementation of parallel depth-first search could solve large search problems much quicker than the sequential counterpart.

The way we developed the parallel depth-first search makes it useful for all kind of tree shaped search problems that have a finite depth. For example, we could write an asynchronous web crawler by formulating it as search problem so that the crawling of websites would happen in parallel without writing any code for coordinating the crawling workers or preventing the exhaustion of resources.

Runtimes (1)