Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 74 additions & 33 deletions IntelPresentMon/PresentMonService/RealtimePresentMonSession.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (C) 2022-2023 Intel Corporation
// Copyright (C) 2022-2023 Intel Corporation
// SPDX-License-Identifier: MIT
#include "Logging.h"
#include "RealtimePresentMonSession.h"
Expand All @@ -17,53 +17,97 @@ RealtimePresentMonSession::RealtimePresentMonSession(svc::FrameBroadcaster& broa
{
pBroadcaster = &broadcaster;
ResetEtwFlushPeriod();
StartEtwSession();
}

bool RealtimePresentMonSession::IsTraceSessionActive() {
return session_active_.load(std::memory_order_acquire);
}

// Transitions the session to an inactive state without tearing down the ETW session.
// Safe to call multiple times.
void RealtimePresentMonSession::StopProvidersAndResetConsumer(bool shrink)
{
if (pm_consumer_) {
pm_consumer_->SetEventProcessingEnabled(false);
}

trace_session_.StopProviders();

if (pm_consumer_) {
pm_consumer_->ResetPresentTrackingData(shrink);
}

evtStreamingStarted_.Reset();
}

PM_STATUS RealtimePresentMonSession::UpdateTracking(const std::unordered_set<uint32_t>& trackedPids) {
const bool wasActive = HasLiveTargets();
// Ensure ETW session exists (StartTraceW done once; providers may be off).
if (!IsTraceSessionActive()) {
// If the session isn't active, then we need to start it before we can update tracking.
auto const status = StartEtwSession();
if (status != PM_STATUS_SUCCESS) {
return status;
}
}

// Snapshot state so we can rollback tracking on failure.
std::unordered_map<uint32_t, bool> previousState;
{
std::lock_guard lock(tracked_processes_mutex_);
previousState = tracked_pid_live_;
}

SyncTrackedPidState(trackedPids);
const bool isActive = HasLiveTargets();
if (isActive && (!wasActive || !IsTraceSessionActive())) {
auto status = StartTraceSession();
if (status != PM_STATUS_SUCCESS) {
bool const providersEnabled = (bool)util::win::WaitAnyEventFor(0ms, evtStreamingStarted_);

// Stop transition: targets went from some->none; providers currently enabled
if(!isActive && providersEnabled) {
pmlog_info("All targets inactive: Disabling ETW Providers");
StopProvidersAndResetConsumer(true);
evtStreamingStarted_.Reset();
return PM_STATUS::PM_STATUS_SUCCESS;
}

// Start transition: targets went from none->some; providers currently disabled
// This also handles the case where there was a StartProviders failure for some
// reason.
if (isActive && !providersEnabled) {
// Enable present tracking before enabling providers so any immediately-arriving
// events are accounted for by the quiesce logic on StopStreaming.
if (pm_consumer_) {
// Drop any lingering present tracking state from previous streams
pm_consumer_->ResetPresentTrackingData(false);
// Allow event processing before enabling providers
pm_consumer_->SetEventProcessingEnabled(true);
}
pmlog_info("Active targets detected: Enabling ETW Providers");
auto const providerStatus = trace_session_.StartProviders();
if (providerStatus != ERROR_SUCCESS) {
pmlog_info("Enabling of ETW Providers failed");
StopProvidersAndResetConsumer(true);
evtStreamingStarted_.Reset();
{
std::lock_guard lock(tracked_processes_mutex_);
tracked_pid_live_ = std::move(previousState);
}
return status;
return PM_STATUS::PM_STATUS_FAILURE;
}
}
if (isActive && evtStreamingStarted_) {
evtStreamingStarted_.Set();
return PM_STATUS::PM_STATUS_SUCCESS;
}
if (!isActive) {
if (evtStreamingStarted_) {
evtStreamingStarted_.Reset();
}
StopTraceSession();
}

// No transition: either active with providers enabled, or inactive with providers disabled
return PM_STATUS::PM_STATUS_SUCCESS;
}

bool RealtimePresentMonSession::CheckTraceSessions(bool forceTerminate) {
if (forceTerminate) {
StopTraceSession();
StopEtwSession();
ClearTrackedProcesses();
return true;
}
if (!HasLiveTargets() && (IsTraceSessionActive() == true)) {
StopTraceSession();
return true;
}
return false;
}

Expand All @@ -83,16 +127,15 @@ void RealtimePresentMonSession::FlushEvents()
pmlog_warn("Failed manual flush of ETW event buffer").hr();
}
}

}

