public static Configure reader()

in ogg-plugin/src/main/java/com/aliyun/odps/ogg/handler/datahub/ConfigureReader.java [47:348]


    public static Configure reader(String configueFileName) throws DocumentException {
        logger.info("Begin read configure[{}]", configueFileName);

        Configure configure = new Configure();
        SAXReader reader = new SAXReader();
        File file = new File(configueFileName);

        Document document = reader.read(file);
        Element root = document.getRootElement();

        /* for root element */
        String elementText = root.elementTextTrim("batchSize");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setBatchSize(Integer.parseInt(elementText));
        }

        elementText = root.elementTextTrim("batchTimeoutMs");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setBatchTimeoutMs(Integer.parseInt(elementText));
        }

        elementText = root.elementTextTrim("buildBatchSize");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setBuildBatchSize(Integer.parseInt(elementText));
        }

        elementText = root.elementTextTrim("buildBatchTimeoutMs");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setBuildBatchTimeoutMs(Integer.parseInt(elementText));
        }

        elementText = root.elementTextTrim("dirtyDataContinue");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setDirtyDataContinue(Boolean.parseBoolean(elementText));
        }

        elementText = root.elementTextTrim("dirtyDataFile");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setDirtyDataFile(elementText);
        }

        elementText = root.elementTextTrim("dirtyDataFileMaxSize");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setDirtyDataFileMaxSize(Integer.parseInt(elementText));
        }

        elementText = root.elementTextTrim("retryTimes");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setRetryTimes(Integer.parseInt(elementText));
        }

        elementText = root.elementTextTrim("retryInterval");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setRetryIntervalMs(Integer.parseInt(elementText));
        }

        elementText = root.elementTextTrim("buildRecordQueueSize");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setBuildRecordQueueSize(Integer.parseInt(elementText));
        }

        elementText = root.elementTextTrim("buildRecordQueueTimeoutMs");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setBuildRecordQueueTimeoutMs(Integer.parseInt(elementText));
        }

        elementText = root.elementTextTrim("writeRecordQueueSize");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setWriteRecordQueueSize(Integer.parseInt(elementText));
        }

        elementText = root.elementTextTrim("writeRecordQueueTimeoutMs");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setWriteRecordQueueTimeoutMs(Integer.parseInt(elementText));
        }

        elementText = root.elementTextTrim("recordAccess");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setRecordAccess(Boolean.parseBoolean(elementText));
        }

        elementText = root.elementTextTrim("reportMetric");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setReportMetric(Boolean.parseBoolean(elementText));
        }

        elementText = root.elementTextTrim("reportMetricIntervalMs");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setReportMetricIntervalMs(Integer.parseInt(elementText));
        }

        elementText = root.elementTextTrim("buildCorePollSize");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setBuildRecordCorePoolSize(Integer.parseInt(elementText));
        }

        elementText = root.elementTextTrim("buildMaximumPoolSize");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setBuildRecordMaximumPoolSize(Integer.parseInt(elementText));
        }

        elementText = root.elementTextTrim("writeCorePollSize");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setWriteRecordCorePoolSize(Integer.parseInt(elementText));
        }

        elementText = root.elementTextTrim("writeMaximumPoolSize");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setWriteRecordMaximumPoolSize(Integer.parseInt(elementText));
        }

        /* for oracle default config */
        Element element = root.element("defaultOracleConfigure");
        if (element == null) {
            throw new RuntimeException("defaultOracleConfigure is null");
        }

        elementText = element.elementTextTrim("sid");
        if (StringUtils.isNotBlank(elementText)) {
            configure.setOracleSid(elementText);
        }

        String defaultOracleSchema = element.elementTextTrim("schema");

        /* for DataHub default config */
        element = root.element("defaultDatahubConfigure");
        if (element == null) {
            throw new RuntimeException("defaultDatahubConfigure is null");
        }

        String endPoint = element.elementText("endPoint");
        if (StringUtils.isBlank(endPoint)) {
            throw new RuntimeException("defaultDatahubConfigure.endPoint is null");
        }
        configure.setDatahubEndpoint(endPoint);

        String datahubAccessID = element.elementText("accessId");
        if (StringUtils.isBlank(datahubAccessID)) {
            throw new RuntimeException("defaultDatahubConfigure.accessId is null");
        }
        configure.setDatahubAccessId(datahubAccessID);

        String datahubAccessKey = element.elementText("accessKey");
        if (StringUtils.isBlank(datahubAccessKey)) {
            throw new RuntimeException("defaultDatahubConfigure.accessKey is null");
        }
        configure.setDatahubAccessKey(datahubAccessKey);

        String defaultDatahubProject = element.elementText("project");
        String defaultCTypeColumn = element.elementText("ctypeColumn");
        String defaultCTimeColumn = element.elementText("ctimeColumn");
        String defaultCidColumn = element.elementText("cidColumn");

        String defaultConstColumnMapStr = element.elementText("constColumnMap");
        Map<String, String> defalutConstColumnMappings = Maps.newHashMap();
        parseConstColumnMap(defaultConstColumnMapStr, defalutConstColumnMappings);

        String compressType = element.elementText("compressType");
        if (StringUtils.isNotBlank(compressType)) {
            configure.setCompressType(compressType);
        }

        String enablePb = element.elementText("enablePb");
        if (StringUtils.isNotBlank(enablePb)) {
            configure.setEnablePb(Boolean.parseBoolean(enablePb));
        }

        /** for table mappings **/
        element = root.element("mappings");
        if (element == null) {
            throw new RuntimeException("mappings is null");
        }

        List<Element> mappingElements = element.elements("mapping");
        if (mappingElements == null || mappingElements.size() == 0) {
            throw new RuntimeException("mappings.mapping is null");
        }

        /** for table mapping **/
        for (Element e : mappingElements) {
            String oracleSchema = e.elementTextTrim("oracleSchema");
            if (StringUtils.isNotBlank(oracleSchema)) {
                //nothing
            } else if (StringUtils.isNotBlank(defaultOracleSchema)) {
                oracleSchema = defaultOracleSchema;
            } else {
                throw new RuntimeException(
                        "both mappings.mapping.oracleSchema and defaultOracleConfigure.schema is null");
            }

            String oracleTable = e.elementTextTrim("oracleTable");
            if (StringUtils.isBlank(oracleTable)) {
                throw new RuntimeException("mappings.mapping.oracleTable is null");
            }

            String datahubProject = e.elementTextTrim("datahubProject");
            if (StringUtils.isNotBlank(datahubProject)) {
                //nothing
            } else if (StringUtils.isNotBlank(defaultDatahubProject)) {
                datahubProject = defaultDatahubProject;
            } else {
                throw new RuntimeException(
                        "both mappings.mapping.datahubProject and defaultDatahubConfigure.project is null");
            }

            String topicName = e.elementTextTrim("datahubTopic");
            if (StringUtils.isBlank(topicName)) {
                throw new RuntimeException("mappings.mapping.datahubTopic is null");
            }

            String rowIdColumn = e.elementText("rowIdColumn");

            String cTypeColumn = e.elementText("ctypeColumn");
            cTypeColumn = StringUtils.isNotBlank(cTypeColumn) ? cTypeColumn : defaultCTypeColumn;

            String cTimeColumn = e.elementText("ctimeColumn");
            cTimeColumn = StringUtils.isNotBlank(cTimeColumn) ? cTimeColumn : defaultCTimeColumn;

            String cIdColumn = e.elementText("cidColumn");
            cIdColumn = StringUtils.isNotBlank(cIdColumn) ? cIdColumn : defaultCidColumn;

            String constColumnMapStr = e.elementText("constColumnMap");
            Map<String, String> constColumnMappings = Maps.newHashMap();
            parseConstColumnMap(constColumnMapStr, constColumnMappings);
            constColumnMappings = constColumnMappings.isEmpty() ? defalutConstColumnMappings : constColumnMappings;

            TableMapping tableMapping = new TableMapping();

            tableMapping.setOracleSchema(oracleSchema.toLowerCase());
            tableMapping.setOracleTableName(oracleTable.toLowerCase());
            tableMapping.setOracleFullTableName(
                    tableMapping.getOracleSchema() + "." + tableMapping.getOracleTableName());
            tableMapping.setProjectName(datahubProject);
            tableMapping.setTopicName(topicName);
            tableMapping.setRowIdColumn(rowIdColumn);
            tableMapping.setcTypeColumn(cTypeColumn);
            tableMapping.setcTimeColumn(cTimeColumn);
            tableMapping.setcIdColumn(cIdColumn);
            tableMapping.setConstColumnMappings(constColumnMappings);

            configure.addTableMapping(tableMapping);
            Map<String, ColumnMapping> columnMappings = Maps.newHashMap();
            tableMapping.setColumnMappings(columnMappings);


            /** for column mapping **/
            Element columnMappingElement = e.element("columnMapping");
            List<Element> columns = columnMappingElement.elements("column");
            for (Element columnElement : columns) {
                String oracleColumnName = columnElement.attributeValue("src");
                if (StringUtils.isBlank(oracleColumnName)) {
                    throw new RuntimeException("Topic[" + topicName + "] src attribute is null");
                }

                oracleColumnName = oracleColumnName.toLowerCase();
                ColumnMapping columnMapping = new ColumnMapping();
                columnMappings.put(oracleColumnName, columnMapping);
                columnMapping.setSrc(oracleColumnName);

                String dest = columnElement.attributeValue("dest");
                if (StringUtils.isBlank(dest)) {
                    throw new RuntimeException("Topic[" + topicName + "] dest attribute is null");
                }
                columnMapping.setDest(dest);

                String destOld = columnElement.attributeValue("destOld");
                if (StringUtils.isNotBlank(destOld)) {
                    columnMapping.setDestOld(destOld);
                }

                String isShardColumn = columnElement.attributeValue("isShardColumn");
                if (StringUtils.isNotBlank(isShardColumn) && Boolean.TRUE
                        .equals(Boolean.valueOf(isShardColumn))) {
                    tableMapping.setShardHash(true);
                    columnMapping.setIsShardColumn(true);
                } else {
                    columnMapping.setIsShardColumn(false);
                }

                String isKeyColumn = columnElement.attributeValue("isKeyColumn");
                if (StringUtils.isNotBlank(isKeyColumn) && Boolean.TRUE
                        .equals(Boolean.valueOf(isKeyColumn))) {
                    columnMapping.setIsKeyColumn(true);
                } else {
                    columnMapping.setIsKeyColumn(false);
                }

                String isDateFormat = columnElement.attributeValue("isDateFormat");
                if (StringUtils.isNotBlank(isDateFormat)) {
                    columnMapping.setIsDateFormat(Boolean.parseBoolean(isDateFormat));
                }

                String dateFormat = columnElement.attributeValue("dateFormat");
                if (StringUtils.isNotBlank(dateFormat)) {
                    columnMapping.setDateFormat(dateFormat);
                }
            }
        }

        logger.info("Read configure success: {} ", JsonHelper.beanToJson(configure));
        return configure;
    }