in core/src/main/java/org/apache/airavata/mft/core/DoubleStreamingBuffer.java [147:225]
public int read() throws IOException {
if (!barrierPassed) {
try {
if (readBuffer1) {
buffer1Lock.lock();
} else {
buffer2Lock.lock();
}
// wait for writer to enter into read block for the first time
barrier.await();
barrierPassed = true;
} catch (Exception e) {
throw new IOException(e);
}
}
//System.out.println("Read " + readBuffer1 + " " + buf1Remain + " " + buf2Remain);
if (readBuffer1) {
if (buf1Remain > 0) {
buf1Remain --;
//System.out.println("Readval " + (buffer1[readPoint] & 0xff));
processedBytes++;
return buffer1[readPoint++] & 0xff;
} else {
if (doneWrite && buf2Remain <= 0) {
//System.out.println("Return -1");
return -1;
}
buffer2Lock.lock();
readBuffer1 = false;
buffer1Lock.unlock();
readPoint = 0;
try {
// Wait for writer to move into next buffer
barrier.await();
} catch (Exception e) {
throw new IOException();
}
//return read();
buf2Remain --;
processedBytes++;
return buffer2[readPoint++] & 0xff;
}
} else {
if (buf2Remain > 0) {
buf2Remain --;
//System.out.println("Readval " + (buffer2[readPoint] & 0xff));
processedBytes++;
return buffer2[readPoint++] & 0xff;
} else {
if (doneWrite && buf1Remain <= 0) {
//System.out.println("Return -1");
return -1;
}
buffer1Lock.lock();
readBuffer1 = true;
buffer2Lock.unlock();
readPoint = 0;
try {
// Wait for writer to move into next buffer
barrier.await();
} catch (Exception e) {
throw new IOException();
}
//return read();
buf1Remain --;
processedBytes++;
return buffer1[readPoint++] & 0xff;
}
}
}