...
main of IOSession shows how to use it:
| Code Block |
|---|
class OutputSession implements java.io.Closeable { OutputStream outputStream String charactersetName Thread thread Long slow final Object sendLock = new Object() boolean aborted boolean closed = false OutputSession(OutputStream outputStream) { setOutputStream outputStream } OutputSession(OutputStream outputStream, String charactersetName) { setCharactersetName(charactersetName) setOutputStream outputStream } void send(String outStr) throws Exception { send outStr, 0 } void send(String outStr, long timeout) throws Exception { send charactersetName != null ? outStr.getBytes(charactersetName) : outStr.getBytes(), timeout } void send(byte[] outBytes, long timeout) { send outBytes, 0, outBytes.length, timeout } void send(final byte[] outBytes, final int outPtr, final int outLen, final long timeout) { synchronized (sendLock) { aborted = false if (closed) { throw new Exception("OutputSession is closed, cannot send out string: '$charactersetName==null?new String(outBytes,outPtr,outLen):new String(outBytes,outPtr,outLen,charactersetName)'") } if (outputStream == null) { throw new Exception("OutputStream is null, cannot send out string: '$charactersetName==null?new String(outBytes,outPtr,outLen):new String(outBytes,outPtr,outLen,charactersetName)'") } if (outLen > 0) { thread = Thread.startDaemon( { if (slow == null) { outputStream.write(outBytes, outPtr, outLen) outputStream.flush() outBytes = null outLen = 0 } else { int ptr = outPtr int len = outLen while (len > 0) { outputStream.write(outBytes, ptr, 1) outputStream.flush() ptr++ len-- if (len != 0 && slow != null) { sleep slow } } } } ) } try { if (timeout == 0) { thread.join() } else { thread.join(timeout) } } catch (InterruptedException ex) { } if (closed) { throw new IllegalStateException("OutputSession was closed while sending string: '$charactersetName==null?new String(outBytes,outPtr,outLen):new String(outBytes,outPtr,outLen,charactersetName)'") } if (aborted) { return; } if (outBytes != null) { throw new Exception("Timeout while sending string: '$charactersetName==null?new String(outBytes,outPtr,outLen):new String(outBytes,outPtr,outLen,charactersetName)'") } } } void abort() { aborted = true thread?.interrupt() } void close() { closed = true abort() } } class InputSession implements java.io.Closeable { static final int buffersize = 2048 byte[] inputData = new byte[buffersize] Thread thread StringBuffer stringBuffer = new StringBuffer() int caret = 0 String charactersetName InputStream inputStream Set<OutputSession> pipes = new HashSet<OutputSession>() boolean aborted boolean closed = false boolean timedOut long expirationTime String group String receivedUntil Object receivedObject Object expectations final Object lock = new Object() InputSession(InputStream inputStream, String charactersetName) { setCharactersetName charactersetName setInputStream inputStream } InputSession(InputStream inputStream) { setInputStream inputStream } void setExpectations(Object expectations) { if (expectations instanceof Collection || expectations instanceof Map) { this.expectations = expectations } else { this.expectations = new ArrayList() ((ArrayList) this.expectations).add(expectations) } } String expect(Object expectations, long timeout) { expirationTime = System.currentTimeMillis() + timeout synchronized (lock) { if (closed) { throw new IllegalStateException("Cannot expect on a closed InputSession") } setExpectations(expectations) timedOut = false aborted = false receivedObject = null receivedUntil = null if (stringBuffer.length() > caret) { if (check(null)) { return group } } while (this.expectations != null) { if(isClosed()){ throw new IllegalStateException("InputSession was closed while expecting: $expectations") } if (isAborted()) { return null } long timeToWait = expirationTime - System.currentTimeMillis() if (timeToWait > 0L) { println "waiting $timeToWait ms" lock.wait(timeToWait) } else { println "timed out" this.expectations=null timedOut = true return null } } return group } } void setInputStream(final InputStream inputStream) { if (inputStream != this.inputStream) { if (inputStream == null) { throw new IllegalArgumentException("Cannot set InputStream to null") } thread?.interrupt() this.inputStream = inputStream thread = Thread.startDaemon( { int total = 0 while (total >= 0) { total = inputStream.read(inputData, 0, buffersize) if (total > 0) { add(inputData, 0, total) } } } ) } } void add(byte[] data, int start, int total) { if (total > 0) { if (charactersetName != null) { add(new String(data, start, total, charactersetName)) } add(new String(data, start, total)) } } void add(String string) { if (string != null && string.length() > 0) { System.out.println("inputStream: " + string) for (OutputSession pipe: pipes) { pipe.send string } check string } } protected boolean check(String string) { synchronized (lock) { if (string != null) { stringBuffer.append(string) } if (expectations != null) { Integer pos = null Object received = null expectations.each {Object expectation -> if (pos != 0) { def o = expectation instanceof Map.Entry ? ((Map.Entry) expectation).key : expectation int p = -1 String g = null if (o instanceof Pattern) { Matcher m = ((Pattern) o).matcher(stringBuffer) if (m.find(caret)) { p = m.start() g = m.group() } } else { g = o.toString() p = stringBuffer.indexOf(g, caret) //println "looking for '$g' in '$stringBuffer' from: $caret index:$p" } if (p >= 0 && (pos == null || p < pos)) { received = expectation group = g pos = p } } } if (pos != null) { //println "Received: $received" receivedUntil = stringBuffer.substring(caret, pos) caret += pos + group.length() receivedObject = received expectations = null lock.notify() return true } } } return false } void abort() { aborted = true thread?.interrupt() } void close() { synchronized (lock) { closed = true abort(); } } } class IOSession implements java.io.Closeable { static final int buffersize = 2048 Map<Object, InputSession> inputSessions = new HashMap<Object, InputSession>() Map<Object, OutputSession> outputSessions = new HashMap<Object, OutputSession>() Map pipes = new HashMap() InputSession inputSession OutputSession outputSession Thread outputThread Thread inputThread String charactersetName long timeout = 10000 boolean closed = false final sendLock = new Object() IOSession(Object object) { add object } IOSession(Object object, String charactersetName) { add object, charactersetName } void setSlow(Long slow) { if (outputSession == null) { throw new IllegalStateException("Cannot set slow") } outputSession.setSlow slow } void add(Object object, String charactersetName) { if (object == null) { throw new IllegalArgumentException("Cannot add null to Session") } if (!addOutputStream(object, charactersetName) && !addInputStream(object, charactersetName) && !addProcess(object, charactersetName)) { throw new IllegalArgumentException("Cannot add '$object' to Session") } } void add(Object object) { if (object == null) { throw new IllegalArgumentException("Cannot add null to Session") } if (!addOutputStream(object) && !addInputStream(object) && !addProcess(object)) { throw new IllegalArgumentException("Cannot add '$object' to Session") } } boolean addOutputStream(Object object) { if (object == null) { return false } if (object instanceof File) { return addOutputStreamEntry(new MapEntry(object, new FileOutputStream((File) object))) } if (object instanceof String) { return addOutputStreamEntry(new MapEntry(object, new FileOutputStream((String) object))) } if (object instanceof OutputStream) { return addOutputStreamEntry(new MapEntry(object, (OutputStream) object)) } if (object instanceof Map.Entry) { return addOutputStreamEntry((Map.Entry) object) } return false } boolean addOutputStream(Object object, String charactersetName) { if (object == null) { return false } if (object instanceof File) { return addOutputStreamEntry(new MapEntry(object, new FileOutputStream((File) object)), charactersetName) } if (object instanceof String) { return addOutputStreamEntry(new MapEntry(object, new FileOutputStream((String) object)), charactersetName) } if (object instanceof OutputStream) { return addOutputStreamEntry(new MapEntry(object, (OutputStream) object), charactersetName) } if (object instanceof Map.Entry) { return addOutputStreamEntry((Map.Entry) object, charactersetName) } return false } boolean addOutputStreamEntry(Map.Entry entry) { if (entry.value instanceof OutputStream) { OutputSession old = outputSessions.get(entry.key) if (old != null) { old.setOutputStream((OutputStream) entry.value) } else { old = new OutputSession((OutputStream) entry.value) outputSessions.put entry.key, old } if (outputSession == null) { outputSession = old } return true } return false } boolean addOutputStreamEntry(Map.Entry entry, String charactersetName) { if (entry.value instanceof OutputStream) { OutputSession old = outputSessions.get(entry.key) if (old != null) { old.setCharactersetName charactersetName old.setOutputStream((OutputStream) entry.value) } else { old = new OutputSession((OutputStream) entry.value, charactersetName) outputSessions.put entry.key, old } if (outputSession == null) { outputSession = old } return true } return false } boolean addProcess(Object object) { boolean added = false if (object.metaClass.respondsTo(object, "getOutputStream")) { added |= addOutputStream(object.getOutputStream()) } if (object.metaClass.respondsTo(object, "getInputStream")) { added |= addInputStream(object.getInputStream()) } if (object.metaClass.respondsTo(object, "getErrorStream")) { added |= addInputStream(object.getErrorStream()) } return added } boolean addProcess(Object object, String charactersetName) { boolean added = false if (object.metaClass.respondsTo(object, "getOutputStream")) { added |= addOutputStream(object.getOutputStream(), charactersetName) } if (object.metaClass.respondsTo(object, "getInputStream")) { added |= addInputStream(object.getInputStream(), charactersetName) } if (object.metaClass.respondsTo(object, "getErrorStream")) { added |= addInputStream(object.getErrorStream(), charactersetName) } return added } boolean addInputStream(Object object) { if (object == null) { return false } if (object instanceof File) { return addInputStreamEntry(new MapEntry(object, new FileInputStream((File) object))) } if (object instanceof String) { return addInputStreamEntry(new MapEntry(object, new FileInputStream((String) object))) } if (object instanceof InputStream) { return addInputStreamEntry(new MapEntry(object, (InputStream) object)) } if (object instanceof Map.Entry) { return addInputStreamEntry((Map.Entry) object) } return false } boolean addInputStream(Object object, String charactersetName) { if (object == null) { return false } if (object instanceof File) { return addInputStreamEntry(new MapEntry(object, new FileInputStream((File) object)), charactersetName) } if (object instanceof String) { return addInputStreamEntry(new MapEntry(object, new FileInputStream((String) object)), charactersetName) } if (object instanceof InputStream) { return addInputStreamEntry(new MapEntry(object, (InputStream) object), charactersetName) } if (object instanceof Map.Entry) { return addInputStreamEntry((Map.Entry) object, charactersetName) } return false } boolean addInputStreamEntry(Map.Entry entry) { if (entry.value instanceof InputStream) { InputSession old = inputSessions.get(entry.key) if (old != null) { old.setInputStream((InputStream) entry.value) } else { old = new InputSession((InputStream) entry.value) inputSessions.put(entry.key, old) } if (inputSession == null) { inputSession = old } return true } return false } boolean addInputStreamEntry(Map.Entry entry, String charactersetName) { if (entry.value instanceof InputStream) { InputSession old = inputSessions.get(entry.key) if (old != null) { old.setCharactersetName charactersetName old.setInputStream((InputStream) entry.value) } else { old = new InputSession((InputStream) entry.value, charactersetName) inputSessions.put(entry.key, old) } if (inputSession == null) { inputSession = old } return true } return false } void send(Closure outStr) throws Exception { send(outStr.call(this).toString()) } void send(String str) throws Exception { if (outputSession == null) { throw new IllegalStateException("Cannot send, because outputSession is not set") } else { outputSession.send(str, timeout) } } void send(byte[] outBytes) { if (outputSession == null) { throw new IllegalStateException("Cannot send, because outputSession is not set") } else { outputSession.send(outBytes, timeout) } } void send(byte[] outBytes, int outPtr, int outLen) { if (outputSession == null) { p throw new IllegalStateException("Cannot send, because outputSession is not set") } else { outputSession.send(outBytes, outPtr, outLen, timeout) } } void fail(String failureMessage) { throw new IllegalStateException(failureMessage) } void fail(Closure failureMessage) { fail(failureMessage.call(this).toString()) } String expect(Object expectations, long timeout, Closure onTimeout) { if (inputSession == null) { throw new IllegalStateException("Cannot expect, because inputSession is not set") } inputSession.expect(expectations, timeout) if (inputSession.isClosed()) { throw new IllegalStateException("InputSession was closed while expecting: $expectations") } if (inputSession.isAborted()) { return null } if (inputSession.isTimedOut()) { return onTimeout?.call(this) } Object o = inputSession.getReceivedObject() if (o == null) { throw new IllegalStateException("InputSession ended without receiving: $expectations") } if (o instanceof Map.Entry && ((Map.Entry) o).value instanceof Closure) { return ((Closure) ((Map.Entry) o).value).call(this) } return inputSession.getGroup() } String expect(Object expectations, Closure onTimeout) { return expect(expectations, timeout, onTimeout) } String expect(Object expectations, long timeout) { return expect(expectations, timeout, {IOSession it -> it.fail "Timeout while expecting: $expectations"}) } String expect(Object expectations) { return expect(expectations, timeout) } void abort() { outputSessions.each {OutputSession it -> it.abort()} inputSessions.each {InputSession it -> it.abort()} } void close() throws java.io.IOException { closed = true outputSessions.each {OutputSession it -> it.close()} inputSessions.each {InputSession it -> it.close()} } public static void main(String[] args) { println "main..." ServerSocket serverSocket = new ServerSocket(8995) Thread.start { IOSession client = new IOSession(new Socket('127.0.0.1', 8995)) println 'client sending GET...' client.send 'GET' client.expect 'HTML' println Double.parseDouble(client.expect(~/[-+]?[0-9]*\.?[0-9]+\s/).toString().trim()) client.send 'PUT' println "Client OK" } Thread.start { IOSession server = new IOSession(serverSocket.accept()) println 'server expecting...' println "expect on server returned: " + server.expect([(~/G?T/): {IOSession session -> println 'server has received a GET request'; session.send 'HTML2.34\n'}, 'PUT': {println 'server has received a PUT request'}]) println "expect on server returned: " + server.expect([new MapEntry('GET', {println 'server has received a GET'}), 'PUT']) println "Server OK" } } public static void main(String[] args) { println "main..." ServerSocket serverSocket = new ServerSocket(8995) Thread.start { IOSession client = new IOSession(new Socket('127.0.0.1', 8995)) println 'client sending GET...' client.send 'GET' client.expect 'HTML' println Double.parseDouble(client.expect(~/[-+]?[0-9]*\.?[0-9]+\s/).toString().trim()) client.send 'PUT' println "Client OK" } Thread.start { IOSession server = new IOSession(serverSocket.accept()) println 'server expecting...' println "expect on server returned: " + server.expect([(~/G?T/): {IOSession session -> println 'server has received a GET request'; session.send 'HTML2.34\n'}, 'PUT': {println 'server has received a PUT request'}]) println "expect on server returned: " + server.expect([new MapEntry('GET', {println 'server has received a GET'}), 'PUT']) println "Server OK" } } } |