Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 41 additions & 40 deletions src/perf/fiber-http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <Poco/Net/StreamSocket.h>

#include <cerrno>
#include <format>

#include <poll.h>
#include <unistd.h>
Expand Down Expand Up @@ -128,7 +129,8 @@ bool FiberSocketImpl::poll(const Poco::Timespan & timeout, int mode)

int FiberSocketImpl::sendBytes(const void * buffer, int length, int flags)
{
UNUSED(flags);
// Poco flags (MSG_NOSIGNAL, MSG_PEEK, etc.) are not plumbed through to io_uring read/write.
ASSERT(flags == 0, "FiberSocketImpl::sendBytes does not support flags (got {})", flags);

int total = 0;
const char * ptr = static_cast<const char *>(buffer);
Expand Down Expand Up @@ -165,27 +167,44 @@ int FiberSocketImpl::sendBytes(const void * buffer, int length, int flags)
return total;
}

void FiberSocketImpl::shutdown()
int FiberSocketImpl::receiveBytes(void * buffer, int length, int flags)
{
int fd = atomicFd.load(std::memory_order_relaxed);
if (fd < 0)
// Poco flags (MSG_NOSIGNAL, MSG_PEEK, etc.) are not plumbed through to io_uring read/write.
ASSERT(flags == 0, "FiberSocketImpl::receiveBytes does not support flags (got {})", flags);

#if defined(USE_IO_URING_RW)
uint64_t bytesRead = 0;
int r = silk::FiberScheduler::read(sockfd(), buffer, static_cast<uint64_t>(length), 0, &bytesRead);
if (r)
{
return;
error(r, "recv");
}

int r = ::shutdown(fd, SHUT_RDWR);
if (r < 0)
return static_cast<int>(bytesRead);
#else
for (;;)
{
r = errno;
error(r, "shutdown");
ssize_t count = ::recv(sockfd(), buffer, static_cast<size_t>(length), 0);
if (count >= 0)
{
return static_cast<int>(count);
}
int r = errno;
if (r == EAGAIN)
{
r = silk::FiberScheduler::poll(sockfd(), POLLIN);
if (!r)
{
continue;
}
}
error(r, "recv");
}
#endif
}

