core/src/main/java/com/datastax/dse/driver/internal/core/cql/reactive/ReactiveResultSetSubscription.java [132:236]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  @Override
  public void request(long n) {
    // As per 3.6: after the Subscription is cancelled, additional
    // calls to request() MUST be NOPs.
    if (!cancelled) {
      if (n < 1) {
        // Validate request as per rule 3.9
        doOnError(
            new IllegalArgumentException(
                mainSubscriber
                    + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
      } else {
        // As per rule 3.17, when demand overflows Long.MAX_VALUE
        // it can be treated as "effectively unbounded"
        ReactiveOperators.addCap(requested, n);
        // Set the first future to true if not done yet.
        // This will make the first page of results ready for consumption,
        // see start().
        // As per 2.7 it is the subscriber's responsibility to provide
        // external synchronization when calling request(),
        // so the check-then-act idiom below is good enough
        // (and besides, complete() is idempotent).
        if (!firstSubscriberRequestArrived.isDone()) {
          firstSubscriberRequestArrived.complete(null);
        }
        drain();
      }
    }
  }

  @Override
  public void cancel() {
    // As per 3.5: Subscription.cancel() MUST respect the responsiveness of
    // its caller by returning in a timely manner, MUST be idempotent and
    // MUST be thread-safe.
    if (!cancelled) {
      cancelled = true;
      if (draining.getAndIncrement() == 0) {
        // If nobody is draining, clear now;
        // otherwise, the draining thread will notice
        // that the cancelled flag was set
        // and will clear for us.
        clear();
      }
    }
  }

  /**
   * Attempts to drain available items, i.e. emit them to the subscriber.
   *
   * <p>Access to this method is serialized by the field {@link #draining}: only one thread at a
   * time can drain, but threads that attempt to drain while other thread is already draining
   * increment that field; the draining thread, before finishing its work, checks for such failed
   * attempts and triggers another round of draining if that was the case.
   *
   * <p>The loop is interrupted when 1) the requested amount has been met or 2) when there are no
   * more items readily available or 3) the subscription has been cancelled.
   *
   * <p>The loop also checks for stream exhaustion and emits a terminal {@code onComplete} signal in
   * this case.
   *
   * <p>This method may run on a driver IO thread when invoked from {@link
   * #fetchNextPageAndEnqueue(Page, boolean)}, or on a subscriber thread, when invoked from {@link
   * #request(long)}.
   */
  @SuppressWarnings("ConditionalBreakInInfiniteLoop")
  private void drain() {
    // As per 3.4: this method SHOULD respect the responsiveness
    // of its caller by returning in a timely manner.
    // We accomplish this by a wait-free implementation.
    if (draining.getAndIncrement() != 0) {
      // Someone else is already draining, so do nothing,
      // the other thread will notice that we attempted to drain.
      // This also allows to abide by rule 3.3 and avoid
      // cycles such as request() -> onNext() -> request() etc.
      return;
    }
    int missed = 1;
    // Note: when termination is detected inside this loop,
    // we MUST call clear() manually.
    for (; ; ) {
      // The requested number of items at this point
      long r = requested.get();
      // The number of items emitted thus far
      long emitted = 0L;
      while (emitted != r) {
        if (cancelled) {
          clear();
          return;
        }
        Object result;
        try {
          result = tryNext();
        } catch (Throwable t) {
          doOnError(t);
          clear();
          return;
        }
        if (result == null) {
          break;
        }
        if (result instanceof Throwable) {
          doOnError((Throwable) result);
          clear();
          return;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



core/src/main/java/com/datastax/dse/driver/internal/core/graph/reactive/ReactiveGraphResultSetSubscription.java [125:229]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  @Override
  public void request(long n) {
    // As per 3.6: after the Subscription is cancelled, additional
    // calls to request() MUST be NOPs.
    if (!cancelled) {
      if (n < 1) {
        // Validate request as per rule 3.9
        doOnError(
            new IllegalArgumentException(
                mainSubscriber
                    + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
      } else {
        // As per rule 3.17, when demand overflows Long.MAX_VALUE
        // it can be treated as "effectively unbounded"
        ReactiveOperators.addCap(requested, n);
        // Set the first future to true if not done yet.
        // This will make the first page of results ready for consumption,
        // see start().
        // As per 2.7 it is the subscriber's responsibility to provide
        // external synchronization when calling request(),
        // so the check-then-act idiom below is good enough
        // (and besides, complete() is idempotent).
        if (!firstSubscriberRequestArrived.isDone()) {
          firstSubscriberRequestArrived.complete(null);
        }
        drain();
      }
    }
  }

  @Override
  public void cancel() {
    // As per 3.5: Subscription.cancel() MUST respect the responsiveness of
    // its caller by returning in a timely manner, MUST be idempotent and
    // MUST be thread-safe.
    if (!cancelled) {
      cancelled = true;
      if (draining.getAndIncrement() == 0) {
        // If nobody is draining, clear now;
        // otherwise, the draining thread will notice
        // that the cancelled flag was set
        // and will clear for us.
        clear();
      }
    }
  }

  /**
   * Attempts to drain available items, i.e. emit them to the subscriber.
   *
   * <p>Access to this method is serialized by the field {@link #draining}: only one thread at a
   * time can drain, but threads that attempt to drain while other thread is already draining
   * increment that field; the draining thread, before finishing its work, checks for such failed
   * attempts and triggers another round of draining if that was the case.
   *
   * <p>The loop is interrupted when 1) the requested amount has been met or 2) when there are no
   * more items readily available or 3) the subscription has been cancelled.
   *
   * <p>The loop also checks for stream exhaustion and emits a terminal {@code onComplete} signal in
   * this case.
   *
   * <p>This method may run on a driver IO thread when invoked from {@link
   * #fetchNextPageAndEnqueue(Page)}, or on a subscriber thread, when invoked from {@link
   * #request(long)}.
   */
  @SuppressWarnings("ConditionalBreakInInfiniteLoop")
  private void drain() {
    // As per 3.4: this method SHOULD respect the responsiveness
    // of its caller by returning in a timely manner.
    // We accomplish this by a wait-free implementation.
    if (draining.getAndIncrement() != 0) {
      // Someone else is already draining, so do nothing,
      // the other thread will notice that we attempted to drain.
      // This also allows to abide by rule 3.3 and avoid
      // cycles such as request() -> onNext() -> request() etc.
      return;
    }
    int missed = 1;
    // Note: when termination is detected inside this loop,
    // we MUST call clear() manually.
    for (; ; ) {
      // The requested number of items at this point
      long r = requested.get();
      // The number of items emitted thus far
      long emitted = 0L;
      while (emitted != r) {
        if (cancelled) {
          clear();
          return;
        }
        Object result;
        try {
          result = tryNext();
        } catch (Throwable t) {
          doOnError(t);
          clear();
          return;
        }
        if (result == null) {
          break;
        }
        if (result instanceof Throwable) {
          doOnError((Throwable) result);
          clear();
          return;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



