Skip to content

Fix high CPU utilization regression on event streaming #3318

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ namespace Aws
namespace Client
{
class AsyncCallerContext;
template <typename OutcomeT, typename ClientT, typename AWSEndpointT, typename RequestT, typename HandlerT>
class BidirectionalEventStreamingTask;

/**
* A helper to determine if AWS Operation is EventStream-enabled or not (based on const-ness of the request)
Expand Down Expand Up @@ -204,6 +206,9 @@ namespace Client
return Aws::Client::MakeCallableOperation(AwsServiceClientT::GetAllocationTag(), operationFunc, clientThis, clientThis->m_clientConfiguration.executor.get());
}
protected:
template <typename OutcomeT, typename ClientT, typename AWSEndpointT, typename RequestT, typename HandlerT>
friend class BidirectionalEventStreamingTask; // allow BidirectionalEventStreamingTask to access m_isInitialized

std::atomic<bool> m_isInitialized;
mutable std::atomic<size_t> m_operationsProcessed;
mutable std::condition_variable m_shutdownSignal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ class AWS_CORE_LOCAL BidirectionalEventStreamingTask final {
* @return OutcomeT, operation response, NoResult on success. (Check InitialResponse for the actual service reply).
*/
OutcomeT operator()() {
if(!m_clientThis->m_isInitialized)
{
AWS_LOGSTREAM_ERROR("BidirectionalEventStreamingTask", "Unable to call " <<
m_pRequest->GetServiceRequestName() << ": client is not initialized (or already terminated)");
m_handler(m_clientThis, *m_pRequest, Aws::Client::AWSError<CoreErrors>(CoreErrors::NOT_INITIALIZED,
"NOT_INITIALIZED", "Client is not initialized or already terminated", false), m_handlerContext);
return OutcomeT(NoResult());
}
Aws::Utils::RAIICounter raiiGuard(m_clientThis->m_operationsProcessed, &m_clientThis->m_shutdownSignal);

const auto outcome = m_clientThis->MakeRequest(*m_pRequest, m_endpoint, m_method, m_signerName);
if (outcome.IsSuccess()) {
m_handler(m_clientThis, *m_pRequest, OutcomeT(NoResult()), m_handlerContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ if(!m_isInitialized) \
AWS_LOGSTREAM_ERROR(#OPERATION, "Unable to call " #OPERATION ": client is not initialized (or already terminated)"); \
return Aws::Client::AWSError<CoreErrors>(CoreErrors::NOT_INITIALIZED, "NOT_INITIALIZED", "Client is not initialized or already terminated", false); \
} \
Aws::Utils::RAIICounter(this->m_operationsProcessed, &this->m_shutdownSignal)
Aws::Utils::RAIICounter raiiGuard(this->m_operationsProcessed, &this->m_shutdownSignal)

#define AWS_ASYNC_OPERATION_GUARD(OPERATION) \
if(!m_isInitialized) \
{ \
AWS_LOGSTREAM_ERROR(#OPERATION, "Unable to call " #OPERATION ": client is not initialized (or already terminated)"); \
return handler(this, request, Aws::Client::AWSError<CoreErrors>(CoreErrors::NOT_INITIALIZED, "NOT_INITIALIZED", "Client is not initialized or already terminated", false), handlerContext); \
} \
Aws::Utils::RAIICounter(this->m_operationsProcessed, &this->m_shutdownSignal)
Aws::Utils::RAIICounter raiiGuard(this->m_operationsProcessed, &this->m_shutdownSignal)
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ namespace Aws
*/
bool WaitForDrain(int64_t timeoutMs);

/**
* A flag returned by underflow() if there is no data available at the moment but stream must not be closed yet.
*/
static const int noData;

protected:
std::streampos seekoff(std::streamoff off, std::ios_base::seekdir dir, std::ios_base::openmode which = std::ios_base::in | std::ios_base::out) override;
std::streampos seekpos(std::streampos pos, std::ios_base::openmode which = std::ios_base::in | std::ios_base::out) override;
Expand Down
8 changes: 8 additions & 0 deletions src/aws-cpp-sdk-core/source/client/AWSClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,14 @@ HttpResponseOutcome AWSClient::AttemptExhaustively(const Aws::Http::URI& uri,
{
break;
}
if (request.IsEventStreamRequest() &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also extend this change to the smithy client else pending fixes will pile up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to have a completely different design of streaming for the smithy client.
also smithy client does not use for loop for retry logic.

(request.GetBody()->eof() ||
(outcome.GetError().GetResponseCode() != Http::HttpResponseCode::REQUEST_NOT_MADE &&
outcome.GetError().GetResponseCode() != Http::HttpResponseCode::NETWORK_CONNECT_TIMEOUT &&
outcome.GetError().GetResponseCode() != Http::HttpResponseCode::SERVICE_UNAVAILABLE))) {
AWS_LOGSTREAM_ERROR(AWS_CLIENT_LOG_TAG, "SDK is not able to retry EventStream request after the connection was established");
break;
}

AWS_LOGSTREAM_WARN(AWS_CLIENT_LOG_TAG, "Request failed, now waiting " << sleepMillis << " ms before attempting again.");
if(request.GetBody())
Expand Down
23 changes: 15 additions & 8 deletions src/aws-cpp-sdk-core/source/http/curl/CurlHttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <aws/core/utils/logging/LogMacros.h>
#include <aws/core/utils/ratelimiter/RateLimiterInterface.h>
#include <aws/core/utils/stream/AwsChunkedStream.h>
#include <aws/core/utils/stream/ConcurrentStreamBuf.h>

#include <algorithm>
#include <cassert>
Expand Down Expand Up @@ -315,12 +316,18 @@ static size_t ReadBody(char* ptr, size_t size, size_t nmemb, void* userdata, boo
{
size_t amountRead = 0;
if (isStreaming) {
if (!ioStream->eof() && ioStream->peek() != EOF) {
amountRead = (size_t)ioStream->readsome(ptr, amountToRead);
if (ioStream->bad()) {
AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Input stream is bad!");
return CURL_READFUNC_ABORT;
}
if (amountRead == 0 && !ioStream->eof()) {
const int peekVal = ioStream->peek();
if (peekVal == ConcurrentStreamBuf::noData) {
return CURL_READFUNC_PAUSE;
}
if (ioStream->eof() || peekVal == EOF) {
return 0;
}
amountRead = (size_t)ioStream->readsome(ptr, amountToRead);
} else if (isAwsChunked && context->m_chunkedStream != nullptr) {
amountRead = context->m_chunkedStream->BufferedRead(ptr, amountToRead);
} else {
Expand Down Expand Up @@ -417,9 +424,9 @@ int CurlHttpClient::CurlProgressCallback(void *userdata, double, double, double,
}

const int peekVal = ioStream->peek();
if (ioStream->eof() && peekVal == std::char_traits<char>::eof()) {
if (ioStream->eof() || peekVal == std::char_traits<char>::eof()) {
// curl won't call ReadBody after the last ReadBody call returns 0
// however, this Progress method is still called few times for incoming data.
curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT);
return 0;
}

Expand All @@ -432,10 +439,10 @@ int CurlHttpClient::CurlProgressCallback(void *userdata, double, double, double,
// we should use multi handle or another HTTP client in the future to avoid this
curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT);
} else {
if (peekVal != std::char_traits<char>::eof()) {
curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT);
} else {
if (peekVal == ConcurrentStreamBuf::noData) {
curl_easy_pause(context->m_curlHandle, CURLPAUSE_SEND);
} else {
curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT);
}
}

Expand Down
15 changes: 8 additions & 7 deletions src/aws-cpp-sdk-core/source/utils/stream/ConcurrentStreamBuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ namespace Aws
namespace Stream
{
const char TAG[] = "ConcurrentStreamBuf";

const int ConcurrentStreamBuf::noData = ((((('n' << 8) | 'z') << 8) | 'm') << 8) | 'a';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is 'n' and 'm' evaluated ? I see that 'z' and 'a' states are treated now as same state when try lock can't be achieved or m_getArea is empty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcurrentStreamBuf::noData is a magic variable with a value of 1634564718
1634564718 represents an integer where the bytes are 'a', 'm', 'z', and 'n'. Just a bit of fun. I thought about also using leetspeak, but decided to use something that we can claim as a reserved variable/keyword.
As mentioned on the comment, it is a

A flag returned by underflow() if there is no data available at the moment but stream must not be closed yet.

We can't return -1 - it is already reserved for eof.
We can't return single byte - it will be treated as a valid user value from the stream.
But we still can return some int.
Honestly, it is still kind of an "implementation defined hack", but it works.


ConcurrentStreamBuf::ConcurrentStreamBuf(size_t bufferLength) :
m_putArea(bufferLength), // we access [0] of the put area below so we must initialize it.
m_eofInput(false),
m_eofOutput(false)
m_putArea(bufferLength) // we access [0] of the put area below so we must initialize it.
{
m_getArea.reserve(bufferLength);
m_backbuf.reserve(bufferLength);
Expand Down Expand Up @@ -163,7 +164,7 @@ namespace Aws
if (!lock.try_lock())
{
// don't block consumer, it will retry asking later
return 'z'; // just returning some valid value other than EOF
return noData;
}

if (m_eofInput && m_backbuf.empty())
Expand All @@ -187,10 +188,10 @@ namespace Aws
char* gbegin = reinterpret_cast<char*>(m_getArea.data());
setg(gbegin, gbegin, gbegin + m_getArea.size());

if (!m_getArea.empty())
if (!m_getArea.empty()) {
return std::char_traits<char>::to_int_type(*gptr());
else
return 'a'; // just returning some valid value other than EOF
}
return noData;
}

int ConcurrentStreamBuf::uflow()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ void DefaultExecutor::impl::Detach(std::thread::id id) {
assert(it != m_tasks.end());
it->second.first.detach();
m_tasks.erase(it);
m_state = State::Free;
m_cv.notify_one();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ TEST_F(TranscribeStreamingErrorTests, TranscribeTerminateByLowSpeedLimit) {
}
TestTrace(Aws::String("Writing good event"));
if (!stream.WriteAudioEvent(event)) {
AWS_ADD_FAILURE("Failed to write an audio event");
// the stream may be force closed by timeout, no test assertion here.
break;
}
}
Expand Down
Loading