in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkWriter.java [196:233]
public void refresh(Odps odps) {
LOGGER.info("Enter refresh.");
this.odps = odps;
this.tunnel = new TableTunnel(odps);
if (!Objects.equals(this.tunnelEndpoint, "")) {
this.tunnel.setEndpoint(this.tunnelEndpoint);
}
// Upload session may not exist
if (session != null) {
String sessionId = session.getId();
try {
this.session = tunnel.getUploadSession(project, table, partitionSpec, sessionId);
} catch (Exception e) {
LOGGER.error("Set session failed!!!", e);
throw new RuntimeException(e);
}
}
if (streamSession != null) {
try {
// old version: streamSession = tunnel.createStreamUploadSession(project, table, partitionSpec, true);
// new version: below
streamSession =
tunnel.buildStreamUploadSession(project, table)
.setPartitionSpec(partitionSpec)
.setCreatePartition(true)
.build();
flushStreamPackWithRetry(retryTimes);
streamPack = recreateRecordPack();
} catch (TunnelException e) {
LOGGER.error("Refresh sts token failed: cannot recreate stream session", e);
throw new RuntimeException(e);
} catch (IOException e) {
LOGGER.error("Refresh sts token failed: cannot recreate stream pack", e);
throw new RuntimeException(e);
}
}
}