private IEnumerable GetIterator()

in csharp/Worker/Microsoft.Spark.CSharp/UDFCommand.cs [280:389]


        private IEnumerable<dynamic> GetIterator(Stream inputStream, string serializedMode, int isFuncSqlUdf)
        {
            logger.LogInfo("Serialized mode in GetIterator: " + serializedMode);           
            IFormatter formatter = new BinaryFormatter();
            var mode = (SerializedMode)Enum.Parse(typeof(SerializedMode), serializedMode);
            int messageLength;
            Stopwatch watch = Stopwatch.StartNew();
            Row tempRow = null;

            while ((messageLength = SerDe.ReadInt(inputStream)) != (int)SpecialLengths.END_OF_DATA_SECTION)
            {
                watch.Stop();
                if (messageLength > 0 || messageLength == (int)SpecialLengths.NULL)
                {
                    watch.Start();
                    byte[] buffer = messageLength > 0 ? SerDe.ReadBytes(inputStream, messageLength) : null;
                    watch.Stop();
                    switch (mode)
                    {
                        case SerializedMode.String:
                            {
                                if (messageLength > 0)
                                {
                                    if (buffer == null)
                                    {
                                        logger.LogDebug("Buffer is null. Message length is {0}", messageLength);
                                    }
                                    yield return SerDe.ToString(buffer);
                                }
                                else
                                {
                                    yield return null;
                                }
                                break;
                            }

                        case SerializedMode.Row:
                            {
                                Debug.Assert(messageLength > 0);
                                var unpickledObjects = PythonSerDe.GetUnpickledObjects(buffer);

                                if (isFuncSqlUdf == 0)
                                {
                                    foreach (var row in unpickledObjects.Select(item => (item as RowConstructor).GetRow()))
                                    {
                                        yield return row;
                                    }
                                }
                                else
                                {
                                    foreach (var row in unpickledObjects)
                                    {
                                        yield return row;
                                    }
                                }

                                break;
                            }

                        case SerializedMode.Pair:
                            {
                                byte[] pairKey = buffer;
                                byte[] pairValue;

                                watch.Start();
                                int valueLength = SerDe.ReadInt(inputStream);
                                if (valueLength > 0)
                                {
                                    pairValue = SerDe.ReadBytes(inputStream, valueLength);
                                }
                                else if (valueLength == (int)SpecialLengths.NULL)
                                {
                                    pairValue = null;
                                }
                                else
                                {
                                    throw new Exception(string.Format("unexpected valueLength: {0}", valueLength));
                                }
                                watch.Stop();

                                yield return new Tuple<byte[], byte[]>(pairKey, pairValue);
                                break;
                            }

                        case SerializedMode.None: //just return raw bytes
                            {
                                yield return buffer;
                                break;
                            }

                        default:
                            {
                                if (buffer != null)
                                {
                                    var ms = new MemoryStream(buffer);
                                    yield return formatter.Deserialize(ms);
                                }
                                else
                                {
                                    yield return null;
                                }
                                break;
                            }
                    }
                }
                watch.Start();
            }

            logger.LogInfo("total receive time: {0}", watch.ElapsedMilliseconds);
        }