Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
import fj.*
import fj.control.parallel.Strategy
import static fj.Function.curry as fcurry
import static fj.P1.curry as pcurry
import static fj.P1.fmap
import static fj.control.parallel.Actor.actor
import static fj.control.parallel.Promise.*
import static fj.data.List.range
import static java.util.concurrent.Executors.*

CUTOFF  = 12   // not worth parallelizing for small n
START   = 8
END     = 16
THREADS = 4

pool = newFixedThreadPool(THREADS)
su   = Strategy.executorStrategy(pool)
spi  = Strategy.executorStrategy(pool)
add  = fcurry({ a, b -> a + b } as F2)
nums = range(START, END + 1)

println "Calculating Fibonacci sequence in parallel..."

serialFib = { n -> n < 2 ? n : serialFib(n - 1) + serialFib(n - 2) }

print = { results ->
  def n = START
  results.each{ println "n=${n++} => $it" }
  pool.shutdown()
} as Effect

calc = { n ->
  n < CUTOFF ?
    promise(su, P.p(serialFib(n))) :
    calc.f(n - 1).bind(join(su, pcurry(calc).f(n - 2)), add)
} as F

out = actor(su, print)
join(su, fmap(sequence(su)).f(spi.parMapList(calc).f(nums))).to(out)

Using java.util.concurrent.Exchanger

Code Block

import java.util.concurrent.Exchanger
def x = new Exchanger()
def (first, second) = [1..20, 21..40]
def (evens, odds) = [[], []]
def t1 = Thread.start{ odds = first.collect{ it % 2 != 0 ? it : x.exchange(it) } }
def t2 = Thread.start{ evens = second.collect{ it % 2 == 0 ? it : x.exchange(it) } }
[t1, t2]*.join()
println "evens: $evens"
println "odds: $odds"

Catching Exceptions with an Exception Handler

...