private Observable connect()

in rx-central-ble/src/main/java/com/uber/rxcentralble/core/CoreConnectionManager.java [155:180]


  private Observable<Peripheral> connect(BluetoothDevice bluetoothDevice) {
    stateRelay.accept(State.CONNECTING);

    Peripheral peripheral = peripheralFactory.produce(bluetoothDevice, context);

    Observable<Pair<Peripheral.ConnectableState, Peripheral>> peripheralConnection =
            peripheral
                    .connect()
                    .withLatestFrom(Observable.just(peripheral), Pair::new);

    Observable<Peripheral.ConnectableState> peripheralConnectionTimeout =
            peripheral
                    .connect()
                    .filter(s -> s == Peripheral.ConnectableState.CONNECTED)
                    .firstOrError()
                    .timeout(
                            connectionTimeoutMs,
                            TimeUnit.MILLISECONDS,
                            Single.error(new ConnectionError(CONNECT_TIMEOUT)))
                    .toObservable();

    return Observable.combineLatest(peripheralConnection, peripheralConnectionTimeout,
        (connection, timeout) -> connection)
            .filter(statePeripheraltPair -> statePeripheraltPair.first == Peripheral.ConnectableState.CONNECTED)
            .map(statePeripheraltPair -> statePeripheraltPair.second);
  }