Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

All To create a simple groovy version of the "expect" concept, all you need are the three Groovy classes below.

The main of IOSession shows how to use it:

Code Block
titleOutputSession.groovy
languagegroovy
collapsetrue
class OutputSession implements java.io.Closeable {
    OutputStream outputStream
    String charactersetName
    Thread thread
    Long slow
    final Object sendLock = new Object()
    boolean aborted
    boolean closed = false

    OutputSession(OutputStream outputStream) {
        setOutputStream outputStream
    }

    OutputSession(OutputStream outputStream, String charactersetName) {
        setCharactersetName(charactersetName)
        setOutputStream outputStream
    }

    void send(String outStr) throws Exception {
        send outStr, 0
    }

    void send(String outStr, long timeout) throws Exception {
        send charactersetName != null ? outStr.getBytes(charactersetName) : outStr.getBytes(), timeout
    }

    void send(byte[] outBytes, long timeout) {
        send outBytes, 0, outBytes.length, timeout
    }

    void send(final byte[] outBytes, final int outPtr, final int outLen, final long timeout) {
        synchronized (sendLock) {
            aborted = false
            if (closed) {
                throw new Exception("OutputSession is closed, cannot send out string: '$charactersetName==null?new String(outBytes,outPtr,outLen):new String(outBytes,outPtr,outLen,charactersetName)'")
            }
            if (outputStream == null) {
                throw new Exception("OutputStream is null, cannot send out string: '$charactersetName==null?new String(outBytes,outPtr,outLen):new String(outBytes,outPtr,outLen,charactersetName)'")
            }
            if (outLen > 0) {
                thread = Thread.startDaemon(
                        {
                            if (slow == null) {
                                outputStream.write(outBytes, outPtr, outLen)
                                outputStream.flush()
                                outBytes = null
                                outLen = 0
                            } else {
                                int ptr = outPtr
                                int len = outLen
                                while (len > 0) {
                                    outputStream.write(outBytes, ptr, 1)
                                    outputStream.flush()
                                    ptr++
                                    len--
                                    if (len != 0 && slow != null) {
                                        sleep slow
                                    }
                                }
                            }
                        }
                )
            }
            try {
                if (timeout == 0) {
                    thread.join()
                } else {
                    thread.join(timeout)
                }
            } catch (InterruptedException ex) {
            }
            if (closed) {
                throw new IllegalStateException("OutputSession was closed while sending string: '$charactersetName==null?new String(outBytes,outPtr,outLen):new String(outBytes,outPtr,outLen,charactersetName)'")
            }
            if (aborted) {
                return;
            }
            if (outBytes != null) {
                throw new Exception("Timeout while sending string: '$charactersetName==null?new String(outBytes,outPtr,outLen):new String(outBytes,outPtr,outLen,charactersetName)'")
            }
        }
    }

    void abort() {
        aborted = true
        thread?.interrupt()
    }

    void close() {
        closed = true
        abort()
    }
}

Code Block
titleInputSession.groovy
languagegroovy
collapsetrue
class InputSession implements java.io.Closeable {
    static final int buffersize = 2048
    byte[] inputData = new byte[buffersize]
    Thread thread
    StringBuffer stringBuffer = new StringBuffer()
    int caret = 0
    String charactersetName
    InputStream inputStream
    Set<OutputSession> pipes = new HashSet<OutputSession>()
    boolean aborted
    boolean closed = false
    boolean timedOut
    long expirationTime
    String group

    String receivedUntil
    Object receivedObject
    Object expectations

    final Object lock = new Object()

    InputSession(InputStream inputStream, String charactersetName) {
        setCharactersetName charactersetName
        setInputStream inputStream
    }

    InputSession(InputStream inputStream) {
        setInputStream inputStream
    }

    void setExpectations(Object expectations) {
        if (expectations instanceof Collection || expectations instanceof Map) {
            this.expectations = expectations
        } else {
            this.expectations = new ArrayList()
            ((ArrayList) this.expectations).add(expectations)
        }
    }

