in AWSIoTTests/AWSIoTDataManagerTests.swift [431:643]
func testMultiBroker(useWebSocket: Bool, burst: Bool, totalMessages: Int, messagesPerSecond: Int) {
//Variables and Expectations for Broker 1
var messageCountBroker1 = 0
var broker1Disconnected = false
var broker1Connected = false
let broker1SubConfirmed = self.expectation(description: "Subscription to broker 1 has been established")
let hasConnectedBroker1 = self.expectation(description: "MQTT connection has been established with Broker 1 Broker")
let hasDisconnectedBroker1 = self.expectation(description: "Disconnected from Broker1 Broker")
var disconnectForBroker1Issued = false
//Variables and Expectations for Broker 2
var messageCountBroker2 = 0
var broker2Disconnected = false
var broker2Connected = false
let broker2SubConfirmed = self.expectation(description: "Subscription to broker 2 has been established")
let hasConnectedBroker2 = self.expectation(description: "MQTT connection has been established with Broker 2 Broker")
let hasDisconnectedBroker2 = self.expectation(description: "Disconnected from Broker2 Broker")
var disconnectForBroker2Issued = false
func mqttEventCallbackBroker1( _ status: AWSIoTMQTTStatus )
{
print("Broker1 connection status = \(status.rawValue)")
switch(status)
{
case .connecting:
print ("Connecting...")
case .connected:
print("Connected")
if (!broker1Connected) {
broker1Connected = true
hasConnectedBroker1.fulfill()
}
case .disconnected:
print("Disconnected")
if (disconnectForBroker1Issued) {
broker1Disconnected = true
hasDisconnectedBroker1.fulfill()
}
case .connectionRefused:
print("Connection Refused")
case .connectionError:
print("Connection Error")
case .protocolError:
print("Protocol Error")
default:
print("Unknown state: \(status.rawValue)")
}
}
func mqttEventCallbackBroker2( _ status: AWSIoTMQTTStatus )
{
print("Broker 2 connection status = \(status.rawValue)")
switch(status)
{
case .connecting:
print ("Connecting...")
case .connected:
print("Connected")
if (!broker2Connected) {
broker2Connected = true
hasConnectedBroker2.fulfill()
}
case .disconnected:
print("Disconnected")
if (disconnectForBroker2Issued ) {
broker2Disconnected = true
hasDisconnectedBroker2.fulfill()
}
case .connectionRefused:
print("Connection Refused")
case .connectionError:
print("Connection Error")
case .protocolError:
print("Protocol Error")
default:
print("Unknown state: \(status.rawValue)")
}
}
//Connect to Broker 1
let iotDataManagerBroker1:AWSIoTDataManager = AWSIoTDataManager(forKey: "iot-data-manager-broker1")
let uuidBroker1 = UUID().uuidString
print("Calling Connect to Broker1")
let defaults = UserDefaults.standard
if (useWebSocket) {
iotDataManagerBroker1.connectUsingWebSocket(withClientId: uuidBroker1, cleanSession: true, statusCallback: mqttEventCallbackBroker1)
print("Connect call with Broker1 completed")
}
else {
iotDataManagerBroker1.connect( withClientId: uuidBroker1, cleanSession:true, certificateId:defaults.string(forKey: "TestCertBroker1")!, statusCallback: mqttEventCallbackBroker1)
print("Connect call with Broker1 completed")
}
wait(for:[hasConnectedBroker1], timeout: 30)
XCTAssertTrue(broker1Connected, "Successfully established MQTT Connection with Broker1")
if (!broker1Connected) {
return
}
//Connect to Broker 2
let iotDataManagerBroker2:AWSIoTDataManager = AWSIoTDataManager(forKey: "iot-data-manager-broker2")
let uuidBroker2 = UUID().uuidString
print("Calling Connect to Broker 2")
if (useWebSocket) {
iotDataManagerBroker2.connectUsingWebSocket(withClientId: uuidBroker2, cleanSession: true, statusCallback: mqttEventCallbackBroker2)
print("Connect call with Broker2 completed")
}
else {
iotDataManagerBroker2.connect( withClientId: uuidBroker2, cleanSession:true,certificateId:defaults.string(forKey: "TestCertBroker2")!, statusCallback: mqttEventCallbackBroker2)
print("Connect call with Broker2 completed")
}
wait(for:[hasConnectedBroker2], timeout: 30)
XCTAssertTrue(broker2Connected, "Successfully established MQTT Connection with Broker 2")
if (!broker2Connected) {
return
}
let testMessageBroker1 = "Test Message Broker1"
let testMessageBroker2 = "Test Message Broker2"
let testTopicBroker1 = "TestTopicBroker1"
let testTopicBroker2 = "TestTopicBroker2"
//Subscribe to TestTopic on Broker1
iotDataManagerBroker1.subscribe(toTopic: testTopicBroker1, qoS: .messageDeliveryAttemptedAtLeastOnce, messageCallback: {
(payload) ->Void in
let stringValue:String = NSString(data: payload, encoding: String.Encoding.utf8.rawValue)! as String
XCTAssertEqual(testMessageBroker1, stringValue)
messageCountBroker1 = messageCountBroker1+1
print("Broker1 received: ", messageCountBroker1)
}, ackCallback: {
broker1SubConfirmed.fulfill()
})
wait(for:[broker1SubConfirmed], timeout: 30)
//Subscribe to TestTopic on Broker 2
iotDataManagerBroker2.subscribe(toTopic: testTopicBroker2, qoS: .messageDeliveryAttemptedAtLeastOnce, messageCallback: {
(payload) ->Void in
let stringValue:String = NSString(data: payload, encoding: String.Encoding.utf8.rawValue)! as String
XCTAssertEqual(testMessageBroker2, stringValue)
messageCountBroker2 = messageCountBroker2+1
//print("Broker2 received: ", messageCountBroker2)
}, ackCallback: {
broker2SubConfirmed.fulfill()
})
wait(for:[broker2SubConfirmed], timeout: 30)
//Publish to TestTopic n times each at x messages per second
let outerLoop = (totalMessages/messagesPerSecond)
let burstSize = 200
var messagesToSend = totalMessages
if (burst) {
messagesToSend = totalMessages + burstSize
for _ in 1...burstSize {
iotDataManagerBroker1.publishString(testMessageBroker1, onTopic:testTopicBroker1, qoS:.messageDeliveryAttemptedAtLeastOnce)
iotDataManagerBroker2.publishString(testMessageBroker2, onTopic:testTopicBroker2, qoS:.messageDeliveryAttemptedAtLeastOnce)
}
sleep(1)
}
if (totalMessages > 1) {
for _ in 1...outerLoop {
for _ in 1...messagesPerSecond {
iotDataManagerBroker1.publishString(testMessageBroker1, onTopic:testTopicBroker1, qoS:.messageDeliveryAttemptedAtLeastOnce)
iotDataManagerBroker2.publishString(testMessageBroker2, onTopic:testTopicBroker2, qoS:.messageDeliveryAttemptedAtLeastOnce)
}
sleep(1)
print("Published batch of \(messagesPerSecond) to each broker")
print("Received \(messageCountBroker1) so far from Broker1")
print("Received \(messageCountBroker2) so far from Broker2")
}
}
print("Published \(messagesToSend) Messages to each broker")
print("sleeping for 30 seconds for the client retry to happen if necessary")
sleep(30)
print("Total message count from Broker1:", messageCountBroker1)
print("Total message count from Broker2:", messageCountBroker2)
XCTAssertGreaterThanOrEqual(messageCountBroker1, messagesToSend, "Received \(messagesToSend) plus messages on Broker1")
// allows for some leeway since some messages could be delayed
XCTAssertGreaterThanOrEqual(messageCountBroker2, messagesToSend - 3, "Received \(messagesToSend) plus messages on Broker2")
//Disconnect
iotDataManagerBroker1.disconnect()
disconnectForBroker1Issued = true
wait(for:[hasDisconnectedBroker1], timeout: 30)
XCTAssertTrue(broker1Disconnected)
iotDataManagerBroker2.disconnect()
disconnectForBroker2Issued = true
wait(for:[hasDisconnectedBroker2], timeout: 30)
XCTAssertTrue(broker2Disconnected)
}