static Record buildRecord()

in cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/CassandraReaderHelper.java [263:414]


  static Record buildRecord(Record record, Row rs, ColumnDefinitions metaData, int columnNumber,
      TaskPluginCollector taskPluginCollector) {

    try {
      for (int i = 0; i < columnNumber; i++)
        try {
          if (rs.isNull(i)) {
            record.addColumn(new StringColumn());
          continue;
        }
        switch (metaData.getType(i).getName()) {

        case ASCII:
        case TEXT:
        case VARCHAR:
          record.addColumn(new StringColumn(rs.getString(i)));
          break;

        case BLOB:
          record.addColumn(new BytesColumn(rs.getBytes(i).array()));
          break;

        case BOOLEAN:
          record.addColumn(new BoolColumn(rs.getBool(i)));
          break;

        case SMALLINT:
          record.addColumn(new LongColumn((int)rs.getShort(i)));
          break;

        case TINYINT:
          record.addColumn(new LongColumn((int)rs.getByte(i)));
          break;

        case INT:
          record.addColumn(new LongColumn(rs.getInt(i)));
          break;

        case COUNTER:
        case BIGINT:
          record.addColumn(new LongColumn(rs.getLong(i)));
          break;

        case VARINT:
          record.addColumn(new LongColumn(rs.getVarint(i)));
          break;

        case FLOAT:
          record.addColumn(new DoubleColumn(rs.getFloat(i)));
          break;

        case DOUBLE:
          record.addColumn(new DoubleColumn(rs.getDouble(i)));
          break;

        case DECIMAL:
          record.addColumn(new DoubleColumn(rs.getDecimal(i)));
          break;

        case DATE:
          record.addColumn(new DateColumn(rs.getDate(i).getMillisSinceEpoch()));
          break;

        case TIME:
          record.addColumn(new LongColumn(rs.getTime(i)));
          break;

        case TIMESTAMP:
          record.addColumn(new DateColumn(rs.getTimestamp(i)));
          break;

        case UUID:
        case TIMEUUID:
          record.addColumn(new StringColumn(rs.getUUID(i).toString()));
          break;

        case INET:
          record.addColumn(new StringColumn(rs.getInet(i).getHostAddress()));
          break;

        case DURATION:
          record.addColumn(new StringColumn(rs.get(i,Duration.class).toString()));
          break;

        case LIST: {
          TypeToken listEltClass = registry.codecFor(metaData.getType(i).getTypeArguments().get(0)).getJavaType();
          List<?> l = rs.getList(i, listEltClass);
          record.addColumn(new StringColumn(toJSonString(l,metaData.getType(i))));
        }
        break;

        case MAP: {
          DataType keyType = metaData.getType(i).getTypeArguments().get(0);
          DataType valType = metaData.getType(i).getTypeArguments().get(1);
          TypeToken<?> keyEltClass = registry.codecFor(keyType).getJavaType();
          TypeToken<?> valEltClass = registry.codecFor(valType).getJavaType();
          Map<?,?> m = rs.getMap(i, keyEltClass, valEltClass);
          record.addColumn(new StringColumn(toJSonString(m,metaData.getType(i))));
        }
        break;

        case SET: {
          TypeToken<?> setEltClass = registry.codecFor(metaData.getType(i).getTypeArguments().get(0))
              .getJavaType();
          Set<?> set = rs.getSet(i, setEltClass);
          record.addColumn(new StringColumn(toJSonString(set,metaData.getType(i))));
        }
        break;

        case TUPLE: {
          TupleValue t = rs.getTupleValue(i);
          record.addColumn(new StringColumn(toJSonString(t,metaData.getType(i))));
        }
        break;

        case UDT: {
          UDTValue t = rs.getUDTValue(i);
          record.addColumn(new StringColumn(toJSonString(t,metaData.getType(i))));
        }
        break;

        default:
          throw DataXException
              .asDataXException(
                  CassandraReaderErrorCode.CONF_ERROR,
                  String.format(
                      "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], "
                          + "字段类型:[%s]. ",
                      metaData.getName(i),
                      metaData.getType(i)));
        }
      } catch (TypeNotSupported t) {
        throw DataXException
            .asDataXException(
                CassandraReaderErrorCode.CONF_ERROR,
                String.format(
                    "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], "
                        + "字段类型:[%s]. ",
                    metaData.getName(i),
                    metaData.getType(i)));

      }
    } catch (Exception e) {
      //TODO 这里识别为脏数据靠谱吗?
      taskPluginCollector.collectDirtyRecord(record, e);
      if (e instanceof DataXException) {
        throw (DataXException) e;
      }
      return null;
    }
    return record;
  }