in connectors/jdbc/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/jdbc/JDBCConnector.java [303:953]
public void processDocuments(String[] documentIdentifiers, IExistingVersions statuses, Specification spec,
IProcessActivity activities, int jobMode, boolean usesDefaultAuthority)
throws ManifoldCFException, ServiceInterruption
{
TableSpec ts = new TableSpec(spec);
Set<String> acls = ts.getAcls();
String[] versionsReturned = new String[documentIdentifiers.length];
// If there is no version query, then always return empty string for all documents.
// This will mean that processDocuments will be called
// for all. ProcessDocuments will then be responsible for doing document deletes itself,
// based on the query results.
Map<String,String> documentVersions = new HashMap<String,String>();
if (ts.versionQuery != null && ts.versionQuery.length() > 0)
{
// If there IS a versions query, do it. First set up the variables, then do the substitution.
VariableMap vm = new VariableMap();
addConstant(vm,JDBCConstants.idReturnVariable,JDBCConstants.idReturnColumnName);
addConstant(vm,JDBCConstants.versionReturnVariable,JDBCConstants.versionReturnColumnName);
if (addIDList(vm,JDBCConstants.idListVariable,documentIdentifiers,null))
{
// Do the substitution
ArrayList paramList = new ArrayList();
StringBuilder sb = new StringBuilder();
substituteQuery(ts.versionQuery,vm,sb,paramList);
// Now, build a result return, and a hash table so we can correlate the returned values with the place to put them.
// We presume that if the row is missing, the document is gone.
// Fire off the query!
getSession();
IDynamicResultSet result;
String queryText = sb.toString();
long startTime = System.currentTimeMillis();
// Get a dynamic resultset. Contract for dynamic resultset is that if
// one is returned, it MUST be closed, or a connection will leak.
try
{
result = connection.executeUncachedQuery(queryText,paramList,-1);
}
catch (ManifoldCFException e)
{
// If failure, record the failure.
if (e.getErrorCode() != ManifoldCFException.INTERRUPTED)
activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
createQueryString(queryText,paramList), "ERROR", e.getMessage(), null);
throw e;
}
try
{
// If success, record that too.
activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
createQueryString(queryText,paramList), "OK", null, null);
// Now, go through resultset
while (true)
{
IDynamicResultRow row = result.getNextRow();
if (row == null)
break;
try
{
Object o = row.getValue(JDBCConstants.idReturnColumnName);
if (o == null)
throw new ManifoldCFException("Bad version query; doesn't return $(IDCOLUMN) column. Try using quotes around $(IDCOLUMN) variable, e.g. \"$(IDCOLUMN)\", or, for MySQL, select \"by label\" in your repository connection.");
String idValue = JDBCConnection.readAsString(o);
o = row.getValue(JDBCConstants.versionReturnColumnName);
String versionValue;
// Null version is OK; make it a ""
if (o == null)
versionValue = "";
else
versionValue = JDBCConnection.readAsString(o);
documentVersions.put(idValue,versionValue);
}
finally
{
row.close();
}
}
}
finally
{
result.close();
}
}
}
else
{
for (String documentIdentifier : documentIdentifiers)
{
documentVersions.put(documentIdentifier,"");
}
}
// Delete the documents that had no version, and work only on ones that did
final Set<String> fetchDocuments = documentVersions.keySet();
for (String documentIdentifier : documentIdentifiers)
{
String documentVersion = documentVersions.get(documentIdentifier);
if (documentVersion == null)
{
activities.deleteDocument(documentIdentifier);
continue;
}
}
// Pick up document acls
Map<String,Set<String>> documentAcls = new HashMap<String,Set<String>>();
if (ts.securityOn)
{
if (acls.size() == 0 && ts.aclQuery != null && ts.aclQuery.length() > 0)
{
// If there IS an acls query, do it. First set up the variables, then do the substitution.
VariableMap vm = new VariableMap();
addConstant(vm,JDBCConstants.idReturnVariable,JDBCConstants.idReturnColumnName);
addConstant(vm,JDBCConstants.tokenReturnVariable,JDBCConstants.tokenReturnColumnName);
if (addIDList(vm,JDBCConstants.idListVariable,documentIdentifiers,fetchDocuments))
{
// Do the substitution
ArrayList paramList = new ArrayList();
StringBuilder sb = new StringBuilder();
substituteQuery(ts.aclQuery,vm,sb,paramList);
// Fire off the query!
getSession();
IDynamicResultSet result;
String queryText = sb.toString();
long startTime = System.currentTimeMillis();
// Get a dynamic resultset. Contract for dynamic resultset is that if
// one is returned, it MUST be closed, or a connection will leak.
try
{
result = connection.executeUncachedQuery(queryText,paramList,-1);
}
catch (ManifoldCFException e)
{
// If failure, record the failure.
if (e.getErrorCode() != ManifoldCFException.INTERRUPTED)
activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
createQueryString(queryText,paramList), "ERROR", e.getMessage(), null);
throw e;
}
try
{
// If success, record that too.
activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
createQueryString(queryText,paramList), "OK", null, null);
// Now, go through resultset
while (true)
{
IDynamicResultRow row = result.getNextRow();
if (row == null)
break;
try
{
Object o = row.getValue(JDBCConstants.idReturnColumnName);
if (o == null)
throw new ManifoldCFException("Bad acl query; doesn't return $(IDCOLUMN) column. Try using quotes around $(IDCOLUMN) variable, e.g. \"$(IDCOLUMN)\", or, for MySQL, select \"by label\" in your repository connection.");
String idValue = JDBCConnection.readAsString(o);
o = row.getValue(JDBCConstants.tokenReturnColumnName);
String tokenValue;
if (o == null)
tokenValue = "";
else
tokenValue = JDBCConnection.readAsString(o);
// Versions that are "", when processed, will have their acls fetched at that time...
Set<String> dcs = documentAcls.get(idValue);
if (dcs == null)
{
dcs = new HashSet<String>();
documentAcls.put(idValue,dcs);
}
dcs.add(tokenValue);
}
finally
{
row.close();
}
}
}
finally
{
result.close();
}
}
}
else
{
for (String documentIdentifier : fetchDocuments)
{
documentAcls.put(documentIdentifier,acls);
}
}
}
// Map from identifier to version string
final Map<String,String> map = new HashMap<>();
// This is the set of documents actually seen
final Set<String> seenDocuments = new HashSet<>();
for (String documentIdentifier : fetchDocuments)
{
String documentVersion = documentVersions.get(documentIdentifier);
if (documentVersion.length() == 0)
{
map.put(documentIdentifier,documentVersion);
}
else
{
// Compute a full version string
StringBuilder sb = new StringBuilder();
Set<String> dAcls = documentAcls.get(documentIdentifier);
if (dAcls == null)
sb.append('-');
else
{
sb.append('+');
String[] aclValues = new String[dAcls.size()];
int k = 0;
for (String acl : dAcls)
{
aclValues[k++] = acl;
}
java.util.Arrays.sort(aclValues);
packList(sb,aclValues,'+');
}
sb.append(documentVersion).append("=").append(ts.dataQuery);
String versionValue = sb.toString();
if (activities.checkDocumentNeedsReindexing(documentIdentifier,versionValue))
{
map.put(documentIdentifier,versionValue);
}
}
}
// We have a primary query and a number of attribute queries to execute.
// We execute the attribute queries first because those do not include binary data.
final Map<String, Map<String, Set<String>>> attributeValues = new HashMap<String, Map<String, Set<String>>>();
int index = 0;
while (index < spec.getChildCount())
{
SpecificationNode sn = spec.getChild(index++);
if (sn.getType().equals(JDBCConstants.attributeQueryNode))
{
final String attributeName = sn.getAttributeValue(JDBCConstants.attributeName);
final String attributeQuery = sn.getValue();
// Fire off attribute query
VariableMap attrVm = new VariableMap();
addConstant(attrVm,JDBCConstants.idReturnVariable,JDBCConstants.idReturnColumnName);
addConstant(attrVm,JDBCConstants.dataReturnVariable,JDBCConstants.dataReturnColumnName);
if (!addIDList(attrVm,JDBCConstants.idListVariable,documentIdentifiers,map.keySet()))
continue;
// Do the substitution
ArrayList paramList = new ArrayList();
StringBuilder sb = new StringBuilder();
substituteQuery(attributeQuery,attrVm,sb,paramList);
// Fire off the query!
getSession();
IDynamicResultSet result;
String queryText = sb.toString();
long startTime = System.currentTimeMillis();
// Get a dynamic resultset. Contract for dynamic resultset is that if
// one is returned, it MUST be closed, or a connection will leak.
try
{
result = connection.executeUncachedQuery(queryText,paramList,-1);
}
catch (ManifoldCFException e)
{
// If failure, record the failure.
if (e.getErrorCode() != ManifoldCFException.INTERRUPTED)
activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
createQueryString(queryText,paramList), "ERROR", e.getMessage(), null);
throw e;
}
try
{
// If success, record that too.
activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
createQueryString(queryText,paramList), "OK", null, null);
// Now, go through resultset
while (true)
{
IDynamicResultRow row = result.getNextRow();
if (row == null)
break;
try
{
Object o = row.getValue(JDBCConstants.idReturnColumnName);
if (o == null)
throw new ManifoldCFException("Bad attribute query; doesn't return $(IDCOLUMN) column. Try using quotes around $(IDCOLUMN) variable, e.g. \"$(IDCOLUMN)\", or, for MySQL, select \"by label\" in your repository connection.");
String idValue = JDBCConnection.readAsString(o);
o = row.getValue(JDBCConstants.dataReturnColumnName);
String dataValue;
if (o == null)
dataValue = "";
else
dataValue = JDBCConnection.readAsString(o);
Map<String, Set<String>> avs = attributeValues.get(idValue);
if (avs == null)
{
avs = new HashMap<String, Set<String>>();
attributeValues.put(idValue,avs);
}
Set<String> dataValues = avs.get(attributeName);
if (dataValues == null)
{
dataValues = new HashSet<String>();
avs.put(attributeName, dataValues);
}
dataValues.add(dataValue);
}
finally
{
row.close();
}
}
}
finally
{
result.close();
}
}
}
// For all the documents not marked "scan only", form a query and pick up the contents.
// If the contents is not found, then explicitly call the delete action method.
VariableMap vm = new VariableMap();
addConstant(vm,JDBCConstants.idReturnVariable,JDBCConstants.idReturnColumnName);
addConstant(vm,JDBCConstants.urlReturnVariable,JDBCConstants.urlReturnColumnName);
addConstant(vm,JDBCConstants.dataReturnVariable,JDBCConstants.dataReturnColumnName);
addConstant(vm,JDBCConstants.contentTypeReturnVariable,JDBCConstants.contentTypeReturnColumnName);
if (!addIDList(vm,JDBCConstants.idListVariable,documentIdentifiers,map.keySet()))
return;
// Do the substitution
ArrayList paramList = new ArrayList();
StringBuilder sb = new StringBuilder();
substituteQuery(ts.dataQuery,vm,sb,paramList);
// Execute the query
getSession();
IDynamicResultSet result;
String queryText = sb.toString();
long startTime = System.currentTimeMillis();
// Get a dynamic resultset. Contract for dynamic resultset is that if
// one is returned, it MUST be closed, or a connection will leak.
try
{
result = connection.executeUncachedQuery(queryText,paramList,-1);
}
catch (ManifoldCFException e)
{
// If failure, record the failure.
activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
createQueryString(queryText,paramList), "ERROR", e.getMessage(), null);
throw e;
}
try
{
// If success, record that too.
activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
createQueryString(queryText,paramList), "OK", null, null);
while (true)
{
IDynamicResultRow row = result.getNextRow();
if (row == null)
break;
try
{
Object o = row.getValue(JDBCConstants.idReturnColumnName);
if (o == null)
throw new ManifoldCFException("Bad document query; doesn't return $(IDCOLUMN) column. Try using quotes around $(IDCOLUMN) variable, e.g. \"$(IDCOLUMN)\", or, for MySQL, select \"by label\" in your repository connection.");
final String id = JDBCConnection.readAsString(o);
seenDocuments.add(id);
String errorCode = null;
String errorDesc = null;
Long fileLengthLong = null;
long fetchStartTime = System.currentTimeMillis();
try
{
String version = map.get(id);
if (version == null)
// Does not need refetching
continue;
// This document was marked as "not scan only", so we expect to find it.
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("JDBC: Document data result found for '"+id+"'");
// We will determine the disposition of the document here, so remove this id from the map in order that we know what we still
// need to delete when all done.
map.remove(id);
o = row.getValue(JDBCConstants.urlReturnColumnName);
if (o == null)
{
Logging.connectors.debug("JDBC: Document '"+id+"' has a null url - skipping");
errorCode = activities.NULL_URL;
errorDesc = "Excluded because document had a null URL";
activities.noDocument(id,version);
continue;
}
// This is not right - url can apparently be a BinaryInput
String url = JDBCConnection.readAsString(o);
boolean validURL;
try
{
// Check to be sure url is valid
new java.net.URI(url);
validURL = true;
}
catch (java.net.URISyntaxException e)
{
validURL = false;
}
if (!validURL)
{
Logging.connectors.debug("JDBC: Document '"+id+"' has an illegal url: '"+url+"' - skipping");
errorCode = activities.BAD_URL;
errorDesc = "Excluded because document had illegal URL ('"+url+"')";
activities.noDocument(id,version);
continue;
}
// Process the document itself
Object contents = row.getValue(JDBCConstants.dataReturnColumnName);
// Null data is allowed; we just ignore these
if (contents == null)
{
Logging.connectors.debug("JDBC: Document '"+id+"' seems to have null data - skipping");
errorCode = "NULLDATA";
errorDesc = "Excluded because document had null data";
activities.noDocument(id,version);
continue;
}
String contentType;
o = row.getValue(JDBCConstants.contentTypeReturnColumnName);
if (o != null)
contentType = JDBCConnection.readAsString(o);
else
{
if (contents instanceof BinaryInput)
contentType = "application/octet-stream";
else if (contents instanceof CharacterInput)
contentType = "text/plain; charset=utf-8";
else
contentType = "text/plain";
}
if (!activities.checkMimeTypeIndexable(contentType))
{
Logging.connectors.debug("JDBC: Document '"+id+"' excluded because of mime type - skipping");
errorCode = activities.EXCLUDED_MIMETYPE;
errorDesc = "Excluded because of mime type ("+contentType+")";
activities.noDocument(id,version);
continue;
}
if (!activities.checkURLIndexable(url))
{
Logging.connectors.debug("JDBC: Document '"+id+"' excluded because of url - skipping");
errorCode = activities.EXCLUDED_URL;
errorDesc = "Excluded because of URL ('"+url+"')";
activities.noDocument(id,version);
continue;
}
// An ingestion will take place for this document.
RepositoryDocument rd = new RepositoryDocument();
rd.setMimeType(contentType);
applyMultiAttributeValues(rd,attributeValues.get(id));
applyAccessTokens(rd,documentAcls.get(id));
applyMetadata(rd,row);
if (contents instanceof BinaryInput)
{
BinaryInput bi = (BinaryInput)contents;
long fileLength = bi.getLength();
if (!activities.checkLengthIndexable(fileLength))
{
Logging.connectors.debug("JDBC: Document '"+id+"' excluded because of length - skipping");
errorCode = activities.EXCLUDED_LENGTH;
errorDesc = "Excluded because of length ("+fileLength+")";
activities.noDocument(id, version);
continue;
}
try
{
// Read the stream
InputStream is = bi.getStream();
try
{
rd.setBinary(is,fileLength);
activities.ingestDocumentWithException(id, version, url, rd);
errorCode = "OK";
fileLengthLong = new Long(fileLength);
}
finally
{
is.close();
}
}
catch (IOException e)
{
errorCode = e.getClass().getSimpleName().toUpperCase(Locale.ROOT);
errorDesc = e.getMessage();
handleIOException(id,e);
}
}
else if (contents instanceof CharacterInput)
{
CharacterInput ci = (CharacterInput)contents;
long fileLength = ci.getUtf8StreamLength();
if (!activities.checkLengthIndexable(fileLength))
{
Logging.connectors.debug("JDBC: Document '"+id+"' excluded because of length - skipping");
errorCode = activities.EXCLUDED_LENGTH;
errorDesc = "Excluded because of length ("+fileLength+")";
activities.noDocument(id, version);
continue;
}
try
{
// Read the stream
InputStream is = ci.getUtf8Stream();
try
{
rd.setBinary(is,fileLength);
activities.ingestDocumentWithException(id, version, url, rd);
errorCode = "OK";
fileLengthLong = new Long(fileLength);
}
finally
{
is.close();
}
}
catch (IOException e)
{
errorCode = e.getClass().getSimpleName().toUpperCase(Locale.ROOT);
errorDesc = e.getMessage();
handleIOException(id,e);
}
}
else
{
// Turn it into a string, and then into a stream
String value = contents.toString();
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
long fileLength = bytes.length;
if (!activities.checkLengthIndexable(fileLength))
{
Logging.connectors.debug("JDBC: Document '"+id+"' excluded because of length - skipping");
errorCode = activities.EXCLUDED_LENGTH;
errorDesc = "Excluded because of length ("+fileLength+")";
activities.noDocument(id, version);
continue;
}
try
{
InputStream is = new ByteArrayInputStream(bytes);
try
{
rd.setBinary(is,fileLength);
activities.ingestDocumentWithException(id, version, url, rd);
errorCode = "OK";
fileLengthLong = new Long(fileLength);
}
finally
{
is.close();
}
}
catch (IOException e)
{
errorCode = e.getClass().getSimpleName().toUpperCase(Locale.ROOT);
errorDesc = e.getMessage();
handleIOException(id,e);
}
}
}
catch (ManifoldCFException e)
{
if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
errorCode = null;
throw e;
}
finally
{
if (errorCode != null)
activities.recordActivity(new Long(fetchStartTime), ACTIVITY_FETCH,
fileLengthLong, id, errorCode, errorDesc, null);
}
}
finally
{
row.close();
}
}
}
finally
{
result.close();
}
// Now, go through the original id's, and see which ones are still in the map. These
// did not appear in the result and are presumed to be gone from the database, and thus must be deleted.
for (final String documentIdentifier : fetchDocuments)
{
if (!seenDocuments.contains(documentIdentifier))
{
// Never saw it in the fetch attempt
activities.deleteDocument(documentIdentifier);
}
else
{
// Saw it in the fetch attempt, and we might have fetched it
final String documentVersion = map.get(documentIdentifier);
if (documentVersion != null)
{
// This means we did not see it (or data for it) in the result set. Delete it!
activities.noDocument(documentIdentifier,documentVersion);
activities.recordActivity(null, ACTIVITY_FETCH,
null, documentIdentifier, "NOTFETCHED", "Document was not seen by processing query", null);
}
}
}
}