in google-cloud-bigquery-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/storage/scaladsl/BigQueryArrowStorage.scala [35:99]
def readRecordsMerged(projectId: String,
datasetId: String,
tableId: String,
readOptions: Option[TableReadOptions] = None,
maxNumStreams: Int = 0): Source[Seq[BigQueryRecord], Future[NotUsed]] =
readAndMapTo(projectId,
datasetId,
tableId,
readOptions,
maxNumStreams,
(_, client, session) => ArrowSource.readRecordsMerged(client, session))
.flatMapConcat(a => a)
def readRecords(projectId: String,
datasetId: String,
tableId: String,
readOptions: Option[TableReadOptions] = None,
maxNumStreams: Int = 0): Source[Seq[Source[BigQueryRecord, NotUsed]], Future[NotUsed]] =
readAndMapTo(projectId,
datasetId,
tableId,
readOptions,
maxNumStreams,
(_, client, session) => ArrowSource.readRecords(client, session))
def readMerged(projectId: String,
datasetId: String,
tableId: String,
readOptions: Option[TableReadOptions] = None,
maxNumStreams: Int = 0): Source[(ArrowSchema, Source[ArrowRecordBatch, NotUsed]), Future[NotUsed]] =
readAndMapTo(projectId,
datasetId,
tableId,
readOptions,
maxNumStreams,
(schema, client, session) => (schema, ArrowSource.readMerged(client, session)))
def read(projectId: String,
datasetId: String,
tableId: String,
readOptions: Option[TableReadOptions] = None,
maxNumStreams: Int = 0): Source[(ArrowSchema, Seq[Source[ArrowRecordBatch, NotUsed]]), Future[NotUsed]] =
readAndMapTo(projectId,
datasetId,
tableId,
readOptions,
maxNumStreams,
(schema, client, session) => (schema, ArrowSource.read(client, session)))
private def readAndMapTo[T](projectId: String,
datasetId: String,
tableId: String,
readOptions: Option[TableReadOptions],
maxNumStreams: Int,
fx: (ArrowSchema, BigQueryReadClient, ReadSession) => T): Source[T, Future[NotUsed]] =
Source.fromMaterializer { (mat, attr) =>
val client = reader(mat.system, attr).client
readSession(client, projectId, datasetId, tableId, DataFormat.ARROW, readOptions, maxNumStreams)
.map { session =>
session.schema match {
case ReadSession.Schema.ArrowSchema(schema) => fx(schema, client, session)
case other => throw new IllegalArgumentException(s"Only Arrow format is supported, received: $other")
}
}
}