The Fortress programming language has some very powerful features. Guy Steele's slides here (slide 52 onwards) illustrate some of its features including how to split a sentence into words in parallel.

Here we look at some ways to use Groovy and GPars to solve that problem.

First we can write an imperative sequential version:

def swords = { s -> def result = [] def word = '' s.each{ ch -> if (ch == ' ') { if (word) result += word word = '' } else word += ch } if (word) result += word result } |

And test it as follows:

assert swords("Here is a sesquipedalian string of words") == ['Here', 'is', 'a', 'sesquipedalian', 'string', 'of', 'words'] |

Now, we can ask ourselves a question. Is this solution in its current form easy to turn into a parallel solution? The answer is NO. Each step in our loop relies on the result from the previous step. So, if I split the sentence into say two pieces, and I am processing the start of the second piece, I don't know whether a character or space finished the previous piece. I.e. the order in which I process information is important in the current algorithm and to make it work easily in parallel, we need to refactor the algorithm into some equivalent one where the order is no longer important. Technically we require that the algorithm is associative. The figure below shows what we mean.

If we have three things to calculate, it doesn't matter if we do the first two first or the last two first.

Another useful property that would be good for our algorithm to have is left and right identity ("zero") elements.

This property allows us to break our data up into pieces and (if needed) substitute a ZERO element at one or the other end, i.e. our algorithm will play nicely at the boundary points. Basically we can turn this:

into this:

Technically, combining associativity with left and right identity gives us a monoid. The details aren't important here but basically algorithms with such properties can be easily made parallel. This is exactly what we want when using techniques such as map/reduce or divide and conquer as part of our solution.

