def withConnectionFactory()

in jms/src/main/scala/org/apache/pekko/stream/connectors/jms/JmsConsumerSettings.scala [43:161]


  def withConnectionFactory(value: javax.jms.ConnectionFactory): JmsConsumerSettings = copy(connectionFactory = value)

  /** Configure connection retrying. */
  def withConnectionRetrySettings(value: ConnectionRetrySettings): JmsConsumerSettings =
    copy(connectionRetrySettings = value)

  /** Set a queue name to read from. */
  def withQueue(name: String): JmsConsumerSettings = copy(destination = Some(Queue(name)))

  /** Set a topic name to listen to. */
  def withTopic(name: String): JmsConsumerSettings = copy(destination = Some(Topic(name)))

  /** Set a durable topic name to listen to, with a unique subscriber name. */
  def withDurableTopic(name: String, subscriberName: String): JmsConsumerSettings =
    copy(destination = Some(DurableTopic(name, subscriberName)))

  /** Set a JMS to subscribe to. Allows for custom handling with [[pekko.stream.connectors.jms.CustomDestination CustomDestination]]. */
  def withDestination(value: Destination): JmsConsumerSettings = copy(destination = Option(value))

  /** Set JMS broker credentials. */
  def withCredentials(value: Credentials): JmsConsumerSettings = copy(credentials = Option(value))

  /**
   * Number of parallel sessions to use for receiving JMS messages.
   */
  def withSessionCount(value: Int): JmsConsumerSettings = copy(sessionCount = value)

  /** Buffer size for maximum number for messages read from JMS when there is no demand. */
  def withBufferSize(value: Int): JmsConsumerSettings = copy(bufferSize = value)

  /**
   * JMS selector expression.
   *
   * @see https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html
   */
  def withSelector(value: String): JmsConsumerSettings = copy(selector = Option(value))

  /** Set an explicit acknowledge mode. (Consumers have specific defaults.) */
  def withAcknowledgeMode(value: AcknowledgeMode): JmsConsumerSettings = copy(acknowledgeMode = Option(value))

  /** Timeout for acknowledge. (Used by TX consumers.) */
  def withAckTimeout(value: scala.concurrent.duration.Duration): JmsConsumerSettings = copy(ackTimeout = value)

  /** Java API: Timeout for acknowledge. (Used by TX consumers.) */
  def withAckTimeout(value: java.time.Duration): JmsConsumerSettings = copy(ackTimeout = value.asScala)

  /** Max interval before sending queued acknowledges back to the broker. (Used by AckSources.) */
  def withMaxAckInterval(value: scala.concurrent.duration.FiniteDuration): JmsConsumerSettings =
    copy(maxAckInterval = Option(value))

  /** Java API: Max interval before sending queued acknowledges back to the broker. (Used by AckSources.) */
  def withMaxAckInterval(value: java.time.Duration): JmsConsumerSettings =
    copy(maxAckInterval = Option(value.asScala))

  /** Max number of acks queued by AckSource before they are sent to broker. (Unless MaxAckInterval is specified) */
  def withMaxPendingAcks(value: Int): JmsConsumerSettings = copy(maxPendingAcks = value)

  /**
   * For use with transactions, if true the stream fails if Pekko Connectors rolls back the transaction when `ackTimeout` is hit.
   */
  def withFailStreamOnAckTimeout(value: Boolean): JmsConsumerSettings =
    if (failStreamOnAckTimeout == value) this else copy(failStreamOnAckTimeout = value)

  /** Timeout for connection status subscriber */
  def withConnectionStatusSubscriptionTimeout(value: FiniteDuration): JmsConsumerSettings =
    copy(connectionStatusSubscriptionTimeout = value)

  /** Java API: Timeout for connection status subscriber */
  def withConnectionStatusSubscriptionTimeout(value: java.time.Duration): JmsConsumerSettings =
    copy(connectionStatusSubscriptionTimeout = value.asScala)

  private def copy(
      connectionFactory: javax.jms.ConnectionFactory = connectionFactory,
      connectionRetrySettings: ConnectionRetrySettings = connectionRetrySettings,
      destination: Option[Destination] = destination,
      credentials: Option[Credentials] = credentials,
      sessionCount: Int = sessionCount,
      bufferSize: Int = bufferSize,
      selector: Option[String] = selector,
      acknowledgeMode: Option[AcknowledgeMode] = acknowledgeMode,
      ackTimeout: scala.concurrent.duration.Duration = ackTimeout,
      maxAckInterval: Option[scala.concurrent.duration.FiniteDuration] = maxAckInterval,
      maxPendingAcks: Int = maxPendingAcks,
      failStreamOnAckTimeout: Boolean = failStreamOnAckTimeout,
      connectionStatusSubscriptionTimeout: scala.concurrent.duration.FiniteDuration =
        connectionStatusSubscriptionTimeout): JmsConsumerSettings = new JmsConsumerSettings(
    connectionFactory = connectionFactory,
    connectionRetrySettings = connectionRetrySettings,
    destination = destination,
    credentials = credentials,
    sessionCount = sessionCount,
    bufferSize = bufferSize,
    selector = selector,
    acknowledgeMode = acknowledgeMode,
    ackTimeout = ackTimeout,
    maxAckInterval = maxAckInterval,
    maxPendingAcks = maxPendingAcks,
    failStreamOnAckTimeout = failStreamOnAckTimeout,
    connectionStatusSubscriptionTimeout = connectionStatusSubscriptionTimeout)

  override def toString =
    "JmsConsumerSettings(" +
    s"connectionFactory=$connectionFactory," +
    s"connectionRetrySettings=$connectionRetrySettings," +
    s"destination=$destination," +
    s"credentials=$credentials," +
    s"sessionCount=$sessionCount," +
    s"bufferSize=$bufferSize," +
    s"selector=$selector," +
    s"acknowledgeMode=${acknowledgeMode.map(m => AcknowledgeMode.asString(m))}," +
    s"ackTimeout=${ackTimeout.toCoarsest}," +
    s"maxAckInterval=${maxAckInterval.map(_.toCoarsest)}," +
    s"maxPendingAcks=$maxPendingAcks," +
    s"failStreamOnAckTimeout=$failStreamOnAckTimeout," +
    s"connectionStatusSubscriptionTimeout=${connectionStatusSubscriptionTimeout.toCoarsest}" +
    ")"
}

object JmsConsumerSettings {