    String expect(Object expectations, long timeout) {
        expirationTime = System.currentTimeMillis() + timeout
        synchronized (lock) {
            if (closed) {
                throw new IllegalStateException("Cannot expect on a closed InputSession")
            }
            setExpectations(expectations)
            timedOut = false
            aborted = false
            receivedObject = null
            receivedUntil = null
            if (stringBuffer.length() > caret) {
                if (check(null)) {
                    return group
                }
            }
            while (this.expectations != null) {
                if(isClosed()){
                    throw new IllegalStateException("InputSession was closed while expecting: $expectations")
                }
                if (isAborted()) {
                    return null
                }
                long timeToWait = expirationTime - System.currentTimeMillis()
                if (timeToWait > 0L) {
                    println "waiting $timeToWait ms"
                    lock.wait(timeToWait)
                } else {
                    println "timed out"
                    this.expectations=null
                    timedOut = true
                    return null
                }
            }
            return group
        }
    }

    void setInputStream(final InputStream inputStream) {
        if (inputStream != this.inputStream) {
            if (inputStream == null) {
                throw new IllegalArgumentException("Cannot set InputStream to null")
            }
            thread?.interrupt()
            this.inputStream = inputStream
            thread = Thread.startDaemon(
                    {
                        int total = 0
                        while (total >= 0) {
                            total = inputStream.read(inputData, 0, buffersize)
                            if (total > 0) {
                                add(inputData, 0, total)
                            }
                        }
                    }
            )
        }
    }

    void add(byte[] data, int start, int total) {
        if (total > 0) {
            if (charactersetName != null) {
                add(new String(data, start, total, charactersetName))
            }
            add(new String(data, start, total))
        }
    }

    void add(String string) {
        if (string != null && string.length() > 0) {
            System.out.println("inputStream: " + string)
            for (OutputSession pipe: pipes) {
                pipe.send string
            }
            check string
        }
    }

    protected boolean check(String string) {
        synchronized (lock) {
            if (string != null) {
                stringBuffer.append(string)
            }
            if (expectations != null) {
                Integer pos = null
                Object received = null
                expectations.each {Object expectation ->
                    if (pos != 0) {
                        def o = expectation instanceof Map.Entry ? ((Map.Entry) expectation).key : expectation
                        int p = -1
                        String g = null
                        if (o instanceof Pattern) {
                            Matcher m = ((Pattern) o).matcher(stringBuffer)
                            if (m.find(caret)) {
                                p = m.start()
                                g = m.group()
                            }
                        } else {
                            g = o.toString()
                            p = stringBuffer.indexOf(g, caret)
                            //println "looking for '$g' in '$stringBuffer' from: $caret index:$p"
                        }
                        if (p >= 0 && (pos == null || p < pos)) {
                            received = expectation
                            group = g
                            pos = p
                        }
                    }
                }
                if (pos != null) {
                    //println "Received: $received"
                    receivedUntil = stringBuffer.substring(caret, pos)
                    caret += pos + group.length()
                    receivedObject = received
                    expectations = null
                    lock.notify()
                    return true
                }
            }
        }
        return false
    }

    void abort() {
        aborted = true
        thread?.interrupt()
    }

    void close() {
        synchronized (lock) {
            closed = true
            abort();
        }
    }
}


Code Block
titleIOSession.groovy
languagegroovy
collapsetrue
class IOSession implements java.io.Closeable {
    static final int buffersize = 2048
    Map<Object, InputSession> inputSessions = new HashMap<Object, InputSession>()
    Map<Object, OutputSession> outputSessions = new HashMap<Object, OutputSession>()

    Map pipes = new HashMap()

    InputSession inputSession
    OutputSession outputSession
    Thread outputThread
    Thread inputThread
    String charactersetName
    long timeout = 10000

    boolean closed = false

    final sendLock = new Object()

    IOSession(Object object) {
        add object
    }

    IOSession(Object object, String charactersetName) {
        add object, charactersetName
    }

    void setSlow(Long slow) {
        if (outputSession == null) {
            throw new IllegalStateException("Cannot set slow")
        }
        outputSession.setSlow slow
    }

    void add(Object object, String charactersetName) {
        if (object == null) {
            throw new IllegalArgumentException("Cannot add null to Session")
        }
        if (!addOutputStream(object, charactersetName) && !addInputStream(object, charactersetName) && !addProcess(object, charactersetName)) {
            throw new IllegalArgumentException("Cannot add '$object' to Session")
        }
    }

    void add(Object object) {
        if (object == null) {
            throw new IllegalArgumentException("Cannot add null to Session")
        }
        if (!addOutputStream(object) && !addInputStream(object) && !addProcess(object)) {
            throw new IllegalArgumentException("Cannot add '$object' to Session")
        }
    }

