public Future write()

in gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriter.java [169:252]


  public Future<WriteResponse> write(final D record, final WriteCallback callback) {
    assertRecordWritable(record);
    if (record instanceof TupleDocument) {
      ((TupleDocument) record).content().value1().retain();
    }
    Observable<D> observable;
    try {
      observable = _bucket.async().upsert(setDocumentTTL(record));
    } catch (DataRecordException e) {
      throw new RuntimeException("Caught exception trying to set TTL of the document", e);
    }

    if (callback == null) {
      return new WriteResponseFuture<>(
          observable.timeout(_operationTimeout, _operationTimeunit).toBlocking().toFuture(),
          _defaultWriteResponseMapper);
    } else {

      final AtomicBoolean callbackFired = new AtomicBoolean(false);
      final BlockingQueue<Pair<WriteResponse, Throwable>> writeResponseQueue = new ArrayBlockingQueue<>(1);

      final Future<WriteResponse> writeResponseFuture = new Future<WriteResponse>() {
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
          return false;
        }

        @Override
        public boolean isCancelled() {
          return false;
        }

        @Override
        public boolean isDone() {
          return callbackFired.get();
        }

        @Override
        public WriteResponse get() throws InterruptedException, ExecutionException {
          Pair<WriteResponse, Throwable> writeResponseThrowablePair = writeResponseQueue.take();
          return getWriteResponseOrThrow(writeResponseThrowablePair);
        }

        @Override
        public WriteResponse get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
          Pair<WriteResponse, Throwable> writeResponseThrowablePair = writeResponseQueue.poll(timeout, unit);
          if (writeResponseThrowablePair == null) {
            throw new TimeoutException("Timeout exceeded while waiting for future to be done");
          } else {
            return getWriteResponseOrThrow(writeResponseThrowablePair);
          }
        }
      };

      observable.timeout(_operationTimeout, _operationTimeunit).subscribe(new Subscriber<D>() {
        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {
          callbackFired.set(true);
          writeResponseQueue.add(new Pair<WriteResponse, Throwable>(null, e));
          callback.onFailure(e);
        }

        @Override
        public void onNext(D doc) {
          try {
            callbackFired.set(true);
            WriteResponse writeResponse = new GenericWriteResponse<D>(doc);
            writeResponseQueue.add(new Pair<WriteResponse, Throwable>(writeResponse, null));
            callback.onSuccess(writeResponse);
          } finally {
            if (doc instanceof TupleDocument) {
              ((TupleDocument) doc).content().value1().release();
            }
          }
        }
      });
      return writeResponseFuture;
    }
  }