in rx-central-ble/src/main/java/com/uber/rxcentralble/core/CoreConnectionManager.java [155:180]
private Observable<Peripheral> connect(BluetoothDevice bluetoothDevice) {
stateRelay.accept(State.CONNECTING);
Peripheral peripheral = peripheralFactory.produce(bluetoothDevice, context);
Observable<Pair<Peripheral.ConnectableState, Peripheral>> peripheralConnection =
peripheral
.connect()
.withLatestFrom(Observable.just(peripheral), Pair::new);
Observable<Peripheral.ConnectableState> peripheralConnectionTimeout =
peripheral
.connect()
.filter(s -> s == Peripheral.ConnectableState.CONNECTED)
.firstOrError()
.timeout(
connectionTimeoutMs,
TimeUnit.MILLISECONDS,
Single.error(new ConnectionError(CONNECT_TIMEOUT)))
.toObservable();
return Observable.combineLatest(peripheralConnection, peripheralConnectionTimeout,
(connection, timeout) -> connection)
.filter(statePeripheraltPair -> statePeripheraltPair.first == Peripheral.ConnectableState.CONNECTED)
.map(statePeripheraltPair -> statePeripheraltPair.second);
}