    boolean addOutputStream(Object object) {
        if (object == null) {
            return false
        }
        if (object instanceof File) {
            return addOutputStreamEntry(new MapEntry(object, new FileOutputStream((File) object)))
        }
        if (object instanceof String) {
            return addOutputStreamEntry(new MapEntry(object, new FileOutputStream((String) object)))
        }
        if (object instanceof OutputStream) {
            return addOutputStreamEntry(new MapEntry(object, (OutputStream) object))
        }
        if (object instanceof Map.Entry) {
            return addOutputStreamEntry((Map.Entry) object)
        }
        return false
    }

    boolean addOutputStream(Object object, String charactersetName) {
        if (object == null) {
            return false
        }
        if (object instanceof File) {
            return addOutputStreamEntry(new MapEntry(object, new FileOutputStream((File) object)), charactersetName)
        }
        if (object instanceof String) {
            return addOutputStreamEntry(new MapEntry(object, new FileOutputStream((String) object)), charactersetName)
        }
        if (object instanceof OutputStream) {
            return addOutputStreamEntry(new MapEntry(object, (OutputStream) object), charactersetName)
        }
        if (object instanceof Map.Entry) {
            return addOutputStreamEntry((Map.Entry) object, charactersetName)
        }
        return false
    }

    boolean addOutputStreamEntry(Map.Entry entry) {
        if (entry.value instanceof OutputStream) {
            OutputSession old = outputSessions.get(entry.key)
            if (old != null) {
                old.setOutputStream((OutputStream) entry.value)
            } else {
                old = new OutputSession((OutputStream) entry.value)
                outputSessions.put entry.key, old
            }
            if (outputSession == null) {
                outputSession = old
            }
            return true
        }
        return false
    }

    boolean addOutputStreamEntry(Map.Entry entry, String charactersetName) {
        if (entry.value instanceof OutputStream) {
            OutputSession old = outputSessions.get(entry.key)
            if (old != null) {
                old.setCharactersetName charactersetName
                old.setOutputStream((OutputStream) entry.value)
            } else {
                old = new OutputSession((OutputStream) entry.value, charactersetName)
                outputSessions.put entry.key, old
            }
            if (outputSession == null) {
                outputSession = old
            }
            return true
        }
        return false
    }

    boolean addProcess(Object object) {
        boolean added = false
        if (object.metaClass.respondsTo(object, "getOutputStream")) {
            added |= addOutputStream(object.getOutputStream())
        }
        if (object.metaClass.respondsTo(object, "getInputStream")) {
            added |= addInputStream(object.getInputStream())
        }
        if (object.metaClass.respondsTo(object, "getErrorStream")) {
            added |= addInputStream(object.getErrorStream())
        }
        return added
    }

    boolean addProcess(Object object, String charactersetName) {
        boolean added = false
        if (object.metaClass.respondsTo(object, "getOutputStream")) {
            added |= addOutputStream(object.getOutputStream(), charactersetName)
        }
        if (object.metaClass.respondsTo(object, "getInputStream")) {
            added |= addInputStream(object.getInputStream(), charactersetName)
        }
        if (object.metaClass.respondsTo(object, "getErrorStream")) {
            added |= addInputStream(object.getErrorStream(), charactersetName)
        }
        return added
    }

    boolean addInputStream(Object object) {
        if (object == null) {
            return false
        }
        if (object instanceof File) {
            return addInputStreamEntry(new MapEntry(object, new FileInputStream((File) object)))
        }
        if (object instanceof String) {
            return addInputStreamEntry(new MapEntry(object, new FileInputStream((String) object)))
        }
        if (object instanceof InputStream) {
            return addInputStreamEntry(new MapEntry(object, (InputStream) object))
        }
        if (object instanceof Map.Entry) {
            return addInputStreamEntry((Map.Entry) object)
        }
        return false
    }

    boolean addInputStream(Object object, String charactersetName) {
        if (object == null) {
            return false
        }
        if (object instanceof File) {
            return addInputStreamEntry(new MapEntry(object, new FileInputStream((File) object)), charactersetName)
        }
        if (object instanceof String) {
            return addInputStreamEntry(new MapEntry(object, new FileInputStream((String) object)), charactersetName)
        }
        if (object instanceof InputStream) {
            return addInputStreamEntry(new MapEntry(object, (InputStream) object), charactersetName)
        }
        if (object instanceof Map.Entry) {
            return addInputStreamEntry((Map.Entry) object, charactersetName)
        }
        return false
    }