So getting back to word splitting we need to change the algorithm to remember the boundary conditions. To do that we will create a little domain for remembering "chunks" and "segments" of partial results. Here is one way to code that solution (and credit goes to Guy Steele's article for devising this domain):

class Util { static maybeWord(s) { s ? [s] : [] } } import static Util.* @Immutable class Chunk { String s public static final ZERO = new Chunk('') def plus(Chunk other) { new Chunk(s + other.s) } def plus(Segment other) { new Segment(s + other.l, other.m, other.r) } def flatten() { maybeWord(s) } } @Immutable class Segment { String l; List m; String r public static final ZERO = new Segment('', [], '') def plus(Chunk other) { new Segment(l, m, r + other.s) } def plus(Segment other) { new Segment(l, m + maybeWord(r + other.l) + other.m, other.r) } def flatten() { maybeWord(l) + m + maybeWord(r) } } |

We'll have more to say about these classes later but basically we have plus ("+") methods for adding `Segment`

s and `Chunk`

s, a `ZERO`

identity element and a `flatten`

method which takes our `Chunk`

or `Segment`

and converts it into a list of words.

Guy Steele's slides show using this domain in more detail. Here is a summary snapshot to illustrate the main idea.

Now we are ready to write a slightly more functional flavored sequential solution:

def processChar(ch) { ch == ' ' ? new Segment('', [], '') : new Chunk(ch) } def swords(s) { s.inject(Chunk.ZERO) { result, ch -> result + processChar(ch) } } |

And test it as follows:

assert swords("Here is a sesquipedalian string of words").flatten() == ['Here', 'is', 'a', 'sesquipedalian', 'string', 'of', 'words'] |

More importantly, because of the properties of the classes in our domain, we are ready to write some parallel solutions. Before doing that, let's take a slight diversion to illustrate how we might further check some of our algorithm's properties. We'll use the Java port of quickcheck:

@GrabResolver('http://download.java.net/maven/2') @Grab('net.java:quickcheck:0.5') import static net.java.quickcheck.generator.CombinedGeneratorsIterables.someNonEmptyLists import static net.java.quickcheck.generator.PrimitiveGenerators.nonEmptyStrings def total = 0 someNonEmptyLists(nonEmptyStrings()).each { words -> def someSegment = words.size() > 2 ? new Segment(words[0], words[1..-2], words[-1]) : new Segment('', words, '') def expected = someSegment.flatten() [Chunk.ZERO, Segment.ZERO].each { zero -> assert expected == (someSegment + zero).flatten() assert expected == (zero + someSegment).flatten() } total++ } println "$total tests" |

This produces the output:

200 tests |

and gives me confidence that my algorithm is a monoid. The above showed that `Segment`

s have left and right identity elements for "+" (in fact both `Check.ZERO`

and `Segment.ZERO`

work). If we wanted to, we could have additional checks for the associativity piece or for properties of `Chunk`

.

What have we achieved? Well, now we are in a position to write some parallel versions of our algorithm. Many have a common strategy and that is to divide the input into sections. Typically the division stops once the sections reach a certain granularity of size. As a general rule, if we divide past a certain level of granularity, then the overheads associated with setting up the parallelism out weigh the parallelism gains.

Here's one version which uses an old school concurrent hash map. This version divides the input sentence into 4 pieces and solves each piece in a separate thread, storing the results into a concurrent hash map.

THREADS = 4 def pwords(s) { int n = (s.size() + THREADS - 1) / THREADS def map = new ConcurrentHashMap() (0..<THREADS).collect { i -> Thread.start { def (min, max) = [[s.size(), i * n].min(), [s.size(), (i + 1) * n].min()] map[i] = swords(s[min..<max]) } }*.join() (0..<THREADS).collect { i -> map[i] }.sum().flatten() } |

And test it using this code:

assert pwords("Here is a sesquipedalian string of words") == ['Here', 'is', 'a', 'sesquipedalian', 'string', 'of', 'words'] |

Alternatively, I can use GPars and not have to deal with as much explicit synchronization.

Firstly, we'll look at a map reduce version. It is comprised of a partition part which splits the data up - in our case into 4 pieces. Then the map part runs our serial solution for each piece - notice that the mapping is independent and can be run completely in parallel. For the reduce part we will use our "+" monoid. Here is the code:

@Grab('org.codehaus.gpars:gpars:0.11') import static groovyx.gpars.GParsPool.withPool THRESHHOLD = 10 def partition(piece) { piece.size() <= THRESHHOLD ? piece : [piece[0..<THRESHHOLD]] + partition(piece.substring(THRESHHOLD)) } def pwords = { input -> withPool(THREADS) { partition(input).parallel.map(swords).reduce{ a, b -> a + b }.flatten() } } |

The previous example used an explicit reduce to make that part of our solution obvious but we can also simplify a bit more using the built-in sum reduction:

partition(input).parallel.map(swords).sum().flatten() |

As an alternative, we can use the fork/join capabilities which are available in Java 7 (or the JSR166y library - this library is typically bundled with GPars):

def pwords(input) { withPool(THREADS) { runForkJoin(0, input.size(), input) { first, last, s -> def size = last - first if (size <= THRESHHOLD) { swords(s[first..<last]) } else { // divide and conquer int pivot = first + size / 2 forkOffChild(first, pivot, s) forkOffChild(pivot, last, s) childrenResults.sum() } }.flatten() } } |

Alternatively, we can use GPars' dataflow capabilities. With dataflow we write expressions which declaratively express the relationships in our data. In our case the pieces must be summed (using our "+" monoid) once they have been calculated:

import groovyx.gpars.dataflow.DataFlows import static groovyx.gpars.dataflow.DataFlow.task def partition(s, n, i) { s[[s.size(), i * n].min()..<[s.size(), (i + 1) * n].min()] } def pwords(s) { int n = (s.size() + THREADS - 1) / THREADS new DataFlows().with { task { a = swords(partition(s, n, 0)) } task { b = swords(partition(s, n, 1)) } task { c = swords(partition(s, n, 2)) } task { d = swords(partition(s, n, 3)) } task { sum1 = a + b } task { sum2 = c + d } task { sum = sum1 + sum2 } // task { sum = a + b + c + d } // alternative sum }.flatten() } |

Finally, we can create a parallel array version. This version (as currently written) doesn't limit the amount of parallelism to some granularity level, instead it follows the same approach as our functional sequential version but just replaces inject with collectParallel which will automatically perform its steps in parallel. Here is the code:

def pwords(input) { withPool(THREADS) { input.collectParallel{ processChar(it) }.sumParallel().flatten() } } |

Just as one final example, here is a variation of our original ConcurrentHashMap version but making use of GPars agents to guard a non-concurrent HashMap.

def pwords(s) { int n = (s.size() + THREADS - 1) / THREADS def agent = new Agent<Map>([:], {it?.clone()}) (0..<THREADS).collect { i -> Thread.start { def (min, max) = [[s.size(), i * n].min(), [s.size(), (i + 1) * n].min()] def result = swords(s[min..<max]) agent << { it[i] = result; updateValue it } } }*.join() assert agent.val instanceof LinkedHashMap (0..<THREADS).collect { i -> agent.val[i] }.sum().flatten() } |

Here, the agent acts as a synchronization layer between our code and the standard LinkedHashMap (the default map for Groovy). This offers no particular advantage over ConcurrentHashMap but illustrates how agents work in general. We supply the agent with code to run and it runs that code after performing the necessary synchronization.