From e26f29b89fbb3ae469606b8c69a7cce2e55a7ab7 Mon Sep 17 00:00:00 2001 From: SergeyRyabinin Date: Thu, 27 Feb 2025 21:44:37 +0000 Subject: [PATCH] Fix high CPU utilization regression on event streaming introduced by #3302 --- .../aws/core/client/AWSClientAsyncCRTP.h | 5 ++++ .../client/AWSClientEventStreamingAsyncTask.h | 10 ++++++++ .../aws/core/utils/logging/ErrorMacros.h | 4 ++-- .../core/utils/stream/ConcurrentStreamBuf.h | 5 ++++ .../source/client/AWSClient.cpp | 8 +++++++ .../source/http/curl/CurlHttpClient.cpp | 23 ++++++++++++------- .../utils/stream/ConcurrentStreamBuf.cpp | 15 ++++++------ .../utils/threading/DefaultExecutor.cpp | 1 - .../TranscribeErrorCaseTests.cpp | 2 +- 9 files changed, 54 insertions(+), 19 deletions(-) diff --git a/src/aws-cpp-sdk-core/include/aws/core/client/AWSClientAsyncCRTP.h b/src/aws-cpp-sdk-core/include/aws/core/client/AWSClientAsyncCRTP.h index 7210ee38d69..3ec8969caa3 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/client/AWSClientAsyncCRTP.h +++ b/src/aws-cpp-sdk-core/include/aws/core/client/AWSClientAsyncCRTP.h @@ -14,6 +14,8 @@ namespace Aws namespace Client { class AsyncCallerContext; + template + class BidirectionalEventStreamingTask; /** * A helper to determine if AWS Operation is EventStream-enabled or not (based on const-ness of the request) @@ -204,6 +206,9 @@ namespace Client return Aws::Client::MakeCallableOperation(AwsServiceClientT::GetAllocationTag(), operationFunc, clientThis, clientThis->m_clientConfiguration.executor.get()); } protected: + template + friend class BidirectionalEventStreamingTask; // allow BidirectionalEventStreamingTask to access m_isInitialized + std::atomic m_isInitialized; mutable std::atomic m_operationsProcessed; mutable std::condition_variable m_shutdownSignal; diff --git a/src/aws-cpp-sdk-core/include/aws/core/client/AWSClientEventStreamingAsyncTask.h b/src/aws-cpp-sdk-core/include/aws/core/client/AWSClientEventStreamingAsyncTask.h index f3e6138af48..bcef3211914 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/client/AWSClientEventStreamingAsyncTask.h +++ b/src/aws-cpp-sdk-core/include/aws/core/client/AWSClientEventStreamingAsyncTask.h @@ -91,6 +91,16 @@ class AWS_CORE_LOCAL BidirectionalEventStreamingTask final { * @return OutcomeT, operation response, NoResult on success. (Check InitialResponse for the actual service reply). */ OutcomeT operator()() { + if(!m_clientThis->m_isInitialized) + { + AWS_LOGSTREAM_ERROR("BidirectionalEventStreamingTask", "Unable to call " << + m_pRequest->GetServiceRequestName() << ": client is not initialized (or already terminated)"); + m_handler(m_clientThis, *m_pRequest, Aws::Client::AWSError(CoreErrors::NOT_INITIALIZED, + "NOT_INITIALIZED", "Client is not initialized or already terminated", false), m_handlerContext); + return OutcomeT(NoResult()); + } + Aws::Utils::RAIICounter raiiGuard(m_clientThis->m_operationsProcessed, &m_clientThis->m_shutdownSignal); + const auto outcome = m_clientThis->MakeRequest(*m_pRequest, m_endpoint, m_method, m_signerName); if (outcome.IsSuccess()) { m_handler(m_clientThis, *m_pRequest, OutcomeT(NoResult()), m_handlerContext); diff --git a/src/aws-cpp-sdk-core/include/aws/core/utils/logging/ErrorMacros.h b/src/aws-cpp-sdk-core/include/aws/core/utils/logging/ErrorMacros.h index dbfd75e82d7..d167d598fe3 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/utils/logging/ErrorMacros.h +++ b/src/aws-cpp-sdk-core/include/aws/core/utils/logging/ErrorMacros.h @@ -64,7 +64,7 @@ if(!m_isInitialized) \ AWS_LOGSTREAM_ERROR(#OPERATION, "Unable to call " #OPERATION ": client is not initialized (or already terminated)"); \ return Aws::Client::AWSError(CoreErrors::NOT_INITIALIZED, "NOT_INITIALIZED", "Client is not initialized or already terminated", false); \ } \ -Aws::Utils::RAIICounter(this->m_operationsProcessed, &this->m_shutdownSignal) +Aws::Utils::RAIICounter raiiGuard(this->m_operationsProcessed, &this->m_shutdownSignal) #define AWS_ASYNC_OPERATION_GUARD(OPERATION) \ if(!m_isInitialized) \ @@ -72,4 +72,4 @@ if(!m_isInitialized) \ AWS_LOGSTREAM_ERROR(#OPERATION, "Unable to call " #OPERATION ": client is not initialized (or already terminated)"); \ return handler(this, request, Aws::Client::AWSError(CoreErrors::NOT_INITIALIZED, "NOT_INITIALIZED", "Client is not initialized or already terminated", false), handlerContext); \ } \ -Aws::Utils::RAIICounter(this->m_operationsProcessed, &this->m_shutdownSignal) +Aws::Utils::RAIICounter raiiGuard(this->m_operationsProcessed, &this->m_shutdownSignal) diff --git a/src/aws-cpp-sdk-core/include/aws/core/utils/stream/ConcurrentStreamBuf.h b/src/aws-cpp-sdk-core/include/aws/core/utils/stream/ConcurrentStreamBuf.h index 307178becc8..aef9ad6dd74 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/utils/stream/ConcurrentStreamBuf.h +++ b/src/aws-cpp-sdk-core/include/aws/core/utils/stream/ConcurrentStreamBuf.h @@ -41,6 +41,11 @@ namespace Aws */ bool WaitForDrain(int64_t timeoutMs); + /** + * A flag returned by underflow() if there is no data available at the moment but stream must not be closed yet. + */ + static const int noData; + protected: std::streampos seekoff(std::streamoff off, std::ios_base::seekdir dir, std::ios_base::openmode which = std::ios_base::in | std::ios_base::out) override; std::streampos seekpos(std::streampos pos, std::ios_base::openmode which = std::ios_base::in | std::ios_base::out) override; diff --git a/src/aws-cpp-sdk-core/source/client/AWSClient.cpp b/src/aws-cpp-sdk-core/source/client/AWSClient.cpp index 801fdb74b27..205970795c8 100644 --- a/src/aws-cpp-sdk-core/source/client/AWSClient.cpp +++ b/src/aws-cpp-sdk-core/source/client/AWSClient.cpp @@ -357,6 +357,14 @@ HttpResponseOutcome AWSClient::AttemptExhaustively(const Aws::Http::URI& uri, { break; } + if (request.IsEventStreamRequest() && + (request.GetBody()->eof() || + (outcome.GetError().GetResponseCode() != Http::HttpResponseCode::REQUEST_NOT_MADE && + outcome.GetError().GetResponseCode() != Http::HttpResponseCode::NETWORK_CONNECT_TIMEOUT && + outcome.GetError().GetResponseCode() != Http::HttpResponseCode::SERVICE_UNAVAILABLE))) { + AWS_LOGSTREAM_ERROR(AWS_CLIENT_LOG_TAG, "SDK is not able to retry EventStream request after the connection was established"); + break; + } AWS_LOGSTREAM_WARN(AWS_CLIENT_LOG_TAG, "Request failed, now waiting " << sleepMillis << " ms before attempting again."); if(request.GetBody()) diff --git a/src/aws-cpp-sdk-core/source/http/curl/CurlHttpClient.cpp b/src/aws-cpp-sdk-core/source/http/curl/CurlHttpClient.cpp index f67f123ea08..b34aeeb556a 100644 --- a/src/aws-cpp-sdk-core/source/http/curl/CurlHttpClient.cpp +++ b/src/aws-cpp-sdk-core/source/http/curl/CurlHttpClient.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -315,12 +316,18 @@ static size_t ReadBody(char* ptr, size_t size, size_t nmemb, void* userdata, boo { size_t amountRead = 0; if (isStreaming) { - if (!ioStream->eof() && ioStream->peek() != EOF) { - amountRead = (size_t)ioStream->readsome(ptr, amountToRead); + if (ioStream->bad()) { + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Input stream is bad!"); + return CURL_READFUNC_ABORT; } - if (amountRead == 0 && !ioStream->eof()) { + const int peekVal = ioStream->peek(); + if (peekVal == ConcurrentStreamBuf::noData) { return CURL_READFUNC_PAUSE; } + if (ioStream->eof() || peekVal == EOF) { + return 0; + } + amountRead = (size_t)ioStream->readsome(ptr, amountToRead); } else if (isAwsChunked && context->m_chunkedStream != nullptr) { amountRead = context->m_chunkedStream->BufferedRead(ptr, amountToRead); } else { @@ -417,9 +424,9 @@ int CurlHttpClient::CurlProgressCallback(void *userdata, double, double, double, } const int peekVal = ioStream->peek(); - if (ioStream->eof() && peekVal == std::char_traits::eof()) { + if (ioStream->eof() || peekVal == std::char_traits::eof()) { // curl won't call ReadBody after the last ReadBody call returns 0 - // however, this Progress method is still called few times for incoming data. + curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT); return 0; } @@ -432,10 +439,10 @@ int CurlHttpClient::CurlProgressCallback(void *userdata, double, double, double, // we should use multi handle or another HTTP client in the future to avoid this curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT); } else { - if (peekVal != std::char_traits::eof()) { - curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT); - } else { + if (peekVal == ConcurrentStreamBuf::noData) { curl_easy_pause(context->m_curlHandle, CURLPAUSE_SEND); + } else { + curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT); } } diff --git a/src/aws-cpp-sdk-core/source/utils/stream/ConcurrentStreamBuf.cpp b/src/aws-cpp-sdk-core/source/utils/stream/ConcurrentStreamBuf.cpp index 7f9aaa8771b..f2069a6f507 100644 --- a/src/aws-cpp-sdk-core/source/utils/stream/ConcurrentStreamBuf.cpp +++ b/src/aws-cpp-sdk-core/source/utils/stream/ConcurrentStreamBuf.cpp @@ -15,10 +15,11 @@ namespace Aws namespace Stream { const char TAG[] = "ConcurrentStreamBuf"; + + const int ConcurrentStreamBuf::noData = ((((('n' << 8) | 'z') << 8) | 'm') << 8) | 'a'; + ConcurrentStreamBuf::ConcurrentStreamBuf(size_t bufferLength) : - m_putArea(bufferLength), // we access [0] of the put area below so we must initialize it. - m_eofInput(false), - m_eofOutput(false) + m_putArea(bufferLength) // we access [0] of the put area below so we must initialize it. { m_getArea.reserve(bufferLength); m_backbuf.reserve(bufferLength); @@ -163,7 +164,7 @@ namespace Aws if (!lock.try_lock()) { // don't block consumer, it will retry asking later - return 'z'; // just returning some valid value other than EOF + return noData; } if (m_eofInput && m_backbuf.empty()) @@ -187,10 +188,10 @@ namespace Aws char* gbegin = reinterpret_cast(m_getArea.data()); setg(gbegin, gbegin, gbegin + m_getArea.size()); - if (!m_getArea.empty()) + if (!m_getArea.empty()) { return std::char_traits::to_int_type(*gptr()); - else - return 'a'; // just returning some valid value other than EOF + } + return noData; } int ConcurrentStreamBuf::uflow() diff --git a/src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp b/src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp index bd8c5682c1f..e8d4e1cee1a 100644 --- a/src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp +++ b/src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp @@ -126,7 +126,6 @@ void DefaultExecutor::impl::Detach(std::thread::id id) { assert(it != m_tasks.end()); it->second.first.detach(); m_tasks.erase(it); - m_state = State::Free; m_cv.notify_one(); } diff --git a/tests/aws-cpp-sdk-transcribestreaming-integ-tests/TranscribeErrorCaseTests.cpp b/tests/aws-cpp-sdk-transcribestreaming-integ-tests/TranscribeErrorCaseTests.cpp index 421b7590690..6721ff87c0a 100644 --- a/tests/aws-cpp-sdk-transcribestreaming-integ-tests/TranscribeErrorCaseTests.cpp +++ b/tests/aws-cpp-sdk-transcribestreaming-integ-tests/TranscribeErrorCaseTests.cpp @@ -217,7 +217,7 @@ TEST_F(TranscribeStreamingErrorTests, TranscribeTerminateByLowSpeedLimit) { } TestTrace(Aws::String("Writing good event")); if (!stream.WriteAudioEvent(event)) { - AWS_ADD_FAILURE("Failed to write an audio event"); + // the stream may be force closed by timeout, no test assertion here. break; } }