    boolean addInputStreamEntry(Map.Entry entry) {
        if (entry.value instanceof InputStream) {
            InputSession old = inputSessions.get(entry.key)
            if (old != null) {
                old.setInputStream((InputStream) entry.value)
            } else {
                old = new InputSession((InputStream) entry.value)
                inputSessions.put(entry.key, old)
            }
            if (inputSession == null) {
                inputSession = old
            }
            return true

        }
        return false
    }

    boolean addInputStreamEntry(Map.Entry entry, String charactersetName) {
        if (entry.value instanceof InputStream) {
            InputSession old = inputSessions.get(entry.key)
            if (old != null) {
                old.setCharactersetName charactersetName
                old.setInputStream((InputStream) entry.value)
            } else {
                old = new InputSession((InputStream) entry.value, charactersetName)
                inputSessions.put(entry.key, old)
            }
            if (inputSession == null) {
                inputSession = old
            }
            return true

        }
        return false
    }

    void send(Closure outStr) throws Exception {
        send(outStr.call(this).toString())
    }

    void send(String str) throws Exception {
        if (outputSession == null) {
            throw new IllegalStateException("Cannot send, because outputSession is not set")
        } else {
            outputSession.send(str, timeout)
        }
    }

    void send(byte[] outBytes) {
        if (outputSession == null) {
            throw new IllegalStateException("Cannot send, because outputSession is not set")
        } else {
            outputSession.send(outBytes, timeout)
        }
    }

    void send(byte[] outBytes, int outPtr, int outLen) {
        if (outputSession == null) {
            p
            throw new IllegalStateException("Cannot send, because outputSession is not set")
        } else {
            outputSession.send(outBytes, outPtr, outLen, timeout)
        }
    }

    void fail(String failureMessage) {
        throw new IllegalStateException(failureMessage)
    }

    void fail(Closure failureMessage) {
        fail(failureMessage.call(this).toString())
    }

    String expect(Object expectations, long timeout, Closure onTimeout) {
        if (inputSession == null) {
            throw new IllegalStateException("Cannot expect, because inputSession is not set")
        }
        inputSession.expect(expectations, timeout)
        if (inputSession.isClosed()) {
            throw new IllegalStateException("InputSession was closed while expecting: $expectations")
        }
        if (inputSession.isAborted()) {
            return null
        }
        if (inputSession.isTimedOut()) {
            return onTimeout?.call(this)
        }
        Object o = inputSession.getReceivedObject()
        if (o == null) {
            throw new IllegalStateException("InputSession ended without receiving: $expectations")
        }
        if (o instanceof Map.Entry && ((Map.Entry) o).value instanceof Closure) {
            return ((Closure) ((Map.Entry) o).value).call(this)
        }
        return inputSession.getGroup()
    }


    String expect(Object expectations, Closure onTimeout) {
        return expect(expectations, timeout, onTimeout)
    }

    String expect(Object expectations, long timeout) {
        return expect(expectations, timeout, {IOSession it -> it.fail "Timeout while expecting: $expectations"})
    }

    String expect(Object expectations) {
        return expect(expectations, timeout)
    }

    void abort() {
        outputSessions.each {OutputSession it -> it.abort()}
        inputSessions.each {InputSession it -> it.abort()}
    }

    void close() throws java.io.IOException {
        closed = true
        outputSessions.each {OutputSession it -> it.close()}
        inputSessions.each {InputSession it -> it.close()}
    }

    public static void main(String[] args) {
        println "main..."
        ServerSocket serverSocket = new ServerSocket(8995)
        Thread.start {
            IOSession client = new IOSession(new Socket('127.0.0.1', 8995))
            println 'client sending GET...'
            client.send 'GET'
            client.expect 'HTML'
            println Double.parseDouble(client.expect(~/[-+]?[0-9]*\.?[0-9]+\s/).toString().trim())
            client.send 'PUT'
            println "Client OK"
        }
        Thread.start {
            IOSession server = new IOSession(serverSocket.accept())
            println 'server expecting...'
            println "expect on server returned: " + server.expect([(~/G?T/): {IOSession session -> println 'server has received a GET request'; session.send 'HTML2.34\n'}, 'PUT': {println 'server has received a PUT request'}])
            println "expect on server returned: " + server.expect([new MapEntry('GET', {println 'server has received a GET'}), 'PUT'])
            println "Server OK"
        }
    }
}