in core/src/main/scala/org/apache/pekko/persistence/cassandra/EventsByTagMigration.scala [41:76]
def apply(systemProvider: ClassicActorSystemProvider): EventsByTagMigration =
new EventsByTagMigration(systemProvider)
// Extracts a Cassandra Row, assuming the pre 0.80 schema into a [[RawEvent]]
def rawPayloadOldTagSchemaExtractor(
bucketSize: BucketSize,
systemProvider: ClassicActorSystemProvider): Extractor[RawEvent] =
new Extractor[RawEvent] {
// TODO check this is only created once
val columnDefinitionCache = new ColumnDefinitionCache
val serialization = SerializationExtension(systemProvider.classicSystem)
override def extract(row: Row, async: Boolean)(implicit ec: ExecutionContext): Future[RawEvent] = {
// Get the tags from the old location i.e. tag1, tag2, tag3
val tags: Set[String] =
if (columnDefinitionCache.hasOldTagsColumns(row)) {
(1 to 3).foldLeft(Set.empty[String]) {
case (acc, i) =>
val tag = row.getString(s"tag$i")
if (tag != null) {
acc + tag
} else acc
}
} else {
Set.empty
}
Extractors.deserializeRawEvent(
systemProvider.classicSystem,
bucketSize,
columnDefinitionCache,
tags,
serialization,
row)
}
}