in src/core-common/src/main/java/org/apache/kylin/common/util/SSHClient.java [170:266]
public void scpRemoteFileToLocal(String remoteFile, String localTargetDirectory) throws Exception {
FileInputStream fis = null;
try {
logger.info("SCP file " + remoteFile + " to " + localTargetDirectory);
Session session = newJSchSession();
session.connect();
String prefix = null;
if (new File(localTargetDirectory).isDirectory()) {
prefix = localTargetDirectory + File.separator;
}
// exec 'scp -f rfile' remotely
String command = "scp -p -f " + remoteFile;
Channel channel = session.openChannel("exec");
((ChannelExec) channel).setCommand(command);
// get I/O streams for remote scp
OutputStream out = channel.getOutputStream();
InputStream in = channel.getInputStream();
channel.connect();
byte[] buf = new byte[1024];
// send '\0'
buf[0] = 0;
out.write(buf, 0, 1);
out.flush();
while (true) {
int c = checkAck(in);
if (c != 'T') {
break;
}
long modTime = getModifyTime(out, in, buf);
do {
c = checkAck(in);
} while (c != 'C');
long fileSize = 0L;
fileSize = getFilesize(out, in, buf, fileSize);
String file;
for (int i = 0; true; i++) {
in.read(buf, i, 1);
if (buf[i] == (byte) 0x0a) {
file = new String(buf, 0, i, Charset.defaultCharset());
break;
}
}
logger.info("file-size=" + fileSize + ", file=" + file);
// read a content of lfile
try (FileOutputStream fos = new FileOutputStream(prefix == null ? remoteFile : prefix + file)) {
int foo;
while (true) {
foo = buf.length < fileSize ? buf.length : (int) fileSize;
foo = in.read(buf, 0, foo);
if (foo < 0) {
// error
break;
}
fos.write(buf, 0, foo);
fileSize -= foo;
if (fileSize == 0L) {
break;
}
}
if (checkAck(in) != 0) {
Unsafe.systemExit(0);
}
File tempFile = new File(prefix + file);
if (!tempFile.setLastModified(modTime * 1000)) {
logger.warn("update {} modify time failed", file);
}
// send '\0'
buf[0] = 0;
out.write(buf, 0, 1);
out.flush();
}
}
channel.disconnect();
session.disconnect();
} finally {
IOUtils.closeQuietly(fis);
}
}