lang/php/lib/Datum/AvroIODatumWriter.php (119 lines of code) (raw):
<?php
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache\Avro\Datum;
use Apache\Avro\AvroException;
use Apache\Avro\Schema\AvroSchema;
/**
* Handles schema-specific writing of data to the encoder.
*
* Ensures that each datum written is consistent with the writer's schema.
*
* @package Avro
*/
class AvroIODatumWriter
{
/**
* Schema used by this instance to write Avro data.
* @var AvroSchema
*/
public $writersSchema;
/**
* @param AvroSchema $writers_schema
*/
public function __construct($writers_schema = null)
{
$this->writersSchema = $writers_schema;
}
/**
* @param $datum
* @param AvroIOBinaryEncoder $encoder
*/
public function write($datum, $encoder)
{
$this->writeData($this->writersSchema, $datum, $encoder);
}
/**
* @param AvroSchema $writers_schema
* @param $datum
* @param AvroIOBinaryEncoder $encoder
* @return mixed
*
* @throws AvroIOTypeException if $datum is invalid for $writers_schema
*/
public function writeData($writers_schema, $datum, $encoder)
{
if (!AvroSchema::isValidDatum($writers_schema, $datum)) {
throw new AvroIOTypeException($writers_schema, $datum);
}
return $this->writeValidatedData($writers_schema, $datum, $encoder);
}
/**
* @param AvroSchema $writers_schema
* @param $datum
* @param AvroIOBinaryEncoder $encoder
* @return mixed
*
* @throws AvroIOTypeException if $datum is invalid for $writers_schema
*/
private function writeValidatedData($writers_schema, $datum, $encoder)
{
switch ($writers_schema->type()) {
case AvroSchema::NULL_TYPE:
return $encoder->writeNull($datum);
case AvroSchema::BOOLEAN_TYPE:
return $encoder->writeBoolean($datum);
case AvroSchema::INT_TYPE:
return $encoder->writeInt($datum);
case AvroSchema::LONG_TYPE:
return $encoder->writeLong($datum);
case AvroSchema::FLOAT_TYPE:
return $encoder->writeFloat($datum);
case AvroSchema::DOUBLE_TYPE:
return $encoder->writeDouble($datum);
case AvroSchema::STRING_TYPE:
return $encoder->writeString($datum);
case AvroSchema::BYTES_TYPE:
return $encoder->writeBytes($datum);
case AvroSchema::ARRAY_SCHEMA:
return $this->writeArray($writers_schema, $datum, $encoder);
case AvroSchema::MAP_SCHEMA:
return $this->writeMap($writers_schema, $datum, $encoder);
case AvroSchema::FIXED_SCHEMA:
return $this->writeFixed($writers_schema, $datum, $encoder);
case AvroSchema::ENUM_SCHEMA:
return $this->writeEnum($writers_schema, $datum, $encoder);
case AvroSchema::RECORD_SCHEMA:
case AvroSchema::ERROR_SCHEMA:
case AvroSchema::REQUEST_SCHEMA:
return $this->writeRecord($writers_schema, $datum, $encoder);
case AvroSchema::UNION_SCHEMA:
return $this->writeUnion($writers_schema, $datum, $encoder);
default:
throw new AvroException(sprintf(
'Unknown type: %s',
$writers_schema->type
));
}
}
/**
* @param AvroSchema $writers_schema
* @param null|boolean|int|float|string|array $datum item to be written
* @param AvroIOBinaryEncoder $encoder
*/
private function writeArray($writers_schema, $datum, $encoder)
{
$datum_count = count($datum);
if (0 < $datum_count) {
$encoder->writeLong($datum_count);
$items = $writers_schema->items();
foreach ($datum as $item) {
$this->writeValidatedData($items, $item, $encoder);
}
}
return $encoder->writeLong(0);
}
/**
* @param $writers_schema
* @param $datum
* @param $encoder
* @throws AvroIOTypeException
*/
private function writeMap($writers_schema, $datum, $encoder)
{
$datum_count = count($datum);
if ($datum_count > 0) {
$encoder->writeLong($datum_count);
foreach ($datum as $k => $v) {
$encoder->writeString($k);
$this->writeValidatedData($writers_schema->values(), $v, $encoder);
}
}
$encoder->writeLong(0);
}
private function writeFixed($writers_schema, $datum, $encoder)
{
/**
* NOTE Unused $writers_schema parameter included for consistency
* with other write_* methods.
*/
return $encoder->write($datum);
}
private function writeEnum($writers_schema, $datum, $encoder)
{
$datum_index = $writers_schema->symbolIndex($datum);
return $encoder->writeInt($datum_index);
}
private function writeRecord($writers_schema, $datum, $encoder)
{
foreach ($writers_schema->fields() as $field) {
$this->writeValidatedData($field->type(), $datum[$field->name()] ?? null, $encoder);
}
}
private function writeUnion($writers_schema, $datum, $encoder)
{
$datum_schema_index = -1;
$datum_schema = null;
foreach ($writers_schema->schemas() as $index => $schema) {
if (AvroSchema::isValidDatum($schema, $datum)) {
$datum_schema_index = $index;
$datum_schema = $schema;
break;
}
}
if (is_null($datum_schema)) {
throw new AvroIOTypeException($writers_schema, $datum);
}
$encoder->writeLong($datum_schema_index);
$this->writeValidatedData($datum_schema, $datum, $encoder);
}
}