diff --git a/include/dsn/cpp/callocator.h b/include/dsn/cpp/callocator.h index 25b17bf34..18c466e22 100644 --- a/include/dsn/cpp/callocator.h +++ b/include/dsn/cpp/callocator.h @@ -36,7 +36,9 @@ # pragma once # include +# include # include +# include namespace dsn { @@ -49,7 +51,12 @@ namespace dsn { public: void* operator new(size_t size) { - return a(uint32_t(size)); + void* ptr = a(uint32_t(size)); + if (ptr == nullptr) + { + throw std::bad_alloc(); + } + return ptr; } void operator delete(void* p) @@ -59,7 +66,12 @@ namespace dsn { void* operator new[](size_t size) { - return a((uint32_t)size); + void* ptr = a((uint32_t)size); + if (ptr == nullptr) + { + throw std::bad_alloc(); + } + return ptr; } void operator delete[](void* p) @@ -86,7 +98,23 @@ namespace dsn { T* allocate(size_type n, const void *hint = 0) { - return static_cast(a(uint32_t(n * sizeof(T)))); + if (n == 0) + { + return nullptr; + } + + // The underlying rDSN allocation API takes uint32_t bytes. + if (n > (std::numeric_limits::max)() / sizeof(T)) + { + throw std::bad_alloc(); + } + + T* ptr = static_cast(a(uint32_t(n * sizeof(T)))); + if (ptr == nullptr) + { + throw std::bad_alloc(); + } + return ptr; } void deallocate(T* p, size_type n) diff --git a/include/dsn/tool-api/task_spec.h b/include/dsn/tool-api/task_spec.h index 0eb48f926..a50271645 100644 --- a/include/dsn/tool-api/task_spec.h +++ b/include/dsn/tool-api/task_spec.h @@ -193,7 +193,7 @@ class task_spec : public extensible_object { public: DSN_API static task_spec* get(int ec); - DSN_API static void register_task_code(dsn_task_code_t code, dsn_task_type_t type, dsn_task_priority_t pri, dsn_threadpool_code_t pool); + DSN_API static bool register_task_code(dsn_task_code_t code, dsn_task_type_t type, dsn_task_priority_t pri, dsn_threadpool_code_t pool); public: // not configurable [ diff --git a/include/dsn/utility/customizable_id.h b/include/dsn/utility/customizable_id.h index 5b038a501..3cd6ca5ec 100644 --- a/include/dsn/utility/customizable_id.h +++ b/include/dsn/utility/customizable_id.h @@ -206,16 +206,23 @@ int customized_id_mgr::register_id(const char* name) return -1; } - int id = get_id(name); - if (-1 != id) + try { - return id; + int id = get_id(name); + if (-1 != id) + { + return id; + } + + int code = static_cast(_names.size()); + _names[std::string(name)] = code; + _names2.push_back(std::string(name)); + return code; + } + catch (...) + { + return -1; } - - int code = static_cast(_names.size()); - _names[std::string(name)] = code; - _names2.push_back(std::string(name)); - return code; } }} // end namespace dsn::utils diff --git a/src/core/src/coredump.win.cpp b/src/core/src/coredump.win.cpp index 51ee3018f..f701782c4 100644 --- a/src/core/src/coredump.win.cpp +++ b/src/core/src/coredump.win.cpp @@ -63,7 +63,7 @@ namespace dsn { s_dump_dir = dump_dir; if (::GetModuleBaseNameA(::GetCurrentProcess(), - ::GetModuleHandleA(NULL), + ::GetModuleHandleA(nullptr), s_app_name, 256 ) == 0) @@ -128,7 +128,7 @@ namespace dsn { s_dump_dir.c_str(), s_app_name, ::GetCurrentProcessId(), - (int64_t)time(NULL)); + (int64_t)time(nullptr)); if (len < 0 || static_cast(len) >= sizeof(szDumpPath)) { szResult = "failed to format dump file path"; @@ -136,8 +136,8 @@ namespace dsn { } // create the file - fh = ::CreateFileA(szDumpPath, GENERIC_WRITE, FILE_SHARE_WRITE, NULL, CREATE_ALWAYS, - FILE_ATTRIBUTE_NORMAL, NULL); + fh = ::CreateFileA(szDumpPath, GENERIC_WRITE, FILE_SHARE_WRITE, nullptr, CREATE_ALWAYS, + FILE_ATTRIBUTE_NORMAL, nullptr); if (fh == INVALID_HANDLE_VALUE) { len = snprintf(szScratch, diff --git a/src/core/src/main.cpp b/src/core/src/main.cpp index 220ed9e53..efb9d0375 100644 --- a/src/core/src/main.cpp +++ b/src/core/src/main.cpp @@ -57,6 +57,7 @@ # include "transient_memory.h" # include "library_utils.h" # include +# include # include # if defined(_WIN32) @@ -305,9 +306,9 @@ DSN_API void dsn_config_dump(const char* file) extern bool dsn_log_init(); // load all modules: local components, tools, frameworks, apps -static void load_all_modules(::dsn::configuration_ptr config) +static bool load_all_modules(::dsn::configuration_ptr config) { - std::vector< std::pair > modules; + std::vector> modules; std::map module_map; // name -> index in modules // load local components, toollets, and tools @@ -373,9 +374,8 @@ static void load_all_modules(::dsn::configuration_ptr config) auto hmod = ::dsn::utils::load_dynamic_library(m.first.c_str(), search_dirs); if (nullptr == hmod) { - dassert(false, "cannot load shared library '%s' specified in config file", - m.first.c_str()); - break; + derror("cannot load shared library '%s' specified in config file", m.first.c_str()); + return false; } else { @@ -386,10 +386,12 @@ static void load_all_modules(::dsn::configuration_ptr config) # if !defined(_WIN32) typedef void(*dsn_module_init_fn)(); dsn_module_init_fn init_fn = (dsn_module_init_fn)::dsn::utils::load_symbol(hmod, "dsn_module_init"); - dassert(init_fn != nullptr, - "dsn_module_init is not present (%s), use MODULE_INIT_BEGIN/END to define it", - m.first.c_str() - ); + if (init_fn == nullptr) + { + derror("dsn_module_init is not present (%s), use MODULE_INIT_BEGIN/END to define it", + m.first.c_str()); + return false; + } init_fn(); # endif // ! _WIN32 @@ -397,11 +399,13 @@ static void load_all_modules(::dsn::configuration_ptr config) if (m.second.length() > 0) { dsn_app_bridge_t bridge_ptr = (dsn_app_bridge_t)::dsn::utils::load_symbol(hmod, "dsn_app_bridge"); - dassert(bridge_ptr != nullptr, - "when dmodule_bridge_arguments is present (%s), function dsn_app_bridge must be implemented in module %s", - m.second.c_str(), - m.first.c_str() - ); + if (bridge_ptr == nullptr) + { + derror("when dmodule_bridge_arguments is present (%s), function dsn_app_bridge must be implemented in module %s", + m.second.c_str(), + m.first.c_str()); + return false; + } ddebug("call %s.dsn_app_bridge(...%s...)", m.first.c_str(), @@ -420,6 +424,7 @@ static void load_all_modules(::dsn::configuration_ptr config) bridge_ptr((int)args_ptr.size(), &args_ptr[0]); } } + return true; } void run_all_unit_tests_prepare_when_necessary(); @@ -469,7 +474,10 @@ bool run( } // load plugged modules - load_all_modules(dsn_all.config); + if (!load_all_modules(dsn_all.config)) + { + return false; + } // prepare unit test run if necessary run_all_unit_tests_prepare_when_necessary(); @@ -619,7 +627,11 @@ bool run( if (create_it) { - ::dsn::service_engine::fast_instance().start_node(sp); + if (::dsn::service_engine::fast_instance().start_node(sp) == nullptr) + { + fprintf(stderr, "Fail to start app %s.\n", sp.name.c_str()); + return false; + } } } @@ -758,12 +770,22 @@ DSN_API void dsn_run(int argc, char** argv, bool sleep_after_init) } } - if (!run(config, - config_args.size() > 0 ? config_args.c_str() : nullptr, - overwrites.size() > 0 ? overwrites.c_str() : nullptr, - sleep_after_init, - app_list - )) + bool run_succeeded = false; + try + { + run_succeeded = run(config, + config_args.size() > 0 ? config_args.c_str() : nullptr, + overwrites.size() > 0 ? overwrites.c_str() : nullptr, + sleep_after_init, + app_list + ); + } + catch (const std::exception& err) + { + fprintf(stderr, "run the system failed due to exception: %s\n", err.what()); + } + + if (!run_succeeded) { fprintf(stderr, "run the system failed\n"); dsn_exit(-1); @@ -779,8 +801,17 @@ DSN_API bool dsn_run_config(const char* config, bool sleep_after_init) return false; } - std::string name; - return run(config, nullptr, nullptr, sleep_after_init, name); + try + { + std::string name; + return run(config, nullptr, nullptr, sleep_after_init, name); + } + catch (const std::exception& err) + { + fprintf(stderr, "run the system failed due to exception: %s\n", err.what()); + } + + return false; } DSN_API int dsn_get_all_apps(dsn_app_info* info_buffer, int count) diff --git a/src/core/src/rpc_engine.cpp b/src/core/src/rpc_engine.cpp index 675a7da0e..dd3e4464c 100644 --- a/src/core/src/rpc_engine.cpp +++ b/src/core/src/rpc_engine.cpp @@ -613,8 +613,7 @@ namespace dsn { } else { - // mem leak, don't care as it halts the program - dassert(false, "create network failed, error_code: %s", ret.to_string()); + derror("create network failed, error_code: %s", ret.to_string()); return nullptr; } } diff --git a/src/core/src/service_api_c.cpp b/src/core/src/service_api_c.cpp index 6ceeb157c..255b6d75f 100644 --- a/src/core/src/service_api_c.cpp +++ b/src/core/src/service_api_c.cpp @@ -231,8 +231,12 @@ DSN_API dsn_task_code_t dsn_task_code_register( return static_cast(::dsn::TASK_CODE_INVALID); } - auto r = static_cast(::dsn::utils::customized_id_mgr::instance().register_id(name)); - ::dsn::task_spec::register_task_code(r, type, pri, pool); + auto r = static_cast( + ::dsn::utils::customized_id_mgr::instance().register_id(name)); + if (!::dsn::task_spec::register_task_code(r, type, pri, pool)) + { + return static_cast(::dsn::TASK_CODE_INVALID); + } return r; } @@ -1731,7 +1735,7 @@ static BOOL SuspendAllThreads() threads.find(ti.th32ThreadID) == threads.end()) { HANDLE hThread = ::OpenThread(THREAD_ALL_ACCESS, FALSE, ti.th32ThreadID); - if (hThread == NULL) + if (hThread == nullptr) { derror("OpenThread failed, err = %d", ::GetLastError()); goto err; diff --git a/src/core/src/service_engine.cpp b/src/core/src/service_engine.cpp index 8d9bd22bc..2fde0960c 100644 --- a/src/core/src/service_engine.cpp +++ b/src/core/src/service_engine.cpp @@ -386,7 +386,11 @@ error_code service_node::start() // start io engines (only timer, disk and rpc), others are started in app start task for (auto& io : _ios) { - start_io_engine_in_main(io); + err = start_io_engine_in_main(io); + if (err != ERR_OK) + { + return err; + } } // start task engine @@ -673,7 +677,11 @@ service_node* service_engine::start_node(service_app_spec& app_spec) auto node = new service_node(app_spec); error_code err = node->start(); - dassert (err == ERR_OK, "service node start failed, err = %s", err.to_string()); + if (err != ERR_OK) + { + derror("service node start failed, err = %s", err.to_string()); + return nullptr; + } _nodes_by_app_id[node->id()] = node; for (auto p1 : node->spec().ports) diff --git a/src/core/src/task_spec.cpp b/src/core/src/task_spec.cpp index b23fd47f2..0166d5db5 100644 --- a/src/core/src/task_spec.cpp +++ b/src/core/src/task_spec.cpp @@ -37,6 +37,7 @@ # include # include # include +# include # include # include # include @@ -48,59 +49,99 @@ namespace dsn { -void task_spec::register_task_code(dsn_task_code_t code, dsn_task_type_t type, dsn_task_priority_t pri, dsn_threadpool_code_t pool) +bool task_spec::register_task_code(dsn_task_code_t code, dsn_task_type_t type, dsn_task_priority_t pri, dsn_threadpool_code_t pool) { - dassert(pool != THREAD_POOL_INVALID, - "registered pool cannot be THREAD_POOL_INVALID for task %s, " - "make sure it is registered AFTER the pool is registered", - dsn_task_code_to_string(code) - ); - - if (!dsn::utils::singleton_vector_store::instance().contains(code)) + if (code < 0) { - task_spec* spec = new task_spec(code, dsn_task_code_to_string(code), type, pri, pool); - dsn::utils::singleton_vector_store::instance().put(code, spec); + return false; + } - if (type == TASK_TYPE_RPC_REQUEST) - { - std::string ack_name = std::string(dsn_task_code_to_string(code)) + std::string("_ACK"); - auto ack_code = dsn_task_code_register(ack_name.c_str(), TASK_TYPE_RPC_RESPONSE, pri, pool); - spec->rpc_paired_code = ack_code; - task_spec::get(ack_code)->rpc_paired_code = code; - } + if (pool == THREAD_POOL_INVALID) + { + derror("registered pool cannot be THREAD_POOL_INVALID for task %s, " + "make sure it is registered AFTER the pool is registered", + dsn_task_code_to_string(code)); + return false; } - else + + try { - auto spec = task_spec::get(code); - if (spec->type != type) + auto& store = dsn::utils::singleton_vector_store::instance(); + if (!store.contains(code)) { - dassert(false, "task code %s registerd for %s, which does not match with previously registered %s", - dsn_task_code_to_string(code), - enum_to_string(type), - enum_to_string(spec->type) - ); - return; + std::unique_ptr spec( + new task_spec(code, dsn_task_code_to_string(code), type, pri, pool)); + + if (type == TASK_TYPE_RPC_REQUEST) + { + std::string ack_name = std::string(dsn_task_code_to_string(code)) + std::string("_ACK"); + auto ack_code = dsn_task_code_register(ack_name.c_str(), TASK_TYPE_RPC_RESPONSE, pri, pool); + auto ack_spec = task_spec::get(ack_code); + if (ack_spec == nullptr) + { + return false; + } + spec->rpc_paired_code = ack_code; + + if (!store.put(code, spec.get())) + { + return false; + } + ack_spec->rpc_paired_code = code; + spec.release(); + } + else + { + if (!store.put(code, spec.get())) + { + return false; + } + spec.release(); + } } - - if (spec->priority != pri) + else { - dwarn("overwrite priority for task %s from %s to %s", - dsn_task_code_to_string(code), - enum_to_string(spec->priority), - enum_to_string(pri) - ); - spec->priority = pri; - } + auto spec = task_spec::get(code); + if (spec == nullptr) + { + return false; + } - if (spec->pool_code != pool) - { - dwarn("overwrite default thread pool for task %s from %s to %s", - dsn_task_code_to_string(code), - dsn_threadpool_code_to_string(spec->pool_code), - dsn_threadpool_code_to_string(pool) - ); - spec->pool_code = pool; + if (spec->type != type) + { + derror("task code %s registerd for %s, which does not match with previously registered %s", + dsn_task_code_to_string(code), + enum_to_string(type), + enum_to_string(spec->type) + ); + return false; + } + + if (spec->priority != pri) + { + dwarn("overwrite priority for task %s from %s to %s", + dsn_task_code_to_string(code), + enum_to_string(spec->priority), + enum_to_string(pri) + ); + spec->priority = pri; + } + + if (spec->pool_code != pool) + { + dwarn("overwrite default thread pool for task %s from %s to %s", + dsn_task_code_to_string(code), + dsn_threadpool_code_to_string(spec->pool_code), + dsn_threadpool_code_to_string(pool) + ); + spec->pool_code = pool; + } } + return true; + } + catch (...) + { + return false; } } diff --git a/src/dev/cpp/utils.cpp b/src/dev/cpp/utils.cpp index b68188da4..b722db8ee 100644 --- a/src/dev/cpp/utils.cpp +++ b/src/dev/cpp/utils.cpp @@ -401,7 +401,8 @@ namespace dsn // now ret should be sizeof(len). if (len < 0) { - dassert(false, "len is negative: %d", len); + derror("binary_reader::read got negative string length: %d", len); + return 0; } else if (len == 0) { @@ -416,7 +417,8 @@ namespace dsn } else { - dassert(false, "read beyond the end of buffer"); + derror("binary_reader::read string beyond the end of buffer"); + return 0; } return ret; @@ -435,7 +437,8 @@ namespace dsn // now ret should be sizeof(len). if (len < 0) { - dassert(false, "len is negative: %d", len); + derror("binary_reader::read got negative blob length: %d", len); + return 0; } else if (len == 0) { @@ -459,7 +462,8 @@ namespace dsn } else { - dassert(false, "read beyond the end of buffer"); + derror("binary_reader::read blob beyond the end of buffer"); + return 0; } return ret; @@ -488,7 +492,7 @@ namespace dsn } else { - dassert(false, "read beyond the end of buffer"); + derror("binary_reader::read beyond the end of buffer"); return 0; } } diff --git a/src/plugins/apps.skv/simple_kv.server.impl.cpp b/src/plugins/apps.skv/simple_kv.server.impl.cpp index 798bce3e3..3ce8a0472 100644 --- a/src/plugins/apps.skv/simple_kv.server.impl.cpp +++ b/src/plugins/apps.skv/simple_kv.server.impl.cpp @@ -35,6 +35,7 @@ #include "simple_kv.server.impl.h" #include +#include #include #include @@ -49,7 +50,7 @@ namespace dsn { namespace replication { namespace application { - static bool parse_checkpoint_version(const std::string &name, int64_t *version) + static bool parse_checkpoint_version(const std::string& name, int64_t& version) { const char *prefix = "checkpoint."; if (name.substr(0, strlen(prefix)) != std::string(prefix)) @@ -57,13 +58,53 @@ namespace dsn { return false; } - if (!::dsn::utils::lexical_cast_integer(name.substr(strlen(prefix)), *version)) + if (!::dsn::utils::lexical_cast_integer(name.substr(strlen(prefix)), version)) { return false; } return true; } + + static bool read_checkpoint_exact(std::ifstream& is, char* buffer, size_t size, size_t& remaining) + { + if (remaining < size) + { + return false; + } + + is.read(buffer, size); + if (!is) + { + return false; + } + + remaining -= size; + return true; + } + + static bool read_checkpoint_string(std::ifstream& is, std::string& value, size_t& remaining) + { + uint32_t sz; + if (!read_checkpoint_exact(is, reinterpret_cast(&sz), sizeof(sz), remaining)) + { + return false; + } + + if (remaining < static_cast(sz)) + { + return false; + } + + if (sz == 0) + { + value.clear(); + return true; + } + + value.resize(sz); + return read_checkpoint_exact(is, &value[0], sz, remaining); + } simple_kv_service_impl::simple_kv_service_impl(dsn_gpid gpid) : ::dsn::replicated_service_app_type_1(gpid), _lock(true) @@ -125,7 +166,11 @@ namespace dsn { { zauto_lock l(_lock); set_last_durable_decree(0); - recover(); + auto err = recover(); + if (err != ERR_OK) + { + return err; + } } open_service(get_gpid()); @@ -153,81 +198,117 @@ namespace dsn { } // checkpoint related - void simple_kv_service_impl::recover() + // Fault injection showed checkpoint files can be left truncated or malformed when + // writes fail. Do not trust the newest checkpoint blindly: scan checkpoint candidates + // from newest to oldest, validate the complete file before loading it into _store, and + // only report corruption when every checkpoint candidate is invalid. + ::dsn::error_code simple_kv_service_impl::recover() { zauto_lock l(_lock); _store.clear(); - int64_t maxVersion = 0; - std::string name; - std::vector sub_list; std::string path = data_dir(); if (!dsn::utils::filesystem::get_subfiles(path, sub_list, false)) { - dassert(false, "Fail to get subfiles in %s.", path.c_str()); + derror("Fail to get subfiles in %s.", path.c_str()); + return ERR_FILE_OPERATION_FAILED; } + if (sub_list.empty()) + { + ddebug("simple_kv_service_impl found no checkpoint files in %s.", path.c_str()); + return ERR_OK; + } + + typedef std::pair checkpoint_info; + std::vector checkpoints; for (auto& fpath : sub_list) { auto&& s = dsn::utils::filesystem::get_file_name(fpath); int64_t version = 0; - if (!parse_checkpoint_version(s, &version)) + if (!parse_checkpoint_version(s, version)) { continue; } - if (version > maxVersion) - { - maxVersion = version; - name = std::string(data_dir()) + "/" + s; - } + checkpoints.push_back( + checkpoint_info(version, std::string(data_dir()) + "/" + s)); } sub_list.clear(); + if (checkpoints.empty()) + { + ddebug("simple_kv_service_impl found no checkpoint files in %s.", path.c_str()); + return ERR_OK; + } - if (maxVersion > 0) + std::sort(checkpoints.rbegin(), checkpoints.rend()); + for (const auto &checkpoint : checkpoints) { - recover(name, maxVersion); - set_last_durable_decree(maxVersion); + if (recover(checkpoint.second, checkpoint.first)) + { + set_last_durable_decree(checkpoint.first); + return ERR_OK; + } + + derror("simple_kv_service_impl ignored invalid checkpoint %s", + checkpoint.second.c_str()); } + + return ERR_CORRUPTION; } - void simple_kv_service_impl::recover(const std::string& name, int64_t version) + bool simple_kv_service_impl::recover(const std::string& name, int64_t version) { zauto_lock l(_lock); + int64_t file_size = 0; + if (!dsn::utils::filesystem::file_size(name, file_size) || file_size < 0) + { + derror("simple_kv_service_impl get checkpoint file size failed: %s", + name.c_str()); + return false; + } + size_t remaining = static_cast(file_size); + std::ifstream is(name.c_str(), std::ios::binary); if (!is.is_open()) - return; + { + derror("simple_kv_service_impl open checkpoint failed: %s", name.c_str()); + return false; + } - _store.clear(); + simple_kv store; uint64_t count; int magic; - is.read((char*)&count, sizeof(count)); - is.read((char*)&magic, sizeof(magic)); - dassert(magic == 0xdeadbeef, "invalid checkpoint"); + if (!read_checkpoint_exact(is, reinterpret_cast(&count), sizeof(count), remaining) || + !read_checkpoint_exact(is, reinterpret_cast(&magic), sizeof(magic), remaining) || + magic != 0xdeadbeef) + { + derror("simple_kv_service_impl invalid checkpoint header: %s", name.c_str()); + return false; + } for (uint64_t i = 0; i < count; i++) { std::string key; std::string value; - uint32_t sz; - is.read((char*)&sz, (uint32_t)sizeof(sz)); - key.resize(sz); - - is.read((char*)&key[0], sz); - - is.read((char*)&sz, (uint32_t)sizeof(sz)); - value.resize(sz); - - is.read((char*)&value[0], sz); + if (!read_checkpoint_string(is, key, remaining) || + !read_checkpoint_string(is, value, remaining)) + { + derror("simple_kv_service_impl invalid checkpoint body: %s", + name.c_str()); + return false; + } - _store[key] = value; + store[key] = value; } - is.close(); + + _store.swap(store); + return true; } ::dsn::error_code simple_kv_service_impl::sync_checkpoint(int64_t last_commit) @@ -251,7 +332,15 @@ namespace dsn { return ERR_OK; } - std::ofstream os(name, std::ios::binary); + std::string tmp_name = std::string(name) + ".tmp"; + dsn::utils::filesystem::remove_path(tmp_name); + std::ofstream os(tmp_name.c_str(), std::ios::binary | std::ios::trunc); + if (!os.is_open()) + { + derror("simple_kv_service_impl open checkpoint failed: %s", + tmp_name.c_str()); + return ERR_CHECKPOINT_FAILED; + } uint64_t count = (uint64_t)_store.size(); int magic = 0xdeadbeef; @@ -265,16 +354,39 @@ namespace dsn { uint32_t sz = (uint32_t)k.length(); os.write((const char*)&sz, (uint32_t)sizeof(sz)); - os.write((const char*)&k[0], sz); + if (sz > 0) + { + os.write((const char*)&k[0], sz); + } const std::string& v = it->second; sz = (uint32_t)v.length(); os.write((const char*)&sz, (uint32_t)sizeof(sz)); - os.write((const char*)&v[0], sz); + if (sz > 0) + { + os.write((const char*)&v[0], sz); + } } + os.flush(); + bool write_succeed = os.good(); os.close(); + write_succeed = write_succeed && os.good(); + if (!write_succeed) + { + derror("simple_kv_service_impl write checkpoint failed: %s", + tmp_name.c_str()); + dsn::utils::filesystem::remove_path(tmp_name); + return ERR_CHECKPOINT_FAILED; + } + + if (!utils::filesystem::rename_path(tmp_name, name)) + { + derror("simple_kv_service_impl publish checkpoint failed: %s", name); + dsn::utils::filesystem::remove_path(tmp_name); + return ERR_CHECKPOINT_FAILED; + } // TODO: gc checkpoints set_last_durable_decree(last_commit); @@ -322,7 +434,13 @@ namespace dsn { { if (mode == DSN_CHKPT_LEARN) { - recover(state.files[0], state.to_decree_included); + if (state.file_state_count <= 0 || + !recover(state.files[0], state.to_decree_included)) + { + derror("simple_kv_service_impl learn checkpoint failed"); + return ERR_CHECKPOINT_FAILED; + } + return ERR_OK; } else diff --git a/src/plugins/apps.skv/simple_kv.server.impl.h b/src/plugins/apps.skv/simple_kv.server.impl.h index 03198c83d..aa231a2cd 100644 --- a/src/plugins/apps.skv/simple_kv.server.impl.h +++ b/src/plugins/apps.skv/simple_kv.server.impl.h @@ -78,8 +78,8 @@ namespace dsn { ) override; private: - void recover(); - void recover(const std::string& name, int64_t version); + ::dsn::error_code recover(); + bool recover(const std::string& name, int64_t version); const char* data_dir() const { return _data_dir.c_str(); } int64_t last_durable_decree() const { return _last_durable_decree; } void set_last_durable_decree(int64_t d) { _last_durable_decree = d; } diff --git a/src/plugins/tools.common/asio_net_provider.cpp b/src/plugins/tools.common/asio_net_provider.cpp index b7ddcd01e..dcf862786 100644 --- a/src/plugins/tools.common/asio_net_provider.cpp +++ b/src/plugins/tools.common/asio_net_provider.cpp @@ -36,6 +36,8 @@ #include "asio_net_provider.h" #include "asio_rpc_session.h" +#include + # ifdef __TITLE__ # undef __TITLE__ # endif @@ -57,24 +59,32 @@ namespace dsn { int io_service_worker_count = (int)dsn_config_get_value_uint64("network", "io_service_worker_count", 1, "thread number for io service (timer and boost network)"); - for (int i = 0; i < io_service_worker_count; i++) + try { - _workers.push_back(std::shared_ptr(new std::thread([this, ctx, i]() + for (int i = 0; i < io_service_worker_count; i++) { - task::set_tls_dsn_context(node(), nullptr, ctx.queue); - - const char* name = ::dsn::tools::get_service_node_name(node()); - char buffer[128]; - int name_len = snprintf(buffer, sizeof(buffer), "%s.asio.%d", name, i); - if (name_len < 0 || static_cast(name_len) >= sizeof(buffer)) + _workers.push_back(std::shared_ptr(new std::thread([this, ctx, i]() { - dwarn("asio worker name is too long: %s", name); - } - task_worker::set_name(buffer); - - boost::asio::io_service::work work(_io_service); - _io_service.run(); - }))); + task::set_tls_dsn_context(node(), nullptr, ctx.queue); + + const char* name = ::dsn::tools::get_service_node_name(node()); + char buffer[128]; + int name_len = snprintf(buffer, sizeof(buffer), "%s.asio.%d", name, i); + if (name_len < 0 || static_cast(name_len) >= sizeof(buffer)) + { + dwarn("asio worker name is too long: %s", name); + } + task_worker::set_name(buffer); + + boost::asio::io_service::work work(_io_service); + _io_service.run(); + }))); + } + } + catch (const std::exception& err) + { + derror("failed to start asio tcp worker thread, err: %s", err.what()); + return ERR_NETWORK_START_FAILED; } _acceptor = nullptr; @@ -283,7 +293,9 @@ namespace dsn { if (client_only) { - do + static const int max_client_port_retry_count = 10; + int retry_count = 0; + for (; retry_count < max_client_port_retry_count; retry_count++) { //FIXME: we actually do not need to set a random port for client if the rpc_engine is refactored _address.assign_ipv4(get_local_ipv4(), (std::numeric_limits::max)() - @@ -297,9 +309,23 @@ namespace dsn { } catch (boost::system::system_error& err) { - ddebug("asio udp listen on port %u failed, err: %s", _address.port(), err.what()); + if (err.code() == boost::asio::error::address_in_use) + { + ddebug("asio udp listen on port %u failed, err: %s", _address.port(), err.what()); + continue; + } + + derror("asio udp listen on port %u failed, err: %s", _address.port(), err.what()); + return ERR_NETWORK_START_FAILED; } - } while (true); + } + + if (retry_count >= max_client_port_retry_count) + { + derror("asio udp failed to find an available client port after %d retries", + max_client_port_retry_count); + return ERR_ADDRESS_ALREADY_USED; + } } else { @@ -316,24 +342,32 @@ namespace dsn { } } - for (int i = 0; i < io_service_worker_count; i++) + try { - _workers.push_back(std::shared_ptr(new std::thread([this, ctx, i]() + for (int i = 0; i < io_service_worker_count; i++) { - task::set_tls_dsn_context(node(), nullptr, ctx.queue); - - const char* name = ::dsn::tools::get_service_node_name(node()); - char buffer[128]; - int name_len = snprintf(buffer, sizeof(buffer), "%s.asio.udp.%d.%d", name, (int)(this->address().port()), i); - if (name_len < 0 || static_cast(name_len) >= sizeof(buffer)) + _workers.push_back(std::shared_ptr(new std::thread([this, ctx, i]() { - dwarn("asio udp worker name is too long: %s", name); - } - task_worker::set_name(buffer); - - boost::asio::io_service::work work(_io_service); - _io_service.run(); - }))); + task::set_tls_dsn_context(node(), nullptr, ctx.queue); + + const char* name = ::dsn::tools::get_service_node_name(node()); + char buffer[128]; + int name_len = snprintf(buffer, sizeof(buffer), "%s.asio.udp.%d.%d", name, (int)(this->address().port()), i); + if (name_len < 0 || static_cast(name_len) >= sizeof(buffer)) + { + dwarn("asio udp worker name is too long: %s", name); + } + task_worker::set_name(buffer); + + boost::asio::io_service::work work(_io_service); + _io_service.run(); + }))); + } + } + catch (const std::exception& err) + { + derror("failed to start asio udp worker thread, err: %s", err.what()); + return ERR_NETWORK_START_FAILED; } do_receive(); diff --git a/src/plugins/tools.common/native_aio_provider.linux.cpp b/src/plugins/tools.common/native_aio_provider.linux.cpp index f858d8eef..a1c7846dc 100644 --- a/src/plugins/tools.common/native_aio_provider.linux.cpp +++ b/src/plugins/tools.common/native_aio_provider.linux.cpp @@ -138,7 +138,7 @@ namespace dsn { while (true) { - ret = io_getevents(_ctx, 1, 1, events, NULL); + ret = io_getevents(_ctx, 1, 1, events, nullptr); if (ret > 0) // should be 1 { dassert(ret == 1, ""); diff --git a/src/plugins/tools.common/native_aio_provider.win.cpp b/src/plugins/tools.common/native_aio_provider.win.cpp index 2e41485a7..c62bff29a 100644 --- a/src/plugins/tools.common/native_aio_provider.win.cpp +++ b/src/plugins/tools.common/native_aio_provider.win.cpp @@ -52,8 +52,8 @@ namespace dsn { namespace tools { native_win_aio_provider::native_win_aio_provider(disk_engine* disk, aio_provider* inner_provider) : aio_provider(disk, inner_provider) { - _iocp = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 0); - if ((_iocp == NULL) || (_iocp == INVALID_HANDLE_VALUE)) + _iocp = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, nullptr, 0); + if ((_iocp == nullptr) || (_iocp == INVALID_HANDLE_VALUE)) { derror("CreateIoCompletionPort failed, err = %d", ::GetLastError()); _iocp = INVALID_HANDLE_VALUE; @@ -62,9 +62,9 @@ native_win_aio_provider::native_win_aio_provider(disk_engine* disk, aio_provider native_win_aio_provider::~native_win_aio_provider() { - if (_worker_thr != nullptr && _iocp != NULL && _iocp != INVALID_HANDLE_VALUE) + if (_worker_thr != nullptr && _iocp != nullptr && _iocp != INVALID_HANDLE_VALUE) { - if (::PostQueuedCompletionStatus(_iocp, 0, 1, NULL) == FALSE) + if (::PostQueuedCompletionStatus(_iocp, 0, 1, nullptr) == FALSE) { derror("PostQueuedCompletionStatus failed, err = %d", ::GetLastError()); } @@ -81,7 +81,7 @@ native_win_aio_provider::~native_win_aio_provider() void native_win_aio_provider::start(io_modifer& ctx) { - if ((_iocp == NULL) || (_iocp == INVALID_HANDLE_VALUE)) + if ((_iocp == nullptr) || (_iocp == INVALID_HANDLE_VALUE)) { derror("cannot start native win aio provider without a valid IO completion port"); return; @@ -118,7 +118,7 @@ dsn_handle_t native_win_aio_provider::open(const char* file_name, int oflag, int SECURITY_ATTRIBUTES SecurityAttributes; SecurityAttributes.nLength = sizeof(SecurityAttributes); - SecurityAttributes.lpSecurityDescriptor = NULL; + SecurityAttributes.lpSecurityDescriptor = nullptr; if (oflag & _O_NOINHERIT) { SecurityAttributes.bInheritHandle = FALSE; @@ -206,7 +206,7 @@ dsn_handle_t native_win_aio_provider::open(const char* file_name, int oflag, int if (fileHandle != INVALID_HANDLE_VALUE && fileHandle != nullptr) { HANDLE iocp = ::CreateIoCompletionPort(fileHandle, _iocp, 0, 0); - if ((iocp == NULL) || (iocp != _iocp)) + if ((iocp == nullptr) || (iocp != _iocp)) { derror("cannot associate file handle %s to io completion port, err = 0x%x", file_name, @@ -307,10 +307,10 @@ error_code native_win_aio_provider::aio_internal(aio_task* aio_tsk, bool async, switch (aio->type) { case AIO_Read: - r = ::ReadFile((HANDLE)aio->file, aio->buffer, aio->buffer_size, NULL, &aio->olp); + r = ::ReadFile((HANDLE)aio->file, aio->buffer, aio->buffer_size, nullptr, &aio->olp); break; case AIO_Write: - r = ::WriteFile((HANDLE)aio->file, aio->buffer, aio->buffer_size, NULL, &aio->olp); + r = ::WriteFile((HANDLE)aio->file, aio->buffer, aio->buffer_size, nullptr, &aio->olp); break; default: dassert (false, "unknown aio type %u", static_cast(aio->type)); diff --git a/src/plugins/tools.common/profiler_command.cpp b/src/plugins/tools.common/profiler_command.cpp index de72d5097..7fa83a231 100644 --- a/src/plugins/tools.common/profiler_command.cpp +++ b/src/plugins/tools.common/profiler_command.cpp @@ -249,7 +249,7 @@ namespace dsn { counter_type = find_counter_type(val[1].c_str()); percentile_type = find_percentail_type(val[2].c_str()); - if ((task_id != TASK_CODE_INVALID) && (counter_type != PREF_COUNTER_INVALID) && (s_spec_profilers[task_id].ptr[counter_type] != NULL) && (s_spec_profilers[task_id].is_profile != false)) + if ((task_id != TASK_CODE_INVALID) && (counter_type != PREF_COUNTER_INVALID) && (s_spec_profilers[task_id].ptr[counter_type] != nullptr) && (s_spec_profilers[task_id].is_profile != false)) { ss << dsn_task_code_to_string(task_id) << ":" << counter_info_ptr[counter_type]->title << ":" << percentail_counter_string[percentile_type] << ":"; if (counter_info_ptr[counter_type]->type != COUNTER_TYPE_NUMBER_PERCENTILES) @@ -265,7 +265,7 @@ namespace dsn { { for (int j = 0; j < PREF_COUNTER_COUNT; j++) { - if ((s_spec_profilers[task_id].ptr[j] != NULL) && (counter_info_ptr[j]->type == COUNTER_TYPE_NUMBER_PERCENTILES)) + if ((s_spec_profilers[task_id].ptr[j] != nullptr) && (counter_info_ptr[j]->type == COUNTER_TYPE_NUMBER_PERCENTILES)) { ss << dsn_task_code_to_string(i) << ":" << counter_info_ptr[j]->title << ":" << percentail_counter_string[percentile_type] << ":" << s_spec_profilers[task_id].ptr[j]->get_percentile(percentile_type) << " "; } @@ -348,7 +348,7 @@ namespace dsn { { counter_type = static_cast(k); - if (s_spec_profilers[task_id].ptr[counter_type] == NULL) + if (s_spec_profilers[task_id].ptr[counter_type] == nullptr) { ss << ",\"\""; } @@ -432,7 +432,7 @@ namespace dsn { { counter_sample_resp resp; - if (s_spec_profilers[task_id].ptr[counter_type] == NULL) + if (s_spec_profilers[task_id].ptr[counter_type] == nullptr) continue; char name[20] = { 0 }; @@ -532,7 +532,7 @@ namespace dsn { if (counter_info_ptr[counter_type]->type == COUNTER_TYPE_NUMBER_PERCENTILES) { - if (s_spec_profilers[task_id].ptr[counter_type] == NULL) + if (s_spec_profilers[task_id].ptr[counter_type] == nullptr) continue; double timeGet = 0; @@ -615,7 +615,7 @@ namespace dsn { if (counter_info_ptr[counter_type]->type == COUNTER_TYPE_NUMBER_PERCENTILES) { - if (s_spec_profilers[task_id].ptr[counter_type] == NULL) + if (s_spec_profilers[task_id].ptr[counter_type] == nullptr) continue; char name[20] = { 0 }; diff --git a/src/plugins/tools.emulator/scheduler.cpp b/src/plugins/tools.emulator/scheduler.cpp index eaf6bab33..7746c43d8 100644 --- a/src/plugins/tools.emulator/scheduler.cpp +++ b/src/plugins/tools.emulator/scheduler.cpp @@ -91,7 +91,7 @@ std::vector* event_wheel::pop_next_events(/*out*/ uint64_t& ts) { utils::auto_lock< ::dsn::utils::ex_lock> l(_lock); - std::vector* evts = NULL; + std::vector* evts = nullptr; auto itr = _events.begin(); if (itr != _events.end()){ evts = itr->second; diff --git a/src/plugins_ext/rDSN.dist.service b/src/plugins_ext/rDSN.dist.service index 4317efe52..4757232bb 160000 --- a/src/plugins_ext/rDSN.dist.service +++ b/src/plugins_ext/rDSN.dist.service @@ -1 +1 @@ -Subproject commit 4317efe529000abbaecf11dcfa18a89fdbeb65e6 +Subproject commit 4757232bbdbb3a13d2a8d85e8d65dc65777aa454 diff --git a/src/plugins_ext/rDSN.tools.hpc b/src/plugins_ext/rDSN.tools.hpc index 8ce49b23d..842ba7ceb 160000 --- a/src/plugins_ext/rDSN.tools.hpc +++ b/src/plugins_ext/rDSN.tools.hpc @@ -1 +1 @@ -Subproject commit 8ce49b23df4b4ee41c0ade8470da938b6cd7ed50 +Subproject commit 842ba7ceb7cab73c90dbe3093cf965dca090f8a2