public void write()

in streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistWriter.java [74:165]


  public void write(StreamsDatum entry) {

    Objects.nonNull(client);

    String id = null;
    String document;
    String bucket;
    String bucketType;
    String contentType;
    String charset;
    if( StringUtils.isNotBlank(entry.getId())) {
      id = entry.getId();
    }
    if( entry.getDocument() instanceof String) {
      document = (String)entry.getDocument();
    } else {
      try {
        document = MAPPER.writeValueAsString(entry.getDocument());
      } catch( Exception e ) {
        LOGGER.warn("Exception", e);
        return;
      }
    }
    if( entry.getMetadata() != null
        && entry.getMetadata().containsKey("bucket")
        && entry.getMetadata().get("bucket") instanceof String
        && StringUtils.isNotBlank((String)entry.getMetadata().get("bucket") )) {
      bucket = (String)entry.getMetadata().get("bucket");
    } else {
      bucket = configuration.getDefaultBucket();
    }
    if( entry.getMetadata() != null
        && entry.getMetadata().containsKey("bucketType")
        && entry.getMetadata().get("bucketType") instanceof String
        && StringUtils.isNotBlank((String)entry.getMetadata().get("bucketType") )) {
      bucketType = (String)entry.getMetadata().get("bucketType");
    } else {
      bucketType = configuration.getDefaultBucketType();
    }
    if( entry.getMetadata() != null
        && entry.getMetadata().containsKey("charset")
        && entry.getMetadata().get("charset") instanceof String
        && StringUtils.isNotBlank((String)entry.getMetadata().get("charset") )) {
      charset = (String)entry.getMetadata().get("charset");
    } else {
      charset = configuration.getDefaultCharset();
    }
    if( entry.getMetadata() != null
        && entry.getMetadata().containsKey("contentType")
        && entry.getMetadata().get("contentType") instanceof String
        && StringUtils.isNotBlank((String)entry.getMetadata().get("contentType") )) {
      contentType = (String)entry.getMetadata().get("contentType");
    } else {
      contentType = configuration.getDefaultContentType();
    }

    URIBuilder uriBuilder = new URIBuilder(client.baseURI);
    if( bucket != null && StringUtils.isNotBlank(bucket)) {
      uriBuilder.setPath("/riak/"+bucket);
    }
    if( id != null && StringUtils.isNotBlank(id)) {
      uriBuilder.setPath("/riak/"+bucket+"/"+id);
    }

    URI uri;
    try {
      uri = uriBuilder.build();
    } catch (URISyntaxException e) {
      LOGGER.warn("URISyntaxException", e);
      return;
    }

    HttpPost post = new HttpPost();
    post.setHeader("Content-Type", contentType + "; charset=" + charset);
    post.setURI(uri);
    HttpEntity entity;
    try {
      entity = new StringEntity(document);
      post.setEntity(entity);
    } catch (UnsupportedEncodingException e) {
      LOGGER.warn("UnsupportedEncodingException", e);
      return;
    }

    try {
      HttpResponse response = client.client().execute(post);
    } catch (IOException e) {
      LOGGER.warn("IOException", e);
      return;
    }

  }