...
| Code Block |
|---|
//JAVA CODE
import java.util.concurrent.*;
import java.lang.ref.*;
import groovy.lang.Closure;
import java.util.*;
public class Generator<T> implements Iterator<T>, Iterable<T>{
Semaphore availSemaphore=new Semaphore(0);
Semaphore emptySemaphore=new Semaphore(1);
//the thread can push one value at at time into pushedValue
T pushedValue=null;
//pull value moves it from pushedValue to pulledValue
//until it is released by next()
T pulledValue=null;
boolean hasPulledValue=false;
Thread internalThread;
Generator(Closure closure){
internalThread=new GeneratorThread<T>(this,closure);
internalThread.setDaemon(true);
internalThread.start();
}
private void pullValue(){
availSemaphore.acquireUninterruptibly();
pulledValue=pushedValue;
pushedValue=null;
hasPulledValue=true;
emptySemaphore.release();
}
public boolean hasNext(){
if (!hasPulledValue)
pullValue();
return emptySemaphore.availablePermits() != 2;
}
public T next(){
if (!hasNext())
throw new NoSuchElementException("Closure has no more values");
T retval=pulledValue;
hasPulledValue=false;
return retval;
}
public void remove(){
throw new UnsupportedOperationException(
"Remove is not supported on generators");
}
public Iterator<T> iterator(){
return this;
}
public void finalize(){
internalThread.interrupt();
}
static class GeneratorThread<T> extends Thread{
WeakReference<Generator<T>> generatorRef;
Closure closure;
public GeneratorThread(Generator<T> generator, Closure cl){
generatorRef=new WeakReference<Generator<T>>(generator);
closure=cl;
}
public void run(){
closure.call(new SaveClosure<T>(this));
Generator generator=generatorRef.get();
//NOTE: when the closure completes, pullValue() will block forever
//waiting for more available data. This release() allows it to
//get in one last time, and read a variable indicating that the
//thread has died and isn't producing any more data. one final
//pullValue() run will have emptySemaphore==1 and
//availSemaphore==1, and it will make emptySemaphore==2 thus
//indicating that the thread has died
if (generator!=null){
generator.availSemaphore.release();
}
//NOTE: if the generator has been garbage collected, we don't care
//about letting the generator pull a termination condition.
}
}
static class SaveClosure<T> extends Closure{
WeakReference<Generator<T>> generatorRef;
Semaphore emptySemaphore;
Semaphore availSemaphore;
public SaveClosure(GeneratorThread<T> gt){
super(gt,null);
generatorRef=gt.generatorRef;
Generator<T> generator=generatorRef.get();
if (generator!=null){
emptySemaphore=generator.emptySemaphore;
availSemaphore=generator.availSemaphore;
}else{
throw new GeneratorDisposedException();
}
}
public void doCall(T value){
try{
emptySemaphore.acquire();
}catch(InterruptedException e){
throw new GeneratorDisposedException();
}
Generator<T> generator=generatorRef.get();
if (generator!=null){
generator.pushedValue=value;
}else{
throw new GeneratorDisposedException();
}
availSemaphore.release();
}
}
/**
* A GeneratorDisposedException is used to terminate the thread
* that was generating values, once the Generator has been garbage
* collected.
*/
static public class GeneratorDisposedException extends RuntimeException{
}
}
|
...
As the Groovy runtime implements this now, you would exhaust all available ram when converting the MethodClosure to an iterator, before find() was ever called. With a Generator, values are only generated on demand.
NOTEDisadvantages:
- because of the use of threads, the generator may generate one more value than is actually needed before the garbage collector disposes of the generator.
- Exceptions thrown from the generator method are not propagated back to the caller (again, because the function is called on a different thread)