in connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java [736:1430]
public void processDocuments(String[] documentIdentifiers, IExistingVersions statuses, Specification spec,
IProcessActivity activities, int jobMode, boolean usesDefaultAuthority)
throws ManifoldCFException, ServiceInterruption
{
getSession();
// The connection limit is designed to permit this connector to coexist with potentially other connectors, such as the web connector.
// There is currently no good way to enforce connection limits across all installed connectors - this will require considerably more
// thought to set up properly.
int connectionLimit = 200;
String[] fixedList = new String[2];
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: In getDocumentVersions for "+Integer.toString(documentIdentifiers.length)+" documents");
Filter f = new Filter(spec,false);
String[] acls = f.getAcls();
// Sort it,
java.util.Arrays.sort(acls);
// NOTE: There are two kinds of documents in here; documents that are RSS feeds (that presumably have a content-type
// of text/xml), and documents that need to be indexed.
//
// For the latter, the metadata etc is part of the version string. For the former, the only thing that is part of the version string is the
// document's checksum.
//
// The need to exclude documents from fetch based on whether they match an expression causes some difficulties, because we really
// DON'T want this to apply to the feeds themselves. Since the distinguishing characteristic of a feed is that it is in the seed list,
// and that its content-type is text/xml, we could use either of these characteristics to treat feeds differently from
// fetchable urls. But the latter approach requires a fetch, which is forbidden. So - the spec will be used to characterize the url.
// However, the spec might change, and the url might be dropped from the list - and then what??
//
// The final solution is to simply not queue what cannot be mapped.
int feedTimeout = f.getFeedTimeoutValue();
// The document specification has already been used to trim out documents that are not
// allowed from appearing in the queue. So, even that has already been done.
for (String documentIdentifier : documentIdentifiers)
{
// If it is in this list, we presume that it has been vetted against the map etc., so we don't do that again. We just fetch it.
// And, if the content type is xml, we calculate the version as if it is a feed rather than a document.
// Get the url
String urlValue = documentIdentifier;
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Getting version string for '"+urlValue+"'");
String versionString;
String ingestURL = null;
String[] pubDates = null;
String[] sources = null;
String[] titles = null;
String[] authorNames = null;
String[] authorEmails = null;
String[] categories = null;
String[] descriptions = null;
try
{
// If there's a carrydown "data" value for this url, we use that value rather than actually fetching the document. This also means we don't need to
// do a robots check, because we aren't actually crawling anything. So, ALWAYS do this first...
CharacterInput[] dechromedData = activities.retrieveParentDataAsFiles(urlValue,"data");
try
{
if (dechromedData.length > 0)
{
// Data already available. The fetch cycle can be entirely avoided, as can the robots check.
ingestURL = f.mapDocumentURL(urlValue);
if (ingestURL != null)
{
// Open up an input stream corresponding to the carrydown data. The stream will be encoded as utf-8.
try
{
InputStream is = dechromedData[0].getUtf8Stream();
try
{
StringBuilder sb = new StringBuilder();
long checkSum = cache.addData(activities,urlValue,"text/html",is);
// Grab what we need from the passed-down data for the document. These will all become part
// of the version string.
pubDates = activities.retrieveParentData(urlValue,"pubdate");
sources = activities.retrieveParentData(urlValue,"source");
titles = activities.retrieveParentData(urlValue,"title");
authorNames = activities.retrieveParentData(urlValue,"authorname");
authorEmails = activities.retrieveParentData(urlValue,"authoremail");
categories = activities.retrieveParentData(urlValue,"category");
descriptions = activities.retrieveParentData(urlValue,"description");
java.util.Arrays.sort(pubDates);
java.util.Arrays.sort(sources);
java.util.Arrays.sort(titles);
java.util.Arrays.sort(authorNames);
java.util.Arrays.sort(authorEmails);
java.util.Arrays.sort(categories);
java.util.Arrays.sort(descriptions);
if (sources.length == 0)
{
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Warning; URL '"+ingestURL+"' doesn't seem to have any RSS feed source!");
}
sb.append('+');
packList(sb,acls,'+');
if (acls.length > 0)
{
sb.append('+');
pack(sb,defaultAuthorityDenyToken,'+');
}
else
sb.append('-');
// The ingestion URL
pack(sb,ingestURL,'+');
// The pub dates
packList(sb,pubDates,'+');
// The titles
packList(sb,titles,'+');
// The sources
packList(sb,sources,'+');
// The categories
packList(sb,categories,'+');
// The descriptions
packList(sb,descriptions,'+');
// The author names
packList(sb,authorNames,'+');
// The author emails
packList(sb,authorEmails,'+');
// Do the checksum part, which does not need to be parseable.
sb.append(new Long(checkSum).toString());
versionString = sb.toString();
}
finally
{
is.close();
}
}
catch (java.net.SocketTimeoutException e)
{
throw new ManifoldCFException("IO exception reading data from string: "+e.getMessage(),e);
}
catch (InterruptedIOException e)
{
throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
}
catch (IOException e)
{
throw new ManifoldCFException("IO exception reading data from string: "+e.getMessage(),e);
}
}
else
{
// Document a seed or unmappable; just skip
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Skipping carry-down document '"+urlValue+"' because it is unmappable or is a seed.");
}
}
else
{
// Get the old version string
String oldVersionString = statuses.getIndexedVersionString(documentIdentifier);
// Unpack the old version as much as possible.
// We are interested in what the ETag and Last-Modified headers were last time.
String lastETagValue = null;
String lastModifiedValue = null;
// Note well: Non-continuous jobs cannot use etag because the rss document MUST be fetched each time for such jobs,
// or the documents it points at would get deleted.
//
// NOTE: I disabled this code because we really need the feed's TTL value in order to reschedule properly. I can't get the
// TTL value without refetching the document - therefore ETag and Last-Modified cannot be used :-(
if (false && jobMode == JOBMODE_CONTINUOUS && oldVersionString != null && oldVersionString.startsWith("-"))
{
// It's a feed, so the last etag and last-modified fields should be encoded in this version string.
StringBuilder lastETagBuffer = new StringBuilder();
int unpackPos = unpack(lastETagBuffer,oldVersionString,1,'+');
StringBuilder lastModifiedBuffer = new StringBuilder();
unpackPos = unpack(lastModifiedBuffer,oldVersionString,unpackPos,'+');
if (lastETagBuffer.length() > 0)
lastETagValue = lastETagBuffer.toString();
if (lastModifiedBuffer.length() > 0)
lastModifiedValue = lastModifiedBuffer.toString();
}
if (Logging.connectors.isDebugEnabled() && (lastETagValue != null || lastModifiedValue != null))
Logging.connectors.debug("RSS: Document '"+urlValue+"' was found to have a previous ETag value of '"+((lastETagValue==null)?"null":lastETagValue)+
"' and a previous Last-Modified value of '"+((lastModifiedValue==null)?"null":lastModifiedValue)+"'");
// Robots check. First, we need to separate the url into its components
URL url;
try
{
url = new URL(urlValue);
}
catch (MalformedURLException e)
{
Logging.connectors.debug("RSS: URL '"+urlValue+"' is malformed; skipping",e);
activities.deleteDocument(documentIdentifier);
continue;
}
String protocol = url.getProtocol();
int port = url.getPort();
String hostName = url.getHost();
String pathPart = url.getFile();
// Check with robots to see if it's allowed
if (robotsUsage >= ROBOTS_DATA && !robots.isFetchAllowed(currentContext,throttleGroupName,
protocol,port,hostName,url.getPath(),
userAgent,from,
proxyHost, proxyPort, proxyAuthDomain, proxyAuthUsername, proxyAuthPassword,
activities, connectionLimit))
{
activities.recordActivity(null,ACTIVITY_FETCH,
null,urlValue,Integer.toString(-2),"Robots exclusion",null);
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Skipping url '"+urlValue+"' because robots.txt says to");
activities.deleteDocument(documentIdentifier);
continue;
}
// Now, use the fetcher, and get the file.
IThrottledConnection connection = fetcher.createConnection(currentContext,
throttleGroupName,
hostName,
connectionLimit,
feedTimeout,
proxyHost,
proxyPort,
proxyAuthDomain,
proxyAuthUsername,
proxyAuthPassword,
activities);
try
{
// Begin the fetch
connection.beginFetch("Data");
try
{
// Execute the request.
// Use the connect timeout from the document specification!
int status = connection.executeFetch(protocol,port,pathPart,userAgent,from,
lastETagValue,lastModifiedValue);
switch (status)
{
case IThrottledConnection.STATUS_NOCHANGE:
versionString = oldVersionString;
break;
case IThrottledConnection.STATUS_OK:
try
{
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Successfully fetched "+urlValue);
// Document successfully fetched!
// If its content is xml, presume it's a feed...
String contentType = connection.getResponseHeader("Content-Type");
// Some sites have multiple content types. We just look at the LAST one in that case.
if (contentType != null)
{
String[] contentTypes = contentType.split(",");
if (contentTypes.length > 0)
contentType = contentTypes[contentTypes.length-1].trim();
else
contentType = null;
}
String strippedContentType = contentType;
if (strippedContentType != null)
{
int pos = strippedContentType.indexOf(";");
if (pos != -1)
strippedContentType = strippedContentType.substring(0,pos).trim();
}
boolean isXML = (strippedContentType != null && xmlContentTypes.contains(strippedContentType));
ingestURL = null;
if (!isXML)
{
// If the chromed content mode is set to "skip", and we got here, it means
// we should not include the content.
if (f.getChromedContentMode() == CHROMED_SKIP)
{
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Removing url '"+urlValue+"' because it no longer has dechromed content available");
versionString = null;
break;
}
// Decide whether to exclude this document based on what we see here.
// Basically, we want to get rid of everything that we don't know what
// to do with in the ingestion system.
if (!activities.checkMimeTypeIndexable(contentType))
{
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Removing url '"+urlValue+"' because it had the wrong content type: "+((contentType==null)?"null":"'"+contentType+"'"));
versionString = null;
break;
}
ingestURL = f.mapDocumentURL(urlValue);
}
else
{
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: The url '"+urlValue+"' is a feed");
if (!f.isSeed(urlValue))
{
// Remove the feed from consideration, since it has left the list of seeds
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Removing feed url '"+urlValue+"' because it is not a seed.");
versionString = null;
break;
}
}
InputStream is = connection.getResponseBodyStream();
try
{
long checkSum = cache.addData(activities,urlValue,contentType,is);
StringBuilder sb = new StringBuilder();
if (ingestURL != null)
{
// We think it is ingestable. The version string accordingly starts with a "+".
// Grab what we need from the passed-down data for the document. These will all become part
// of the version string.
pubDates = activities.retrieveParentData(urlValue,"pubdate");
sources = activities.retrieveParentData(urlValue,"source");
titles = activities.retrieveParentData(urlValue,"title");
authorNames = activities.retrieveParentData(urlValue,"authorname");
authorEmails = activities.retrieveParentData(urlValue,"authoremail");
categories = activities.retrieveParentData(urlValue,"category");
descriptions = activities.retrieveParentData(urlValue,"description");
java.util.Arrays.sort(pubDates);
java.util.Arrays.sort(sources);
java.util.Arrays.sort(titles);
java.util.Arrays.sort(authorNames);
java.util.Arrays.sort(authorEmails);
java.util.Arrays.sort(categories);
java.util.Arrays.sort(descriptions);
if (sources.length == 0)
{
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Warning; URL '"+ingestURL+"' doesn't seem to have any RSS feed source!");
}
sb.append('+');
packList(sb,acls,'+');
if (acls.length > 0)
{
sb.append('+');
pack(sb,defaultAuthorityDenyToken,'+');
}
else
sb.append('-');
// The ingestion URL
pack(sb,ingestURL,'+');
// The pub dates
packList(sb,pubDates,'+');
// The titles
packList(sb,titles,'+');
// The sources
packList(sb,sources,'+');
// The categories
packList(sb,categories,'+');
// The descriptions
packList(sb,descriptions,'+');
// The author names
packList(sb,authorNames,'+');
// The author emails
packList(sb,authorEmails,'+');
}
else
{
sb.append('-');
String etag = connection.getResponseHeader("ETag");
if (etag == null)
pack(sb,"",'+');
else
pack(sb,etag,'+');
String lastModified = connection.getResponseHeader("Last-Modified");
if (lastModified == null)
pack(sb,"",'+');
else
pack(sb,lastModified,'+');
}
// Do the checksum part, which does not need to be parseable.
sb.append(new Long(checkSum).toString());
versionString = sb.toString();
}
finally
{
is.close();
}
}
catch (java.net.SocketTimeoutException e)
{
Logging.connectors.warn("RSS: Socket timeout exception fetching document contents '"+urlValue+"' - skipping: "+e.getMessage(), e);
versionString = null;
}
catch (ConnectTimeoutException e)
{
Logging.connectors.warn("RSS: Connecto timeout exception fetching document contents '"+urlValue+"' - skipping: "+e.getMessage(), e);
versionString = null;
}
catch (InterruptedIOException e)
{
throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
}
catch (IOException e)
{
Logging.connectors.warn("RSS: IO exception fetching document contents '"+urlValue+"' - skipping: "+e.getMessage(), e);
versionString = null;
}
break;
case IThrottledConnection.STATUS_SITEERROR:
case IThrottledConnection.STATUS_PAGEERROR:
default:
// Record an *empty* version.
// This signals the processDocuments() method that we really don't want to ingest this document, but we also don't
// want to blow the document out of the queue, since then we'd wind up perhaps fetching it multiple times.
versionString = "";
break;
}
}
finally
{
connection.doneFetch(activities);
}
}
finally
{
connection.close();
}
if (versionString == null)
{
activities.deleteDocument(documentIdentifier);
continue;
}
if (!(versionString.length() == 0 || activities.checkDocumentNeedsReindexing(documentIdentifier,versionString)))
continue;
// Process document!
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Processing '"+urlValue+"'");
// The only links we extract come from documents that we think are RSS feeds.
// When we think that's the case, we attempt to parse it as RSS XML.
if (ingestURL == null)
{
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Interpreting document '"+urlValue+"' as a feed");
// We think it is a feed.
// If this is a continuous job, AND scanonly is true, it means that the document was either identical to the
// previous fetch, or was not fetched at all. In that case, it may not even be there, and we *certainly* don't
// want to attempt to process it in any case.
//
// NOTE: I re-enabled the scan permanently because we need the TTL value to be set whatever the cost. If the
// TTL value is not set, we default to the specified job's feed-rescan time, which is not going to be current enough for some feeds.
if (true || jobMode != JOBMODE_CONTINUOUS)
{
handleRSSFeedSAX(urlValue,activities,f);
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Extraction of feed '"+urlValue+"' complete");
// Record the feed's version string, so we won't refetch unless needed.
// This functionality is required for the last ETag and Last-Modified fields to be sent to the rss server, and to
// keep track of the adaptive parameters.
activities.recordDocument(documentIdentifier,versionString);
}
else
{
// The problem here is that we really do need to set the rescan time to something reasonable.
// But we might not even have read the feed! So what to do??
// One answer is to build a connector-specific table that carries the last value of every feed around.
// Another answer is to change the version code to always read the feed (and the heck with ETag and Last-Modified).
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Feed '"+urlValue+"' does not appear to differ from previous fetch for a continuous job; not extracting!");
long currentTime = System.currentTimeMillis();
Long defaultRescanTime = f.getDefaultRescanTime(currentTime);
if (defaultRescanTime != null)
{
Long minimumTime = f.getMinimumRescanTime(currentTime);
if (minimumTime != null)
{
if (defaultRescanTime.longValue() < minimumTime.longValue())
defaultRescanTime = minimumTime;
}
}
activities.setDocumentScheduleBounds(urlValue,defaultRescanTime,defaultRescanTime,null,null);
}
}
else
{
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Interpreting '"+urlValue+"' as a document");
String errorCode = null;
String errorDesc = null;
long startTime = System.currentTimeMillis();
Long fileLengthLong = null;
try
{
long documentLength = cache.getDataLength(documentIdentifier);
if (!activities.checkLengthIndexable(documentLength))
{
activities.noDocument(documentIdentifier,versionString);
errorCode = activities.EXCLUDED_LENGTH;
errorDesc = "Document rejected because of length ("+documentLength+")";
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Skipping document '"+urlValue+"' because its length was rejected ("+documentLength+")");
continue;
}
if (!activities.checkURLIndexable(documentIdentifier))
{
activities.noDocument(documentIdentifier,versionString);
errorCode = activities.EXCLUDED_URL;
errorDesc = "Document rejected because of URL ('"+documentIdentifier+"')";
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Skipping document '"+urlValue+"' because its URL was rejected ('"+documentIdentifier+"')");
continue;
}
// Check if it's a recognized content type
String contentType = cache.getContentType(documentIdentifier);
// Some sites have multiple content types. We just look at the LAST one in that case.
if (contentType != null)
{
String[] contentTypes = contentType.split(",");
if (contentTypes.length > 0)
contentType = contentTypes[contentTypes.length-1].trim();
else
contentType = null;
}
if (!activities.checkMimeTypeIndexable(contentType))
{
activities.noDocument(documentIdentifier,versionString);
errorCode = activities.EXCLUDED_MIMETYPE;
errorDesc = "Document rejected because of mime type ("+contentType+")";
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Skipping document '"+urlValue+"' because its mime type was rejected ('"+contentType+"')");
continue;
}
// Treat it as an ingestable document.
long dataSize = cache.getDataLength(urlValue);
RepositoryDocument rd = new RepositoryDocument();
// Set content type
if (contentType != null)
rd.setMimeType(contentType);
// Turn into acls and add into description
String[] denyAcls;
if (acls == null)
denyAcls = null;
else if (acls.length == 0)
denyAcls = new String[0];
else
denyAcls = new String[]{defaultAuthorityDenyToken};
if (acls != null && denyAcls != null)
rd.setSecurity(RepositoryDocument.SECURITY_TYPE_DOCUMENT,acls,denyAcls);
if (titles != null && titles.length > 0)
rd.addField("title",titles);
if (authorNames != null && authorNames.length > 0)
rd.addField("authorname",authorNames);
if (authorEmails != null && authorEmails.length > 0)
rd.addField("authoremail",authorEmails);
if (descriptions != null && descriptions.length > 0)
rd.addField("summary",descriptions);
if (sources != null && sources.length > 0)
rd.addField("source",sources);
if (categories != null && categories.length > 0)
rd.addField("category",categories);
// The pubdates are a ms since epoch value; we want the minimum one for the origination time.
Long minimumOrigTime = null;
if (pubDates != null && pubDates.length > 0)
{
String[] pubDateValuesISO = new String[pubDates.length];
TimeZone tz = TimeZone.getTimeZone("UTC");
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'", Locale.ROOT);
df.setTimeZone(tz);
for (int k = 0; k < pubDates.length; k++)
{
String pubDate = pubDates[k];
try
{
Long pubDateLong = new Long(pubDate);
if (minimumOrigTime == null || pubDateLong.longValue() < minimumOrigTime.longValue())
minimumOrigTime = pubDateLong;
pubDateValuesISO[k] = df.format(new Date(pubDateLong.longValue()));
}
catch (NumberFormatException e)
{
// Do nothing; the version string seems to not mean anything
pubDateValuesISO[k] = "";
}
}
rd.addField("pubdate",pubDates);
rd.addField("pubdateiso",pubDateValuesISO);
}
if (minimumOrigTime != null)
activities.setDocumentOriginationTime(urlValue,minimumOrigTime);
InputStream is = cache.getData(urlValue);
if (is != null)
{
try
{
rd.setBinary(is,dataSize);
try
{
activities.ingestDocumentWithException(documentIdentifier,versionString,ingestURL,rd);
errorCode = "OK";
fileLengthLong = new Long(dataSize);
}
catch (IOException e)
{
errorCode = e.getClass().getSimpleName().toUpperCase(Locale.ROOT);
errorDesc = e.getMessage();
handleIOException(e,"reading data");
}
}
finally
{
try
{
is.close();
}
catch (IOException e)
{
errorCode = e.getClass().getSimpleName().toUpperCase(Locale.ROOT);
errorDesc = e.getMessage();
handleIOException(e,"closing stream");
}
}
}
}
catch (ManifoldCFException e)
{
if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
errorCode = null;
throw e;
}
finally
{
if (errorCode != null)
activities.recordActivity(new Long(startTime),ACTIVITY_PROCESS,
null,urlValue,errorCode,errorDesc,null);
}
}
}
}
finally
{
for (CharacterInput ci : dechromedData)
{
if (ci != null)
ci.discard();
}
}
}
finally
{
// Remove any fetched documents.
cache.deleteData(documentIdentifier);
}
}
}