LYN-7473: Ensure Requestor background thread is shutdown correctly (#4744)

Signed-off-by: rppotter <rppotter@amazon.com>
monroegm-disable-blank-issue-2
Pip Potter 4 years ago committed by GitHub
parent a534fccc9b
commit d692cde146
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -36,22 +36,27 @@ namespace HttpRequestor
desc.m_cpuId = AFFINITY_MASK_USERTHREADS; desc.m_cpuId = AFFINITY_MASK_USERTHREADS;
m_runThread = true; m_runThread = true;
AWSNativeSDKInit::InitializationManager::InitAwsApi(); AWSNativeSDKInit::InitializationManager::InitAwsApi();
auto function = AZStd::bind(&Manager::ThreadFunction, this); auto function = [this]
{
ThreadFunction();
};
m_thread = AZStd::thread(desc, function); m_thread = AZStd::thread(desc, function);
} }
Manager::~Manager() Manager::~Manager()
{ {
AWSNativeSDKInit::InitializationManager::Shutdown();
m_runThread = false; m_runThread = false;
m_requestConditionVar.notify_all(); m_requestConditionVar.notify_all();
if (m_thread.joinable()) if (m_thread.joinable())
{ {
m_thread.join(); m_thread.join();
} }
// Shutdown after background thread has closed.
AWSNativeSDKInit::InitializationManager::Shutdown();
} }
void Manager::AddRequest(Parameters && httpRequestParameters) void Manager::AddRequest(Parameters&& httpRequestParameters)
{ {
{ {
AZStd::lock_guard<AZStd::mutex> lock(m_requestMutex); AZStd::lock_guard<AZStd::mutex> lock(m_requestMutex);
@ -60,7 +65,7 @@ namespace HttpRequestor
m_requestConditionVar.notify_all(); m_requestConditionVar.notify_all();
} }
void Manager::AddTextRequest(TextParameters && httpTextRequestParameters) void Manager::AddTextRequest(TextParameters&& httpTextRequestParameters)
{ {
{ {
AZStd::lock_guard<AZStd::mutex> lock(m_requestMutex); AZStd::lock_guard<AZStd::mutex> lock(m_requestMutex);
@ -80,9 +85,14 @@ namespace HttpRequestor
void Manager::HandleRequestBatch() void Manager::HandleRequestBatch()
{ {
// Lock mutex and wait for work to be signalled via the condition variable // Lock mutex and wait for work to be signaled via the condition variable
AZStd::unique_lock<AZStd::mutex> lock(m_requestMutex); AZStd::unique_lock<AZStd::mutex> lock(m_requestMutex);
m_requestConditionVar.wait(lock, [&] { return !m_runThread || !m_requestsToHandle.empty() || !m_textRequestsToHandle.empty(); }); m_requestConditionVar.wait(
lock,
[&]
{
return !m_runThread || !m_requestsToHandle.empty() || !m_textRequestsToHandle.empty();
});
// Swap queues // Swap queues
AZStd::queue<Parameters> requestsToHandle; AZStd::queue<Parameters> requestsToHandle;
@ -114,21 +124,22 @@ namespace HttpRequestor
config.enableTcpKeepAlive = AZ_TRAIT_AZFRAMEWORK_AWS_ENABLE_TCP_KEEP_ALIVE_SUPPORTED; config.enableTcpKeepAlive = AZ_TRAIT_AZFRAMEWORK_AWS_ENABLE_TCP_KEEP_ALIVE_SUPPORTED;
std::shared_ptr<Aws::Http::HttpClient> httpClient = Aws::Http::CreateHttpClient(config); std::shared_ptr<Aws::Http::HttpClient> httpClient = Aws::Http::CreateHttpClient(config);
auto httpRequest = Aws::Http::CreateHttpRequest(httpRequestParameters.GetURI(), httpRequestParameters.GetMethod(), Aws::Utils::Stream::DefaultResponseStreamFactoryMethod); auto httpRequest = Aws::Http::CreateHttpRequest(
httpRequestParameters.GetURI(), httpRequestParameters.GetMethod(), Aws::Utils::Stream::DefaultResponseStreamFactoryMethod);
AZ_Assert(httpRequest, "HttpRequest not created!"); AZ_Assert(httpRequest, "HttpRequest not created!");
for (const auto & it : httpRequestParameters.GetHeaders()) for (const auto& it : httpRequestParameters.GetHeaders())
{ {
httpRequest->SetHeaderValue(it.first.c_str(), it.second.c_str()); httpRequest->SetHeaderValue(it.first.c_str(), it.second.c_str());
} }
if( httpRequestParameters.GetBodyStream() != nullptr) if (httpRequestParameters.GetBodyStream() != nullptr)
{ {
httpRequest->AddContentBody(httpRequestParameters.GetBodyStream()); httpRequest->AddContentBody(httpRequestParameters.GetBodyStream());
httpRequest->SetContentLength(AZStd::to_string(httpRequestParameters.GetBodyStream()->str().length()).c_str()); httpRequest->SetContentLength(AZStd::to_string(httpRequestParameters.GetBodyStream()->str().length()).c_str());
} }
auto httpResponse = httpClient->MakeRequest(httpRequest); auto httpResponse = httpClient->MakeRequest(httpRequest);
if (!httpResponse) if (!httpResponse)
@ -154,15 +165,16 @@ namespace HttpRequestor
} }
} }
void Manager::HandleTextRequest(const TextParameters & httpRequestParameters) void Manager::HandleTextRequest(const TextParameters& httpRequestParameters)
{ {
Aws::Client::ClientConfiguration config; Aws::Client::ClientConfiguration config;
config.enableTcpKeepAlive = AZ_TRAIT_AZFRAMEWORK_AWS_ENABLE_TCP_KEEP_ALIVE_SUPPORTED; config.enableTcpKeepAlive = AZ_TRAIT_AZFRAMEWORK_AWS_ENABLE_TCP_KEEP_ALIVE_SUPPORTED;
std::shared_ptr<Aws::Http::HttpClient> httpClient = Aws::Http::CreateHttpClient(config); std::shared_ptr<Aws::Http::HttpClient> httpClient = Aws::Http::CreateHttpClient(config);
auto httpRequest = Aws::Http::CreateHttpRequest(httpRequestParameters.GetURI(), httpRequestParameters.GetMethod(), Aws::Utils::Stream::DefaultResponseStreamFactoryMethod); auto httpRequest = Aws::Http::CreateHttpRequest(
httpRequestParameters.GetURI(), httpRequestParameters.GetMethod(), Aws::Utils::Stream::DefaultResponseStreamFactoryMethod);
for (const auto & it : httpRequestParameters.GetHeaders())
for (const auto& it : httpRequestParameters.GetHeaders())
{ {
httpRequest->SetHeaderValue(it.first.c_str(), it.second.c_str()); httpRequest->SetHeaderValue(it.first.c_str(), it.second.c_str());
} }
@ -172,7 +184,7 @@ namespace HttpRequestor
httpRequest->AddContentBody(httpRequestParameters.GetBodyStream()); httpRequest->AddContentBody(httpRequestParameters.GetBodyStream());
} }
auto httpResponse = httpClient->MakeRequest(httpRequest); const auto httpResponse = httpClient->MakeRequest(httpRequest);
if (!httpResponse) if (!httpResponse)
{ {
@ -192,4 +204,4 @@ namespace HttpRequestor
AZStd::string data(std::istreambuf_iterator<char>(httpResponse->GetResponseBody()), eos); AZStd::string data(std::istreambuf_iterator<char>(httpResponse->GetResponseBody()), eos);
httpRequestParameters.GetCallback()(AZStd::move(data), httpResponse->GetResponseCode()); httpRequestParameters.GetCallback()(AZStd::move(data), httpResponse->GetResponseCode());
} }
} } // namespace HttpRequestor

@ -63,20 +63,17 @@ namespace HttpRequestor
void HttpRequestorSystemComponent::Reflect(AZ::ReflectContext* context) void HttpRequestorSystemComponent::Reflect(AZ::ReflectContext* context)
{ {
if (AZ::SerializeContext* serialize = azrtti_cast<AZ::SerializeContext*>(context)) if (auto serialize = azrtti_cast<AZ::SerializeContext*>(context))
{ {
serialize->Class<HttpRequestorSystemComponent, AZ::Component>() serialize->Class<HttpRequestorSystemComponent, AZ::Component>()->Version(1);
->Version(1);
;
if (AZ::EditContext* ec = serialize->GetEditContext()) if (AZ::EditContext* ec = serialize->GetEditContext())
{ {
ec->Class<HttpRequestorSystemComponent>("HttpRequestor", "Will make HTTP Rest calls") ec->Class<HttpRequestorSystemComponent>("HttpRequestor", "Will make HTTP Rest calls")
->ClassElement(AZ::Edit::ClassElements::EditorData, "") ->ClassElement(AZ::Edit::ClassElements::EditorData, "")
// ->Attribute(AZ::Edit::Attributes::Category, "") Set a category // ->Attribute(AZ::Edit::Attributes::Category, "") Set a category
->Attribute(AZ::Edit::Attributes::AppearsInAddComponentMenu, AZ_CRC("System")) ->Attribute(AZ::Edit::Attributes::AppearsInAddComponentMenu, AZ_CRC("System"))
->Attribute(AZ::Edit::Attributes::AutoExpand, true) ->Attribute(AZ::Edit::Attributes::AutoExpand, true);
;
} }
} }
} }

@ -19,7 +19,7 @@ class HttpTest
{ {
}; };
TEST_F(HttpTest, DISABLED_HttpRequesterTest) TEST_F(HttpTest, HttpRequesterTest)
{ {
HttpRequestor::Manager httpRequestManager; HttpRequestor::Manager httpRequestManager;
@ -35,17 +35,14 @@ TEST_F(HttpTest, DISABLED_HttpRequesterTest)
requestConditionVar.wait_for(lock, AZStd::chrono::milliseconds(10)); requestConditionVar.wait_for(lock, AZStd::chrono::milliseconds(10));
} }
httpRequestManager.AddTextRequest( httpRequestManager.AddTextRequest(HttpRequestor::TextParameters(
HttpRequestor::TextParameters("https://httpbin.org/ip", "https://httpbin.org/ip", Aws::Http::HttpMethod::HTTP_GET,
Aws::Http::HttpMethod::HTTP_GET, [&resultData, &resultCode, &requestConditionVar](const AZStd::string& data, Aws::Http::HttpResponseCode code)
[&resultData, &resultCode, &requestConditionVar](const AZStd::string& data, Aws::Http::HttpResponseCode code) {
{ resultData = data;
resultData = data; resultCode = code;
resultCode = code; requestConditionVar.notify_all();
requestConditionVar.notify_all(); }));
}
)
);
{ {
AZStd::unique_lock<AZStd::mutex> lock(requestMutex); AZStd::unique_lock<AZStd::mutex> lock(requestMutex);

Loading…
Cancel
Save