func testMultiBroker()

in AWSIoTTests/AWSIoTDataManagerTests.swift [431:643]


    func testMultiBroker(useWebSocket: Bool, burst: Bool, totalMessages: Int, messagesPerSecond: Int) {

        //Variables and Expectations for Broker 1
        var messageCountBroker1 = 0
        var broker1Disconnected = false
        var broker1Connected = false
        let broker1SubConfirmed  = self.expectation(description: "Subscription to broker 1 has been established")
        let hasConnectedBroker1 = self.expectation(description: "MQTT connection has been established with Broker 1 Broker")
        let hasDisconnectedBroker1 = self.expectation(description: "Disconnected from Broker1 Broker")
        var disconnectForBroker1Issued = false

        //Variables and Expectations for Broker 2
        var messageCountBroker2 = 0
        var broker2Disconnected = false
        var broker2Connected = false
        let broker2SubConfirmed  = self.expectation(description: "Subscription to broker 2 has been established")
        let hasConnectedBroker2 = self.expectation(description: "MQTT connection has been established with Broker 2 Broker")
        let hasDisconnectedBroker2 = self.expectation(description: "Disconnected from Broker2 Broker")
        var disconnectForBroker2Issued = false

        func mqttEventCallbackBroker1( _ status: AWSIoTMQTTStatus )
        {
            print("Broker1 connection status = \(status.rawValue)")
            switch(status)
            {
            case .connecting:
                print ("Connecting...")

            case .connected:
                print("Connected")
                if (!broker1Connected) {
                    broker1Connected = true
                    hasConnectedBroker1.fulfill()
                }
            case .disconnected:
                print("Disconnected")
                if (disconnectForBroker1Issued) {
                    broker1Disconnected = true
                    hasDisconnectedBroker1.fulfill()
                }
            case .connectionRefused:
                print("Connection Refused")

            case .connectionError:
                print("Connection Error")

            case .protocolError:
                print("Protocol Error")

            default:
                print("Unknown state: \(status.rawValue)")
            }
        }

        func mqttEventCallbackBroker2( _ status: AWSIoTMQTTStatus )
        {
            print("Broker 2 connection status = \(status.rawValue)")
            switch(status)
            {
            case .connecting:
                print ("Connecting...")

            case .connected:
                print("Connected")
                if (!broker2Connected) {
                    broker2Connected = true
                    hasConnectedBroker2.fulfill()
                }

            case .disconnected:
                print("Disconnected")
                if (disconnectForBroker2Issued ) {
                    broker2Disconnected = true
                    hasDisconnectedBroker2.fulfill()
                }

            case .connectionRefused:
                print("Connection Refused")

            case .connectionError:
                print("Connection Error")

            case .protocolError:
                print("Protocol Error")

            default:
                print("Unknown state: \(status.rawValue)")
            }
        }

        //Connect to Broker 1
        let iotDataManagerBroker1:AWSIoTDataManager = AWSIoTDataManager(forKey: "iot-data-manager-broker1")
        let uuidBroker1 = UUID().uuidString
        print("Calling Connect to Broker1")
        let defaults = UserDefaults.standard

        if (useWebSocket) {
            iotDataManagerBroker1.connectUsingWebSocket(withClientId: uuidBroker1, cleanSession: true, statusCallback: mqttEventCallbackBroker1)
             print("Connect call with Broker1 completed")
        }
        else {
            iotDataManagerBroker1.connect( withClientId: uuidBroker1, cleanSession:true, certificateId:defaults.string(forKey: "TestCertBroker1")!, statusCallback: mqttEventCallbackBroker1)
            print("Connect call with Broker1 completed")
        }

        wait(for:[hasConnectedBroker1], timeout: 30)
        XCTAssertTrue(broker1Connected, "Successfully established MQTT Connection with Broker1")
        if (!broker1Connected) {
            return
        }

        //Connect to Broker 2
        let iotDataManagerBroker2:AWSIoTDataManager = AWSIoTDataManager(forKey: "iot-data-manager-broker2")
        let uuidBroker2 = UUID().uuidString
        print("Calling Connect to Broker 2")

        if (useWebSocket) {
            iotDataManagerBroker2.connectUsingWebSocket(withClientId: uuidBroker2, cleanSession: true, statusCallback: mqttEventCallbackBroker2)
            print("Connect call with Broker2 completed")
        }
        else {
            iotDataManagerBroker2.connect( withClientId: uuidBroker2, cleanSession:true,certificateId:defaults.string(forKey: "TestCertBroker2")!, statusCallback: mqttEventCallbackBroker2)
            print("Connect call with Broker2 completed")
        }

        wait(for:[hasConnectedBroker2], timeout: 30)
        XCTAssertTrue(broker2Connected, "Successfully established MQTT Connection with Broker 2")
        if (!broker2Connected) {
            return
        }

        let testMessageBroker1 = "Test Message Broker1"
        let testMessageBroker2 = "Test Message Broker2"
        let testTopicBroker1 = "TestTopicBroker1"
        let testTopicBroker2 = "TestTopicBroker2"

        //Subscribe to TestTopic on Broker1
        iotDataManagerBroker1.subscribe(toTopic: testTopicBroker1, qoS: .messageDeliveryAttemptedAtLeastOnce, messageCallback: {
            (payload) ->Void in
            let stringValue:String = NSString(data: payload, encoding: String.Encoding.utf8.rawValue)! as String
            XCTAssertEqual(testMessageBroker1, stringValue)
            messageCountBroker1 = messageCountBroker1+1
            print("Broker1 received: ", messageCountBroker1)
        },  ackCallback: {
            broker1SubConfirmed.fulfill()
        })
        wait(for:[broker1SubConfirmed], timeout: 30)

        //Subscribe to TestTopic on Broker 2
        iotDataManagerBroker2.subscribe(toTopic: testTopicBroker2, qoS: .messageDeliveryAttemptedAtLeastOnce, messageCallback: {
            (payload) ->Void in
            let stringValue:String = NSString(data: payload, encoding: String.Encoding.utf8.rawValue)! as String
            XCTAssertEqual(testMessageBroker2, stringValue)
            messageCountBroker2 = messageCountBroker2+1
            //print("Broker2 received: ", messageCountBroker2)
        },  ackCallback: {
            broker2SubConfirmed.fulfill()
        })
        wait(for:[broker2SubConfirmed], timeout: 30)

        //Publish to TestTopic n times each at x messages per second
        let outerLoop = (totalMessages/messagesPerSecond)
        let burstSize = 200
        var messagesToSend = totalMessages

        if (burst) {
            messagesToSend = totalMessages + burstSize
            for _ in 1...burstSize {
                iotDataManagerBroker1.publishString(testMessageBroker1, onTopic:testTopicBroker1, qoS:.messageDeliveryAttemptedAtLeastOnce)
                iotDataManagerBroker2.publishString(testMessageBroker2, onTopic:testTopicBroker2, qoS:.messageDeliveryAttemptedAtLeastOnce)
            }
            sleep(1)
        }

        if (totalMessages > 1) {
            for _ in 1...outerLoop {
                for _ in 1...messagesPerSecond {
                    iotDataManagerBroker1.publishString(testMessageBroker1, onTopic:testTopicBroker1, qoS:.messageDeliveryAttemptedAtLeastOnce)
                    iotDataManagerBroker2.publishString(testMessageBroker2, onTopic:testTopicBroker2, qoS:.messageDeliveryAttemptedAtLeastOnce)
                }
                sleep(1)
                print("Published batch of \(messagesPerSecond) to each broker")

                print("Received \(messageCountBroker1) so far from Broker1")
                print("Received \(messageCountBroker2) so far from Broker2")
            }
        }

        print("Published \(messagesToSend) Messages to each broker")

        print("sleeping for 30 seconds for the client retry to happen if necessary")
        sleep(30)

        print("Total message count from Broker1:", messageCountBroker1)
        print("Total message count from Broker2:", messageCountBroker2)

        XCTAssertGreaterThanOrEqual(messageCountBroker1, messagesToSend, "Received \(messagesToSend) plus messages on Broker1")
        // allows for some leeway since some messages could be delayed
        XCTAssertGreaterThanOrEqual(messageCountBroker2, messagesToSend - 3, "Received \(messagesToSend) plus messages on Broker2")

        //Disconnect
        iotDataManagerBroker1.disconnect()
        disconnectForBroker1Issued = true
        wait(for:[hasDisconnectedBroker1], timeout: 30)
        XCTAssertTrue(broker1Disconnected)


        iotDataManagerBroker2.disconnect()
        disconnectForBroker2Issued = true
        wait(for:[hasDisconnectedBroker2], timeout: 30)
        XCTAssertTrue(broker2Disconnected)

    }