in connectors/documentum/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/DCTM/DCTM.java [1040:1340]
public String addSeedDocuments(ISeedingActivity activities, Specification spec,
String lastSeedVersion, long seedTime, int jobMode)
throws ManifoldCFException, ServiceInterruption
{
// Extract startTime
long startTime;
if (lastSeedVersion == null)
startTime = 0L;
else
{
// Unpack seed time from seed version string
startTime = new Long(lastSeedVersion).longValue();
}
// First, build the query
StringBuilder strLocationsClause = new StringBuilder();
Map<String,Map<String,Map<String,Set<String>>>> tokenList = new HashMap<String,Map<String,Map<String,Set<String>>>>();
List<String> contentList = null;
boolean seenAllMimeTypes = false;
boolean allMimeTypes = false;
String maxSize = null;
for (int i = 0; i < spec.getChildCount(); i++)
{
SpecificationNode n = spec.getChild(i);
if (n.getType().equals(CONFIG_PARAM_LOCATION))
{
String strLocation = n.getAttributeValue("path");
if (strLocation != null && strLocation.length() > 0)
{
if (strLocationsClause != null && strLocationsClause.length() > 0)
{
strLocationsClause.append(" OR Folder('").append(strLocation).append("', DESCEND)");
}
else
{
strLocationsClause.append("Folder('").append(strLocation).append("', DESCEND)");
}
}
}
else if (n.getType().equals(CONFIG_PARAM_OBJECTTYPE))
{
String objType = n.getAttributeValue("token");
Map<String,Map<String,Set<String>>> filters = tokenList.get(objType);
if (filters == null)
{
filters = new HashMap<String,Map<String,Set<String>>>();
tokenList.put(objType,filters);
}
// Go through children and pick out filters
for (int j = 0; j < n.getChildCount(); j++)
{
SpecificationNode sn = n.getChild(j);
if (sn.getType().equals(CONFIG_PARAM_FILTER))
{
String attributeName = sn.getAttributeValue("name");
String operation = sn.getAttributeValue("op");
String value = sn.getAttributeValue("value");
Map<String,Set<String>> operations = filters.get(attributeName);
if (operations == null)
{
operations = new HashMap<String,Set<String>>();
filters.put(attributeName,operations);
}
Set<String> values = operations.get(operation);
if (values == null)
{
values = new HashSet<String>();
operations.put(operation,values);
}
values.add(value);
}
}
}
else if (n.getType().equals(CONFIG_PARAM_FORMAT_ALL))
{
seenAllMimeTypes = true;
String all = n.getAttributeValue("value");
if (all.equals("true"))
{
allMimeTypes = true;
}
}
else if (n.getType().equals(CONFIG_PARAM_FORMAT))
{
seenAllMimeTypes = true;
String docType = n.getAttributeValue("value");
if (contentList == null)
contentList = new ArrayList<String>();
contentList.add(docType);
}
else if (n.getType().equals(CONFIG_PARAM_MAXLENGTH))
{
maxSize = n.getAttributeValue("value");
}
}
if (tokenList.size() == 0)
{
Logging.connectors.debug("DCTM: No ObjectType found in Document Spec. Setting it to dm_document");
tokenList.put("dm_document",new HashMap<String,Map<String,Set<String>>>());
}
if (strLocationsClause.length() < 1)
{
Logging.connectors.debug("DCTM: No location found in document specification. Search will be across entire docbase");
}
try
{
String strDQLstart = "select for READ distinct i_chronicle_id from ";
// There seems to be some unexplained slop in the latest DCTM version. It misses documents depending on how close to the r_modify_date you happen to be.
// So, I've decreased the start time by a full five minutes, to insure overlap.
if (startTime > 300000L)
startTime = startTime - 300000L;
else
startTime = 0L;
StringBuilder strDQLend = new StringBuilder(" where r_modify_date >= " + buildDateString(startTime) +
" and r_modify_date<=" + buildDateString(seedTime) +
" AND (i_is_deleted=TRUE Or (i_is_deleted=FALSE AND a_full_text=TRUE AND r_content_size>=0");
// append maxsize if set
if (maxSize != null && maxSize.length() > 0)
{
strDQLend.append(" AND r_content_size<=").append(maxSize);
}
// If we don't even see the allmimetypes record, we emit no restriction
if (seenAllMimeTypes == true && allMimeTypes == false)
{
String[] dctmTypes = convertToDCTMTypes(contentList);
if (dctmTypes == null || dctmTypes.length == 0)
strDQLend.append(" AND 1<0");
else
{
strDQLend.append(" AND a_content_type IN (");
boolean commaNeeded = false;
for (String cType : dctmTypes)
{
if (commaNeeded)
strDQLend.append(",");
else
commaNeeded = true;
strDQLend.append(quoteDQLString(cType));
}
strDQLend.append(")");
}
}
// End the clause for non-deleted documents
strDQLend.append("))");
// append location on if it is provided. This will apply to both deleted and non-deleted documents.
if (strLocationsClause.length() > 0)
{
strDQLend.append(" AND ( " + strLocationsClause.toString() + " )");
}
// Now, loop through the documents and queue them up.
for (String tokenValue : tokenList.keySet())
{
activities.checkJobStillActive();
// Construct the filter part of the DQL query
Map<String,Map<String,Set<String>>> filters = tokenList.get(tokenValue);
StringBuilder filterPart = new StringBuilder();
// For each attribute, go through the operations and emit an AND clause
for (String attributeName : filters.keySet())
{
Map<String,Set<String>> operations = filters.get(attributeName);
for (String operation : operations.keySet())
{
Set<String> values = operations.get(operation);
if (operation.equals("="))
{
filterPart.append(" AND \"").append(attributeName).append("\"").append(" IN (");
boolean commaNeeded = false;
for (String value : values)
{
if (commaNeeded)
filterPart.append(",");
else
commaNeeded = true;
filterPart.append(quoteDQLString(value));
}
filterPart.append(")");
}
else if (operation.equals("<>"))
{
filterPart.append(" AND (");
boolean andNeeded = false;
for (String value : values)
{
if (andNeeded)
filterPart.append(" AND ");
else
andNeeded = true;
filterPart.append("\"").append(attributeName).append("\"").append("<>").append(quoteDQLString(value));
}
filterPart.append(")");
}
else
throw new ManifoldCFException("Unrecognized operation: "+operation);
}
}
String strDQL = strDQLstart + tokenValue + strDQLend + filterPart;
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("DCTM: About to execute query= (" + strDQL + ")");
while (true)
{
boolean noSession = (session==null);
getSession();
try
{
StringQueue stringQueue = new StringQueue();
GetDocumentsFromQueryThread t = new GetDocumentsFromQueryThread(strDQL,stringQueue);
t.start();
try
{
int checkIndex = 0;
// Loop through return values and add them until done is signalled
while (true)
{
if (checkIndex == 10)
{
activities.checkJobStillActive();
checkIndex = 0;
}
checkIndex++;
String next = stringQueue.getNext();
if (next == null)
break;
activities.addSeedDocument(next);
}
t.finishUp();
// Go on to next document type and repeat
break;
}
catch (InterruptedException e)
{
t.abort();
// This is just a courtesy; the thread will be killed regardless on process exit
t.interrupt();
// It's ok to leave the thread still active; we'll be shutting down anyway.
throw e;
}
catch (ManifoldCFException e)
{
t.abort();
// We need the join, because we really don't want this documentum session to be
// still busy when we leave.
t.join();
throw e;
}
catch (ServiceInterruption e)
{
t.abort();
// We need the join, because we really don't want this documentum session to be
// still busy when we leave.
t.join();
throw e;
}
}
catch (InterruptedException e)
{
throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
}
catch (RemoteException e)
{
Throwable e2 = e.getCause();
if (e2 instanceof InterruptedException || e2 instanceof InterruptedIOException)
throw new ManifoldCFException(e2.getMessage(),e2,ManifoldCFException.INTERRUPTED);
if (noSession)
{
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("Transient error connecting to documentum service: "+e.getMessage(),currentTime + 60000L);
}
session = null;
lastSessionFetch = -1L;
// Go back around again
}
}
}
}
catch (DocumentumException e)
{
// Base our treatment on the kind of error it is.
if (e.getType() == DocumentumException.TYPE_SERVICEINTERRUPTION)
{
long currentTime = System.currentTimeMillis();
Logging.connectors.warn("DCTM: Remote service interruption getting versions: "+e.getMessage(),e);
throw new ServiceInterruption(e.getMessage(),e,currentTime + 300000L, currentTime + 12 * 60 * 60000L,-1,true);
}
throw new ManifoldCFException(e.getMessage(),e);
}
return new Long(seedTime).toString();
}