public void startRead()

in loghubreader/src/main/java/com/alibaba/datax/plugin/reader/loghubreader/LogHubReader.java [315:472]


        public void startRead(RecordSender recordSender) {
            LOG.info("read start");
            
            try {                                
                GetCursorResponse cursorRes;
                if (this.beginTimestampMillis != -1) {
                    cursorRes = getCursorWithRetry(client, project, logstore, this.shard, beginTimestampMillis);                     
                } else {
                    cursorRes = getCursorWithRetry(client, project, logstore, this.shard, CursorMode.BEGIN);
                }
                String beginCursor = cursorRes.GetCursor();
                
                LOG.info("the begin cursor, loghub requestId: {} cursor: {}", cursorRes.GetRequestId(), cursorRes.GetCursor());
                                
                if (this.endTimestampMillis != -1) {
                    cursorRes = getCursorWithRetry(client, project, logstore, this.shard, endTimestampMillis); 
                } else {
                    cursorRes = getCursorWithRetry(client, project, logstore, this.shard, CursorMode.END);
                }
                String endCursor = cursorRes.GetCursor();
                LOG.info("the end cursor, loghub requestId: {} cursor: {}", cursorRes.GetRequestId(), cursorRes.GetCursor());

                if (StringUtils.equals(beginCursor, endCursor)) {
                    LOG.info("beginCursor:{} equals endCursor:{}, end directly!", beginCursor, endCursor);
                    return;
                }

                String currentCursor = null;
                String nextCursor = beginCursor;

                HashMap<String, String> metaMap = new HashMap<String, String>();
                HashMap<String, String> dataMap = new HashMap<String, String>();
                JSONObject allMetaJson = new JSONObject();
                while (!StringUtils.equals(currentCursor, nextCursor)) {
                    currentCursor = nextCursor;
                    BatchGetLogResponse logDataRes = batchGetLogWithRetry(client, project, logstore, this.shard, this.batchSize, currentCursor, endCursor);
                                        
                    List<LogGroupData> logGroups = logDataRes.GetLogGroups();
                    
                    for(LogGroupData logGroup: logGroups) {
                        metaMap.clear();
                        allMetaJson.clear();
                        FastLogGroup flg = logGroup.GetFastLogGroup();

                        metaMap.put("C_Category", flg.getCategory());
                        metaMap.put(Constant.META_COL_CATEGORY, flg.getCategory());
                        allMetaJson.put(Constant.META_COL_CATEGORY, flg.getCategory());

                        metaMap.put("C_Source", flg.getSource());
                        metaMap.put(Constant.META_COL_SOURCE, flg.getSource());
                        allMetaJson.put(Constant.META_COL_SOURCE, flg.getSource());

                        metaMap.put("C_Topic", flg.getTopic());
                        metaMap.put(Constant.META_COL_TOPIC, flg.getTopic());
                        allMetaJson.put(Constant.META_COL_TOPIC, flg.getTopic());

                        metaMap.put("C_MachineUUID", flg.getMachineUUID());
                        metaMap.put(Constant.META_COL_MACHINEUUID, flg.getMachineUUID());
                        allMetaJson.put(Constant.META_COL_MACHINEUUID, flg.getMachineUUID());
                                                                    
                        for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
                            FastLogTag logtag = flg.getLogTags(tagIdx);
                            String tagKey = logtag.getKey();
                            String tagValue = logtag.getValue();
                            if (tagKey.equals(Constant.META_COL_HOSTNAME)) {
                                metaMap.put("C_HostName", logtag.getValue());
                            } else if (tagKey.equals(Constant.META_COL_PATH)) {
                                metaMap.put("C_Path", logtag.getValue());
                            }
                            metaMap.put(tagKey, tagValue);
                            allMetaJson.put(tagKey, tagValue);
                        }

                        for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
                            dataMap.clear();
                            FastLog log = flg.getLogs(lIdx);
                            
                            String logTime = String.valueOf(log.getTime());
                            metaMap.put("C_LogTime", logTime);
                            metaMap.put(Constant.META_COL_LOGTIME, logTime);
                            allMetaJson.put(Constant.META_COL_LOGTIME, logTime);

                            for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
                                FastLogContent content = log.getContents(cIdx);
                                dataMap.put(content.getKey(), content.getValue());  
                            }
                            
                            Record record = recordSender.createRecord();

                            JSONObject extractOthers = new JSONObject();
                            if(columns.contains(Constant.COL_EXTRACT_OTHERS)){
                                List<String> keyList = Arrays.asList(dataMap.keySet().toArray(new String[dataMap.keySet().size()]));
                                for (String otherKey:keyList) {
                                    if (!columns.contains(otherKey)){
                                        extractOthers.put(otherKey,dataMap.get(otherKey));
                                    }
                                }
                            }
                            if (null != this.columns && 1 == this.columns.size()) {
                                String columnsInStr = columns.get(0).toString();
                                if ("\"*\"".equals(columnsInStr) || "*".equals(columnsInStr)) {
                                    List<String> keyList = Arrays.asList(dataMap.keySet().toArray(new String[dataMap.keySet().size()]));
                                    Collections.sort(keyList);

                                    for (String key : keyList) {
                                        record.addColumn(new StringColumn(key + ":" + dataMap.get(key)));
                                    }
                                } else {
                                    if (dataMap.containsKey(columnsInStr)) {
                                        record.addColumn(new StringColumn(dataMap.get(columnsInStr)));
                                    } else if (metaMap.containsKey(columnsInStr)) {
                                        record.addColumn(new StringColumn(metaMap.get(columnsInStr)));
                                    } else if (Constant.COL_EXTRACT_OTHERS.equals(columnsInStr)){
                                        record.addColumn(new StringColumn(extractOthers.toJSONString()));
                                    } else if (Constant.COL_EXTRACT_ALL_META.equals(columnsInStr)) {
                                        record.addColumn(new StringColumn(allMetaJson.toJSONString()));
                                    }
                                }
                            } else {
                                for (String col : this.columns) {
                                    if (dataMap.containsKey(col)) {
                                        record.addColumn(new StringColumn(dataMap.get(col)));
                                    } else if (metaMap.containsKey(col)) {
                                        record.addColumn(new StringColumn(metaMap.get(col)));
                                    } else if (col != null && col.startsWith("'") && col.endsWith("'")){
                                        String constant = col.substring(1, col.length()-1);
                                        record.addColumn(new StringColumn(constant));
                                    }else if (Constant.COL_EXTRACT_OTHERS.equals(col)){
                                        record.addColumn(new StringColumn(extractOthers.toJSONString()));
                                    } else if (Constant.COL_EXTRACT_ALL_META.equals(col)) {
                                        record.addColumn(new StringColumn(allMetaJson.toJSONString()));
                                    } else {
                                        record.addColumn(new StringColumn(null));
                                    }
                                }
                            }                         

                            recordSender.sendToWriter(record);                            
                        }
                    }

                    nextCursor = logDataRes.GetNextCursor();
                }
            } catch (LogException e) {
                if (e.GetErrorCode().equals("LogStoreNotExist")) {
                    LOG.info("logStore[" + logstore +"] Not Exits! detail error messsage: " + e.toString());
                } else {
                    LOG.error("read LogStore[" + logstore + "] error, please check ! detail error messsage: " + e.toString());
                    throw DataXException.asDataXException(LogHubReaderErrorCode.LOG_HUB_ERROR, e);             
                }

            } catch (Exception e) {
                LOG.error("read LogStore[" + logstore + "] error, please check ! detail error messsage: " + e.toString());
                throw DataXException.asDataXException(LogHubReaderErrorCode.LOG_HUB_ERROR, e);
            }
                        
            LOG.info("end read loghub shard...");
        }