in connectors/gts/connector/src/main/java/org/apache/manifoldcf/agents/output/gts/HttpPoster.java [701:1067]
public void run()
{
long length = document.getBinaryLength();
InputStream is = document.getBinaryStream();
try
{
// Do the operation!
long fullStartTime = System.currentTimeMillis();
// Open a socket to ingest, and to the response stream to get the post result
try
{
// Set up the socket, and the (optional) secure socket.
long responseRetryCount = responseRetries + (long)((float)length * sizeCoefficient);
Socket socket = createSocket(responseRetryCount);
try
{
InputStreamReader isr = new InputStreamReader(socket.getInputStream(),"ASCII");
try
{
BufferedReader in = new BufferedReader(isr);
try
{
OutputStream out = socket.getOutputStream();
try
{
// Create the output stream to GTS
String uri = url.getFile();
if (uri.length() == 0)
uri = "/";
byte[] tmp = ("POST " + uri + " HTTP/1.0\r\n").getBytes(StandardCharsets.UTF_8);
out.write(tmp, 0, tmp.length);
// Set all the headers
tmp = ("Document-URI: " + documentURI + "\r\n").getBytes(StandardCharsets.UTF_8);
out.write(tmp, 0, tmp.length);
writeCredentials(out);
// Apply ACL if present
if (aclXmlString.length() > 0)
{
String encodedACL = new Base64().encodeByteArray(aclXmlString.getBytes(StandardCharsets.UTF_8));
// Break into chunks - 4K each - 'cause otherwise we blow up the ingester.
int index = 0;
while (true)
{
if (index + HEADER_CHUNK >= encodedACL.length())
{
tmp = ("Document-ACL: " + encodedACL.substring(index) + "\r\n").getBytes(StandardCharsets.UTF_8);
out.write(tmp, 0, tmp.length);
break;
}
tmp = ("Document-ACL: " + encodedACL.substring(index,index + HEADER_CHUNK) + "\r\n").getBytes(StandardCharsets.UTF_8);
out.write(tmp, 0, tmp.length);
index += HEADER_CHUNK;
}
}
// Do the collections
if (collections != null)
{
for (String collectionName : collections)
{
String encodedValue = metadataEncode(collectionName);
//System.out.println("collection metadata: collection_name = '"+encodedValue+"'");
tmp = ("Document-Metadata: collection_name="+encodedValue+"\r\n").getBytes(StandardCharsets.UTF_8);
out.write(tmp, 0, tmp.length);
}
}
// Do the document template
if (documentTemplate != null && documentTemplate.length() > 0)
{
String encodedTemplate = new Base64().encodeByteArray(documentTemplate.getBytes(StandardCharsets.UTF_8));
// Break into chunks - 4K each - 'cause otherwise we blow up the ingester.
int index = 0;
while (true)
{
if (index + HEADER_CHUNK >= encodedTemplate.length())
{
tmp = ("Document-Template: " + encodedTemplate.substring(index) + "\r\n").getBytes(StandardCharsets.UTF_8);
out.write(tmp, 0, tmp.length);
break;
}
tmp = ("Document-Template: " + encodedTemplate.substring(index,index + HEADER_CHUNK) + "\r\n").getBytes(StandardCharsets.UTF_8);
out.write(tmp, 0, tmp.length);
index += HEADER_CHUNK;
}
}
// Write all the metadata, if any
Iterator<String> iter = document.getFields();
while (iter.hasNext())
{
String fieldName = iter.next();
String[] values = document.getFieldAsStrings(fieldName);
// We only handle strings right now!!!
int k = 0;
while (k < values.length)
{
String value = (String)values[k++];
String encodedValue = metadataEncode(value);
//System.out.println("Metadata: Name = '"+fieldName+"', value = '"+encodedValue+"'");
tmp = ("Document-Metadata: "+ fieldName+"="+encodedValue+"\r\n").getBytes(StandardCharsets.UTF_8);
out.write(tmp, 0, tmp.length);
}
}
tmp = ("Content-length: " + new Long(length).toString() + "\r\n\n").getBytes(StandardCharsets.UTF_8);
out.write(tmp, 0, tmp.length);
long total = 0;
long now, later;
now = System.currentTimeMillis();
byte[] bytes = new byte[buffersize];
// Write out the contents of the inputstream to the socket
while (true)
{
int count;
// Specially catch all errors that come from reading the input stream itself.
// This will help us segregate errors that come from the stream vs. those that come from the ingestion system.
try
{
count = is.read(bytes);
}
catch (java.net.SocketTimeoutException ioe)
{
// We have to catch socket timeout exceptions specially, because they are derived from InterruptedIOException
// They are otherwise just like IOExceptions
// Log the error
Logging.ingest.warn("Error reading data for transmission to Ingestion API: "+ioe.getMessage(),ioe);
activityStart = new Long(fullStartTime);
activityCode = "-1";
activityDetails = "Couldn't read document: "+ioe.getMessage();
// If this continues, we should indeed abort the job. Retries should not go on indefinitely either; 2 hours is plenty
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("IO error reading document for ingestion: "+ioe.getMessage()+"; read will be retried again later",
ioe,
currentTime + interruptionRetryTime,
currentTime + 2L * 60L * 60000L,
-1,
true);
}
catch (InterruptedIOException ioe)
{
// If the transfer was interrupted, it may be because we are shutting down the thread.
// Third-party library exceptions derived from InterruptedIOException are possible; if the stream comes from httpclient especially.
// If we see one of these, we treat it as "not an interruption".
if (!ioe.getClass().getName().equals("java.io.InterruptedIOException"))
{
// Log the error
Logging.ingest.warn("Error reading data for transmission to Ingestion API: "+ioe.getMessage(),ioe);
activityStart = new Long(fullStartTime);
activityCode = "-1";
activityDetails = "Couldn't read document: "+ioe.getMessage();
// If this continues, we should indeed abort the job. Retries should not go on indefinitely either; 2 hours is plenty
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("IO error reading document for ingestion: "+ioe.getMessage()+"; read will be retried again later",
ioe,
currentTime + interruptionRetryTime,
currentTime + 2L * 60L * 60000L,
-1,
true);
}
else
throw ioe;
}
catch (IOException ioe)
{
// We need to decide whether to throw a service interruption or metacarta exception, based on what went wrong.
// We never retry here; the cause is the repository, so there's not any point.
// Log the error
Logging.ingest.warn("Error reading data for transmission to Ingestion API: "+ioe.getMessage(),ioe);
activityStart = new Long(fullStartTime);
activityCode = "-1";
activityDetails = "Couldn't read document: "+ioe.getMessage();
// If this continues, we should indeed abort the job. Retries should not go on indefinitely either; 2 hours is plenty
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("IO error reading document for ingestion: "+ioe.getMessage()+"; read will be retried again later",
ioe,
currentTime + interruptionRetryTime,
currentTime + 2L * 60L * 60000L,
-1,
true);
}
if (count == -1)
break;
readFromDocumentStreamYet = true;
out.write(bytes,0,count);
total += (long)count;
}
later = System.currentTimeMillis();
if (Logging.ingest.isDebugEnabled())
Logging.ingest.debug("Total bytes posted: " + new Long(total).toString() + ", total time: " + (later - now));
out.flush();
// Now, process response
String res;
try
{
res = getResponse(in);
}
catch (ServiceInterruption si)
{
activityStart = new Long(now);
activityCode = "-2";
activityDetails = si.getMessage();
throw si;
}
if (Logging.ingest.isDebugEnabled())
Logging.ingest.debug("Response code from ingest: '" + res + "'");
CodeDetails cd = new CodeDetails(res);
activityStart = new Long(now);
activityBytes = new Long(length);
activityCode = cd.getCode();
activityDetails = cd.getDetails();
int codeValue = cd.getCodeValue();
// A negative number means http error of some kind.
if (codeValue < 0)
throw new ManifoldCFException("Http protocol error");
// 200 means everything went OK
if (codeValue == 200)
{
rval = true;
return;
}
// Anything else means the document didn't ingest.
// There are three possibilities here:
// 1) The document will NEVER ingest (it's illegal), in which case a 400 or 403 will be returned, and
// 2) There is a transient error, in which case we will want to try again, after a wait.
// If the situation is (2), then we CAN'T retry if we already read any of the stream; therefore
// we are forced to throw a "service interrupted" exception, and let the caller reschedule
// the ingestion.
// 3) Something is wrong with the setup, e.g. bad credentials. In this case we chuck a ManifoldCFException,
// since this will abort the current activity entirely.
if (codeValue == 401)
throw new ManifoldCFException("Bad credentials for ingestion",ManifoldCFException.SETUP_ERROR);
if (codeValue >= 400 && codeValue < 500)
{
rval = false;
return;
}
// If this continues, we should indeed abort the job. Retries should not go on indefinitely either; 2 hours is plenty
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("Error "+Integer.toString(codeValue)+" from ingestion request; ingestion will be retried again later",
new ManifoldCFException("Ingestion HTTP error code "+Integer.toString(codeValue)),
currentTime + interruptionRetryTime,
currentTime + 2L * 60L * 60000L,
-1,
true);
}
finally
{
out.close();
}
}
finally
{
in.close();
}
}
finally
{
isr.close();
}
}
finally
{
try
{
socket.close();
}
catch (InterruptedIOException e)
{
throw e;
}
catch (IOException e)
{
Logging.ingest.debug("Error closing socket: "+e.getMessage(),e);
// Do NOT rethrow
}
}
}
catch (java.net.SocketTimeoutException ioe)
{
// These are just like IO errors, but since they are derived from InterruptedIOException, they have to be caught first.
// Log the error
Logging.ingest.warn("Error connecting to ingestion API: "+ioe.getMessage(),ioe);
activityStart = new Long(fullStartTime);
activityCode = "-1";
activityDetails = ioe.getMessage();
throw ioe;
}
catch (InterruptedIOException e)
{
return;
}
catch (IOException ioe)
{
activityStart = new Long(fullStartTime);
// Intercept "broken pipe" exception, since that seems to be what we get if the ingestion API kills the socket right after a 400 goes out.
// Basically, we have no choice but to interpret that in the same manner as a 400, since no matter how we do it, it's a race and the 'broken pipe'
// result is always possible. So we might as well expect it and treat it properly.
//
if (ioe.getClass().getName().equals("java.net.SocketException") && ioe.getMessage().toLowerCase(Locale.ROOT).indexOf("broken pipe") != -1)
{
// We've seen what looks like the ingestion interface forcibly closing the socket.
// We *choose* to interpret this just like a 400 response. However, we log in the history using a different code,
// since we really don't know what happened for sure.
// Record the attempt
activityCode = "-103";
activityDetails = "Presuming an ingestion rejection: "+ioe.getMessage();
rval = false;
return;
}
// Record the attempt
activityCode = "-1";
activityDetails = ioe.getMessage();
// Log the error
Logging.ingest.warn("Error communicating with Ingestion API: "+ioe.getMessage(),ioe);
throw ioe;
}
}
catch (Throwable e)
{
this.exception = e;
}
}