in rx-central-ble/src/main/java/com/uber/rxcentralble/core/operations/AbstractWrite.java [80:104]
protected Single<Peripheral> write(UUID svc, UUID chr, byte[] data) {
final ByteBuffer byteBuffer = ByteBuffer.wrap(data);
return peripheralRelay
.filter(Optional::isPresent)
.map(Optional::get)
.firstOrError()
.doOnSuccess(g -> peripheralRelay.accept(Optional.empty()))
.flatMapObservable(peripheral -> {
int chunkCount = (int) Math.ceil((double) byteBuffer.remaining()
/ (double) peripheral.getMaxWriteLength());
return Observable.range(0, chunkCount).map(index -> new Pair<>(peripheral, index));
})
.zipWith(chunkIndexRelay, (peripheralIndex, chunkIndexRelay) -> peripheralIndex)
.flatMapSingle(peripheralIndex ->
peripheralIndex.first
.write(svc, chr, chunk(byteBuffer, peripheralIndex.first.getMaxWriteLength()))
.doOnComplete(() -> chunkIndexRelay.accept(peripheralIndex.second))
.andThen(Single.just(peripheralIndex.first)))
.lastOrError()
.doOnSubscribe(d -> {
byteBuffer.rewind();
chunkIndexRelay.accept(0);
});
}