[SPEC-7257] [LYN-4472] [AWSMetrics] The background thread that monitors the metrics queue burns a lot of CPU (#1256)

[LYN-4472] [AWSMetrics] The background thread that monitors the metrics queue burns a lot of CPU
main
Junbo Liang 5 years ago committed by GitHub
parent b08245a7f5
commit f9fe2392f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -122,29 +122,22 @@ namespace AWSMetrics
//! @return Outcome of the operation. //! @return Outcome of the operation.
AZ::Outcome<void, AZStd::string> SendMetricsToFile(AZStd::shared_ptr<MetricsQueue> metricsQueue); AZ::Outcome<void, AZStd::string> SendMetricsToFile(AZStd::shared_ptr<MetricsQueue> metricsQueue);
//! Check whether the consumer should flush the metrics queue.
//! @return whether the limit is hit.
bool ShouldSendMetrics();
//! Push metrics events to the front of the queue for retry. //! Push metrics events to the front of the queue for retry.
//! @param metricsEventsForRetry Metrics events for retry. //! @param metricsEventsForRetry Metrics events for retry.
void PushMetricsForRetry(MetricsQueue& metricsEventsForRetry); void PushMetricsForRetry(MetricsQueue& metricsEventsForRetry);
void SubmitLocalMetricsAsync(); void SubmitLocalMetricsAsync();
//////////////////////////////////////////// AZStd::mutex m_metricsMutex; //!< Mutex to protect the metrics queue
// These data are protected by m_metricsMutex. MetricsQueue m_metricsQueue; //!< Queue fo buffering the metrics events
AZStd::mutex m_metricsMutex;
AZStd::chrono::system_clock::time_point m_lastSendMetricsTime;
MetricsQueue m_metricsQueue;
////////////////////////////////////////////
AZStd::mutex m_metricsFileMutex; //!< Local metrics file is protected by m_metricsFileMutex AZStd::mutex m_metricsFileMutex; //!< Mutex to protect the local metrics file
AZStd::atomic<int> m_sendMetricsId;//!< Request ID for sending metrics AZStd::atomic<int> m_sendMetricsId;//!< Request ID for sending metrics
AZStd::thread m_consumerThread; //!< Thread to monitor and consume the metrics queue AZStd::thread m_monitorThread; //!< Thread to monitor and consume the metrics queue
AZStd::atomic<bool> m_consumerTerminated; AZStd::atomic<bool> m_monitorTerminated;
AZStd::binary_semaphore m_waitEvent;
// Client Configurations. // Client Configurations.
AZStd::unique_ptr<ClientConfiguration> m_clientConfiguration; AZStd::unique_ptr<ClientConfiguration> m_clientConfiguration;

@ -29,7 +29,7 @@ namespace AWSMetrics
MetricsManager::MetricsManager() MetricsManager::MetricsManager()
: m_clientConfiguration(AZStd::make_unique<ClientConfiguration>()) : m_clientConfiguration(AZStd::make_unique<ClientConfiguration>())
, m_clientIdProvider(IdentityProvider::CreateIdentityProvider()) , m_clientIdProvider(IdentityProvider::CreateIdentityProvider())
, m_consumerTerminated(true) , m_monitorTerminated(true)
, m_sendMetricsId(0) , m_sendMetricsId(0)
{ {
} }
@ -53,33 +53,29 @@ namespace AWSMetrics
void MetricsManager::StartMetrics() void MetricsManager::StartMetrics()
{ {
if (!m_consumerTerminated) if (!m_monitorTerminated)
{ {
// The background thread has been started. // The background thread has been started.
return; return;
} }
m_monitorTerminated = false;
m_consumerTerminated = false;
AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex);
m_lastSendMetricsTime = AZStd::chrono::system_clock::now();
// Start a separate thread to monitor and consume the metrics queue. // Start a separate thread to monitor and consume the metrics queue.
// Avoid using the job system since the worker is long-running over multiple frames // Avoid using the job system since the worker is long-running over multiple frames
m_consumerThread = AZStd::thread(AZStd::bind(&MetricsManager::MonitorMetricsQueue, this)); m_monitorThread = AZStd::thread(AZStd::bind(&MetricsManager::MonitorMetricsQueue, this));
} }
void MetricsManager::MonitorMetricsQueue() void MetricsManager::MonitorMetricsQueue()
{ {
while (!m_consumerTerminated) // Continue to loop until the monitor is terminated.
while (!m_monitorTerminated)
{ {
if (ShouldSendMetrics()) // The thread will wake up either when the metrics event queue is full (try_acquire_for call returns true),
{ // or the flush period limit is hit (try_acquire_for call returns false).
// Flush the metrics queue when the accumulated metrics size or time period hits the limit m_waitEvent.try_acquire_for(AZStd::chrono::seconds(m_clientConfiguration->GetQueueFlushPeriodInSeconds()));
FlushMetricsAsync(); FlushMetricsAsync();
} }
} }
}
void MetricsManager::SetupJobContext() void MetricsManager::SetupJobContext()
{ {
@ -114,6 +110,12 @@ namespace AWSMetrics
AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex); AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex);
m_metricsQueue.AddMetrics(metricsEvent); m_metricsQueue.AddMetrics(metricsEvent);
if (m_metricsQueue.GetSizeInBytes() >= m_clientConfiguration->GetMaxQueueSizeInBytes())
{
// Flush the metrics queue when the accumulated metrics size hits the limit
m_waitEvent.release();
}
return true; return true;
} }
@ -348,9 +350,6 @@ namespace AWSMetrics
void MetricsManager::FlushMetricsAsync() void MetricsManager::FlushMetricsAsync()
{ {
AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex); AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex);
m_lastSendMetricsTime = AZStd::chrono::system_clock::now();
if (m_metricsQueue.GetNumMetrics() == 0) if (m_metricsQueue.GetNumMetrics() == 0)
{ {
return; return;
@ -363,34 +362,20 @@ namespace AWSMetrics
SendMetricsAsync(metricsToFlush); SendMetricsAsync(metricsToFlush);
} }
bool MetricsManager::ShouldSendMetrics()
{
AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex);
auto secondsSinceLastFlush = AZStd::chrono::duration_cast<AZStd::chrono::seconds>(AZStd::chrono::system_clock::now() - m_lastSendMetricsTime);
if (secondsSinceLastFlush >= AZStd::chrono::seconds(m_clientConfiguration->GetQueueFlushPeriodInSeconds()) ||
m_metricsQueue.GetSizeInBytes() >= m_clientConfiguration->GetMaxQueueSizeInBytes())
{
return true;
}
return false;
}
void MetricsManager::ShutdownMetrics() void MetricsManager::ShutdownMetrics()
{ {
if (m_consumerTerminated) if (m_monitorTerminated)
{ {
return; return;
} }
// Terminate the consumer thread // Terminate the monitor thread
m_consumerTerminated = true; m_monitorTerminated = true;
FlushMetricsAsync(); m_waitEvent.release();
if (m_consumerThread.joinable()) if (m_monitorThread.joinable())
{ {
m_consumerThread.join(); m_monitorThread.join();
} }
} }
@ -449,6 +434,12 @@ namespace AWSMetrics
{ {
AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex); AZStd::lock_guard<AZStd::mutex> lock(m_metricsMutex);
m_metricsQueue.AddMetrics(offlineRecords[index]); m_metricsQueue.AddMetrics(offlineRecords[index]);
if (m_metricsQueue.GetSizeInBytes() >= m_clientConfiguration->GetMaxQueueSizeInBytes())
{
// Flush the metrics queue when the accumulated metrics size hits the limit
m_waitEvent.release();
}
} }
// Remove the local metrics file after reading all its content. // Remove the local metrics file after reading all its content.

