in cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java [229:407]
public SSTableReader(@NotNull TableMetadata metadata,
@NotNull SSTable ssTable,
@Nullable SparkRangeFilter sparkRangeFilter,
@NotNull List<PartitionKeyFilter> partitionKeyFilters,
@Nullable PruneColumnFilter columnFilter,
boolean readIndexOffset,
@NotNull Stats stats,
boolean useIncrementalRepair,
boolean isRepairPrimary,
@NotNull Function<StatsMetadata, Boolean> isRepaired) throws IOException
{
long startTimeNanos = System.nanoTime();
long now;
this.ssTable = ssTable;
this.stats = stats;
this.isRepaired = isRepaired;
this.sparkRangeFilter = sparkRangeFilter;
Descriptor descriptor = ReaderUtils.constructDescriptor(metadata.keyspace, metadata.name, ssTable);
this.version = descriptor.version;
SummaryDbUtils.Summary summary = null;
Pair<DecoratedKey, DecoratedKey> keys = null;
try
{
now = System.nanoTime();
summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable);
stats.readSummaryDb(ssTable, System.nanoTime() - now);
keys = Pair.of(summary.first(), summary.last());
}
catch (IOException exception)
{
LOGGER.warn("Failed to read Summary.db file ssTable='{}'", ssTable, exception);
}
if (keys == null)
{
LOGGER.warn("Could not load first and last key from Summary.db file, so attempting Index.db fileName={}",
ssTable.getDataFileName());
now = System.nanoTime();
keys = SSTableCache.INSTANCE.keysFromIndex(metadata, ssTable);
stats.readIndexDb(ssTable, System.nanoTime() - now);
}
if (keys == null)
{
throw new IOException("Could not load SSTable first or last tokens");
}
this.first = keys.left;
this.last = keys.right;
this.firstToken = ReaderUtils.tokenToBigInteger(first.getToken());
this.lastToken = ReaderUtils.tokenToBigInteger(last.getToken());
TokenRange readerRange = range();
List<PartitionKeyFilter> matchingKeyFilters = partitionKeyFilters.stream()
.filter(filter -> readerRange.contains(filter.token()))
.collect(Collectors.toList());
boolean overlapsSparkRange = sparkRangeFilter == null || SparkSSTableReader.overlaps(this, sparkRangeFilter.tokenRange());
if (!overlapsSparkRange // SSTable doesn't overlap with Spark worker token range
|| (matchingKeyFilters.isEmpty() && !partitionKeyFilters.isEmpty())) // No matching partition key filters overlap with SSTable
{
this.partitionKeyFilters = Collections.emptyList();
stats.skippedSSTable(sparkRangeFilter, partitionKeyFilters, firstToken, lastToken);
LOGGER.info("Ignoring SSTableReader with firstToken={} lastToken={}, does not overlap with any filter",
firstToken, lastToken);
statsMetadata = null;
header = null;
helper = null;
this.metadata = null;
return;
}
if (!matchingKeyFilters.isEmpty())
{
List<PartitionKeyFilter> matchInBloomFilter =
ReaderUtils.filterKeyInBloomFilter(ssTable, metadata.partitioner, descriptor, matchingKeyFilters);
this.partitionKeyFilters = ImmutableList.copyOf(matchInBloomFilter);
// Check if required keys are actually present
if (matchInBloomFilter.isEmpty() || !ReaderUtils.anyFilterKeyInIndex(ssTable, matchInBloomFilter))
{
if (matchInBloomFilter.isEmpty())
{
stats.missingInBloomFilter();
}
else
{
stats.missingInIndex();
}
LOGGER.info("Ignoring SSTable {}, no match found in index file for key filters",
this.ssTable.getDataFileName());
statsMetadata = null;
header = null;
helper = null;
this.metadata = null;
return;
}
}
else
{
this.partitionKeyFilters = ImmutableList.copyOf(partitionKeyFilters);
}
Map<MetadataType, MetadataComponent> componentMap = SSTableCache.INSTANCE.componentMapFromStats(ssTable, descriptor);
ValidationMetadata validation = (ValidationMetadata) componentMap.get(MetadataType.VALIDATION);
if (validation != null && !validation.partitioner.equals(metadata.partitioner.getClass().getName()))
{
throw new IllegalStateException("Partitioner in ValidationMetadata does not match TableMetaData: "
+ validation.partitioner + " vs. " + metadata.partitioner.getClass().getName());
}
this.statsMetadata = (StatsMetadata) componentMap.get(MetadataType.STATS);
SerializationHeader.Component headerComp = (SerializationHeader.Component) componentMap.get(MetadataType.HEADER);
if (headerComp == null)
{
throw new IOException("Cannot read SSTable if cannot deserialize stats header info");
}
if (useIncrementalRepair && !isRepairPrimary && isRepaired())
{
stats.skippedRepairedSSTable(ssTable, statsMetadata.repairedAt);
LOGGER.info("Ignoring repaired SSTable on non-primary repair replica ssTable='{}' repairedAt={}",
ssTable, statsMetadata.repairedAt);
header = null;
helper = null;
this.metadata = null;
return;
}
Set<String> columnNames = Streams.concat(metadata.columns().stream(),
metadata.staticColumns().stream())
.map(column -> column.name.toString())
.collect(Collectors.toSet());
Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>();
droppedColumns.putAll(buildDroppedColumns(metadata.keyspace,
metadata.name,
ssTable,
headerComp.getRegularColumns(),
columnNames,
ColumnMetadata.Kind.REGULAR));
droppedColumns.putAll(buildDroppedColumns(metadata.keyspace,
metadata.name,
ssTable,
headerComp.getStaticColumns(),
columnNames,
ColumnMetadata.Kind.STATIC));
if (!droppedColumns.isEmpty())
{
LOGGER.info("Rebuilding table metadata with dropped columns numDroppedColumns={} ssTable='{}'",
droppedColumns.size(), ssTable);
metadata = metadata.unbuild().droppedColumns(droppedColumns).build();
}
this.header = headerComp.toHeader(metadata);
this.helper = new DeserializationHelper(metadata,
MessagingService.VERSION_30,
DeserializationHelper.Flag.FROM_REMOTE,
buildColumnFilter(metadata, columnFilter));
this.metadata = metadata;
if (readIndexOffset && summary != null)
{
SummaryDbUtils.Summary finalSummary = summary;
extractRange(sparkRangeFilter, partitionKeyFilters)
.ifPresent(range -> readOffsets(finalSummary.summary(), range));
}
else
{
LOGGER.warn("Reading SSTable without looking up start/end offset, performance will potentially be degraded");
}
// Open SSTableStreamReader so opened in parallel inside thread pool
// and buffered + ready to go when CompactionIterator starts reading
reader.set(new SSTableStreamReader());
stats.openedSSTable(ssTable, System.nanoTime() - startTimeNanos);
this.openedNanos = System.nanoTime();
}