in odps-console-public/src/main/java/com/aliyun/openservices/odps/console/common/FileCache.java [47:116]
public InputStream get(Instance inst, String taskName) throws ODPSConsoleException {
try {
// Do not cache running instance, since it's details is dynamic
Map<String, TaskStatus> taskStatux = inst.getTaskStatus();
TaskStatus taskStatus = taskStatux.get(taskName);
if (taskStatus == null) {
throw new ODPSConsoleException("Instance '" + inst.getId() + "' has no such task: " + taskName);
}
boolean needLock = !(taskStatus.getStatus().equals(TaskStatus.Status.RUNNING) || taskStatus
.getStatus().equals(TaskStatus.Status.WAITING));
if (!needLock) {
return fireOnNet(inst, taskName);
}
} catch (OdpsException e2) {
throw new ODPSConsoleException(e2);
}
String key = inst.getProject() + "-" + inst.getId() + "-" + taskName;
File file = new File(cacheDir, key);
File lockFile = new File(cacheDir, ".lock");
RandomAccessFile locFile = null;
FileLock lock = null;
try {
lockFile.createNewFile();
locFile = new RandomAccessFile(lockFile, "rw");
lock = locFile.getChannel().lock();
} catch (IOException e) {
}
try {
if (lock == null) {
// Failed to obtain lock, request directly online
return fireOnNet(inst, taskName);
}
InputStream in = new FileInputStream(file);
return in;
} catch (FileNotFoundException e) {
FileOutputStream out = null;
try {
InputStream in = fireOnNet(inst, taskName);
int contentSize = getContentSize();
if (contentSize > capacity) {
clearCache();
}
file.createNewFile();
FileUtil.saveInputStreamToFile(in, file.getPath());
return new FileInputStream(file);
} catch (IOException e1) {
throw new ODPSConsoleException(e1);
} catch (Exception e2) {
throw new ODPSConsoleException(e2);
}finally {
IOUtils.closeQuietly(out);
}
} finally {
try {
if (lock != null) {
lock.release();
}
} catch (IOException e) {
throw new ODPSConsoleException(e);
} finally {
IOUtils.closeQuietly(locFile);
}
}
}