in spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java [85:182]
public void registerClasses(final Kryo kryo, final Map<Class<?>, Serializer<?>> serializerOverrides, final Set<Class<?>> blacklist) {
// Apply TinkerPop type registrations copied from GyroSerializer's constructor
for (Map.Entry<Class<?>, Serializer<?>> ent : getExtraRegistrations().entrySet()) {
final Class<?> targetClass = ent.getKey();
final Serializer<?> ser = ent.getValue();
// Is this class blacklisted? Skip it. (takes precedence over serializerOverrides)
if (blacklist.contains(targetClass)) {
log.debug("Not registering serializer for {} (blacklisted)", targetClass);
continue;
}
if (checkForAndApplySerializerOverride(serializerOverrides, kryo, targetClass)) {
// do nothing but skip the remaining else(-if) clauses
} else if (null == ser) {
log.debug("Registering {} with default serializer", targetClass);
kryo.register(targetClass);
} else {
log.debug("Registering {} with serializer {}", targetClass, ser);
kryo.register(targetClass, ser);
}
}
final Set<Class<?>> shimmedClassesFromGryoMapper = new HashSet<>();
// Apply GryoMapper's default registrations
for (TypeRegistration<?> tr : GryoMapper.build().version(GryoVersion.V1_0).create().getTypeRegistrations()) {
// Is this class blacklisted? Skip it. (takes precedence over serializerOverrides)
if (blacklist.contains(tr.getTargetClass())) {
log.debug("Not registering serializer for {} (blacklisted)", tr.getTargetClass());
continue;
}
final org.apache.tinkerpop.shaded.kryo.Serializer<?> shadedSerializer = tr.getShadedSerializer();
final SerializerShim<?> serializerShim = tr.getSerializerShim();
final java.util.function.Function<
org.apache.tinkerpop.shaded.kryo.Kryo,
org.apache.tinkerpop.shaded.kryo.Serializer> functionOfShadedKryo = tr.getFunctionOfShadedKryo();
// Apply overrides with the highest case-precedence
if (checkForAndApplySerializerOverride(serializerOverrides, kryo, tr.getTargetClass())) {
// do nothing but skip the remaining else(-if) clauses
} else if (null != shadedSerializer) {
if (shadedSerializer.getClass().equals(org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer.class)) {
// Convert GryoMapper's shaded JavaSerializer mappings to their unshaded equivalents
log.debug("Registering {} with JavaSerializer", tr.getTargetClass());
kryo.register(tr.getTargetClass(), new JavaSerializer());
} else {
// There's supposed to be a check in GryoMapper that prevents this from happening
log.error("GryoMapper's default serialization registration for {} is a {}. " +
"This is probably a bug in TinkerPop (this is not a valid default registration). " +
"I am configuring Spark to use Kryo's default serializer for this class, " +
"but this may cause serialization failures at runtime.",
tr.getTargetClass(),
org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
kryo.register(tr.getTargetClass());
}
} else if (null != serializerShim) {
// Wrap shim serializers in an adapter for Spark's unshaded Kryo
log.debug("Registering {} to serializer shim {} (serializer shim {})",
tr.getTargetClass(), serializerShim, serializerShim.getClass());
kryo.register(tr.getTargetClass(), new UnshadedSerializerAdapter<>(serializerShim));
shimmedClassesFromGryoMapper.add(tr.getTargetClass());
} else if (null != functionOfShadedKryo) {
// As with shaded serializers, there's supposed to be a check in GryoMapper that prevents this from happening
log.error("GryoMapper's default serialization registration for {} is a Function<{},{}>. " +
"This is probably a bug in TinkerPop (this is not a valid default registration). " +
"I am configuring Spark to use Kryo's default serializer instead of this function, " +
"but this may cause serialization failures at runtime.",
tr.getTargetClass(),
org.apache.tinkerpop.shaded.kryo.Kryo.class.getCanonicalName(),
org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
kryo.register(tr.getTargetClass());
} else {
// Register all other classes with the default behavior (FieldSerializer)
log.debug("Registering {} with default serializer", tr.getTargetClass());
kryo.register(tr.getTargetClass());
}
}
// StarGraph's shim serializer is especially important on Spark for efficiency reasons,
// so log a warning if we failed to register it somehow
if (!shimmedClassesFromGryoMapper.contains(StarGraph.class)) {
log.warn("No SerializerShim found for StarGraph");
}
// handle io-registry classes
for (final IoRegistry registry : IoRegistryHelper.createRegistries(SystemUtil.getSystemPropertiesConfiguration("tinkerpop", true))) {
for (final Pair<Class, Object> pair : registry.find(GryoIo.class)) {
if (pair.getValue1() instanceof SerializerShim)
kryo.register(pair.getValue0(), new UnshadedSerializerAdapter((SerializerShim) pair.getValue1()));
else if (pair.getValue1() instanceof ShadedSerializerAdapter)
kryo.register(pair.getValue0(), new UnshadedSerializerAdapter(((ShadedSerializerAdapter) pair.getValue1()).getSerializerShim()));
else
kryo.register(pair.getValue0(), kryo.getDefaultSerializer(pair.getValue0()));
}
}
}