in cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java [81:134]
public void streamCdcCommitLogSegment(CassandraInstance instance, String segment, HttpRange httpRange, StreamConsumer streamConsumer)
{
sidecarClient.streamCdcSegments(toSidecarInstance(instance), segment, httpRange, new org.apache.cassandra.sidecar.client.StreamConsumer()
{
@Override
public void onRead(StreamBuffer streamBuffer)
{
streamConsumer.onRead(new org.apache.cassandra.spark.utils.streaming.StreamBuffer()
{
@Override
public void getBytes(int index, ByteBuffer destination, int length)
{
streamBuffer.copyBytes(index, destination, length);
}
@Override
public void getBytes(int index, byte[] destination, int destinationIndex, int length)
{
streamBuffer.copyBytes(index, destination, destinationIndex, length);
}
@Override
public byte getByte(int index)
{
return streamBuffer.getByte(index);
}
@Override
public int readableBytes()
{
return streamBuffer.readableBytes();
}
@Override
public void release()
{
streamBuffer.release();
}
});
}
@Override
public void onComplete()
{
streamConsumer.onEnd();
}
@Override
public void onError(Throwable throwable)
{
streamConsumer.onError(throwable);
}
});
}