public void run()

in connectors/gts/connector/src/main/java/org/apache/manifoldcf/agents/output/gts/HttpPoster.java [701:1067]


    public void run()
    {
      long length = document.getBinaryLength();
      InputStream is = document.getBinaryStream();

      try
      {
        // Do the operation!
        long fullStartTime = System.currentTimeMillis();

        // Open a socket to ingest, and to the response stream to get the post result
        try
        {
          // Set up the socket, and the (optional) secure socket.
          long responseRetryCount = responseRetries + (long)((float)length * sizeCoefficient);
          Socket socket = createSocket(responseRetryCount);

          try
          {

            InputStreamReader isr = new InputStreamReader(socket.getInputStream(),"ASCII");
            try
            {
              BufferedReader in = new BufferedReader(isr);
              try
              {
                OutputStream out = socket.getOutputStream();
                try
                {
                  // Create the output stream to GTS
                  String uri = url.getFile();
                  if (uri.length() == 0)
                    uri = "/";
                  byte[] tmp = ("POST " + uri + " HTTP/1.0\r\n").getBytes(StandardCharsets.UTF_8);
                  out.write(tmp, 0, tmp.length);

                  // Set all the headers
                  tmp = ("Document-URI: " + documentURI + "\r\n").getBytes(StandardCharsets.UTF_8);
                  out.write(tmp, 0, tmp.length);

                  writeCredentials(out);

                  // Apply ACL if present
                  if (aclXmlString.length() > 0)
                  {

                    String encodedACL = new Base64().encodeByteArray(aclXmlString.getBytes(StandardCharsets.UTF_8));

                    // Break into chunks - 4K each - 'cause otherwise we blow up the ingester.
                    int index = 0;
                    while (true)
                    {
                      if (index + HEADER_CHUNK >= encodedACL.length())
                      {
                        tmp = ("Document-ACL: " + encodedACL.substring(index) + "\r\n").getBytes(StandardCharsets.UTF_8);
                        out.write(tmp, 0, tmp.length);
                        break;
                      }
                      tmp = ("Document-ACL: " + encodedACL.substring(index,index + HEADER_CHUNK) + "\r\n").getBytes(StandardCharsets.UTF_8);
                      out.write(tmp, 0, tmp.length);
                      index += HEADER_CHUNK;
                    }
                  }

                  // Do the collections
                  if (collections != null)
                  {
                    for (String collectionName : collections)
                    {
                      String encodedValue = metadataEncode(collectionName);
                      //System.out.println("collection metadata: collection_name = '"+encodedValue+"'");
                      tmp = ("Document-Metadata: collection_name="+encodedValue+"\r\n").getBytes(StandardCharsets.UTF_8);
                      out.write(tmp, 0, tmp.length);
                    }
                  }

                  // Do the document template
                  if (documentTemplate != null && documentTemplate.length() > 0)
                  {
                    String encodedTemplate = new Base64().encodeByteArray(documentTemplate.getBytes(StandardCharsets.UTF_8));
                    // Break into chunks - 4K each - 'cause otherwise we blow up the ingester.
                    int index = 0;
                    while (true)
                    {
                      if (index + HEADER_CHUNK >= encodedTemplate.length())
                      {
                        tmp = ("Document-Template: " + encodedTemplate.substring(index) + "\r\n").getBytes(StandardCharsets.UTF_8);
                        out.write(tmp, 0, tmp.length);
                        break;
                      }
                      tmp = ("Document-Template: " + encodedTemplate.substring(index,index + HEADER_CHUNK) + "\r\n").getBytes(StandardCharsets.UTF_8);
                      out.write(tmp, 0, tmp.length);
                      index += HEADER_CHUNK;
                    }
                  }

                  // Write all the metadata, if any
                  Iterator<String> iter = document.getFields();
                  while (iter.hasNext())
                  {
                    String fieldName = iter.next();
                    String[] values = document.getFieldAsStrings(fieldName);
                    // We only handle strings right now!!!
                    int k = 0;
                    while (k < values.length)
                    {
                      String value = (String)values[k++];

                      String encodedValue = metadataEncode(value);
                      //System.out.println("Metadata: Name = '"+fieldName+"', value = '"+encodedValue+"'");
                      tmp = ("Document-Metadata: "+ fieldName+"="+encodedValue+"\r\n").getBytes(StandardCharsets.UTF_8);
                      out.write(tmp, 0, tmp.length);
                    }
                  }

                  tmp = ("Content-length: " + new Long(length).toString() + "\r\n\n").getBytes(StandardCharsets.UTF_8);
                  out.write(tmp, 0, tmp.length);

                  long total = 0;
                  long now, later;
                  now = System.currentTimeMillis();

                  byte[] bytes = new byte[buffersize];

                  // Write out the contents of the inputstream to the socket
                  while (true)
                  {
                    int count;
                    // Specially catch all errors that come from reading the input stream itself.
                    // This will help us segregate errors that come from the stream vs. those that come from the ingestion system.
                    try
                    {
                      count = is.read(bytes);
                    }
                    catch (java.net.SocketTimeoutException ioe)
                    {
                      // We have to catch socket timeout exceptions specially, because they are derived from InterruptedIOException
                      // They are otherwise just like IOExceptions

                      // Log the error
                      Logging.ingest.warn("Error reading data for transmission to Ingestion API: "+ioe.getMessage(),ioe);

                      activityStart = new Long(fullStartTime);
                      activityCode = "-1";
                      activityDetails = "Couldn't read document: "+ioe.getMessage();

                      // If this continues, we should indeed abort the job.  Retries should not go on indefinitely either; 2 hours is plenty
                      long currentTime = System.currentTimeMillis();
                      throw new ServiceInterruption("IO error reading document for ingestion: "+ioe.getMessage()+"; read will be retried again later",
                        ioe,
                        currentTime + interruptionRetryTime,
                        currentTime + 2L * 60L * 60000L,
                        -1,
                        true);

                    }
                    catch (InterruptedIOException ioe)
                    {
                      // If the transfer was interrupted, it may be because we are shutting down the thread.

                      // Third-party library exceptions derived from InterruptedIOException are possible; if the stream comes from httpclient especially.
                      // If we see one of these, we treat it as "not an interruption".
                      if (!ioe.getClass().getName().equals("java.io.InterruptedIOException"))
                      {
                        // Log the error
                        Logging.ingest.warn("Error reading data for transmission to Ingestion API: "+ioe.getMessage(),ioe);

                        activityStart = new Long(fullStartTime);
                        activityCode = "-1";
                        activityDetails = "Couldn't read document: "+ioe.getMessage();

                        // If this continues, we should indeed abort the job.  Retries should not go on indefinitely either; 2 hours is plenty
                        long currentTime = System.currentTimeMillis();
                        throw new ServiceInterruption("IO error reading document for ingestion: "+ioe.getMessage()+"; read will be retried again later",
                          ioe,
                          currentTime + interruptionRetryTime,
                          currentTime + 2L * 60L * 60000L,
                          -1,
                          true);
                      }
                      else
                        throw ioe;
                    }
                    catch (IOException ioe)
                    {
                      // We need to decide whether to throw a service interruption or metacarta exception, based on what went wrong.
                      // We never retry here; the cause is the repository, so there's not any point.

                      // Log the error
                      Logging.ingest.warn("Error reading data for transmission to Ingestion API: "+ioe.getMessage(),ioe);

                      activityStart = new Long(fullStartTime);
                      activityCode = "-1";
                      activityDetails = "Couldn't read document: "+ioe.getMessage();

                      // If this continues, we should indeed abort the job.  Retries should not go on indefinitely either; 2 hours is plenty
                      long currentTime = System.currentTimeMillis();
                      throw new ServiceInterruption("IO error reading document for ingestion: "+ioe.getMessage()+"; read will be retried again later",
                        ioe,
                        currentTime + interruptionRetryTime,
                        currentTime + 2L * 60L * 60000L,
                        -1,
                        true);
                    }

                    if (count == -1)
                      break;
                    readFromDocumentStreamYet = true;
                    out.write(bytes,0,count);
                    total += (long)count;
                  }

                  later = System.currentTimeMillis();
                  if (Logging.ingest.isDebugEnabled())
                    Logging.ingest.debug("Total bytes posted: " + new Long(total).toString() + ", total time: " + (later - now));

                  out.flush();

                  // Now, process response
                  String res;
                  try
                  {
                    res = getResponse(in);
                  }
                  catch (ServiceInterruption si)
                  {
                    activityStart = new Long(now);
                    activityCode = "-2";
                    activityDetails = si.getMessage();
                    throw si;
                  }

                  if (Logging.ingest.isDebugEnabled())
                    Logging.ingest.debug("Response code from ingest: '" + res + "'");

                  CodeDetails cd = new CodeDetails(res);

                  activityStart = new Long(now);
                  activityBytes = new Long(length);
                  activityCode = cd.getCode();
                  activityDetails = cd.getDetails();

                  int codeValue = cd.getCodeValue();

                  // A negative number means http error of some kind.
                  if (codeValue < 0)
                    throw new ManifoldCFException("Http protocol error");

                  // 200 means everything went OK
                  if (codeValue == 200)
                  {
                    rval = true;
                    return;
                  }

                  // Anything else means the document didn't ingest.
                  // There are three possibilities here:
                  // 1) The document will NEVER ingest (it's illegal), in which case a 400 or 403 will be returned, and
                  // 2) There is a transient error, in which case we will want to try again, after a wait.
                  //    If the situation is (2), then we CAN'T retry if we already read any of the stream; therefore
                  //    we are forced to throw a "service interrupted" exception, and let the caller reschedule
                  //    the ingestion.
                  // 3) Something is wrong with the setup, e.g. bad credentials.  In this case we chuck a ManifoldCFException,
                  //    since this will abort the current activity entirely.

                  if (codeValue == 401)
                    throw new ManifoldCFException("Bad credentials for ingestion",ManifoldCFException.SETUP_ERROR);

                  if (codeValue >= 400 && codeValue < 500)
                  {
                    rval = false;
                    return;
                  }

                  // If this continues, we should indeed abort the job.  Retries should not go on indefinitely either; 2 hours is plenty
                  long currentTime = System.currentTimeMillis();
                  throw new ServiceInterruption("Error "+Integer.toString(codeValue)+" from ingestion request; ingestion will be retried again later",
                    new ManifoldCFException("Ingestion HTTP error code "+Integer.toString(codeValue)),
                    currentTime + interruptionRetryTime,
                    currentTime + 2L * 60L * 60000L,
                    -1,
                    true);
                }
                finally
                {
                  out.close();
                }
              }
              finally
              {
                in.close();
              }
            }
            finally
            {
              isr.close();
            }
          }
          finally
          {
            try
            {
              socket.close();
            }
            catch (InterruptedIOException e)
            {
              throw e;
            }
            catch (IOException e)
            {
              Logging.ingest.debug("Error closing socket: "+e.getMessage(),e);
              // Do NOT rethrow
            }
          }
        }
        catch (java.net.SocketTimeoutException ioe)
        {
          // These are just like IO errors, but since they are derived from InterruptedIOException, they have to be caught first.
          // Log the error
          Logging.ingest.warn("Error connecting to ingestion API: "+ioe.getMessage(),ioe);

          activityStart = new Long(fullStartTime);
          activityCode = "-1";
          activityDetails = ioe.getMessage();

          throw ioe;
        }
        catch (InterruptedIOException e)
        {
          return;
        }
        catch (IOException ioe)
        {
          activityStart = new Long(fullStartTime);

          // Intercept "broken pipe" exception, since that seems to be what we get if the ingestion API kills the socket right after a 400 goes out.
          // Basically, we have no choice but to interpret that in the same manner as a 400, since no matter how we do it, it's a race and the 'broken pipe'
          // result is always possible.  So we might as well expect it and treat it properly.
          //
          if (ioe.getClass().getName().equals("java.net.SocketException") && ioe.getMessage().toLowerCase(Locale.ROOT).indexOf("broken pipe") != -1)
          {
            // We've seen what looks like the ingestion interface forcibly closing the socket.
            // We *choose* to interpret this just like a 400 response.  However, we log in the history using a different code,
            // since we really don't know what happened for sure.
            // Record the attempt

            activityCode = "-103";
            activityDetails = "Presuming an ingestion rejection: "+ioe.getMessage();
            rval = false;
            return;
          }

          // Record the attempt
          activityCode = "-1";
          activityDetails = ioe.getMessage();

          // Log the error
          Logging.ingest.warn("Error communicating with Ingestion API: "+ioe.getMessage(),ioe);

          throw ioe;
        }
      }
      catch (Throwable e)
      {
        this.exception = e;
      }
    }