private unsafe void CopyRawEvents()

in src/TraceEvent/TraceLog.cs [1835:2281]


        private unsafe void CopyRawEvents(TraceEventDispatcher rawEvents, IStreamWriter writer)
        {
            SetupCallbacks(rawEvents);

            // Fix up MemInfoWS records so that we get one per process rather than one per machine
            rawEvents.Kernel.MemoryProcessMemInfo += delegate (MemoryProcessMemInfoTraceData data)
            {
                if (!processingDisabled)
                {
                    GenerateMemInfoRecordsPerProcess(data, writer);
                }
            };

            const int defaultMaxEventCount = 20000000;                   // 20M events produces about 3GB of data.  which is close to the limit of ETLX.
            int maxEventCount = defaultMaxEventCount;
            double startMSec = 0;
            if (options != null)
            {
                if (options.SkipMSec != 0)
                {
                    options.ConversionLog.WriteLine("Skipping the {0:n3} MSec of the trace.", options.SkipMSec);
                    processingDisabled = true;
                    startMSec = options.SkipMSec;
                }
                if (options.MaxEventCount >= 1000)      // Numbers smaller than this are almost certainly errors
                {
                    maxEventCount = options.MaxEventCount;
                }
                else if (options.MaxEventCount != 0)
                {
                    options.ConversionLog.WriteLine("MaxEventCount {0} < 1000, assumed in error, ignoring", options.MaxEventCount);
                }
            }
            options.ConversionLog.WriteLine("Collecting a maximum of {0:n0} events.", maxEventCount);

            uint rawEventCount = 0;
            double rawInputSizeMB = rawEvents.Size / 1000000.0;
            var startTime = DateTime.Now;
            long lastQPCEventTime = long.MinValue;     // We want the times to be ordered.
#if DEBUG
            long lastTimeStamp = 0;
#endif

            // While scanning over the stream, copy all data to the file.
            rawEvents.AllEvents += delegate (TraceEvent data)
            {
                Debug.Assert(_syncTimeQPC != 0);         // We should have set this in the Header event (or on session start if it is read time
#if DEBUG
                Debug.Assert(lastTimeStamp <= data.TimeStampQPC);     // Ensure they are in order
                lastTimeStamp = data.TimeStampQPC;
#endif
                // Show status every 128K events
                if ((rawEventCount & 0x1FFFF) == 0)
                {
                    var curOutputSizeMB = ((double)(uint)writer.GetLabel()) / 1000000.0;
                    // Currently ETLX has a size restriction of 4Gig.  Thus if we are getting big, start truncating.
                    if (curOutputSizeMB > 3500)
                    {
                        processingDisabled = true;
                    }

                    if (options != null && options.ConversionLog != null)
                    {
                        if (rawEventCount == 0)
                        {
                            options.ConversionLog.WriteLine("[Opening a log file of size {0:n0} MB.]",
                                rawInputSizeMB);
                        }
                        else
                        {
                            var curDurationSec = (DateTime.Now - startTime).TotalSeconds;

                            var ratioOutputToInput = (double)eventCount / (double)rawEventCount;
                            var estimatedFinalSizeMB = Math.Max(rawInputSizeMB * ratioOutputToInput * 1.15, curOutputSizeMB * 1.02);
                            var ratioSizeComplete = curOutputSizeMB / estimatedFinalSizeMB;
                            var estTimeLeftSec = (int)(curDurationSec / ratioSizeComplete - curDurationSec);

                            var message = "";
                            if (0 < startMSec && data.TimeStampRelativeMSec < startMSec)
                            {
                                message = "  Before StartMSec truncating";
                            }
                            else if (eventCount >= maxEventCount)
                            {
                                message = "  Hit MaxEventCount, truncating.";
                            }
                            else if (curOutputSizeMB > 3500)
                            {
                                message = "  Hit File size limit (3.5Gig) truncating.";
                            }

                            options.ConversionLog.WriteLine(
                                "[Sec {0,4:f0} Read {1,10:n0} events. At {2,7:n0}ms.  Wrote {3,4:f0}MB ({4,3:f0}%).  EstDone {5,2:f0} min {6,2:f0} sec.{7}]",
                                curDurationSec,
                                rawEventCount,
                                data.TimeStampRelativeMSec,
                                curOutputSizeMB,
                                ratioSizeComplete * 100.0,
                                estTimeLeftSec / 60,
                                estTimeLeftSec % 60,
                                message);
                        }
                    }
                }
                rawEventCount++;
#if DEBUG
                if (data is UnhandledTraceEvent)
                {
                    Debug.Assert((byte)data.opcode != unchecked((byte)-1));        // Means PrepForCallback not done.
                    Debug.Assert(data.TaskName != "ERRORTASK");
                    Debug.Assert(data.OpcodeName != "ERROROPCODE");
                }
#endif
                if (processingDisabled)
                {
                    if (startMSec != 0 && startMSec <= data.TimeStampRelativeMSec)
                    {
                        startMSec = 0;                  // Marking it 0 indicates that we have triggered on it already.
                        processingDisabled = false;
                    }
                    return;
                }
                else
                {
                    if (maxEventCount <= eventCount)
                    {
                        processingDisabled = true;
                    }
                }
                // Sadly we have seen cases of merged ETL files where there are events past the end of the session.
                // This confuses later logic so ensure that this does not happen.  Note that we also want the
                // any module-DCStops to happen at sessionEndTime so we have to do this after processing all events
                if (data.TimeStampQPC > sessionEndTimeQPC)
                {
                    sessionEndTimeQPC = data.TimeStampQPC;
                }

                if (data.TimeStampQPC < lastQPCEventTime)
                {
                    options.ConversionLog.WriteLine("WARNING, events out of order! This breaks event search.  Jumping from {0:n3} back to {1:n3} for {2} EventID {3} Thread {4}",
                        QPCTimeToRelMSec(lastQPCEventTime), data.TimeStampRelativeMSec, data.ProviderName, data.ID, data.ThreadID);
                    firstTimeInversion = (EventIndex) (uint) eventCount;
                }

                lastQPCEventTime = data.TimeStampQPC;

                // Update the counts
                var countForEvent = stats.GetEventCounts(data);
                countForEvent.m_count++;
                countForEvent.m_eventDataLenTotal += data.EventDataLength;

                var extendedDataCount = data.eventRecord->ExtendedDataCount;
                if (extendedDataCount != 0)
                {
                    bookKeepingEvent |= ProcessExtendedData(data, extendedDataCount, countForEvent);
                }

                if (bookKeepingEvent)
                {
                    bookKeepingEvent = false;
                    if (bookeepingEventThatMayHaveStack)
                    {
                        // We log the event so that we don't get spurious warnings about not finding the event for a stack,
                        // but we mark the EventIndex as invalid so that we know not to actually log this stack.
                        pastEventInfo.LogEvent(data, EventIndex.Invalid, countForEvent);
                        bookeepingEventThatMayHaveStack = false;
                    }
                    // But unless the user explicitly asked for them, we remove them from the trace.
                    if (!options.KeepAllEvents)
                    {
                        return;
                    }
                }
                else
                {
                    // Remember the event (to attach latter Stack Events) and also log event counts in TraceStats
                    if (!noStack)
                    {
                        pastEventInfo.LogEvent(data, removeFromStream ? EventIndex.Invalid : ((EventIndex)eventCount), countForEvent);
                    }
                    else
                    {
                        noStack = false;
                    }

                    if (removeFromStream)
                    {
                        removeFromStream = false;
                        if (!options.KeepAllEvents)
                        {
                            return;
                        }
                    }
                    else // Remember any code address in the event.
                    {
                        data.LogCodeAddresses(fnAddAddressToCodeAddressMap);
                    }
                }
                // We want all events to have a TraceProcess and TraceThread.
                // We force this to happen here.  We may have created a thread already, in which
                // case the 'thread' instance variable will hold it.  Use that if it is accurate.
                // Otherwise make a new one here.
                if (thread == null || thread.ThreadID != data.ThreadID && data.ProcessID != -1)
                {
                    TraceProcess process = processes.GetOrCreateProcess(data.ProcessID, data.TimeStampQPC);
                    if (data.ThreadID != -1)
                    {
                        // All Thread events should already be handled (since we are passing the wrong args for those here).
                        Debug.Assert(!(data is ThreadTraceData));
                        thread = Threads.GetOrCreateThread(data.ThreadID, data.TimeStampQPC, process);
                    }
                }

                if (numberOnPage >= eventsPerPage)
                {
                    // options.ConversionLog.WriteLine("Writing page " + this.eventPages.BatchCount, " Start " + writer.GetLabel());
                    eventPages.Add(new EventPageEntry(data.TimeStampQPC, writer.GetLabel()));
                    numberOnPage = 0;
                }
                unsafe
                {
                    Debug.Assert(data.eventRecord->EventHeader.TimeStamp < long.MaxValue);
                    WriteBlob((IntPtr)data.eventRecord, writer, headerSize);
                    WriteBlob(data.userData, writer, (data.EventDataLength + 3 & ~3));
                }
                numberOnPage++;
                eventCount++;
            };

#if DEBUG
            // This is a guard against code running in TraceLog.CopyRawEvents that attempts to use
            // the EventIndex for an event returned by ETWTraceEventSource. It is unsafe to do so
            // because the EventIndex returned represents the index in the ETW stream, but user
            // code needs the index in the newly created ETLX stream (which does not include
            // "bookkeeping" events. User code should use the TraceLog.EventCount instead (the
            // way TraceLog.ProcessExtendedData and Activities.HandleActivityCreation do)
            var rawEtwEvents = rawEvents as ETWTraceEventSource;
            if (rawEtwEvents != null)
            {
                rawEtwEvents.DisallowEventIndexAccess = true;
            }
#endif
            try
            {
                rawEvents.Process();                  // Run over the data.
            }
            catch (Exception e)
            {
                options.ConversionLog.WriteLine("[ERROR: processing events ****]");
                if (options.ContinueOnError)
                {
                    options.ConversionLog.WriteLine("***** The following Exception was thrown during processing *****");
                    options.ConversionLog.WriteLine(e.ToString());
                    options.ConversionLog.WriteLine("***** However ContinueOnError is set, so we continue processing  what we have *****");
                    options.ConversionLog.WriteLine("Continuing Processing...");
                }
                else
                {
                    options.ConversionLog.WriteLine("***** Consider using /ContinueOnError to ignore the bad part of the trace.  *****");
                    throw;
                }
            }
#if DEBUG
            if (rawEtwEvents != null)
            {
                rawEtwEvents.DisallowEventIndexAccess = false;
            }
#endif

            // EventPipe doesn't set EventsLost until after Process is called.
            if(rawEvents is EventPipeEventSource)
            {
                eventsLost = rawEvents.EventsLost;
            }

            if (eventCount >= maxEventCount)
            {
                if (options != null && options.ConversionLog != null)
                {
                    if (options.OnLostEvents != null)
                    {
                        options.OnLostEvents(true, EventsLost, eventCount);
                    }

                    options.ConversionLog.WriteLine("Truncated events to {0:n} events.  Use /MaxEventCount to change.", maxEventCount);
                    options.ConversionLog.WriteLine("However  is a hard limit of 4GB of of processed (ETLX) data, increasing it over 15M will probably hit that.");
                    options.ConversionLog.WriteLine("Instead you can use /SkipMSec:X to skip the beginning events and thus see the next window of /MaxEventCount the file.");
                }
            }

            freeEventStackInfos = null;
            pastEventInfo.Dispose();
            if (kernelStackKeyToInfo.Count != 0)
            {
                DebugWarn(false, "Warning: " + kernelStackKeyToInfo.Count + " undefined kernel stacks at the end of the trace.", null);
            }

            kernelStackKeyToInfo = null;
            if (userStackKeyToInfo.Count != 0)
            {
                DebugWarn(false, "Warning: " + userStackKeyToInfo.Count + " undefined user stacks at the end of the trace.", null);
            }

            userStackKeyToInfo = null;

            // TODO FIX NOW hack because unloadMethod not present
            foreach (var jittedMethod in jittedMethods)
            {
                codeAddresses.AddMethod(jittedMethod);
            }

            foreach (var jsJittedMethod in jsJittedMethods)
            {
                codeAddresses.AddMethod(jsJittedMethod, sourceFilesByID);
            }

            // Make sure that all threads have a process
            foreach (var curThread in Threads)
            {
                // Finish off the processing of the ETW compressed stacks.  This means doing all the deferred Kernel stack processing
                // and connecting all pseudo-callStack indexes into real ones.
                if (curThread.lastEntryIntoKernel != null)
                {
                    EmitStackOnExitFromKernel(ref curThread.lastEntryIntoKernel, TraceCallStacks.GetRootForThread(curThread.ThreadIndex), null);
                }

                if (curThread.process == null)
                {
                    DebugWarn(true, "Warning: could not determine the process for thread " + curThread.ThreadID, null);
                    var unknownProcess = Processes.GetOrCreateProcess(-1, 0);
                    unknownProcess.imageFileName = "UNKNOWN_PROCESS";
                    curThread.process = unknownProcess;
                }
                curThread.Process.cpuSamples += curThread.cpuSamples;         // Roll up CPU to the process.
            }

            // Make sure we are not missing any ImageEnds that we have ImageStarts for.
            foreach (var process in Processes)
            {
                foreach (var module in process.LoadedModules)
                {
                    // We did not unload the module
                    if (module.unloadTimeQPC == long.MaxValue && module.ImageBase != 0)
                    {
                        // simulate a module unload, and resolve all code addresses in the module's range.
                        CodeAddresses.ForAllUnresolvedCodeAddressesInRange(process, module.ImageBase, module.ModuleFile.ImageSize, false, delegate (ref TraceCodeAddresses.CodeAddressInfo info)
                        {
                            info.SetModuleFileIndex(module.ModuleFile);
                        });
                    }
                    if (module.unloadTimeQPC > sessionEndTimeQPC)
                    {
                        module.unloadTimeQPC = sessionEndTimeQPC;
                    }
                }

                if (process.endTimeQPC > sessionEndTimeQPC)
                {
                    process.endTimeQPC = sessionEndTimeQPC;
                }

                if (options != null && options.ConversionLog != null)
                {
                    if (process.unresolvedCodeAddresses.Count > 0)
                    {
                        options.ConversionLog.WriteLine("There were {0} address that did not resolve to a module or method in process {1} ({2})",
                            process.unresolvedCodeAddresses.Count, process.Name, process.ProcessID);
                        //options.ConversionLog.WriteLine(process.unresolvedCodeAddresses.Foreach(x => CodeAddresses.Address(x).ToString("x")));
                    }
                }

                // We are done with these data structures.
                process.codeAddressesInProcess = null;
                process.unresolvedCodeAddresses.Clear();
                // Link up all the 'Parent' fields of the process.
                process.SetParentForProcess();
            }

#if DEBUG
            // Confirm that there are no infinite chains (we guarantee this for sanity).
            foreach (var process in Processes)
            {
                Debug.Assert(process.ParentDepth() < Processes.Count);
            }
#endif

            // Sum up the module level statistics for code addresses.
            for (int codeAddrIdx = 0; codeAddrIdx < CodeAddresses.Count; codeAddrIdx++)
            {
                var inclusiveCount = CodeAddresses.codeAddresses[codeAddrIdx].InclusiveCount;

                var moduleIdx = CodeAddresses.ModuleFileIndex((CodeAddressIndex)codeAddrIdx);
                if (moduleIdx != ModuleFileIndex.Invalid)
                {
                    var module = CodeAddresses.ModuleFiles[moduleIdx];
                    module.codeAddressesInModule += inclusiveCount;
                }
                CodeAddresses.totalCodeAddresses += inclusiveCount;
            }

            // Ensure the event to stack table is in sorted order.
            eventsToStacks.Sort(delegate (EventsToStackIndex x, EventsToStackIndex y)
            {
                return (int)x.EventIndex - (int)y.EventIndex;
            });
            cswitchBlockingEventsToStacks.Sort(delegate (EventsToStackIndex x, EventsToStackIndex y)
            {
                return (int)x.EventIndex - (int)y.EventIndex;
            });

#if DEBUG
            // Confirm that the CPU stats make sense.
            foreach (var process in Processes)
            {
                float cpuFromThreads = 0;
                foreach (var curThread in process.Threads)
                {
                    cpuFromThreads += curThread.CPUMSec;
                }

                Debug.Assert(Math.Abs(cpuFromThreads - process.CPUMSec) < .01);     // We add up
            }

            // The eventsToStacks array is sorted.
            // We sort this array above, so this should only fail if we have EQUAL EventIndex.
            // This means we tried to add two stacks to an event  (we should not do that).
            // See the asserts in AddStackToEvent for more.
            for (int i = 0; i < eventsToStacks.Count - 1; i++)
            {
                Debug.Assert(eventsToStacks[i].EventIndex < eventsToStacks[i + 1].EventIndex);
            }
#endif

            Debug.Assert(eventCount % eventsPerPage == numberOnPage || numberOnPage == eventsPerPage || eventCount == 0);
            options.ConversionLog.WriteLine("{0} distinct processes.", processes.Count);
            options.ConversionLog.WriteLine("Totals");
            options.ConversionLog.WriteLine("  {0,8:n0} events.", eventCount);
            options.ConversionLog.WriteLine("  {0,8:n0} events with stack traces.", eventsToStacks.Count);
            options.ConversionLog.WriteLine("  {0,8:n0} events with code addresses in them.", eventsToCodeAddresses.Count);
            options.ConversionLog.WriteLine("  {0,8:n0} total code address instances. (stacks or other)", codeAddresses.TotalCodeAddresses);
            options.ConversionLog.WriteLine("  {0,8:n0} unique code addresses. ", codeAddresses.Count);
            options.ConversionLog.WriteLine("  {0,8:n0} unique stacks.", callStacks.Count);
            options.ConversionLog.WriteLine("  {0,8:n0} unique managed methods parsed.", codeAddresses.Methods.Count);
            options.ConversionLog.WriteLine("  {0,8:n0} CLR method event records.", codeAddresses.ManagedMethodRecordCount);
            options.ConversionLog.WriteLine("[Conversion complete {0:n0} events.  Conversion took {1:n0} sec.]",
                eventCount, (DateTime.Now - startTime).TotalSeconds);
        }