void RealtimePresentMonSession::ResetEtwFlushPeriod()
{
etw_flush_period_ms_ = default_realtime_etw_flush_period_ms_;
}

PM_STATUS RealtimePresentMonSession::StartTraceSession() {
std::lock_guard lock(session_mutex_);
PM_STATUS RealtimePresentMonSession::StartEtwSession() {
std::lock_guard<std::mutex> lock(session_mutex_);

if (pm_consumer_) {
return PM_STATUS::PM_STATUS_SERVICE_ERROR;
Expand Down Expand Up @@ -120,6 +163,9 @@ PM_STATUS RealtimePresentMonSession::StartTraceSession() {
pm_consumer_->mTrackAppTiming = true;
pm_consumer_->mTrackPcLatency = true;

// Service uses provider toggling; enable quiesce gate for safe state reset on start/stop
pm_consumer_->SetProviderToggleMode(true);

auto& opt = clio::Options::Get();
pm_session_name_ = util::str::ToWide(*opt.etwSessionName);

Expand All @@ -128,12 +174,12 @@ PM_STATUS RealtimePresentMonSession::StartTraceSession() {
// it and start a new session. This is useful if a previous process failed to
// properly shut down the session for some reason.
trace_session_.mPMConsumer = pm_consumer_.get();
auto status = trace_session_.Start(etl_file_name, pm_session_name_.c_str());
auto status = trace_session_.Start(etl_file_name, pm_session_name_.c_str(), false);

if (status == ERROR_ALREADY_EXISTS) {
status = StopNamedTraceSession(pm_session_name_.c_str());
if (status == ERROR_SUCCESS) {
status = trace_session_.Start(etl_file_name, pm_session_name_.c_str());
status = trace_session_.Start(etl_file_name, pm_session_name_.c_str(), false);
}
}

Expand Down Expand Up @@ -167,7 +213,7 @@ PM_STATUS RealtimePresentMonSession::StartTraceSession() {
return PM_STATUS::PM_STATUS_SUCCESS;
}

void RealtimePresentMonSession::StopTraceSession() {
void RealtimePresentMonSession::StopEtwSession() {
// PHASE 1: Signal shutdown and wait for threads to observe it
// this also enforces "only_once" semantics for multiple stop callers
if (session_active_.exchange(false, std::memory_order_acq_rel)) {
Expand All @@ -182,9 +228,7 @@ void RealtimePresentMonSession::StopTraceSession() {
// PHASE 2: Safe cleanup after threads have finished
std::lock_guard<std::mutex> lock(session_mutex_);

if (evtStreamingStarted_) {
evtStreamingStarted_.Reset();
}
evtStreamingStarted_.Reset();

if (pm_consumer_) {
pm_consumer_.reset();
Expand Down Expand Up @@ -479,11 +523,8 @@ void RealtimePresentMonSession::UpdateProcesses(
}

void RealtimePresentMonSession::HandleTerminatedProcess(uint32_t processId) {
// TODO(megalvan): Need to figure this out
// Close this process' CSV.
// CloseOutputCsv(processInfo);
MarkProcessExited(processId);
if (!HasLiveTrackedProcesses() && evtStreamingStarted_) {
if (!HasLiveTrackedProcesses()) {
evtStreamingStarted_.Reset();
}
}
Expand All @@ -496,4 +537,4 @@ void RealtimePresentMonSession::HandleTerminatedProcess(uint32_t processId) {
void RealtimePresentMonSession::CheckForTerminatedRealtimeProcesses(
std::vector<std::pair<uint32_t, uint64_t>>* terminatedProcesses) {
(void)terminatedProcesses;
}
}
5 changes: 3 additions & 2 deletions IntelPresentMon/PresentMonService/RealtimePresentMonSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ class RealtimePresentMonSession : public PresentMonSession

private:
// functions
PM_STATUS StartTraceSession();
void StopTraceSession();
PM_STATUS StartEtwSession();
void StopEtwSession();

void DequeueAnalyzedInfo(
std::vector<ProcessEvent>* processEvents,
Expand Down Expand Up @@ -51,6 +51,7 @@ class RealtimePresentMonSession : public PresentMonSession

void CheckForTerminatedRealtimeProcesses(
std::vector<std::pair<uint32_t, uint64_t>>* terminatedProcesses);
void StopProvidersAndResetConsumer(bool shrink);

// data
std::wstring pm_session_name_;
Expand Down
Loading