in csharp/Worker/Microsoft.Spark.CSharp/UDFCommand.cs [280:389]
private IEnumerable<dynamic> GetIterator(Stream inputStream, string serializedMode, int isFuncSqlUdf)
{
logger.LogInfo("Serialized mode in GetIterator: " + serializedMode);
IFormatter formatter = new BinaryFormatter();
var mode = (SerializedMode)Enum.Parse(typeof(SerializedMode), serializedMode);
int messageLength;
Stopwatch watch = Stopwatch.StartNew();
Row tempRow = null;
while ((messageLength = SerDe.ReadInt(inputStream)) != (int)SpecialLengths.END_OF_DATA_SECTION)
{
watch.Stop();
if (messageLength > 0 || messageLength == (int)SpecialLengths.NULL)
{
watch.Start();
byte[] buffer = messageLength > 0 ? SerDe.ReadBytes(inputStream, messageLength) : null;
watch.Stop();
switch (mode)
{
case SerializedMode.String:
{
if (messageLength > 0)
{
if (buffer == null)
{
logger.LogDebug("Buffer is null. Message length is {0}", messageLength);
}
yield return SerDe.ToString(buffer);
}
else
{
yield return null;
}
break;
}
case SerializedMode.Row:
{
Debug.Assert(messageLength > 0);
var unpickledObjects = PythonSerDe.GetUnpickledObjects(buffer);
if (isFuncSqlUdf == 0)
{
foreach (var row in unpickledObjects.Select(item => (item as RowConstructor).GetRow()))
{
yield return row;
}
}
else
{
foreach (var row in unpickledObjects)
{
yield return row;
}
}
break;
}
case SerializedMode.Pair:
{
byte[] pairKey = buffer;
byte[] pairValue;
watch.Start();
int valueLength = SerDe.ReadInt(inputStream);
if (valueLength > 0)
{
pairValue = SerDe.ReadBytes(inputStream, valueLength);
}
else if (valueLength == (int)SpecialLengths.NULL)
{
pairValue = null;
}
else
{
throw new Exception(string.Format("unexpected valueLength: {0}", valueLength));
}
watch.Stop();
yield return new Tuple<byte[], byte[]>(pairKey, pairValue);
break;
}
case SerializedMode.None: //just return raw bytes
{
yield return buffer;
break;
}
default:
{
if (buffer != null)
{
var ms = new MemoryStream(buffer);
yield return formatter.Deserialize(ms);
}
else
{
yield return null;
}
break;
}
}
}
watch.Start();
}
logger.LogInfo("total receive time: {0}", watch.ElapsedMilliseconds);
}