fn open()

in datafusion/core/src/datasource/physical_plan/csv.rs [449:589]


    fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
        // `self.config.has_header` controls whether to skip reading the 1st line header
        // If the .csv file is read in parallel and this `CsvOpener` is only reading some middle
        // partition, then don't skip first line
        let mut csv_has_header = self.config.has_header;
        if let Some(FileRange { start, .. }) = file_meta.range {
            if start != 0 {
                csv_has_header = false;
            }
        }

        let config = CsvConfig {
            has_header: csv_has_header,
            ..(*self.config).clone()
        };

        let file_compression_type = self.file_compression_type.to_owned();

        if file_meta.range.is_some() {
            assert!(
                !file_compression_type.is_compressed(),
                "Reading compressed .csv in parallel is not supported"
            );
        }

        Ok(Box::pin(async move {
            let file_size = file_meta.object_meta.size;
            // Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)
            let (start_byte, end_byte) = match file_meta.range {
                None => (0, file_size),
                Some(FileRange { start, end }) => {
                    let (start, end) = (start as usize, end as usize);
                    // Partition byte range is [start, end), the boundary might be in the middle of
                    // some line. Need to find out the exact line boundaries.
                    let start_delta = if start != 0 {
                        find_first_newline(
                            &config.object_store,
                            file_meta.location(),
                            start - 1,
                            file_size,
                        )
                        .await?
                    } else {
                        0
                    };
                    let end_delta = if end != file_size {
                        find_first_newline(
                            &config.object_store,
                            file_meta.location(),
                            end - 1,
                            file_size,
                        )
                        .await?
                    } else {
                        0
                    };
                    (start + start_delta, end + end_delta)
                }
            };

            // For special case: If `Range` has equal `start` and `end`, object store will fetch
            // the whole file
            let localfs: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
            let is_localfs = localfs.type_id() == config.object_store.type_id();
            if start_byte == end_byte && !is_localfs {
                return Ok(futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed());
            }

            let options = GetOptions {
                range: Some(Range {
                    start: start_byte,
                    end: end_byte,
                }),
                ..Default::default()
            };

            match config
                .object_store
                .get_opts(file_meta.location(), options)
                .await?
            {
                GetResult::File(file, _) => {
                    let is_whole_file_scanned = file_meta.range.is_none();
                    let decoder = if is_whole_file_scanned {
                        // For special case: `get_range()` will interpret `start` and `end` as the
                        // byte range after decompression for compressed files
                        file_compression_type.convert_read(file)?
                    } else {
                        // Range currently is ignored for GetResult::File(...)
                        let bytes = Cursor::new(
                            config
                                .object_store
                                .get_range(
                                    file_meta.location(),
                                    Range {
                                        start: start_byte,
                                        end: end_byte,
                                    },
                                )
                                .await?,
                        );
                        file_compression_type.convert_read(bytes)?
                    };

                    Ok(futures::stream::iter(config.open(decoder)?).boxed())
                }
                GetResult::Stream(s) => {
                    let mut decoder = config.builder().build_decoder();
                    let s = s.map_err(DataFusionError::from);
                    let mut input =
                        file_compression_type.convert_stream(s.boxed())?.fuse();
                    let mut buffered = Bytes::new();

                    let s = futures::stream::poll_fn(move |cx| {
                        loop {
                            if buffered.is_empty() {
                                match ready!(input.poll_next_unpin(cx)) {
                                    Some(Ok(b)) => buffered = b,
                                    Some(Err(e)) => {
                                        return Poll::Ready(Some(Err(e.into())))
                                    }
                                    None => {}
                                };
                            }
                            let decoded = match decoder.decode(buffered.as_ref()) {
                                // Note: the decoder needs to be called with an empty
                                // array to delimt the final record
                                Ok(0) => break,
                                Ok(decoded) => decoded,
                                Err(e) => return Poll::Ready(Some(Err(e))),
                            };
                            buffered.advance(decoded);
                        }

                        Poll::Ready(decoder.flush().transpose())
                    });
                    Ok(s.boxed())
                }
            }
        }))
    }