in src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs [185:255]
private Source CreateSource()
{
Source source = new Source();
source.Address = AmqpDestinationHelper.GetDestinationAddress(info.Destination, session.Connection);
source.Outcomes = new[]
{
SymbolUtil.ATTACH_OUTCOME_ACCEPTED,
SymbolUtil.ATTACH_OUTCOME_RELEASED,
SymbolUtil.ATTACH_OUTCOME_REJECTED,
SymbolUtil.ATTACH_OUTCOME_MODIFIED
};
source.DefaultOutcome = MessageSupport.MODIFIED_FAILED_INSTANCE;
if (info.IsDurable)
{
source.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_NEVER;
source.Durable = (int) TerminusDurability.UNSETTLED_STATE;
source.DistributionMode = SymbolUtil.ATTACH_DISTRIBUTION_MODE_COPY;
}
else
{
source.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_SESSION_END;
source.Durable = (int) TerminusDurability.NONE;
}
if (info.IsBrowser)
{
source.DistributionMode = SymbolUtil.ATTACH_DISTRIBUTION_MODE_COPY;
}
IList<Symbol> capabilities = new List<Symbol>();
Symbol typeCapability = SymbolUtil.GetTerminusCapabilitiesForDestination(info.Destination);
if (typeCapability != null)
{
capabilities.Add(typeCapability);
}
if (info.IsShared) {
capabilities.Add(SymbolUtil.SHARED);
if(!info.IsExplicitClientId) {
capabilities.Add(SymbolUtil.GLOBAL);
}
}
if (capabilities.Any()) {
source.Capabilities = capabilities.ToArray();
}
Map filters = new Map();
if (info.NoLocal)
{
filters.Add(SymbolUtil.ATTACH_FILTER_NO_LOCAL, AmqpNmsNoLocalType.NO_LOCAL);
}
if (info.HasSelector())
{
filters.Add(SymbolUtil.ATTACH_FILTER_SELECTOR, new AmqpNmsSelectorType(info.Selector));
}
if (filters.Count > 0)
{
source.FilterSet = filters;
}
return source;
}