in src/main/java/com/microsoft/azure/datalake/store/ADLFileOutputStream.java [140:201]
private void flush(SyncFlag syncFlag) throws IOException {
if (log.isTraceEnabled()) {
log.trace("flush() with data size {} at offset {} for client {} for file {}", cursor, remoteCursor, client.getClientId(), filename);
}
// Ignoring this, because HBase actually calls flush after close() <sigh>
if (streamClosed) return;
if (cursor == 0 && (syncFlag==SyncFlag.DATA)) return; // nothing to flush
if (cursor == 0 && lastFlushUpdatedMetadata && syncFlag == SyncFlag.METADATA) return; // do not send a
// flush if the last flush updated metadata and there is no data
if (cursor == 0 && syncFlag == SyncFlag.CLOSE) // For no data and syncflag.close just do close and return
{
closehandle();
return;
}
if (buffer == null) buffer = new byte[blocksize];
SyncFlag origsyncFlag = syncFlag;
if(syncFlag==SyncFlag.CLOSE) // syncflag close- we will do a close separately
{
syncFlag = SyncFlag.DATA;
origsyncFlag=SyncFlag.CLOSE;
}
RequestOptions opts = new RequestOptions();
opts.retryPolicy = client.makeExponentialBackoffPolicy();
opts.timeout = client.timeout + (1000 + (buffer.length / 1000 / 1000)); // 1 second grace per MB to upload
OperationResponse resp = new OperationResponse();
Core.append(filename, remoteCursor, buffer, 0, cursor, leaseId,
leaseId, syncFlag, client, opts, resp);
if (!resp.successful) {
if (resp.numRetries > 0 && resp.httpResponseCode == 400 && "BadOffsetException".equals(resp.remoteExceptionName)) {
// if this was a retry and we get bad offset, then this might be because we got a transient
// failure on first try, but request succeeded on back-end. In that case, the retry would fail
// with bad offset. To detect that, we check if there was a retry done, and if the current error we
// have is bad offset.
// If so, do a zero-length append at the current expected Offset, and if that succeeds,
// then the file length must be good - swallow the error. If this append fails, then the last append
// did not succeed and we have some other offset on server - bubble up the error.
long expectedRemoteLength = remoteCursor + cursor;
boolean append0Succeeded = doZeroLengthAppend(expectedRemoteLength);
if (append0Succeeded) {
log.debug("zero-length append succeeded at expected offset (" + expectedRemoteLength + "), " +
" ignoring BadOffsetException for session: " + leaseId + ", file: " + filename);
remoteCursor += cursor;
cursor = 0;
lastFlushUpdatedMetadata = false;
if(origsyncFlag == SyncFlag.CLOSE){
closehandle();
}
return;
} else {
log.debug("Append failed at expected offset(" + expectedRemoteLength +
"). Bubbling exception up for session: " + leaseId + ", file: " + filename);
}
}
throw client.getExceptionFromResponse(resp, "Error appending to file " + filename);
}
remoteCursor += cursor;
cursor = 0;
lastFlushUpdatedMetadata = (syncFlag != SyncFlag.DATA);
if(origsyncFlag == SyncFlag.CLOSE){
closehandle();
}
}