public SSTableReader()

in cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java [229:408]


    public SSTableReader(@NotNull TableMetadata metadata,
                         @NotNull SSTable ssTable,
                         @Nullable SparkRangeFilter sparkRangeFilter,
                         @NotNull List<PartitionKeyFilter> partitionKeyFilters,
                         @Nullable PruneColumnFilter columnFilter,
                         boolean readIndexOffset,
                         @NotNull Stats stats,
                         boolean useIncrementalRepair,
                         boolean isRepairPrimary,
                         @NotNull Function<StatsMetadata, Boolean> isRepaired) throws IOException
    {
        long startTimeNanos = System.nanoTime();
        long now;
        this.ssTable = ssTable;
        this.stats = stats;
        this.isRepaired = isRepaired;
        this.sparkRangeFilter = sparkRangeFilter;

        File file = constructFilename(metadata.keyspace, metadata.name, ssTable.getDataFileName());
        Descriptor descriptor = Descriptor.fromFilename(file);
        this.version = descriptor.version;

        SummaryDbUtils.Summary summary = null;
        Pair<DecoratedKey, DecoratedKey> keys = Pair.create(null, null);
        try
        {
            now = System.nanoTime();
            summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable);
            stats.readSummaryDb(ssTable, System.nanoTime() - now);
            keys = Pair.create(summary.first(), summary.last());
        }
        catch (IOException exception)
        {
            LOGGER.warn("Failed to read Summary.db file ssTable='{}'", ssTable, exception);
        }

        if (keys.left == null || keys.right == null)
        {
            LOGGER.warn("Could not load first and last key from Summary.db file, so attempting Index.db fileName={}",
                        ssTable.getDataFileName());
            now = System.nanoTime();
            keys = SSTableCache.INSTANCE.keysFromIndex(metadata, ssTable);
            stats.readIndexDb(ssTable, System.nanoTime() - now);
        }

        if (keys.left == null || keys.right == null)
        {
            throw new IOException("Could not load SSTable first or last tokens");
        }

        this.first = keys.left;
        this.last = keys.right;
        this.firstToken = ReaderUtils.tokenToBigInteger(first.getToken());
        this.lastToken = ReaderUtils.tokenToBigInteger(last.getToken());
        TokenRange readerRange = range();

        List<PartitionKeyFilter> matchingKeyFilters = partitionKeyFilters.stream()
                .filter(filter -> readerRange.contains(filter.token()))
                .collect(Collectors.toList());
        boolean overlapsSparkRange = sparkRangeFilter == null || SparkSSTableReader.overlaps(this, sparkRangeFilter.tokenRange());
        if (!overlapsSparkRange  // SSTable doesn't overlap with Spark worker token range
                || (matchingKeyFilters.isEmpty() && !partitionKeyFilters.isEmpty()))  // No matching partition key filters overlap with SSTable
        {
            this.partitionKeyFilters = Collections.emptyList();
            stats.skippedSSTable(sparkRangeFilter, partitionKeyFilters, firstToken, lastToken);
            LOGGER.info("Ignoring SSTableReader with firstToken={} lastToken={}, does not overlap with any filter",
                        firstToken, lastToken);
            statsMetadata = null;
            header = null;
            helper = null;
            this.metadata = null;
            return;
        }

        if (!matchingKeyFilters.isEmpty())
        {
            List<PartitionKeyFilter> matchInBloomFilter =
                    ReaderUtils.filterKeyInBloomFilter(ssTable, metadata.partitioner, descriptor, matchingKeyFilters);
            this.partitionKeyFilters = ImmutableList.copyOf(matchInBloomFilter);

            // Check if required keys are actually present
            if (matchInBloomFilter.isEmpty() || !ReaderUtils.anyFilterKeyInIndex(ssTable, matchInBloomFilter))
            {
                if (matchInBloomFilter.isEmpty())
                {
                    stats.missingInBloomFilter();
                }
                else
                {
                    stats.missingInIndex();
                }
                LOGGER.info("Ignoring SSTable {}, no match found in index file for key filters",
                            this.ssTable.getDataFileName());
                statsMetadata = null;
                header = null;
                helper = null;
                this.metadata = null;
                return;
            }
        }
        else
        {
            this.partitionKeyFilters = ImmutableList.copyOf(partitionKeyFilters);
        }

        Map<MetadataType, MetadataComponent> componentMap = SSTableCache.INSTANCE.componentMapFromStats(ssTable, descriptor);

        ValidationMetadata validation = (ValidationMetadata) componentMap.get(MetadataType.VALIDATION);
        if (validation != null && !validation.partitioner.equals(metadata.partitioner.getClass().getName()))
        {
            throw new IllegalStateException("Partitioner in ValidationMetadata does not match TableMetaData: "
                                          + validation.partitioner + " vs. " + metadata.partitioner.getClass().getName());
        }

        this.statsMetadata = (StatsMetadata) componentMap.get(MetadataType.STATS);
        SerializationHeader.Component headerComp = (SerializationHeader.Component) componentMap.get(MetadataType.HEADER);
        if (headerComp == null)
        {
            throw new IOException("Cannot read SSTable if cannot deserialize stats header info");
        }

        if (useIncrementalRepair && !isRepairPrimary && isRepaired())
        {
            stats.skippedRepairedSSTable(ssTable, statsMetadata.repairedAt);
            LOGGER.info("Ignoring repaired SSTable on non-primary repair replica ssTable='{}' repairedAt={}",
                        ssTable, statsMetadata.repairedAt);
            header = null;
            helper = null;
            this.metadata = null;
            return;
        }

        Set<String> columnNames = Streams.concat(metadata.columns().stream(),
                                                 metadata.staticColumns().stream())
                                         .map(column -> column.name.toString())
                                         .collect(Collectors.toSet());
        Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>();
        droppedColumns.putAll(buildDroppedColumns(metadata.keyspace,
                                                  metadata.name,
                                                  ssTable,
                                                  headerComp.getRegularColumns(),
                                                  columnNames,
                                                  ColumnMetadata.Kind.REGULAR));
        droppedColumns.putAll(buildDroppedColumns(metadata.keyspace,
                                                  metadata.name,
                                                  ssTable,
                                                  headerComp.getStaticColumns(),
                                                  columnNames,
                                                  ColumnMetadata.Kind.STATIC));
        if (!droppedColumns.isEmpty())
        {
            LOGGER.info("Rebuilding table metadata with dropped columns numDroppedColumns={} ssTable='{}'",
                        droppedColumns.size(), ssTable);
            metadata = metadata.unbuild().droppedColumns(droppedColumns).build();
        }

        this.header = headerComp.toHeader(metadata);
        this.helper = new DeserializationHelper(metadata,
                                                MessagingService.VERSION_30,
                                                DeserializationHelper.Flag.FROM_REMOTE,
                                                buildColumnFilter(metadata, columnFilter));
        this.metadata = metadata;

        if (readIndexOffset && summary != null)
        {
            SummaryDbUtils.Summary finalSummary = summary;
            extractRange(sparkRangeFilter, partitionKeyFilters)
                    .ifPresent(range -> readOffsets(finalSummary.summary(), range));
        }
        else
        {
            LOGGER.warn("Reading SSTable without looking up start/end offset, performance will potentially be degraded");
        }

        // Open SSTableStreamReader so opened in parallel inside thread pool
        // and buffered + ready to go when CompactionIterator starts reading
        reader.set(new SSTableStreamReader());
        stats.openedSSTable(ssTable, System.nanoTime() - startTimeNanos);
        this.openedNanos = System.nanoTime();
    }