lang/php/lib/DataFile/AvroDataIOReader.php (145 lines of code) (raw):

<?php /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace Apache\Avro\DataFile; use Apache\Avro\AvroException; use Apache\Avro\AvroIO; use Apache\Avro\AvroUtil; use Apache\Avro\Datum\AvroIOBinaryDecoder; use Apache\Avro\Datum\AvroIODatumReader; use Apache\Avro\IO\AvroStringIO; use Apache\Avro\Schema\AvroSchema; /** * * Reads Avro data from an AvroIO source using an AvroSchema. * @package Avro */ class AvroDataIOReader { /** * @var string */ public $sync_marker; /** * @var array object container metadata */ public $metadata; /** * @var AvroIO */ private $io; /** * @var AvroIOBinaryDecoder */ private $decoder; /** * @var AvroIODatumReader */ private $datum_reader; /** * @var int count of items in block */ private $block_count; /** * @var compression codec */ private $codec; /** * @param AvroIO $io source from which to read * @param AvroIODatumReader $datum_reader reader that understands * the data schema * @throws AvroDataIOException if $io is not an instance of AvroIO * or the codec specified in the header * is not supported * @uses readHeader() */ public function __construct($io, $datum_reader) { if (!($io instanceof AvroIO)) { throw new AvroDataIOException('io must be instance of AvroIO'); } $this->io = $io; $this->decoder = new AvroIOBinaryDecoder($this->io); $this->datum_reader = $datum_reader; $this->readHeader(); $codec = $this->metadata[AvroDataIO::METADATA_CODEC_ATTR] ?? null; if ($codec && !AvroDataIO::isValidCodec($codec)) { throw new AvroDataIOException(sprintf('Unknown codec: %s', $codec)); } $this->codec = $codec; $this->block_count = 0; // FIXME: Seems unsanitary to set writers_schema here. // Can't constructor take it as an argument? $this->datum_reader->setWritersSchema( AvroSchema::parse($this->metadata[AvroDataIO::METADATA_SCHEMA_ATTR]) ); } /** * Reads header of object container * @throws AvroDataIOException if the file is not an Avro data file. */ private function readHeader() { $this->seek(0, AvroIO::SEEK_SET); $magic = $this->read(AvroDataIO::magicSize()); if (strlen($magic) < AvroDataIO::magicSize()) { throw new AvroDataIOException( 'Not an Avro data file: shorter than the Avro magic block' ); } if (AvroDataIO::magic() != $magic) { throw new AvroDataIOException( sprintf( 'Not an Avro data file: %s does not match %s', $magic, AvroDataIO::magic() ) ); } $this->metadata = $this->datum_reader->readData( AvroDataIO::metadataSchema(), AvroDataIO::metadataSchema(), $this->decoder ); $this->sync_marker = $this->read(AvroDataIO::SYNC_SIZE); } /** * @uses AvroIO::seek() */ private function seek($offset, $whence) { return $this->io->seek($offset, $whence); } /** * @uses AvroIO::read() */ private function read($len) { return $this->io->read($len); } /** * @internal Would be nice to implement data() as an iterator, I think * @returns array of data from object container. */ public function data() { $data = []; $decoder = $this->decoder; while (true) { if (0 == $this->block_count) { if ($this->isEof()) { break; } if ($this->skipSync()) { if ($this->isEof()) { break; } } $length = $this->readBlockHeader(); if ($this->codec == AvroDataIO::DEFLATE_CODEC) { $compressed = $decoder->read($length); $datum = gzinflate($compressed); $decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum)); } elseif ($this->codec === AvroDataIO::ZSTANDARD_CODEC) { if (!extension_loaded('zstd')) { throw new AvroException('Please install ext-zstd to use zstandard compression.'); } $compressed = $decoder->read($length); $datum = zstd_uncompress($compressed); $decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum)); } elseif ($this->codec === AvroDataIO::SNAPPY_CODEC) { if (!extension_loaded('snappy')) { throw new AvroException('Please install ext-snappy to use snappy compression.'); } $compressed = $decoder->read($length); $crc32 = unpack('N', substr($compressed, -4))[1]; $datum = snappy_uncompress(substr($compressed, 0, -4)); if ($crc32 === crc32($datum)) { $decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum)); } else { $decoder = new AvroIOBinaryDecoder(new AvroStringIO(snappy_uncompress($datum))); } } elseif ($this->codec === AvroDataIO::BZIP2_CODEC) { if (!extension_loaded('bz2')) { throw new AvroException('Please install ext-bz2 to use bzip2 compression.'); } $compressed = $decoder->read($length); $datum = bzdecompress($compressed); $decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum)); } } $data[] = $this->datum_reader->read($decoder); --$this->block_count; } return $data; } /** * @uses AvroIO::isEof() */ private function isEof() { return $this->io->isEof(); } private function skipSync() { $proposed_sync_marker = $this->read(AvroDataIO::SYNC_SIZE); if ($proposed_sync_marker != $this->sync_marker) { $this->seek(-AvroDataIO::SYNC_SIZE, AvroIO::SEEK_CUR); return false; } return true; } /** * Reads the block header (which includes the count of items in the block * and the length in bytes of the block) * @returns int length in bytes of the block. */ private function readBlockHeader() { $this->block_count = $this->decoder->readLong(); return $this->decoder->readLong(); } /** * Closes this writer (and its AvroIO object.) * @uses AvroIO::close() */ public function close() { return $this->io->close(); } }