From f9fe2392f422655fa8803bb30296cf90f63ee331 Mon Sep 17 00:00:00 2001 From: Junbo Liang <68558268+junbo75@users.noreply.github.com> Date: Fri, 11 Jun 2021 15:34:31 -0700 Subject: [PATCH] [SPEC-7257] [LYN-4472] [AWSMetrics] The background thread that monitors the metrics queue burns a lot of CPU (#1256) [LYN-4472] [AWSMetrics] The background thread that monitors the metrics queue burns a lot of CPU --- .../Code/Include/Private/MetricsManager.h | 19 ++---- .../AWSMetrics/Code/Source/MetricsManager.cpp | 65 ++++++++----------- .../Code/Tests/MetricsManagerTest.cpp | 5 +- 3 files changed, 38 insertions(+), 51 deletions(-) diff --git a/Gems/AWSMetrics/Code/Include/Private/MetricsManager.h b/Gems/AWSMetrics/Code/Include/Private/MetricsManager.h index 28c8c3d762..b8b6a8ccce 100644 --- a/Gems/AWSMetrics/Code/Include/Private/MetricsManager.h +++ b/Gems/AWSMetrics/Code/Include/Private/MetricsManager.h @@ -122,29 +122,22 @@ namespace AWSMetrics //! @return Outcome of the operation. AZ::Outcome SendMetricsToFile(AZStd::shared_ptr metricsQueue); - //! Check whether the consumer should flush the metrics queue. - //! @return whether the limit is hit. - bool ShouldSendMetrics(); - //! Push metrics events to the front of the queue for retry. //! @param metricsEventsForRetry Metrics events for retry. void PushMetricsForRetry(MetricsQueue& metricsEventsForRetry); void SubmitLocalMetricsAsync(); - //////////////////////////////////////////// - // These data are protected by m_metricsMutex. - AZStd::mutex m_metricsMutex; - AZStd::chrono::system_clock::time_point m_lastSendMetricsTime; - MetricsQueue m_metricsQueue; - //////////////////////////////////////////// + AZStd::mutex m_metricsMutex; //!< Mutex to protect the metrics queue + MetricsQueue m_metricsQueue; //!< Queue fo buffering the metrics events - AZStd::mutex m_metricsFileMutex; //!< Local metrics file is protected by m_metricsFileMutex + AZStd::mutex m_metricsFileMutex; //!< Mutex to protect the local metrics file AZStd::atomic m_sendMetricsId;//!< Request ID for sending metrics - AZStd::thread m_consumerThread; //!< Thread to monitor and consume the metrics queue - AZStd::atomic m_consumerTerminated; + AZStd::thread m_monitorThread; //!< Thread to monitor and consume the metrics queue + AZStd::atomic m_monitorTerminated; + AZStd::binary_semaphore m_waitEvent; // Client Configurations. AZStd::unique_ptr m_clientConfiguration; diff --git a/Gems/AWSMetrics/Code/Source/MetricsManager.cpp b/Gems/AWSMetrics/Code/Source/MetricsManager.cpp index 2f4f1fb5e5..d48ec1a041 100644 --- a/Gems/AWSMetrics/Code/Source/MetricsManager.cpp +++ b/Gems/AWSMetrics/Code/Source/MetricsManager.cpp @@ -29,7 +29,7 @@ namespace AWSMetrics MetricsManager::MetricsManager() : m_clientConfiguration(AZStd::make_unique()) , m_clientIdProvider(IdentityProvider::CreateIdentityProvider()) - , m_consumerTerminated(true) + , m_monitorTerminated(true) , m_sendMetricsId(0) { } @@ -53,31 +53,27 @@ namespace AWSMetrics void MetricsManager::StartMetrics() { - if (!m_consumerTerminated) + if (!m_monitorTerminated) { // The background thread has been started. return; } - - m_consumerTerminated = false; - - AZStd::lock_guard lock(m_metricsMutex); - m_lastSendMetricsTime = AZStd::chrono::system_clock::now(); + m_monitorTerminated = false; // Start a separate thread to monitor and consume the metrics queue. // Avoid using the job system since the worker is long-running over multiple frames - m_consumerThread = AZStd::thread(AZStd::bind(&MetricsManager::MonitorMetricsQueue, this)); + m_monitorThread = AZStd::thread(AZStd::bind(&MetricsManager::MonitorMetricsQueue, this)); } void MetricsManager::MonitorMetricsQueue() { - while (!m_consumerTerminated) + // Continue to loop until the monitor is terminated. + while (!m_monitorTerminated) { - if (ShouldSendMetrics()) - { - // Flush the metrics queue when the accumulated metrics size or time period hits the limit - FlushMetricsAsync(); - } + // The thread will wake up either when the metrics event queue is full (try_acquire_for call returns true), + // or the flush period limit is hit (try_acquire_for call returns false). + m_waitEvent.try_acquire_for(AZStd::chrono::seconds(m_clientConfiguration->GetQueueFlushPeriodInSeconds())); + FlushMetricsAsync(); } } @@ -114,6 +110,12 @@ namespace AWSMetrics AZStd::lock_guard lock(m_metricsMutex); m_metricsQueue.AddMetrics(metricsEvent); + if (m_metricsQueue.GetSizeInBytes() >= m_clientConfiguration->GetMaxQueueSizeInBytes()) + { + // Flush the metrics queue when the accumulated metrics size hits the limit + m_waitEvent.release(); + } + return true; } @@ -348,9 +350,6 @@ namespace AWSMetrics void MetricsManager::FlushMetricsAsync() { AZStd::lock_guard lock(m_metricsMutex); - - m_lastSendMetricsTime = AZStd::chrono::system_clock::now(); - if (m_metricsQueue.GetNumMetrics() == 0) { return; @@ -363,34 +362,20 @@ namespace AWSMetrics SendMetricsAsync(metricsToFlush); } - bool MetricsManager::ShouldSendMetrics() - { - AZStd::lock_guard lock(m_metricsMutex); - - auto secondsSinceLastFlush = AZStd::chrono::duration_cast(AZStd::chrono::system_clock::now() - m_lastSendMetricsTime); - if (secondsSinceLastFlush >= AZStd::chrono::seconds(m_clientConfiguration->GetQueueFlushPeriodInSeconds()) || - m_metricsQueue.GetSizeInBytes() >= m_clientConfiguration->GetMaxQueueSizeInBytes()) - { - return true; - } - - return false; - } - void MetricsManager::ShutdownMetrics() { - if (m_consumerTerminated) + if (m_monitorTerminated) { return; } - // Terminate the consumer thread - m_consumerTerminated = true; - FlushMetricsAsync(); + // Terminate the monitor thread + m_monitorTerminated = true; + m_waitEvent.release(); - if (m_consumerThread.joinable()) + if (m_monitorThread.joinable()) { - m_consumerThread.join(); + m_monitorThread.join(); } } @@ -449,6 +434,12 @@ namespace AWSMetrics { AZStd::lock_guard lock(m_metricsMutex); m_metricsQueue.AddMetrics(offlineRecords[index]); + + if (m_metricsQueue.GetSizeInBytes() >= m_clientConfiguration->GetMaxQueueSizeInBytes()) + { + // Flush the metrics queue when the accumulated metrics size hits the limit + m_waitEvent.release(); + } } // Remove the local metrics file after reading all its content. diff --git a/Gems/AWSMetrics/Code/Tests/MetricsManagerTest.cpp b/Gems/AWSMetrics/Code/Tests/MetricsManagerTest.cpp index 82385b8bdd..bd7ecb722e 100644 --- a/Gems/AWSMetrics/Code/Tests/MetricsManagerTest.cpp +++ b/Gems/AWSMetrics/Code/Tests/MetricsManagerTest.cpp @@ -355,6 +355,9 @@ namespace AWSMetrics TEST_F(MetricsManagerTest, FlushMetrics_NonEmptyQueue_Success) { + ResetClientConfig(true, (double)TestMetricsEventSizeInBytes * (MaxNumMetricsEvents + 1) / MbToBytes, + DefaultFlushPeriodInSeconds, 1); + for (int index = 0; index < MaxNumMetricsEvents; ++index) { AZStd::vector metricsAttributes; @@ -377,7 +380,7 @@ namespace AWSMetrics TEST_F(MetricsManagerTest, ResetOfflineRecordingStatus_ResubmitLocalMetrics_Success) { // Disable offline recording in the config file. - ResetClientConfig(false, 0.0, 0, 0); + ResetClientConfig(false, (double)TestMetricsEventSizeInBytes * 2 / MbToBytes, 0, 0); // Enable offline recording after initialize the metric manager. m_metricsManager->UpdateOfflineRecordingStatus(true);