in connectors/solr/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/solr/SolrIngesterConnector.java [468:806]
public void processDocuments(final String[] documentIdentifiers, final IExistingVersions statuses, final Specification spec, final IProcessActivity activities, final int jobMode,
final boolean usesDefaultAuthority) throws ManifoldCFException, ServiceInterruption {
getSession();
if (Logging.connectors.isDebugEnabled()) {
Logging.connectors.debug("SolrIngester: ProcessDocuments method");
}
// Get parameters configuration
String collection = null;
String idFieldName = null;
boolean securityActivated = false;
String dateField = null;
String contentField = null;
String securityField = null;
String securityField2 = null;
String rowsNumberString = null;
String errorCode = null;
String description = "";
final long startFetchTime = System.currentTimeMillis();
// Hashmap
final Map<String, String> mapFields = new HashMap<String, String>();
for (int l = 0; l < spec.getChildCount(); l++) {
final SpecificationNode sn = spec.getChild(l);
if (sn.getType() == SolrIngesterConfig.COLLECTION_NAME) {
collection = sn.getAttributeValue(SolrIngesterConfig.ATTRIBUTE_VALUE);
}
if (sn.getType() == SolrIngesterConfig.SECURITY_ACTIVATED) {
securityActivated = Boolean.parseBoolean(sn.getAttributeValue(SolrIngesterConfig.ATTRIBUTE_VALUE));
}
if (sn.getType() == SolrIngesterConfig.ID_FIELD) {
idFieldName = sn.getAttributeValue(SolrIngesterConfig.ATTRIBUTE_VALUE);
}
if (sn.getType() == SolrIngesterConfig.DATE_FIELD) {
dateField = sn.getAttributeValue(SolrIngesterConfig.ATTRIBUTE_VALUE);
}
if (sn.getType() == SolrIngesterConfig.CONTENT_FIELD) {
contentField = sn.getAttributeValue(SolrIngesterConfig.ATTRIBUTE_VALUE);
}
if (sn.getType() == SolrIngesterConfig.SECURITY_FIELD) {
securityField = sn.getAttributeValue(SolrIngesterConfig.ATTRIBUTE_VALUE);
}
if (sn.getType() == SolrIngesterConfig.SECURITY_FIELD2) {
securityField2 = sn.getAttributeValue(SolrIngesterConfig.ATTRIBUTE_VALUE);
}
if (sn.getType().equals(SolrIngesterConfig.NODE_FIELDMAP)) {
final String source = sn.getAttributeValue(SolrIngesterConfig.ATTRIBUTE_SOURCE);
final String target = sn.getAttributeValue(SolrIngesterConfig.ATTRIBUTE_TARGET);
mapFields.put(source, target);
}
if (sn.getType() == SolrIngesterConfig.ROWS_NUMBER) {
rowsNumberString = sn.getAttributeValue(SolrIngesterConfig.ATTRIBUTE_VALUE);
}
}
final String versionString = null;
final StringBuilder url = new StringBuilder(solringesterEntryPoint);
url.append("?").append(ACTION_PARAM_NAME).append("=").append(ACTION_ITEMS);
for (int i = 0; i < documentIdentifiers.length; i++) {
url.append("&id[]=").append(URLEncoder.encode(documentIdentifiers[i]));
}
/*
* Step 1 query that gets idFieldName and versionFieldName
*
*/
final String documentIdentifiersString = "\"" + String.join("\" OR \"", documentIdentifiers) + "\"";
final int rowsNumber = Integer.valueOf(rowsNumberString);
final HashMap<String, String> existingIds = new HashMap<String, String>();
if (Logging.connectors.isDebugEnabled()) {
Logging.connectors.debug("SolrIngester: docidentifiers size '" + documentIdentifiers.length);
}
if (documentIdentifiers.length > 0) {
try {
SolrQuery query;
query = new SolrQuery("*:*").setRows(rowsNumber).setSort(idFieldName, SolrQuery.ORDER.asc);
query.setFields(idFieldName, versionField);
query.addFilterQuery(idFieldName + ":(" + documentIdentifiersString + ")");
String cursorMark = CursorMarkParams.CURSOR_MARK_START;
boolean done = false;
while (!done) {
query.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
QueryResponse response;
response = httpSolrClient.query(collection, query);
String nextCursorMark = cursorMark;
try {
nextCursorMark = response.getNextCursorMark();
} catch (Exception e) {
// Something wrong with the response, break
Logging.connectors.warn("SolrIngester: the type of the reponse getnextcursormark is not recognized as a String");
break;
}
final SolrDocumentList documents = response.getResults();
for (final SolrDocument document : documents) {
existingIds.put((String) document.getFieldValue(idFieldName), String.valueOf(document.getFieldValue(versionField)));
}
if (cursorMark.equals(nextCursorMark)) {
done = true;
}
cursorMark = nextCursorMark;
}
/*
* End step 1
*/
/*
* Step 2 Compare the version stored in MCF and store id without version or with different version
*
*/
final Set<String> toProcess = new HashSet<String>();
final Iterator it = existingIds.entrySet().iterator();
while (it.hasNext()) {
final Map.Entry pair = (Map.Entry) it.next();
final String idStringBis = (String) pair.getKey();
final String versionStringBis = (String) pair.getValue();
if (versionStringBis.length() == 0 || activities.checkDocumentNeedsReindexing(idStringBis, versionStringBis)) {
toProcess.add("\"" + (String) pair.getKey() + "\"");
}
// it.remove(); // avoids a ConcurrentModificationException
}
/*
* Step 3 : Build query from toProcess list and index all the content
*/
if (toProcess.size() > 0) {
final List<String> listToProcess = new ArrayList<String>(toProcess);
final String listToProcessString = String.join(" OR ", listToProcess);
// Process the document
final RepositoryDocument doc = new RepositoryDocument();
query = new SolrQuery("*:*").setRows(rowsNumber).setSort(idFieldName, SolrQuery.ORDER.asc);
query.setFields("*");
query.addFilterQuery(idFieldName + ":(" + listToProcessString + ")");
cursorMark = CursorMarkParams.CURSOR_MARK_START;
done = false;
while (!done) {
query.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
QueryResponse response;
SolrDocumentList documents = null;
response = httpSolrClient.query(collection, query);
String nextCursorMark = cursorMark;
try {
nextCursorMark = response.getNextCursorMark();
} catch (Exception e) {
// Something wrong with the response, break
Logging.connectors.warn("SolrIngester: the type of the reponse getnextcursormark is not recognized as a String");
break;
}
documents = response.getResults();
InputStream is = null;
for (final SolrDocument document : documents) {
for (final Map.Entry<String, String> entry : mapFields.entrySet()) {
ArrayList<Object> listFieldValues = null;
if (document.getFieldValues(entry.getKey()) != null) {
listFieldValues = (ArrayList<Object>) document.getFieldValues(entry.getKey());
if (listFieldValues != null) {
// TODO
// For now supports String, int, long, date fields
if (listFieldValues.get(0) instanceof String) {
final String[] tablistFieldValues = listFieldValues.toArray(new String[0]);
doc.addField(entry.getValue(), tablistFieldValues);
}
else if (listFieldValues.get(0) instanceof Long) {
final Long[] tablistFieldValues = listFieldValues.toArray(new Long[0]);
String[] string_list = new String[tablistFieldValues.length];
for(int i = 0; i < tablistFieldValues.length; i++){
string_list[i] = String.valueOf(tablistFieldValues[i]);
}
doc.addField(entry.getValue(), string_list);
}
else if (listFieldValues.get(0) instanceof Integer) {
final Integer[] tablistFieldValues = listFieldValues.toArray(new Integer[0]);
String[] string_list = new String[tablistFieldValues.length];
for(int i = 0; i < tablistFieldValues.length; i++){
string_list[i] = String.valueOf(tablistFieldValues[i]);
}
doc.addField(entry.getValue(), string_list);
}
else if (listFieldValues.get(0) instanceof Date) {
final Date[] tablistFieldValues = listFieldValues.toArray(new Date[0]);
doc.addField(entry.getValue(), tablistFieldValues);
}
else {
Logging.connectors.warn("SolrIngester: the type of the field "+entry.getKey() + " is not recognized");
}
}
}
}
// TODO
// For now you can indicate the date field of the source. But the date field of the target is hardcoded and its value is last_modified
if (document.get(dateField)!= null) {
doc.addField(date_target_field, (Date) document.getFirstValue(dateField));
}
doc.setFileName((String) document.getFieldValue(idFieldName));
if (Logging.connectors.isDebugEnabled()) {
Logging.connectors.debug("doc '" + (String) document.getFieldValue(idFieldName));
}
// Content part
ArrayList<Object> contentFieldValues = null;
String contentFieldValuesString = "";
if (document.getFieldValues(contentField) != null) {
contentFieldValues = (ArrayList<Object>) document.getFieldValues(contentField);
if (contentFieldValues != null) {
final String[] tabContentFieldValues = contentFieldValues.toArray(new String[0]);
for (final String s : tabContentFieldValues) {
contentFieldValuesString = contentFieldValuesString + " " + s;
}
}
} else if (document.getFieldValue(contentField) != null) {
contentFieldValuesString = (String) document.getFieldValue(contentField);
} else {
contentFieldValuesString = "";
}
is = new ByteArrayInputStream(contentFieldValuesString.getBytes(StandardCharsets.UTF_8));
// security part
if (securityActivated == true) {
if (Logging.connectors.isDebugEnabled()) {
Logging.connectors.debug("Security part");
}
if (document.getFieldValues(securityField) != null && document.getFieldValues(securityField2) != null) {
ArrayList<Object> securityFieldValues = null;
ArrayList<Object> securityFieldValues2 = null;
securityFieldValues = (ArrayList<Object>) document.getFieldValues(securityField);
securityFieldValues2 = (ArrayList<Object>) document.getFieldValues(securityField2);
if (Logging.connectors.isDebugEnabled()) {
Logging.connectors.debug("Security field 1 : " + securityFieldValues.toString());
Logging.connectors.debug("Security field 2 : " + securityFieldValues2.toString());
}
String[] securityValues = null;
final ArrayList<Object> all = new ArrayList<Object>();
all.addAll(securityFieldValues);
all.addAll(securityFieldValues2);
final HashSet hs = new HashSet();
hs.addAll(all);
all.clear();
all.addAll(hs);
securityValues = all.toArray(new String[0]);
doc.setSecurity(RepositoryDocument.SECURITY_TYPE_DOCUMENT, securityValues, new String[] { GLOBAL_DENY_TOKEN });
securityFieldValues = null;
securityFieldValues2 = null;
} else if (document.getFieldValues(securityField) != null) {
ArrayList<Object> securityFieldValues = null;
securityFieldValues = (ArrayList<Object>) document.getFieldValues(securityField);
String[] tabsecurityFieldValues = securityFieldValues.toArray(new String[0]);
doc.setSecurity(RepositoryDocument.SECURITY_TYPE_DOCUMENT, tabsecurityFieldValues, new String[] { GLOBAL_DENY_TOKEN });
securityFieldValues = null;
tabsecurityFieldValues = null;
} else if (document.getFieldValues(securityField2) != null) {
ArrayList<Object> securityFieldValues2 = null;
securityFieldValues2 = (ArrayList<Object>) document.getFieldValues(securityField2);
String[] tabsecurityFieldValues2 = securityFieldValues2.toArray(new String[0]);
doc.setSecurity(RepositoryDocument.SECURITY_TYPE_DOCUMENT, tabsecurityFieldValues2, new String[] { GLOBAL_DENY_TOKEN });
securityFieldValues2 = null;
tabsecurityFieldValues2 = null;
}
}
// Can only index while background thread is running!
try {
doc.setBinary(is, is.available());
// doc.setFileName(documentIdentifier);
activities.ingestDocumentWithException(String.valueOf(document.getFieldValue(idFieldName)), String.valueOf(document.getFieldValue(versionField)),
String.valueOf(document.getFieldValue(idFieldName)), doc);
activities.recordActivity(new Long(startFetchTime), ACTIVITY_GET, doc.getBinaryLength(), document.getFieldValue(idFieldName).toString(), "OK", "", null);
is.close();
} catch (final IOException e) {
errorCode = "ERROR";
description = "Unable to perform Solr requests";
Logging.connectors.error("Unable to perform Solr requests", e);
}
}
if (cursorMark.equals(nextCursorMark)) {
done = true;
}
cursorMark = nextCursorMark;
}
}
} catch (final SolrServerException | IOException e) {
errorCode = "ERROR";
description = "Unable to perform Solr requests";
Logging.connectors.error("Unable to perform Solr requests", e);
}
/*
* Etape 4 Iterate into documentIdentifiers and check if each id is present into existingIds. If not, delete the doc */
for (int i = 0; i < documentIdentifiers.length; i++) {
if (!existingIds.containsKey(documentIdentifiers[i])) {
activities.deleteDocument(documentIdentifiers[i]);
activities.recordActivity(new Long(startFetchTime), ACTIVITY_GET, 0L, documentIdentifiers[i], "NOTFOUND", "Document not found in Solr", null);
} else if (errorCode != null) {
activities.recordActivity(new Long(startFetchTime), ACTIVITY_GET, 0L, documentIdentifiers[i], errorCode, description, null);
}
}
}
}