in src/eventHubManager.ts [21:61]
public async startMonitorCustomEventHubEndpoint(eventHubItem: EventHubItem) {
if (this._isMonitoring) {
this._outputChannel.show();
this.outputLine(Constants.IoTHubMonitorLabel, "There is a running job to monitor custom Event Hub endpoint. Please stop it first.");
return;
}
try {
TelemetryClient.sendEvent(Constants.IoTHubAIEHStartMonitorEvent);
this._outputChannel.show();
this.outputLine(Constants.EventHubMonitorLabel, `Start monitoring message arrived in custom Event Hub endpoint [${eventHubItem.eventHubProperty.name}] ...`);
const eventHubClient = createAzureClient({
credentials: eventHubItem.azureSubscription.session.credentials2,
environment: eventHubItem.azureSubscription.session.environment,
subscriptionId: eventHubItem.azureSubscription.subscription.subscriptionId
}, EventHubManagementClient);
const connectionString = (await eventHubClient.namespaces.listKeys(eventHubItem.eventHubProperty.resourceGroup,
this.getNamespacefromConnectionString(eventHubItem.eventHubProperty.connectionString), "RootManageSharedAccessKey")).primaryConnectionString;
this._eventHubClient = new EventHubConsumerClient("$Default", connectionString, this.getEntityPathfromConnectionString(eventHubItem.eventHubProperty.connectionString));
const partitionIds = await this._eventHubClient.getPartitionIds();
this.updateMonitorStatus(true);
partitionIds.forEach((partitionId) => {
this.outputLine(Constants.EventHubMonitorLabel, `Created partition receiver [${partitionId}]`);
this._eventHubClient.subscribe(partitionId,
{
processEvents: this.onMessage,
processError: this.onError
},
{
startPosition: {enqueuedOn: Date.now()}
}
);
});
} catch (error) {
this.updateMonitorStatus(false);
this.outputLine(Constants.EventHubMonitorLabel, error);
TelemetryClient.sendEvent(Constants.IoTHubAIEHStartMonitorEvent, { Result: "Exception", [Constants.errorProperties.Message]: error });
}
}