in kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java [64:213]
public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPluginCollector) {
LOG.info("kuduwriter began to write!");
Record record;
LongAdder counter = new LongAdder();
try {
while ((record = lineReceiver.getFromReader()) != null) {
if (record.getColumnNumber() != columns.size()) {
throw DataXException.asDataXException(Kudu11xWriterErrorcode.PARAMETER_NUM_ERROR, " number of record fields:" + record.getColumnNumber() + " number of configuration fields:" + columns.size());
}
boolean isDirtyRecord = false;
for (int i = 0; i < primaryKeyIndexUntil && !isDirtyRecord; i++) {
Column column = record.getColumn(i);
isDirtyRecord = StringUtils.isBlank(column.asString());
}
if (isDirtyRecord) {
taskPluginCollector.collectDirtyRecord(record, "primarykey field is null");
continue;
}
CountDownLatch countDownLatch = new CountDownLatch(columnLists.size());
Upsert upsert = table.newUpsert();
Insert insert = table.newInsert();
PartialRow row;
if (isUpsert) {
//覆盖更新
row = upsert.getRow();
} else {
//增量更新
row = insert.getRow();
}
List<Future<?>> futures = new ArrayList<>();
for (List<Configuration> columnList : columnLists) {
Record finalRecord = record;
Future<?> future = pool.submit(() -> {
try {
for (Configuration col : columnList) {
String name = col.getString(Key.NAME);
ColumnType type = ColumnType.getByTypeName(col.getString(Key.TYPE, "string"));
Column column = finalRecord.getColumn(col.getInt(Key.INDEX));
String rawData = column.asString();
if (rawData == null) {
synchronized (lock) {
row.setNull(name);
}
continue;
}
switch (type) {
case INT:
synchronized (lock) {
row.addInt(name, Integer.parseInt(rawData));
}
break;
case LONG:
case BIGINT:
synchronized (lock) {
row.addLong(name, Long.parseLong(rawData));
}
break;
case FLOAT:
synchronized (lock) {
row.addFloat(name, Float.parseFloat(rawData));
}
break;
case DOUBLE:
synchronized (lock) {
row.addDouble(name, Double.parseDouble(rawData));
}
break;
case BOOLEAN:
synchronized (lock) {
row.addBoolean(name, Boolean.parseBoolean(rawData));
}
break;
case STRING:
default:
synchronized (lock) {
row.addString(name, rawData);
}
}
}
} finally {
countDownLatch.countDown();
}
});
futures.add(future);
}
countDownLatch.await();
for (Future<?> future : futures) {
future.get();
}
try {
RetryUtil.executeWithRetry(() -> {
if (isUpsert) {
//覆盖更新
session.apply(upsert);
} else {
//增量更新
session.apply(insert);
}
//flush
if (counter.longValue() > (batchSize * 0.8)) {
session.flush();
counter.reset();
}
counter.increment();
return true;
}, 5, 500L, true);
} catch (Exception e) {
LOG.error("Record Write Failure!", e);
if (isSkipFail) {
LOG.warn("Since you have configured \"skipFail\" to be true, this record will be skipped !");
taskPluginCollector.collectDirtyRecord(record, e.getMessage());
} else {
throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage());
}
}
}
} catch (Exception e) {
LOG.error("write failure! the task will exit!");
throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage());
}
AtomicInteger i = new AtomicInteger(10);
try {
while (i.get() > 0) {
if (session.hasPendingOperations()) {
session.flush();
break;
}
Thread.sleep(20L);
i.decrementAndGet();
}
} catch (Exception e) {
LOG.info("Waiting for data to be written to kudu...... " + i + "s");
} finally {
try {
pool.shutdown();
//强制刷写
session.flush();
} catch (KuduException e) {
LOG.error("kuduwriter flush error! The results may be incomplete!");
throw DataXException.asDataXException(Kudu11xWriterErrorcode.PUT_KUDU_ERROR, e.getMessage());
}
}
}