...
| Code Block |
|---|
import fj.*
import fj.control.parallel.Strategy
import static fj.Function.curry as fcurry
import static fj.P1.curry as pcurry
import static fj.P1.fmap
import static fj.control.parallel.Actor.actor
import static fj.control.parallel.Promise.*
import static fj.data.List.range
import static java.util.concurrent.Executors.*
CUTOFF = 12 // not worth parallelizing for small n
START = 8
END = 16
THREADS = 4
pool = newFixedThreadPool(THREADS)
su = Strategy.executorStrategy(pool)
spi = Strategy.executorStrategy(pool)
add = fcurry({ a, b -> a + b } as F2)
nums = range(START, END + 1)
println "Calculating Fibonacci sequence in parallel..."
serialFib = { n -> n < 2 ? n : serialFib(n - 1) + serialFib(n - 2) }
print = { results ->
def n = START
results.each{ println "n=${n++} => $it" }
pool.shutdown()
} as Effect
calc = { n ->
n < CUTOFF ?
promise(su, P.p(serialFib(n))) :
calc.f(n - 1).bind(join(su, pcurry(calc).f(n - 2)), add)
} as F
out = actor(su, print)
join(su, fmap(sequence(su)).f(spi.parMapList(calc).f(nums))).to(out)
|
Using java.util.concurrent.Exchanger
| Code Block |
|---|
import java.util.concurrent.Exchanger
def x = new Exchanger()
def (first, second) = [1..20, 21..40]
def (evens, odds) = [[], []]
def t1 = Thread.start{ odds = first.collect{ it % 2 != 0 ? it : x.exchange(it) } }
def t2 = Thread.start{ evens = second.collect{ it % 2 == 0 ? it : x.exchange(it) } }
[t1, t2]*.join()
println "evens: $evens"
println "odds: $odds"
|
Catching Exceptions with an Exception Handler
...