in core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala [129:146]
override def deserialize(in: InputStream): EventHubsSourceOffset = {
in.read() // zero byte is read (SPARK-19517)
val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
// HDFSMetadataLog guarantees that it never creates a partial file.
assert(content.length != 0)
if (content(0) == 'v') {
val indexOfNewLine = content.indexOf("\n")
if (indexOfNewLine > 0) {
val version =
parseLogVersion(content.substring(0, indexOfNewLine), VERSION)
EventHubsSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
} else {
throw new IllegalStateException("Log file was malformed.")
}
} else {
EventHubsSourceOffset(SerializedOffset(content)) // Spark 2.1 log file
}
}