in loghubreader/src/main/java/com/alibaba/datax/plugin/reader/loghubreader/LogHubReader.java [315:472]
public void startRead(RecordSender recordSender) {
LOG.info("read start");
try {
GetCursorResponse cursorRes;
if (this.beginTimestampMillis != -1) {
cursorRes = getCursorWithRetry(client, project, logstore, this.shard, beginTimestampMillis);
} else {
cursorRes = getCursorWithRetry(client, project, logstore, this.shard, CursorMode.BEGIN);
}
String beginCursor = cursorRes.GetCursor();
LOG.info("the begin cursor, loghub requestId: {} cursor: {}", cursorRes.GetRequestId(), cursorRes.GetCursor());
if (this.endTimestampMillis != -1) {
cursorRes = getCursorWithRetry(client, project, logstore, this.shard, endTimestampMillis);
} else {
cursorRes = getCursorWithRetry(client, project, logstore, this.shard, CursorMode.END);
}
String endCursor = cursorRes.GetCursor();
LOG.info("the end cursor, loghub requestId: {} cursor: {}", cursorRes.GetRequestId(), cursorRes.GetCursor());
if (StringUtils.equals(beginCursor, endCursor)) {
LOG.info("beginCursor:{} equals endCursor:{}, end directly!", beginCursor, endCursor);
return;
}
String currentCursor = null;
String nextCursor = beginCursor;
HashMap<String, String> metaMap = new HashMap<String, String>();
HashMap<String, String> dataMap = new HashMap<String, String>();
JSONObject allMetaJson = new JSONObject();
while (!StringUtils.equals(currentCursor, nextCursor)) {
currentCursor = nextCursor;
BatchGetLogResponse logDataRes = batchGetLogWithRetry(client, project, logstore, this.shard, this.batchSize, currentCursor, endCursor);
List<LogGroupData> logGroups = logDataRes.GetLogGroups();
for(LogGroupData logGroup: logGroups) {
metaMap.clear();
allMetaJson.clear();
FastLogGroup flg = logGroup.GetFastLogGroup();
metaMap.put("C_Category", flg.getCategory());
metaMap.put(Constant.META_COL_CATEGORY, flg.getCategory());
allMetaJson.put(Constant.META_COL_CATEGORY, flg.getCategory());
metaMap.put("C_Source", flg.getSource());
metaMap.put(Constant.META_COL_SOURCE, flg.getSource());
allMetaJson.put(Constant.META_COL_SOURCE, flg.getSource());
metaMap.put("C_Topic", flg.getTopic());
metaMap.put(Constant.META_COL_TOPIC, flg.getTopic());
allMetaJson.put(Constant.META_COL_TOPIC, flg.getTopic());
metaMap.put("C_MachineUUID", flg.getMachineUUID());
metaMap.put(Constant.META_COL_MACHINEUUID, flg.getMachineUUID());
allMetaJson.put(Constant.META_COL_MACHINEUUID, flg.getMachineUUID());
for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
FastLogTag logtag = flg.getLogTags(tagIdx);
String tagKey = logtag.getKey();
String tagValue = logtag.getValue();
if (tagKey.equals(Constant.META_COL_HOSTNAME)) {
metaMap.put("C_HostName", logtag.getValue());
} else if (tagKey.equals(Constant.META_COL_PATH)) {
metaMap.put("C_Path", logtag.getValue());
}
metaMap.put(tagKey, tagValue);
allMetaJson.put(tagKey, tagValue);
}
for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
dataMap.clear();
FastLog log = flg.getLogs(lIdx);
String logTime = String.valueOf(log.getTime());
metaMap.put("C_LogTime", logTime);
metaMap.put(Constant.META_COL_LOGTIME, logTime);
allMetaJson.put(Constant.META_COL_LOGTIME, logTime);
for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
FastLogContent content = log.getContents(cIdx);
dataMap.put(content.getKey(), content.getValue());
}
Record record = recordSender.createRecord();
JSONObject extractOthers = new JSONObject();
if(columns.contains(Constant.COL_EXTRACT_OTHERS)){
List<String> keyList = Arrays.asList(dataMap.keySet().toArray(new String[dataMap.keySet().size()]));
for (String otherKey:keyList) {
if (!columns.contains(otherKey)){
extractOthers.put(otherKey,dataMap.get(otherKey));
}
}
}
if (null != this.columns && 1 == this.columns.size()) {
String columnsInStr = columns.get(0).toString();
if ("\"*\"".equals(columnsInStr) || "*".equals(columnsInStr)) {
List<String> keyList = Arrays.asList(dataMap.keySet().toArray(new String[dataMap.keySet().size()]));
Collections.sort(keyList);
for (String key : keyList) {
record.addColumn(new StringColumn(key + ":" + dataMap.get(key)));
}
} else {
if (dataMap.containsKey(columnsInStr)) {
record.addColumn(new StringColumn(dataMap.get(columnsInStr)));
} else if (metaMap.containsKey(columnsInStr)) {
record.addColumn(new StringColumn(metaMap.get(columnsInStr)));
} else if (Constant.COL_EXTRACT_OTHERS.equals(columnsInStr)){
record.addColumn(new StringColumn(extractOthers.toJSONString()));
} else if (Constant.COL_EXTRACT_ALL_META.equals(columnsInStr)) {
record.addColumn(new StringColumn(allMetaJson.toJSONString()));
}
}
} else {
for (String col : this.columns) {
if (dataMap.containsKey(col)) {
record.addColumn(new StringColumn(dataMap.get(col)));
} else if (metaMap.containsKey(col)) {
record.addColumn(new StringColumn(metaMap.get(col)));
} else if (col != null && col.startsWith("'") && col.endsWith("'")){
String constant = col.substring(1, col.length()-1);
record.addColumn(new StringColumn(constant));
}else if (Constant.COL_EXTRACT_OTHERS.equals(col)){
record.addColumn(new StringColumn(extractOthers.toJSONString()));
} else if (Constant.COL_EXTRACT_ALL_META.equals(col)) {
record.addColumn(new StringColumn(allMetaJson.toJSONString()));
} else {
record.addColumn(new StringColumn(null));
}
}
}
recordSender.sendToWriter(record);
}
}
nextCursor = logDataRes.GetNextCursor();
}
} catch (LogException e) {
if (e.GetErrorCode().equals("LogStoreNotExist")) {
LOG.info("logStore[" + logstore +"] Not Exits! detail error messsage: " + e.toString());
} else {
LOG.error("read LogStore[" + logstore + "] error, please check ! detail error messsage: " + e.toString());
throw DataXException.asDataXException(LogHubReaderErrorCode.LOG_HUB_ERROR, e);
}
} catch (Exception e) {
LOG.error("read LogStore[" + logstore + "] error, please check ! detail error messsage: " + e.toString());
throw DataXException.asDataXException(LogHubReaderErrorCode.LOG_HUB_ERROR, e);
}
LOG.info("end read loghub shard...");
}