public Instance create()

in odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/Instances.java [418:562]


  public Instance create(List<Task> tasks, CreateInstanceOption option) throws OdpsException {
    String project = option.getProjectName();
    if (StringUtils.isNullOrEmpty(project)) {
      project = getDefaultProjectName();
    }
    if (tasks == null || tasks.size() == 0) {
      throw new IllegalArgumentException("Tasks required.");
    }
    Job job = new Job();
    for (Task t : tasks) {
      job.addTask(t);
    }
    if (option.getPriority() != null) {
      // check priority not negative
      if (option.getPriority() < 0) {
        throw new OdpsException("Priority must more than or equal to zero.");
      }
      job.setPriority(option.getPriority());
    }
    job.setRunningCluster(option.getRunningCluster());
    if (option.getJobName() != null) {
      job.setName(option.getJobName());
    }
    if (option.getUniqueIdentifyID() != null) {
      job.setUniqueIdentifyID(option.getUniqueIdentifyID());
    }

    String guid = UUID.randomUUID().toString();

    for (Task t : job.getTasks()) {
      t.setProperty("uuid", guid);
      t.loadSystemSettings();
      t.loadGlobalSettings();
      if (t.getName() == null) {
        throw new OdpsException("Task name required.");
      }
    }

    OdpsHooks hooks = null;

    if (OdpsHooks.isEnabled()) {
      hooks = new OdpsHooks();
      hooks.before(job, odps);
    }

    AnonymousInstance i = new AnonymousInstance();
    i.job = job.model;
    String xml = null;
    try {
      xml = SimpleXmlUtils.marshal(i);
    } catch (Exception e) {
      throw new OdpsException(e.getMessage(), e);
    }

    HashMap<String, String> headers = new HashMap<String, String>();
    headers.put(Headers.CONTENT_TYPE, "application/xml");

    HashMap<String, String> params = new HashMap<String, String>();

    String resource = ResourceBuilder.buildInstancesResource(project);
    if (StringUtils.isNotBlank(option.getMcqaConnHeader())) {
      resource = "/mcqa" + resource;
      headers.put(Headers.ODPS_MCQA_CONN, option.getMcqaConnHeader());
    }
    if (option.isTryWait()) {
      params.put("tryWait", null);
    }

    long startTime = System.currentTimeMillis();
    Response resp = null;
    // at least wait 180s when get 409
    while (System.currentTimeMillis() - startTime < TimeUnit.SECONDS.toMillis(180)) {
      try {
        resp = client.stringRequest(resource, "POST", params, headers, xml);
        break;
      } catch (OdpsException e) {
        if (e.getStatus() != null && e.getStatus() == 409 && e.existRetryAfter()) {
          try {
            Long retryAfter = e.getRetryAfter();
            if (retryAfter > 0) {
              Thread.sleep(e.getRetryAfter() * 1000);
            }
          } catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
            throw e;
          }
        } else {
          throw e;
        }
      }
    }
    // If we exit the loop without a successful response, make one last attempt
    if (resp == null) {
      resp = client.stringRequest(resource, "POST", params, headers, xml);
    }
    String location = resp.getHeaders().get(Headers.LOCATION);
    if (location == null || location.trim().length() == 0) {
      throw new OdpsException("Invalid response, Location header required.");
    }

    String instanceId = location.substring(location.lastIndexOf("/") + 1);

    Map<String, TaskResult> results = new HashMap<>();

    TaskStatusModel model = new TaskStatusModel();
    model.name = instanceId;

    if (resp.getStatus() == 200 && resp.getBody() != null
        && resp.getBody().length > 0) {
      try {
        InstanceResultModel result = SimpleXmlUtils.unmarshal(resp,
                                                         InstanceResultModel.class);
        for (TaskResult taskResult : result.taskResults) {
          model.tasks.add(createInstanceTaskModel(taskResult));
          results.put(taskResult.name, taskResult);
        }
      } catch (Exception e) {
        throw new OdpsException("Invalid create instance response.", e);
      }
    }

    Instance instance = new Instance(project, model, results, odps);

    // set mcqa 2.0 header
    if (resp.getStatus() == 201 && (resp.getHeader(Headers.ODPS_MCQA_QUERY_COOKIE) != null)) {
      instance.addUserDefinedHeaders(ImmutableMap.of(Headers.ODPS_MCQA_QUERY_COOKIE,
                                                     resp.getHeader(
                                                         Headers.ODPS_MCQA_QUERY_COOKIE)));
    }
    if (StringUtils.isNotBlank(option.getMcqaConnHeader())) {
      instance.addUserDefinedHeaders(ImmutableMap.of(Headers.ODPS_MCQA_CONN, option.getMcqaConnHeader()));
      instance.setMcqaV2(true);
    }

    instance.setOdpsHooks(hooks);

    if (OdpsHooks.isEnabled()) {
      if (hooks == null) {
        hooks = new OdpsHooks();
      }
      hooks.onInstanceCreated(instance, odps);
    }

    return instance;
  }