def readRecordsMerged()

in google-cloud-bigquery-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/storage/scaladsl/BigQueryAvroStorage.scala [35:99]


  def readRecordsMerged(projectId: String,
      datasetId: String,
      tableId: String,
      readOptions: Option[TableReadOptions] = None,
      maxNumStreams: Int = 0): Source[Seq[BigQueryRecord], Future[NotUsed]] =
    readAndMapTo(projectId,
      datasetId,
      tableId,
      readOptions,
      maxNumStreams,
      (_, client, session) => AvroSource.readRecordsMerged(client, session))
      .flatMapConcat(a => a)

  def readRecords(projectId: String,
      datasetId: String,
      tableId: String,
      readOptions: Option[TableReadOptions] = None,
      maxNumStreams: Int = 0): Source[Seq[Source[BigQueryRecord, NotUsed]], Future[NotUsed]] =
    readAndMapTo(projectId,
      datasetId,
      tableId,
      readOptions,
      maxNumStreams,
      (_, client, session) => AvroSource.readRecords(client, session))

  def readMerged(projectId: String,
      datasetId: String,
      tableId: String,
      readOptions: Option[TableReadOptions] = None,
      maxNumStreams: Int = 0): Source[(AvroSchema, Source[AvroRows, NotUsed]), Future[NotUsed]] =
    readAndMapTo(projectId,
      datasetId,
      tableId,
      readOptions,
      maxNumStreams,
      (schema, client, session) => (schema, AvroSource.readMerged(client, session)))

  def read(projectId: String,
      datasetId: String,
      tableId: String,
      readOptions: Option[TableReadOptions] = None,
      maxNumStreams: Int = 0): Source[(AvroSchema, Seq[Source[AvroRows, NotUsed]]), Future[NotUsed]] =
    readAndMapTo(projectId,
      datasetId,
      tableId,
      readOptions,
      maxNumStreams,
      (schema, client, session) => (schema, AvroSource.read(client, session)))

  private def readAndMapTo[T](projectId: String,
      datasetId: String,
      tableId: String,
      readOptions: Option[TableReadOptions],
      maxNumStreams: Int,
      fx: (AvroSchema, BigQueryReadClient, ReadSession) => T): Source[T, Future[NotUsed]] =
    Source.fromMaterializer { (mat, attr) =>
      val client = reader(mat.system, attr).client
      readSession(client, projectId, datasetId, tableId, DataFormat.AVRO, readOptions, maxNumStreams)
        .map { session =>
          session.schema match {
            case ReadSession.Schema.AvroSchema(schema) => fx(schema, client, session)
            case other                                 => throw new IllegalArgumentException(s"Only Arrow format is supported, received: $other")
          }
        }
    }