in google-cloud-bigquery-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/storage/scaladsl/BigQueryAvroStorage.scala [35:99]
def readRecordsMerged(projectId: String,
datasetId: String,
tableId: String,
readOptions: Option[TableReadOptions] = None,
maxNumStreams: Int = 0): Source[Seq[BigQueryRecord], Future[NotUsed]] =
readAndMapTo(projectId,
datasetId,
tableId,
readOptions,
maxNumStreams,
(_, client, session) => AvroSource.readRecordsMerged(client, session))
.flatMapConcat(a => a)
def readRecords(projectId: String,
datasetId: String,
tableId: String,
readOptions: Option[TableReadOptions] = None,
maxNumStreams: Int = 0): Source[Seq[Source[BigQueryRecord, NotUsed]], Future[NotUsed]] =
readAndMapTo(projectId,
datasetId,
tableId,
readOptions,
maxNumStreams,
(_, client, session) => AvroSource.readRecords(client, session))
def readMerged(projectId: String,
datasetId: String,
tableId: String,
readOptions: Option[TableReadOptions] = None,
maxNumStreams: Int = 0): Source[(AvroSchema, Source[AvroRows, NotUsed]), Future[NotUsed]] =
readAndMapTo(projectId,
datasetId,
tableId,
readOptions,
maxNumStreams,
(schema, client, session) => (schema, AvroSource.readMerged(client, session)))
def read(projectId: String,
datasetId: String,
tableId: String,
readOptions: Option[TableReadOptions] = None,
maxNumStreams: Int = 0): Source[(AvroSchema, Seq[Source[AvroRows, NotUsed]]), Future[NotUsed]] =
readAndMapTo(projectId,
datasetId,
tableId,
readOptions,
maxNumStreams,
(schema, client, session) => (schema, AvroSource.read(client, session)))
private def readAndMapTo[T](projectId: String,
datasetId: String,
tableId: String,
readOptions: Option[TableReadOptions],
maxNumStreams: Int,
fx: (AvroSchema, BigQueryReadClient, ReadSession) => T): Source[T, Future[NotUsed]] =
Source.fromMaterializer { (mat, attr) =>
val client = reader(mat.system, attr).client
readSession(client, projectId, datasetId, tableId, DataFormat.AVRO, readOptions, maxNumStreams)
.map { session =>
session.schema match {
case ReadSession.Schema.AvroSchema(schema) => fx(schema, client, session)
case other => throw new IllegalArgumentException(s"Only Arrow format is supported, received: $other")
}
}
}