in odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/Instances.java [418:562]
public Instance create(List<Task> tasks, CreateInstanceOption option) throws OdpsException {
String project = option.getProjectName();
if (StringUtils.isNullOrEmpty(project)) {
project = getDefaultProjectName();
}
if (tasks == null || tasks.size() == 0) {
throw new IllegalArgumentException("Tasks required.");
}
Job job = new Job();
for (Task t : tasks) {
job.addTask(t);
}
if (option.getPriority() != null) {
// check priority not negative
if (option.getPriority() < 0) {
throw new OdpsException("Priority must more than or equal to zero.");
}
job.setPriority(option.getPriority());
}
job.setRunningCluster(option.getRunningCluster());
if (option.getJobName() != null) {
job.setName(option.getJobName());
}
if (option.getUniqueIdentifyID() != null) {
job.setUniqueIdentifyID(option.getUniqueIdentifyID());
}
String guid = UUID.randomUUID().toString();
for (Task t : job.getTasks()) {
t.setProperty("uuid", guid);
t.loadSystemSettings();
t.loadGlobalSettings();
if (t.getName() == null) {
throw new OdpsException("Task name required.");
}
}
OdpsHooks hooks = null;
if (OdpsHooks.isEnabled()) {
hooks = new OdpsHooks();
hooks.before(job, odps);
}
AnonymousInstance i = new AnonymousInstance();
i.job = job.model;
String xml = null;
try {
xml = SimpleXmlUtils.marshal(i);
} catch (Exception e) {
throw new OdpsException(e.getMessage(), e);
}
HashMap<String, String> headers = new HashMap<String, String>();
headers.put(Headers.CONTENT_TYPE, "application/xml");
HashMap<String, String> params = new HashMap<String, String>();
String resource = ResourceBuilder.buildInstancesResource(project);
if (StringUtils.isNotBlank(option.getMcqaConnHeader())) {
resource = "/mcqa" + resource;
headers.put(Headers.ODPS_MCQA_CONN, option.getMcqaConnHeader());
}
if (option.isTryWait()) {
params.put("tryWait", null);
}
long startTime = System.currentTimeMillis();
Response resp = null;
// at least wait 180s when get 409
while (System.currentTimeMillis() - startTime < TimeUnit.SECONDS.toMillis(180)) {
try {
resp = client.stringRequest(resource, "POST", params, headers, xml);
break;
} catch (OdpsException e) {
if (e.getStatus() != null && e.getStatus() == 409 && e.existRetryAfter()) {
try {
Long retryAfter = e.getRetryAfter();
if (retryAfter > 0) {
Thread.sleep(e.getRetryAfter() * 1000);
}
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
throw e;
}
} else {
throw e;
}
}
}
// If we exit the loop without a successful response, make one last attempt
if (resp == null) {
resp = client.stringRequest(resource, "POST", params, headers, xml);
}
String location = resp.getHeaders().get(Headers.LOCATION);
if (location == null || location.trim().length() == 0) {
throw new OdpsException("Invalid response, Location header required.");
}
String instanceId = location.substring(location.lastIndexOf("/") + 1);
Map<String, TaskResult> results = new HashMap<>();
TaskStatusModel model = new TaskStatusModel();
model.name = instanceId;
if (resp.getStatus() == 200 && resp.getBody() != null
&& resp.getBody().length > 0) {
try {
InstanceResultModel result = SimpleXmlUtils.unmarshal(resp,
InstanceResultModel.class);
for (TaskResult taskResult : result.taskResults) {
model.tasks.add(createInstanceTaskModel(taskResult));
results.put(taskResult.name, taskResult);
}
} catch (Exception e) {
throw new OdpsException("Invalid create instance response.", e);
}
}
Instance instance = new Instance(project, model, results, odps);
// set mcqa 2.0 header
if (resp.getStatus() == 201 && (resp.getHeader(Headers.ODPS_MCQA_QUERY_COOKIE) != null)) {
instance.addUserDefinedHeaders(ImmutableMap.of(Headers.ODPS_MCQA_QUERY_COOKIE,
resp.getHeader(
Headers.ODPS_MCQA_QUERY_COOKIE)));
}
if (StringUtils.isNotBlank(option.getMcqaConnHeader())) {
instance.addUserDefinedHeaders(ImmutableMap.of(Headers.ODPS_MCQA_CONN, option.getMcqaConnHeader()));
instance.setMcqaV2(true);
}
instance.setOdpsHooks(hooks);
if (OdpsHooks.isEnabled()) {
if (hooks == null) {
hooks = new OdpsHooks();
}
hooks.onInstanceCreated(instance, odps);
}
return instance;
}