Poco::Net::SocketImpl * FiberServerSocketImpl::acceptConnection(Poco::Net::SocketAddress & clientAddr)
{
silk::FiberScheduler::IoFuture pollFuture;
silk::FiberScheduler::poll(sockfd(), POLLIN, nullptr, &pollFuture);
int r = pollFuture.wait();
int r = silk::FiberScheduler::poll(sockfd(), POLLIN);
if (r)
{
error(r, "accept poll");
Expand All @@ -204,36 +223,18 @@ Poco::Net::SocketImpl * FiberServerSocketImpl::acceptConnection(Poco::Net::Socke
return new FiberSocketImpl(fd);
}

int FiberSocketImpl::receiveBytes(void * buffer, int length, int flags)
void FiberSocketImpl::shutdown()
{
UNUSED(flags);

#if defined(USE_IO_URING_RW)
uint64_t bytesRead = 0;
int r = silk::FiberScheduler::read(sockfd(), buffer, static_cast<uint64_t>(length), 0, &bytesRead);
if (r)
int fd = atomicFd.load(std::memory_order_relaxed);
if (fd < 0)
{
error(r, "recv");
return;
}
return static_cast<int>(bytesRead);
#else
for (;;)

int r = ::shutdown(fd, SHUT_RDWR);
if (r < 0)
{
ssize_t count = ::recv(sockfd(), buffer, static_cast<size_t>(length), 0);
if (count >= 0)
{
return static_cast<int>(count);
}
int r = errno;
if (r == EAGAIN)
{
r = silk::FiberScheduler::poll(sockfd(), POLLIN);
if (!r)
{
continue;
}
}
error(r, "recv");
r = errno;
error(r, "shutdown");
}
#endif
}
43 changes: 41 additions & 2 deletions src/perf/s3-perf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/core/client/ClientConfiguration.h>
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/core/http/HttpClient.h>
#include <aws/core/http/HttpClientFactory.h>
#include <aws/core/http/HttpRequest.h>
Expand Down Expand Up @@ -183,12 +184,16 @@ makeS3Client(const ClientConfig & cfg, std::shared_ptr<Aws::Utils::Threading::Ex
s3Config.useVirtualAddressing = false;
s3Config.payloadSigningPolicy = Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never;
s3Config.executor = std::move(executor);
// The bench measures single-attempt latency. With retries enabled the SDK
// would re-read slot.body on retry, which conflicts with the slot's reuse
// across iterations and would muddy the latency distribution.
s3Config.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(0L, 0L);

return std::make_unique<Aws::S3::S3Client>(
creds, std::make_shared<CachingS3EndpointProvider>(cfg.endpointUrl + "/" + cfg.bucket), s3Config);
}

static void printJson(std::vector<uint64_t> & latNs, const ClientConfig & cfg)
static void printJson(std::vector<uint64_t> & latNs, const ClientConfig & cfg, uint32_t failedSessions)
{
uint64_t total = latNs.size();
double durationS = static_cast<double>(cfg.durationNs) / 1e9;
Expand All @@ -206,6 +211,7 @@ static void printJson(std::vector<uint64_t> & latNs, const ClientConfig & cfg)
printf(" \"duration_s\": %.3f,\n", durationS);
printf(" \"total\": %lu,\n", total);
printf(" \"rps\": %.1f,\n", opsPerSec);
printf(" \"errors\": %u,\n", failedSessions);
printLatencyUs(latNs);
printCounters();
printf("}\n");
Expand Down Expand Up @@ -463,6 +469,12 @@ class FiberHttpClientFactory final : public PocoHttpClientFactory

/**
* FiberExecutor - runs SDK async tasks as fibers instead of OS threads.
*
* Note: SubmitToThread is unbounded -- FiberScheduler::run always succeeds, so
* unlike PooledThreadExecutor (which blocks when the pool saturates) this
* executor offers no backpressure. The s3-perf bench is bounded externally by
* numJobs * ioDepth, so this is harmless here. Code copying this class into
* production should add a bound (counting semaphore, slot accounting, etc.).
*/
class FiberExecutor final : public Aws::Utils::Threading::Executor
{
Expand Down Expand Up @@ -518,6 +530,8 @@ class S3Bench

std::vector<uint64_t> collectLatencies();

uint32_t failedSessions() const;

private:
struct Slot
{
Expand All @@ -531,6 +545,7 @@ class S3Bench
{
std::thread thread;
std::vector<uint64_t> latencies;
bool failed = false;
};

void submit(Slot & slot, uint32_t & count);
Expand Down Expand Up @@ -580,6 +595,19 @@ std::vector<uint64_t> S3Bench::collectLatencies()
return all;
}

uint32_t S3Bench::failedSessions() const
{
uint32_t count = 0;
for (const Session & session : sessions)
{
if (session.failed)
{
count++;
}
}
return count;
}

void S3Bench::submit(Slot & slot, uint32_t & count)
{
bool doPut;
Expand Down Expand Up @@ -664,7 +692,12 @@ void S3Bench::runSession(Session * session)

if (slots[head].error)
{
// Fail-fast: a single S3 error terminates the session. With a misconfigured
// endpoint or a transient failure every session exits after one bad outcome,
// and the run produces no useful latency data. failedSessions exposes the
// count so the caller can warn.
LOG_ERROR("S3 request failed: {}", slots[head].error->GetMessage());
session->failed = true;
head = (head + 1) % cfg.ioDepth;
break;
}
Expand Down Expand Up @@ -802,8 +835,14 @@ int main(int argc, char ** argv)
LOG_INFO("stopping benchmark");
bench.stop();

uint32_t failedSessions = bench.failedSessions();
if (failedSessions > 0)
{
LOG_WARN("{}/{} sessions exited early due to S3 errors", failedSessions, cfg.numJobs);
}

auto latencies = bench.collectLatencies();
printJson(latencies, cfg);
printJson(latencies, cfg, failedSessions);

s3.reset();
Aws::ShutdownAPI(sdkOptions);
Expand Down
Loading