def apply()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/EventsByTagMigration.scala [41:76]


  def apply(systemProvider: ClassicActorSystemProvider): EventsByTagMigration =
    new EventsByTagMigration(systemProvider)

  // Extracts a Cassandra Row, assuming the pre 0.80 schema into a [[RawEvent]]
  def rawPayloadOldTagSchemaExtractor(
      bucketSize: BucketSize,
      systemProvider: ClassicActorSystemProvider): Extractor[RawEvent] =
    new Extractor[RawEvent] {

      // TODO check this is only created once
      val columnDefinitionCache = new ColumnDefinitionCache
      val serialization = SerializationExtension(systemProvider.classicSystem)

      override def extract(row: Row, async: Boolean)(implicit ec: ExecutionContext): Future[RawEvent] = {
        // Get the tags from the old location i.e. tag1, tag2, tag3
        val tags: Set[String] =
          if (columnDefinitionCache.hasOldTagsColumns(row)) {
            (1 to 3).foldLeft(Set.empty[String]) {
              case (acc, i) =>
                val tag = row.getString(s"tag$i")
                if (tag != null) {
                  acc + tag
                } else acc
            }
          } else {
            Set.empty
          }
        Extractors.deserializeRawEvent(
          systemProvider.classicSystem,
          bucketSize,
          columnDefinitionCache,
          tags,
          serialization,
          row)
      }
    }