public void write()

in streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistWriter.java [71:156]


  public void write(StreamsDatum entry) {

    Objects.nonNull(client);

    String id = null;
    String document;
    String bucket;
    String bucketType;
    String contentType;
    String charset;
    if( StringUtils.isNotBlank(entry.getId())) {
      id = entry.getId();
    }
    if( entry.getDocument() instanceof String) {
      document = (String)entry.getDocument();
    } else {
      try {
        document = MAPPER.writeValueAsString(entry.getDocument());
      } catch( Exception e ) {
        LOGGER.warn("Exception", e);
        return;
      }
    }
    if( entry.getMetadata() != null
        && entry.getMetadata().containsKey("bucket")
        && entry.getMetadata().get("bucket") instanceof String
        && StringUtils.isNotBlank((String)entry.getMetadata().get("bucket") )) {
      bucket = (String)entry.getMetadata().get("bucket");
    } else {
      bucket = configuration.getDefaultBucket();
    }
    if( entry.getMetadata() != null
        && entry.getMetadata().containsKey("bucketType")
        && entry.getMetadata().get("bucketType") instanceof String
        && StringUtils.isNotBlank((String)entry.getMetadata().get("bucketType") )) {
      bucketType = (String)entry.getMetadata().get("bucketType");
    } else {
      bucketType = configuration.getDefaultBucketType();
    }
    if( entry.getMetadata() != null
        && entry.getMetadata().containsKey("charset")
        && entry.getMetadata().get("charset") instanceof String
        && StringUtils.isNotBlank((String)entry.getMetadata().get("charset") )) {
      charset = (String)entry.getMetadata().get("charset");
    } else {
      charset = configuration.getDefaultCharset();
    }
    if( entry.getMetadata() != null
        && entry.getMetadata().containsKey("contentType")
        && entry.getMetadata().get("contentType") instanceof String
        && StringUtils.isNotBlank((String)entry.getMetadata().get("contentType") )) {
      contentType = (String)entry.getMetadata().get("contentType");
    } else {
      contentType = configuration.getDefaultContentType();
    }

    try {

      RiakObject riakObject = new RiakObject();
      riakObject.setContentType(contentType);
      riakObject.setCharset(charset);
      riakObject.setValue(BinaryValue.create(document));

      Namespace ns = new Namespace(bucketType, bucket);
      StoreValue.Builder storeValueBuilder = new StoreValue.Builder(riakObject);

      if( id != null && StringUtils.isNotBlank(id)) {
        Location location = new Location(ns, id);
        storeValueBuilder = storeValueBuilder.withLocation(location);
      } else {
        storeValueBuilder = storeValueBuilder.withNamespace(ns);
      }

      StoreValue store = storeValueBuilder.build();

      StoreValue.Response storeResponse = client.client().execute(store);

      LOGGER.debug("storeResponse", storeResponse);

    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }

  }