private SqlFieldSchema convertField()

in samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java [110:148]


  private SqlFieldSchema convertField(Schema fieldSchema, boolean isNullable, boolean isOptional) {
    switch (fieldSchema.getType()) {
      case ARRAY:
        SqlFieldSchema elementSchema = convertField(fieldSchema.getElementType(), false, false);
        return SqlFieldSchema.createArraySchema(elementSchema, isNullable, isOptional);
      case BOOLEAN:
        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN, isNullable, isOptional);
      case DOUBLE:
        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.DOUBLE, isNullable, isOptional);
      case FLOAT:
        // Avro FLOAT is 4 bytes which maps to Sql REAL. Sql FLOAT is 8-bytes
        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.REAL, isNullable, isOptional);
      case ENUM:
        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING, isNullable, isOptional);
      case UNION:
        return getSqlTypeFromUnionTypes(fieldSchema.getTypes(), isNullable, isOptional);
      case FIXED:
        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BYTES, isNullable, isOptional);
      case STRING:
        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING, isNullable, isOptional);
      case BYTES:
        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BYTES, isNullable, isOptional);
      case INT:
        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.INT32, isNullable, isOptional);
      case LONG:
        return SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.INT64, isNullable, isOptional);
      case RECORD:
        SqlSchema rowSchema = convertSchema(fieldSchema.getFields(), false);
        return SqlFieldSchema.createRowFieldSchema(rowSchema, isNullable, isOptional);
      case MAP:
        // Can the value type be nullable and have default values ? Guess not!
        SqlFieldSchema valueType = convertField(fieldSchema.getValueType(), false, false);
        return SqlFieldSchema.createMapSchema(valueType, isNullable, isOptional);
      default:
        String msg = String.format("Field Type %s is not supported", fieldSchema.getType());
        LOG.error(msg);
        throw new SamzaException(msg);
    }
  }