Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions src/core/src/aio.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,46 @@ TEST(core, operation_failed)

EXPECT_TRUE(utils::filesystem::remove_path(tmp_file));
}

// Pins the zero-length aio contract that the NFS file-copy plugin depends on:
// a zero-length write/read completes with ERR_HANDLE_EOF (not ERR_OK) and count 0.
// Because of this, nfs_service_impl::on_copy / nfs_client_impl::continue_write must
// special-case empty (size==0) files instead of issuing a zero-length read/write,
// which would otherwise be reported to the copy caller as a failure.
TEST(core, aio_zero_length)
{
// if in dsn_mimic_app() and disk_io_mode == IOE_PER_QUEUE
if (task::get_current_disk() == nullptr) return;

ASSERT_TRUE(::dsn::utils::test::prepare_test_tmp_dir("dsn.core.aio_zero"));
const std::string tmp_file =
::dsn::utils::test::test_tmp_path("dsn.core.aio_zero", "zero_test_file");

::dsn::error_code werr;
size_t wcount = 0xdead;
::dsn::error_code rerr;
size_t rcount = 0xdead;
char buffer[8] = {};

// zero-length write to a freshly created file
auto fp = dsn_file_open(tmp_file.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666);
ASSERT_TRUE(fp != nullptr);
auto t = ::dsn::file::write(fp, buffer, 0, 0, LPC_AIO_TEST, nullptr,
[&werr, &wcount](::dsn::error_code e, size_t n) { werr = e; wcount = n; }, 0);
t->wait();
EXPECT_TRUE(werr == ERR_HANDLE_EOF);
EXPECT_EQ(0u, wcount);
dsn_file_close(fp);

// zero-length read from the (now empty) file
auto fp2 = dsn_file_open(tmp_file.c_str(), O_RDONLY | O_BINARY, 0);
ASSERT_TRUE(fp2 != nullptr);
t = ::dsn::file::read(fp2, buffer, 0, 0, LPC_AIO_TEST, nullptr,
[&rerr, &rcount](::dsn::error_code e, size_t n) { rerr = e; rcount = n; }, 0);
t->wait();
EXPECT_TRUE(rerr == ERR_HANDLE_EOF);
EXPECT_EQ(0u, rcount);
dsn_file_close(fp2);

EXPECT_TRUE(utils::filesystem::remove_path(tmp_file));
}
27 changes: 19 additions & 8 deletions src/core/src/disk_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,25 @@ aio_task* disk_file::on_write_completed(aio_task* wk, void* ctx, error_code err,
if (err == ERR_OK)
{
size_t this_size = (size_t)wk->aio()->buffer_size;
dassert(size >= this_size,
"written buffer size does not equal to input buffer's size: %d vs %d",
(int)size,
(int)this_size
);

wk->enqueue(err, this_size);
size -= this_size;
if (size < this_size)
{
// Short write from the underlying provider (e.g. the disk is full):
// fewer bytes were written than this task requested. Report a
// file-operation failure for this task and the remaining batched
// tasks (whose data was not written either) through the aio callback
// channel instead of aborting the whole process.
derror("disk write completed with fewer bytes than requested: %d vs %d",
(int)size,
(int)this_size
);
wk->enqueue(ERR_FILE_OPERATION_FAILED, size);
size = 0;
}
else
{
wk->enqueue(err, this_size);
size -= this_size;
}
}
else
{
Expand Down
14 changes: 12 additions & 2 deletions src/dev/cpp/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,18 @@ namespace dsn {
{
bool succ;

dassert((typeflag == FTW_F) || (typeflag == FTW_DP),
"Invalid typeflag = %d.", typeflag);
// nftw reports FTW_NS when stat() fails on an entry (a transient
// I/O error, a permission problem, or a concurrent removal) and
// FTW_DNR for an unreadable directory. These are recoverable, so
// stop the walk and let remove_directory()/remove_path() return
// false through their existing bool channel instead of aborting
// the whole process.
if ((typeflag != FTW_F) && (typeflag != FTW_DP))
{
dwarn("remove path %s failed: unexpected file tree walk typeflag = %d",
fpath, typeflag);
return FTW_STOP;
}
#ifdef _WIN32
if (typeflag != FTW_F)
{
Expand Down
7 changes: 6 additions & 1 deletion src/plugins/tools.common/native_aio_provider.linux.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ namespace dsn {
native_linux_aio_provider::~native_linux_aio_provider()
{
auto ret = io_destroy(_ctx);
dassert(ret == 0, "io_destroy error, ret = %d", ret);
if (ret != 0)
{
// A destructor must not abort. io_destroy can fail (e.g. EINVAL on an
// already-released context); log and continue so shutdown proceeds.
derror("io_destroy error, ret = %d", ret);
}
}

void native_linux_aio_provider::start(io_modifer& ctx)
Expand Down
195 changes: 137 additions & 58 deletions src/plugins/tools.nfs/nfs_client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,22 @@ namespace dsn {
return;
}

// size_list and file_list are parallel arrays in the (remote, untrusted)
// response, but the loop below iterates size_list while indexing file_list.
// A malformed or corrupted response with mismatched lengths would otherwise
// read file_list out of bounds. Validate the lengths match and fail the
// request through the existing nfs_task error channel.
if (resp.size_list.size() != resp.file_list.size())
{
derror("invalid get file size response: size_list (%d) and file_list (%d) "
"have mismatched lengths",
(int)resp.size_list.size(),
(int)resp.file_list.size());
ureq->nfs_task->enqueue(ERR_INVALID_DATA, 0);
delete ureq;
return;
}

for (size_t i = 0; i < resp.size_list.size(); i++) // file list
{
file_context *filec;
Expand Down Expand Up @@ -234,7 +250,28 @@ namespace dsn {
handle_completion(reqc->file_ctx->user_req, err);
return;
}


// response.size and response.file_content are independent fields filled in by the
// (untrusted) remote server. The local write issues
// file::write(response.file_content.data(), response.size, ...), so a response whose
// declared size is negative or larger than the actual file_content buffer would make
// that write read past the end of the buffer -- corrupting the destination file with
// adjacent heap memory (and disclosing it into the replicated data) or crashing. This
// is the client-side counterpart of the size validation on_copy already performs on
// the request. A zero size is legal (empty-file segment) and is handled specially in
// continue_write. On the honest path file_content is the full block buffer and size is
// the valid prefix, so size <= file_content.length() always holds.
if (resp.size < 0 || static_cast<uint64_t>(resp.size) > resp.file_content.length())
{
derror("nfs: invalid copy response for file %s: declared size %d exceeds "
"content buffer length %u",
reqc->file_ctx->file_name.c_str(),
resp.size,
(uint32_t)resp.file_content.length());
handle_completion(reqc->file_ctx->user_req, ERR_INVALID_DATA);
return;
}

reqc->response = resp;
reqc->response.error.end_tracking(); // always ERR_OK
reqc->is_ready_for_write = true;
Expand Down Expand Up @@ -279,82 +316,121 @@ namespace dsn {
return;
}

// get write
dsn::ref_ptr<copy_request_ex> reqc;
// Loop rather than recurse on per-file failures: a persistent error
// (e.g. a full disk that fails every transfer) would otherwise make
// continue_write() invoke itself once per failed request and could
// grow the call stack without bound.
while (true)
{
// get write
dsn::ref_ptr<copy_request_ex> reqc;
while (true)
{
zauto_lock l(_local_writes_lock);
if (!_local_writes.empty())
{
reqc = _local_writes.front();
_local_writes.pop();
zauto_lock l(_local_writes_lock);
if (!_local_writes.empty())
{
reqc = _local_writes.front();
_local_writes.pop();
}
else
{
reqc = nullptr;
break;
}
}
else

{
reqc = nullptr;
break;
}
zauto_lock l(reqc->lock);
if (reqc->is_valid)
break;
}
}

if (nullptr == reqc)
{
zauto_lock l(reqc->lock);
if (reqc->is_valid)
break;
--_concurrent_local_write_count;
return;
}
}

if (nullptr == reqc)
{
--_concurrent_local_write_count;
return;
}
// real write
std::string file_path = dsn::utils::filesystem::path_combine(reqc->copy_req.dst_dir, reqc->file_ctx->file_name);
std::string path = dsn::utils::filesystem::remove_file_name(file_path.c_str());
if (!dsn::utils::filesystem::create_directory(path))
{
derror("create directory %s failed", path.c_str());
error_code err = ERR_FILE_OPERATION_FAILED;
handle_completion(reqc->file_ctx->user_req, err);
continue;
}

// real write
std::string file_path = dsn::utils::filesystem::path_combine(reqc->copy_req.dst_dir, reqc->file_ctx->file_name);
std::string path = dsn::utils::filesystem::remove_file_name(file_path.c_str());
if (!dsn::utils::filesystem::create_directory(path))
{
dassert(false, "Fail to create directory %s.", path.c_str());
}
dsn_handle_t hfile = reqc->file_ctx->file.load();
if (!hfile)
{
zauto_lock l(reqc->file_ctx->user_req->user_req_lock);
hfile = reqc->file_ctx->file.load();
if (!hfile)
{
hfile = dsn_file_open(file_path.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666);
reqc->file_ctx->file = hfile;
}
}

dsn_handle_t hfile = reqc->file_ctx->file.load();
if (!hfile)
{
zauto_lock l(reqc->file_ctx->user_req->user_req_lock);
hfile = reqc->file_ctx->file.load();
if (!hfile)
{
hfile = dsn_file_open(file_path.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666);
reqc->file_ctx->file = hfile;
derror("file open %s failed", file_path.c_str());
error_code err = ERR_FILE_OPERATION_FAILED;
handle_completion(reqc->file_ctx->user_req, err);
continue;
}
}

if (!hfile)
{
derror("file open %s failed", file_path.c_str());
error_code err = ERR_FILE_OPERATION_FAILED;
handle_completion(reqc->file_ctx->user_req, err);
--_concurrent_local_write_count;
continue_write();
return;
}
// An empty source file yields a copy request with response.size == 0. The
// destination file has just been created (or already exists) via the
// O_RDWR | O_CREAT open above, so there is nothing to write: a zero-length
// file::write would complete with ERR_HANDLE_EOF and be reported as a
// failure. Finalize this segment as a success inline -- mirroring the
// success bookkeeping in local_write_callback -- and continue the loop
// (no recursion) instead of issuing the zero-length write.
if (reqc->response.size == 0)
{
reqc->response.file_content = blob();

{
zauto_lock l(reqc->lock);
auto& reqc_save = *reqc.get();
reqc_save.local_write_task = file::write(
hfile,
reqc_save.response.file_content.data(),
reqc_save.response.size,
reqc_save.response.offset,
LPC_NFS_WRITE,
this,
[this, reqc_cap = std::move(reqc)] (error_code err, int sz)
bool completed = false;
{
local_write_callback(err, sz, std::move(reqc_cap));
zauto_lock l(reqc->file_ctx->user_req->user_req_lock);
if (++reqc->file_ctx->finished_segments == (int)reqc->file_ctx->copy_requests.size())
{
if (++reqc->file_ctx->user_req->finished_files == (int)reqc->file_ctx->user_req->file_context_map.size())
{
completed = true;
}
}
}

if (completed)
{
handle_completion(reqc->file_ctx->user_req, ERR_OK);
}
);
continue;
}

{
zauto_lock l(reqc->lock);
auto& reqc_save = *reqc.get();
reqc_save.local_write_task = file::write(
hfile,
reqc_save.response.file_content.data(),
reqc_save.response.size,
reqc_save.response.offset,
LPC_NFS_WRITE,
this,
[this, reqc_cap = std::move(reqc)] (error_code err, int sz)
{
local_write_callback(err, sz, std::move(reqc_cap));
}
);
}
return;
}
}

Expand Down Expand Up @@ -443,7 +519,10 @@ namespace dsn {
if (f.second->file)
{
auto err2 = dsn_file_close(f.second->file);
dassert(err2 == ERR_OK, "dsn_file_close failed, err = %s", dsn_error_to_string(err2));
if (err2 != ERR_OK)
{
dwarn("dsn_file_close failed, err = %s", dsn_error_to_string(err2));
}

f.second->file = nullptr;

Expand Down
Loading
Loading