protected void ingestFromLiveLink()

in connectors/livelink/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/livelink/LivelinkConnector.java [2985:3424]


  protected void ingestFromLiveLink(LivelinkContext llc,
    String documentIdentifier, String version,
    String[] actualAcls, String[] denyAcls,
    String[] categoryPaths,
    IProcessActivity activities,
    MetadataDescription desc, SystemMetadataDescription sDesc)
    throws ManifoldCFException, ServiceInterruption
  {

    String contextMsg = "for '"+documentIdentifier+"'";


    // Fetch logging
    long startTime = System.currentTimeMillis();
    String resultCode = null;
    String resultDescription = null;
    Long readSize = null;
    int objID;
    int vol;

    int colonPos = documentIdentifier.indexOf(":",1);
        
    if (colonPos == -1)
    {
      objID = new Integer(documentIdentifier.substring(1)).intValue();
      vol = LLENTWK_VOL;
    }
    else
    {
      objID = new Integer(documentIdentifier.substring(colonPos+1)).intValue();
      vol = new Integer(documentIdentifier.substring(1,colonPos)).intValue();
    }
    
    // Try/finally for fetch logging
    try
    {
      String viewHttpAddress = convertToViewURI(documentIdentifier);
      if (viewHttpAddress == null)
      {
        if (Logging.connectors.isDebugEnabled())
          Logging.connectors.debug("Livelink: No view URI "+contextMsg+" - not ingesting");
        resultCode = "NOVIEWURI";
        resultDescription = "Document had no view URI";
        activities.noDocument(documentIdentifier,version);
        return;
      }
      
      // Check URL first
      if (!activities.checkURLIndexable(viewHttpAddress))
      {
        // Document not ingestable due to URL
        resultCode = activities.EXCLUDED_URL;
        resultDescription = "URL ("+viewHttpAddress+") was rejected by output connector";
        if (Logging.connectors.isDebugEnabled())
          Logging.connectors.debug("Livelink: Excluding document "+documentIdentifier+" because its URL ("+viewHttpAddress+") was rejected by output connector");
        activities.noDocument(documentIdentifier,version);
        return;
      }
      
      // Add general metadata
      ObjectInformation objInfo = llc.getObjectInformation(vol,objID);
      VersionInformation versInfo = llc.getVersionInformation(vol,objID,0);
      if (!objInfo.exists())
      {
        resultCode = "OBJECTNOTFOUND";
        resultDescription = "Object was not found in Livelink";
        Logging.connectors.debug("Livelink: No object "+contextMsg+": not ingesting");
        activities.noDocument(documentIdentifier,version);
        return;
      }
      if (!versInfo.exists())
      {
        resultCode = "VERSIONNOTFOUND";
        resultDescription = "Version was not found in Livelink";
        Logging.connectors.debug("Livelink: No version data "+contextMsg+": not ingesting");
        activities.noDocument(documentIdentifier,version);
        return;
      }

      String mimeType = versInfo.getMimeType();
      if (!activities.checkMimeTypeIndexable(mimeType))
      {
        // Document not indexable because of its mime type
        resultCode = activities.EXCLUDED_MIMETYPE;
        resultDescription = "Mime type ("+mimeType+") was rejected by output connector";
        if (Logging.connectors.isDebugEnabled())
          Logging.connectors.debug("Livelink: Excluding document "+documentIdentifier+" because its mime type ("+mimeType+") was rejected by output connector");
        activities.noDocument(documentIdentifier,version);
        return;
      }
        
      Long dataSize = versInfo.getDataSize();
      if (dataSize == null)
      {
        // Document had no length
        resultCode = "DOCUMENTNOLENGTH";
        resultDescription = "Document had no length in Livelink";
        if (Logging.connectors.isDebugEnabled())
          Logging.connectors.debug("Livelink: Excluding document "+documentIdentifier+" because it had no length");
        activities.noDocument(documentIdentifier,version);
        return;
      }
      
      if (!activities.checkLengthIndexable(dataSize.longValue()))
      {
        // Document not indexable because of its length
        resultCode = activities.EXCLUDED_LENGTH;
        resultDescription = "Document length ("+dataSize+") was rejected by output connector";
        if (Logging.connectors.isDebugEnabled())
          Logging.connectors.debug("Livelink: Excluding document "+documentIdentifier+" because its length ("+dataSize+") was rejected by output connector");
        activities.noDocument(documentIdentifier,version);
        return;
      }

      Date modifyDate = versInfo.getModifyDate();
      if (!activities.checkDateIndexable(modifyDate))
      {
        // Document not indexable because of its date
        resultCode = activities.EXCLUDED_DATE;
        resultDescription = "Document date ("+modifyDate+") was rejected by output connector";
        if (Logging.connectors.isDebugEnabled())
          Logging.connectors.debug("Livelink: Excluding document "+documentIdentifier+" because its date ("+modifyDate+") was rejected by output connector");
        activities.noDocument(documentIdentifier,version);
        return;
      }
      
      String fileName = versInfo.getFileName();
      Date creationDate = objInfo.getCreationDate();
      Integer parentID = objInfo.getParentId();
      
      
      RepositoryDocument rd = new RepositoryDocument();

      // Add general data we need for the output connector
      if (mimeType != null)
        rd.setMimeType(mimeType);
      if (fileName != null)
        rd.setFileName(fileName);
      if (creationDate != null)
        rd.setCreatedDate(creationDate);
      if (modifyDate != null)
        rd.setModifiedDate(modifyDate);
            
      rd.addField(GENERAL_NAME_FIELD,objInfo.getName());
      rd.addField(GENERAL_DESCRIPTION_FIELD,objInfo.getComments());
      if (creationDate != null)
        rd.addField(GENERAL_CREATIONDATE_FIELD,DateParser.formatISO8601Date(creationDate));
      if (modifyDate != null)
        rd.addField(GENERAL_MODIFYDATE_FIELD,DateParser.formatISO8601Date(modifyDate));
      if (parentID != null)
        rd.addField(GENERAL_PARENTID,parentID.toString());
      UserInformation owner = llc.getUserInformation(objInfo.getOwnerId().intValue());
      UserInformation creator = llc.getUserInformation(objInfo.getCreatorId().intValue());
      UserInformation modifier = llc.getUserInformation(versInfo.getOwnerId().intValue());
      if (owner != null)
        rd.addField(GENERAL_OWNER,owner.getName());
      if (creator != null)
        rd.addField(GENERAL_CREATOR,creator.getName());
      if (modifier != null)
        rd.addField(GENERAL_MODIFIER,modifier.getName());

      // Iterate over the metadata items.  These are organized by category
      // for speed of lookup.

      Iterator<MetadataItem> catIter = desc.getItems(categoryPaths);
      while (catIter.hasNext())
      {
        MetadataItem item = catIter.next();
        MetadataPathItem pathItem = item.getPathItem();
        if (pathItem != null)
        {
          int catID = pathItem.getCatID();
          // grab the associated catversion
          LLValue catVersion = getCatVersion(objID,catID);
          if (catVersion != null)
          {
            // Go through attributes now
            Iterator<String> attrIter = item.getAttributeNames();
            while (attrIter.hasNext())
            {
              String attrName = attrIter.next();
              // Create a unique metadata name
              String metadataName = pathItem.getCatName()+":"+attrName;
              // Fetch the metadata and stuff it into the RepositoryData structure
              String[] metadataValue = getAttributeValue(catVersion,attrName);
              if (metadataValue != null)
                rd.addField(metadataName,metadataValue);
              else
                Logging.connectors.warn("Livelink: Metadata attribute '"+metadataName+"' does not seem to exist; please correct the job");
            }
          }
          
        }
      }

      if (actualAcls != null && denyAcls != null)
        rd.setSecurity(RepositoryDocument.SECURITY_TYPE_DOCUMENT,actualAcls,denyAcls);

      // Add the path metadata item into the mix, if enabled
      String pathAttributeName = sDesc.getPathAttributeName();
      if (pathAttributeName != null && pathAttributeName.length() > 0)
      {
        String pathString = sDesc.getPathAttributeValue(documentIdentifier);
        if (pathString != null)
        {
          if (Logging.connectors.isDebugEnabled())
            Logging.connectors.debug("Livelink: Path attribute name is '"+pathAttributeName+"'"+contextMsg+", value is '"+pathString+"'");
          rd.addField(pathAttributeName,pathString);
        }
      }

      if (ingestProtocol != null)
      {
        // Use HTTP to fetch document!
        String ingestHttpAddress = convertToIngestURI(documentIdentifier);
        if (ingestHttpAddress == null)
        {
          if (Logging.connectors.isDebugEnabled())
            Logging.connectors.debug("Livelink: No fetch URI "+contextMsg+" - not ingesting");
          resultCode = "NOURI";
          resultDescription = "Document had no fetch URI";
          activities.noDocument(documentIdentifier,version);
          return;
        }

        // Set up connection
        HttpClient client = getInitializedClient(contextMsg);

        long currentTime;

        if (Logging.connectors.isInfoEnabled())
          Logging.connectors.info("Livelink: " + ingestHttpAddress);


        HttpGet method = new HttpGet(getHost().toURI() + ingestHttpAddress);
        method.setHeader(new BasicHeader("Accept","*/*"));

        boolean wasInterrupted = false;
        ExecuteMethodThread methodThread = new ExecuteMethodThread(client,method);
        methodThread.start();
        try
        {
          int statusCode = methodThread.getResponseCode();
          switch (statusCode)
          {
          case 500:
          case 502:
            Logging.connectors.warn("Livelink: Service interruption during fetch "+contextMsg+" with Livelink HTTP Server, retrying...");
            resultCode = "FETCHFAILED";
            resultDescription = "HTTP error code "+statusCode+" fetching document";
            throw new ServiceInterruption("Service interruption during fetch",new ManifoldCFException(Integer.toString(statusCode)+" error while fetching"),System.currentTimeMillis()+60000L,
              System.currentTimeMillis()+600000L,-1,true);

          case HttpStatus.SC_UNAUTHORIZED:
            Logging.connectors.warn("Livelink: Document fetch unauthorized for "+ingestHttpAddress+" ("+contextMsg+")");
            // Since we logged in, we should fail here if the ingestion user doesn't have access to the
            // the document, but if we do, don't fail hard.
            resultCode = "UNAUTHORIZED";
            resultDescription = "Document fetch was unauthorized by IIS";
            activities.noDocument(documentIdentifier,version);
            return;

          case HttpStatus.SC_OK:
            if (Logging.connectors.isDebugEnabled())
              Logging.connectors.debug("Livelink: Created http document connection to Livelink "+contextMsg);
            // A non-existent content length will cause a value of -1 to be returned.  This seems to indicate that the session login did not work right.
            if (methodThread.getResponseContentLength() < 0)
            {
              resultCode = "SESSIONLOGINFAILED";
              resultDescription = "Response content length was -1, which usually means session login did not succeed";
              activities.noDocument(documentIdentifier,version);
              return;
            }
              
            try
            {
              InputStream is = methodThread.getSafeInputStream();
              try
              {
                rd.setBinary(is,dataSize);
                            
                activities.ingestDocumentWithException(documentIdentifier,version,viewHttpAddress,rd);
                resultCode = "OK";
                readSize = dataSize;
                    
                if (Logging.connectors.isDebugEnabled())
                  Logging.connectors.debug("Livelink: Ingesting done "+contextMsg);

              }
              finally
              {
                // Close stream via thread, since otherwise this can hang
                is.close();
              }
            }
            catch (InterruptedException e)
            {
              wasInterrupted = true;
              throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
            }
            catch (HttpException e)
            {
              resultCode = e.getClass().getSimpleName().toUpperCase(Locale.ROOT);
              resultDescription = e.getMessage();
              handleHttpException(contextMsg,e);
            }
            catch (IOException e)
            {
              resultCode = e.getClass().getSimpleName().toUpperCase(Locale.ROOT);
              resultDescription = e.getMessage();
              handleIOException(contextMsg,e);
            }
            break;
          case HttpStatus.SC_BAD_REQUEST:
          case HttpStatus.SC_USE_PROXY:
          case HttpStatus.SC_GONE:
            resultCode = "HTTPERROR";
            resultDescription = "Http request returned status "+Integer.toString(statusCode);
            throw new ManifoldCFException("Unrecoverable request failure; error = "+Integer.toString(statusCode));
          default:
            resultCode = "UNKNOWNHTTPCODE";
            resultDescription = "Http request returned status "+Integer.toString(statusCode);
            Logging.connectors.warn("Livelink: Attempt to retrieve document from '"+ingestHttpAddress+"' received a response of "+Integer.toString(statusCode)+"; retrying in one minute");
            currentTime = System.currentTimeMillis();
            throw new ServiceInterruption("Fetch failed; retrying in 1 minute",new ManifoldCFException("Fetch failed with unknown code "+Integer.toString(statusCode)),
              currentTime+60000L,currentTime+600000L,-1,true);
          }
        }
        catch (InterruptedException e)
        {
          // Drop the connection on the floor
          methodThread.interrupt();
          methodThread = null;
          throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
        }
        catch (HttpException e)
        {
          resultCode = e.getClass().getSimpleName().toUpperCase(Locale.ROOT);
          resultDescription = e.getMessage();
          handleHttpException(contextMsg,e);
        }
        catch (IOException e)
        {
          resultCode = e.getClass().getSimpleName().toUpperCase(Locale.ROOT);
          resultDescription = e.getMessage();
          handleIOException(contextMsg,e);
        }
        finally
        {
          if (methodThread != null)
          {
            methodThread.abort();
            try
            {
              if (!wasInterrupted)
                methodThread.finishUp();
            }
            catch (InterruptedException e)
            {
              throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
            }
          }
        }
      }
      else
      {
        // Use FetchVersion instead
        long currentTime;
              
        // Fire up the document reading thread
        DocumentReadingThread t = new DocumentReadingThread(vol,objID,0);
        boolean wasInterrupted = false;
        t.start();
        try 
        {
          try
          {
            InputStream is = t.getSafeInputStream();
            try 
            {
              // Can only index while background thread is running!
              rd.setBinary(is, dataSize);
              activities.ingestDocumentWithException(documentIdentifier, version, viewHttpAddress, rd);
              resultCode = "OK";
              readSize = dataSize;
            }
            finally
            {
              is.close();
            }
          }
          catch (java.net.SocketTimeoutException e)
          {
            throw e;
          }
          catch (InterruptedIOException e)
          {
            wasInterrupted = true;
            throw e;
          }
          finally
          {
            if (!wasInterrupted)
              t.finishUp();
          }

          // No errors.  Record the fact that we made it.
        }
        catch (InterruptedException e) 
        {
          t.interrupt();
          throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
            ManifoldCFException.INTERRUPTED);
        }
        catch (IOException e)
        {
          resultCode = e.getClass().getSimpleName().toUpperCase(Locale.ROOT);
          resultDescription = e.getMessage();
          handleIOException(contextMsg,e);
        }
        catch (RuntimeException e)
        {
          resultCode = e.getClass().getSimpleName().toUpperCase(Locale.ROOT);
          resultDescription = e.getMessage();
          handleLivelinkRuntimeException(e,0,true);
        }
      }
    }
    catch (ManifoldCFException e)
    {
      if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
        resultCode = null;
      throw e;
    }
    finally
    {
      if (resultCode != null)
        activities.recordActivity(new Long(startTime),ACTIVITY_FETCH,readSize,vol+":"+objID,resultCode,resultDescription,null);
    }
  }