@ -355,6 +355,9 @@ namespace AWSMetrics
TEST_F(MetricsManagerTest, FlushMetrics_NonEmptyQueue_Success) TEST_F(MetricsManagerTest, FlushMetrics_NonEmptyQueue_Success)
{ {
ResetClientConfig(true, (double)TestMetricsEventSizeInBytes * (MaxNumMetricsEvents + 1) / MbToBytes,
DefaultFlushPeriodInSeconds, 1);
for (int index = 0; index < MaxNumMetricsEvents; ++index) for (int index = 0; index < MaxNumMetricsEvents; ++index)
{ {
AZStd::vector<MetricsAttribute> metricsAttributes; AZStd::vector<MetricsAttribute> metricsAttributes;
@ -377,7 +380,7 @@ namespace AWSMetrics
TEST_F(MetricsManagerTest, ResetOfflineRecordingStatus_ResubmitLocalMetrics_Success) TEST_F(MetricsManagerTest, ResetOfflineRecordingStatus_ResubmitLocalMetrics_Success)
{ {
// Disable offline recording in the config file. // Disable offline recording in the config file.
ResetClientConfig(false, 0.0, 0, 0); ResetClientConfig(false, (double)TestMetricsEventSizeInBytes * 2 / MbToBytes, 0, 0);
// Enable offline recording after initialize the metric manager. // Enable offline recording after initialize the metric manager.
m_metricsManager->UpdateOfflineRecordingStatus(true); m_metricsManager->UpdateOfflineRecordingStatus(true);

Loading…
Cancel
Save