Status Read()

in tensorflow_io/core/kernels/avro_kernels.cc [439:547]


  Status Read(const int64 start, const int64 stop, const string& component,
              int64* record_read, Tensor* value, Tensor* label) override {
    if (columns_index_.find(component) == columns_index_.end()) {
      return errors::InvalidArgument("component ", component, " is invalid");
    }
    int64 column_index = columns_index_[component];
    (*record_read) = 0;
    if (start >= shapes_[column_index].dim_size(0)) {
      return Status::OK();
    }
    const string& column = component;
    int64 element_start = start < shapes_[column_index].dim_size(0)
                              ? start
                              : shapes_[column_index].dim_size(0);
    int64 element_stop = stop < shapes_[column_index].dim_size(0)
                             ? stop
                             : shapes_[column_index].dim_size(0);

    if (element_start > element_stop) {
      return errors::InvalidArgument("dataset ", column,
                                     " selection is out of boundary");
    }
    if (element_start == element_stop) {
      return Status::OK();
    }

    avro::GenericDatum datum(reader_schema_);

    // Find the start sync point
    int64 item_index_sync = 0;
    for (size_t i = 0; i < positions_.size();
         i++, item_index_sync += positions_[i].first) {
      if (item_index_sync >= element_stop) {
        continue;
      }
      if (item_index_sync + positions_[i].first <= element_start) {
        continue;
      }
      // TODO: Avro is sync point partitioned and each block is very similiar to
      // Row Group of parquet. Ideally each block should be cached with the hope
      // that slicing and indexing will happend around the same block across
      // multiple rows. Caching is not done yet.

      // Seek to sync
      reader_->seek(positions_[i].second);
      for (int64 item_index = item_index_sync;
           item_index < (item_index_sync + positions_[i].first) &&
           item_index < element_stop;
           item_index++) {
        // Read anyway
        if (!reader_->read(datum)) {
          return errors::Internal("unable to read record at: ", item_index);
        }
        // Assign only when in range
        if (item_index >= element_start) {
          const avro::GenericRecord& record =
              datum.value<avro::GenericRecord>();
          const avro::GenericDatum& field = record.field(column);
          switch (field.type()) {
            case avro::AVRO_BOOL:
              value->flat<bool>()(item_index - element_start) =
                  field.value<bool>();
              break;
            case avro::AVRO_INT:
              value->flat<int32>()(item_index - element_start) =
                  field.value<int32_t>();
              break;
            case avro::AVRO_LONG:
              value->flat<int64>()(item_index - element_start) =
                  field.value<int64_t>();
              break;
            case avro::AVRO_FLOAT:
              value->flat<float>()(item_index - element_start) =
                  field.value<float>();
              break;
            case avro::AVRO_DOUBLE:
              value->flat<double>()(item_index - element_start) =
                  field.value<double>();
              break;
            case avro::AVRO_STRING:
              value->flat<tstring>()(item_index - element_start) =
                  field.value<string>();
              break;
            case avro::AVRO_BYTES: {
              const std::vector<uint8_t>& field_value =
                  field.value<std::vector<uint8_t>>();
              value->flat<tstring>()(item_index - element_start) =
                  string((char*)&field_value[0], field_value.size());
            } break;
            case avro::AVRO_FIXED: {
              const std::vector<uint8_t>& field_value =
                  field.value<avro::GenericFixed>().value();
              value->flat<tstring>()(item_index - element_start) =
                  string((char*)&field_value[0], field_value.size());
            } break;
            case avro::AVRO_ENUM:
              value->flat<tstring>()(item_index - element_start) =
                  field.value<avro::GenericEnum>().symbol();
              break;
            default:
              return errors::InvalidArgument("unsupported data type: ",
                                             field.type());
          }
        }
      }
    }
    (*record_read) = element_stop - element_start;
    return Status::OK();
  }