Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
settings.json
*.o
.dep*
build/
build/
.joycode
38 changes: 37 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,42 @@
CMAKE_MINIMUM_REQUIRED (VERSION 2.8)
CMAKE_MINIMUM_REQUIRED (VERSION 3.10)

# Set modern CMake policies
if(POLICY CMP0074)
cmake_policy(SET CMP0074 NEW) # Find_package uses <PackageName>_ROOT variables
endif()
if(POLICY CMP0077)
cmake_policy(SET CMP0077 NEW) # option() honors normal variables
endif()

PROJECT(dtc)

# Set default build type if not specified
if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE RelWithDebInfo CACHE STRING "Build type" FORCE)
endif()

# Enable modern compiler features while maintaining GCC 4.9 compatibility
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)

# Compiler-specific flags for modern GCC support
if(CMAKE_COMPILER_IS_GNUCXX)
# Add compiler version checks for better compatibility
if(CMAKE_CXX_COMPILER_VERSION VERSION_GREATER_EQUAL "6.0")
# Modern GCC flags
add_compile_options(-Wno-misleading-indentation)
add_compile_options(-Wno-address-of-packed-member)
endif()

# Common flags for all GCC versions
add_compile_options("$<$<COMPILE_LANGUAGE:CXX>:-fpermissive>")
add_compile_options(-Wno-builtin-macro-redefined)

# Use old ABI for compatibility with bundled libraries
add_definitions(-D_GLIBCXX_USE_CXX11_ABI=0)
endif()

#Use optional command `cmake -DCMAKE_TEST_OPTION=ON ..` to build test demo.
option(CMAKE_TEST_OPTION "option for test_demo" OFF)

Expand Down
6 changes: 6 additions & 0 deletions conf/server.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
rules:
- !AUTHORITY
users:
- sharding@%:sharding
provider:
type: ALL_PERMITTED
15 changes: 9 additions & 6 deletions src/agent-watchdog/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ INCLUDE_DIRECTORIES(
${PROJECT_SOURCE_DIR}/src/daemons
${PROJECT_SOURCE_DIR}/src/libs/common
${PROJECT_SOURCE_DIR}/src/libs/stat
${PROJECT_SOURCE_DIR}/src/core
${PROJECT_SOURCE_DIR}/src/libs/log4cplus/include
${PROJECT_SOURCE_DIR}/src/libs/yaml-cpp/include
${PROJECT_SOURCE_DIR}/src/core
${PROJECT_SOURCE_DIR}/src/libs/mxml/include)

LINK_DIRECTORIES(
Expand All @@ -16,8 +16,8 @@ LINK_DIRECTORIES(

include(../utils.cmake)

FILE(GLOB_RECURSE SRC_LIST1 ./*.cc ./*.c)

FILE(GLOB_RECURSE CXX_SRC_LIST ./*.cc)
FILE(GLOB_RECURSE C_SRC_LIST ./*.c)

LINK_LIBRARIES(liblog4cplus.a)
LINK_LIBRARIES(libcommon.a)
Expand All @@ -26,10 +26,13 @@ LINK_LIBRARIES(libstat.a)
LINK_LIBRARIES(pthread)
LINK_LIBRARIES(dl)

ADD_DEFINITIONS("-g -fPIC -fpermissive -std=gnu++11")
ADD_DEFINITIONS(-Wno-builtin-macro-redefined)
# Set properties for C++ files
set_source_files_properties(${CXX_SRC_LIST} PROPERTIES COMPILE_FLAGS "-g -fPIC -fpermissive -std=gnu++11 -D_GLIBCXX_USE_CXX11_ABI=0 -Wno-builtin-macro-redefined")

# Set properties for C files
set_source_files_properties(${C_SRC_LIST} PROPERTIES COMPILE_FLAGS "-g -fPIC -Wno-builtin-macro-redefined")

ADD_EXECUTABLE(agent-watchdog ${SRC_LIST1})
ADD_EXECUTABLE(agent-watchdog ${CXX_SRC_LIST} ${C_SRC_LIST})

TARGET_LINK_LIBRARIES(agent-watchdog libstat.a libcommon.a libyaml-cpp.a liblog4cplus.a mxml)
redefine_file_macro(agent-watchdog)
3 changes: 2 additions & 1 deletion src/agent-watchdog/daemons.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ void close_sharding()
p[0] = NULL;
execv("../sharding/bin/stop.sh", p);

system("../sharding/bin/stop.sh");
int result = system("../sharding/bin/stop.sh");
(void)result; // Silence unused result warning
}

void WatchDog::run_loop()
Expand Down
4 changes: 2 additions & 2 deletions src/agent-watchdog/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ std::map<std::string, std::string> map_dtc_conf; //key:value --> dtc addr:conf f
#define DA_STRING(x) DA_STRING_HELPER(x)

#define DA_VERSION_STR \
DA_STRING(DA_VERSION_MAJOR)"."DA_STRING(DA_VERSION_MINOR)"."\
DA_STRING(DA_VERSION_MAJOR) "." DA_STRING(DA_VERSION_MINOR) "." \
DA_STRING(DA_VERSION_BUILD)

static int show_help;
Expand Down Expand Up @@ -368,7 +368,7 @@ int get_all_dtc_confs()
int content_len = 0;
log4cplus_debug("addr:%s", addr.c_str());
std::string str = send_select_dtcyaml((addr.substr(0, addr.find(':'))).c_str(), atoi((addr.substr(addr.find(':')+1)).c_str()));
content = str.c_str();
content = const_cast<char*>(str.c_str());
content_len = str.length();
log4cplus_debug("content:%s", content);

Expand Down
4 changes: 2 additions & 2 deletions src/agent-watchdog/sharding_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ void ShardingEntry::exec()
{
set_proc_title("agent_sharding");
argv[0] = (char *)"../sharding/bin/start.sh";
argv[1] = "3307";
argv[2] = "../conf";
argv[1] = const_cast<char*>("3307");
argv[2] = const_cast<char*>("../conf");
argv[3] = NULL;
execv(argv[0], argv);
}
Expand Down
4 changes: 2 additions & 2 deletions src/agent/common/da_string.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ int string_compare_nonsentive(const struct string *s1, const struct string *s2)
return s1->len > s2->len ? 1 : -1;
}

string_copy(&tmp1, s1, s1->len);
string_copy(&tmp2, s2, s2->len);
string_copy(&tmp1, s1->data, s1->len);
string_copy(&tmp2, s2->data, s2->len);
string_upper(&tmp1);
string_upper(&tmp2);
return da_strncmp(&tmp1.data, &tmp2.data, &tmp1.len);
Expand Down
1 change: 1 addition & 0 deletions src/agent/da_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/
#include <inttypes.h>
#include <sys/uio.h>
#include "da_mem_pool.h"
#include "da_conn.h"
#include "da_listener.h"
Expand Down
1 change: 1 addition & 0 deletions src/agent/da_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "da_core.h"
#include "da_event.h"
#include "da_stats.h"
#include "my/my_net_send.h"
#include "da_msg.h"

void listener_ref(struct conn *l, void *owner)
Expand Down
5 changes: 3 additions & 2 deletions src/agent/da_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "da_server.h"
#include "da_time.h"
#include "da_protocal.h"
#include "da_request.h"
#include "my/my_parse.h"
#include "my/my_comm.h"
#include "da_core.h"
Expand Down Expand Up @@ -426,7 +427,7 @@ static int msg_recv_chain(struct context *ctx, struct conn *conn,
ASSERT((mbuf->last + n) <= mbuf->end);
mbuf->last += n;
msg->mlen += (uint32_t) n;
log_debug("mbuf recv %d bytes data actually.(%p %p %p)", mbuf->last - mbuf->pos, mbuf->last, mbuf->pos, msg->pos);
log_debug("mbuf recv %ld bytes data actually.(%p %p %p)", mbuf->last - mbuf->pos, mbuf->last, mbuf->pos, msg->pos);
for (;;) {
status = msg_parse(ctx, conn, msg);
if (status != 0) {
Expand Down Expand Up @@ -587,7 +588,7 @@ static int msg_send_chain(struct context *ctx, struct conn *conn,
}

mlen = mbuf_length(mbuf);
log_debug("mbuf len, len:%d, msg len:%d; %p %p", mlen, msg->mlen, mbuf->last, mbuf->pos);
log_debug("mbuf len, len:%zu, msg len:%d; %p %p", mlen, msg->mlen, mbuf->last, mbuf->pos);
if ((nsend + mlen) > limit) {
mlen = limit - nsend;
}
Expand Down
12 changes: 7 additions & 5 deletions src/agent/da_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include "da_errno.h"
#include "da_stats.h"
#include "da_time.h"
#include "my/my_net_send.h"
#include "my/my_parse.h"
#include "my/my_comm.h"

#define LAYER3_DEF "layer3"
Expand Down Expand Up @@ -343,7 +345,7 @@ int dtc_header_add(struct msg *msg, enum enum_agent_admin admin, char* dbname)
dtc_header.packet_len = mbuf_length(mbuf) + sizeof(dtc_header) + dtc_header.dbname_len;
dtc_header.layer = msg->layer;

mbuf_copy(new_buf, &dtc_header, sizeof(dtc_header));
mbuf_copy(new_buf, (uint8_t*)&dtc_header, sizeof(dtc_header));
if(dbname && strlen(dbname) > 0)
mbuf_copy(new_buf, dbname, dtc_header.dbname_len);

Expand All @@ -356,7 +358,7 @@ int dtc_header_add(struct msg *msg, enum enum_agent_admin admin, char* dbname)
mbuf_insert(&msg->buf_q, new_buf);

msg->mlen = mbuf_length(new_buf);
log_debug("msg->mlen:%d sizeof(dtc_header):%d mbuf_length(mbuf):%d",
log_debug("msg->mlen:%d sizeof(dtc_header):%zu mbuf_length(mbuf):%d",
msg->mlen, sizeof(dtc_header), mbuf_length(mbuf));

return 0;
Expand Down Expand Up @@ -411,13 +413,13 @@ void req_process(struct context *ctx, struct conn *c_conn, struct msg *msg)
case NEXT_FORWARD:
dtc_header_add(msg, CMD_NOP, c_conn->dbname);
log_debug(
"FORWARD. msg len: %d, msg id: %d",
"FORWARD. msg len: %d, msg id: %lu",
msg->mlen, msg->id);
req_forward(ctx, c_conn, msg);
break;
case NEXT_RSP_OK:
log_debug(
"RSP OK. msg len: %d, msg id: %d",
"RSP OK. msg len: %d, msg id: %lu",
msg->mlen, msg->id);
if (net_send_ok(msg, c_conn) <
0) /* default resp login success. */
Expand All @@ -426,7 +428,7 @@ void req_process(struct context *ctx, struct conn *c_conn, struct msg *msg)
break;
case NEXT_RSP_ERROR:
log_debug(
"RSP ERROR. msg len: %d, msg id: %d",
"RSP ERROR. msg len: %d, msg id: %lu",
msg->mlen, msg->id);
if (net_send_error(msg, c_conn) <
0) /* default resp login success. */
Expand Down
2 changes: 1 addition & 1 deletion src/agent/da_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ void rsp_recv_done(struct context *ctx, struct conn *conn, struct msg *msg,

struct rbnode tnode;
tnode.key = msg->peerid; //peer msg_id for search,get from package
log_debug("rbtree node key: %"PRIu64", id:%d, peerid:%d, tree:%p %d %d",tnode.key, msg->id, msg->peerid, &conn->msg_tree, conn->msg_tree.root->key, conn->msg_tree.sentinel->key);
log_debug("rbtree node key: %"PRIu64", id:%lu, peerid:%lu, tree:%p %"PRIu64" %"PRIu64"",tnode.key, msg->id, msg->peerid, &conn->msg_tree, conn->msg_tree.root->key, conn->msg_tree.sentinel->key);
tarnode = rbtree_search(&conn->msg_tree, &tnode);
if (tarnode == NULL) { //node has been deleted by timeout
log_debug("rsp msg id: %"PRIu64" peerid :%"PRIu64" search peer msg error,msg is not in the tree",
Expand Down
4 changes: 2 additions & 2 deletions src/agent/da_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ struct cache_instance *get_instance_from_array(struct array replica_array, uint1
t = 1;
t = t << ci->failure_num;
t = t * 1000;
printf("i :%.*s now_ms%"PRIu64" failtime%"PRIu64" t%"PRIu64" \n", ci->pname.len, ci->pname.data, now_ms, ci->last_failure_ms, t);
printf("i :%.*s now_ms%"PRIu64" failtime%"PRIu64" t%d \n", ci->pname.len, ci->pname.data, now_ms, ci->last_failure_ms, t);
if ((now_ms - ci->last_failure_ms) > t) {
(*cnt) = 0;
(*array_idx) = idx + 1;
Expand Down Expand Up @@ -569,7 +569,7 @@ void instance_deinit(struct array *instance) {
for (i = 0, nserver = array_n(instance); i < nserver; i++) {
struct cache_instance *ci;
ci = array_pop(instance);
printf("ip : %.*s, num : %d, fail_time : %d\n", ci->pname.len, ci->pname.data, ci->num, ci->last_failure_ms);
printf("ip : %.*s, num : %d, fail_time : %"PRIu64"\n", ci->pname.len, ci->pname.data, ci->num, ci->last_failure_ms);
string_deinit(&ci->pname);
ASSERT(TAILQ_EMPTY(&ci->s_conn_q) && ci->ns_conn_q == 0);
}
Expand Down
1 change: 1 addition & 0 deletions src/agent/my/my_net_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "../da_request.h"
#include "../da_buf.h"
#include "../da_util.h"
#include "my_net_write.h"
#include "../da_errno.h"
#include "../da_time.h"
#include "../da_core.h"
Expand Down
5 changes: 3 additions & 2 deletions src/agent/my/my_net_send.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ MYSQL Protocol Definition, See more detail:
*/

int net_send_ok(struct msg *smsg, struct conn *c_conn);
int net_send_error(struct msg *smsg, struct msg *dmsg);
int net_send_server_greeting(struct msg *smsg, struct msg *dmsg);
int net_send_error(struct msg *smsg, struct conn *c_conn);
int net_send_switch(struct msg *smsg, struct conn *c_conn);
int net_send_server_greeting(struct conn* c, struct msg *smsg);

struct msg *net_send_desc_dtctable(struct conn *c_conn);

Expand Down
22 changes: 14 additions & 8 deletions src/agent/my/my_parse.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
#include "../da_core.h"
#include "my_comm.h"
#include "my_command.h"
#include "my_parse.h"
#include "my_protocol_classic.h"

// Forward declaration for functions used before defined
int my_get_command(uint8_t *input_raw_packet, uint32_t input_packet_length,
struct msg *r, enum enum_server_command *cmd);

#define MYSQL_HEADER_SIZE 4
#define MAXPACKETSIZE (64 << 20)
Expand Down Expand Up @@ -83,7 +89,7 @@ void my_parse_req(struct msg *r)
if (p < b->last) {
if (b->last - p < MYSQL_HEADER_SIZE) {
log_error(
"receive size small than package header. id:%d",
"receive size small than package header. id:%lu",
r->id);
p = b->last;
goto end;
Expand Down Expand Up @@ -238,7 +244,7 @@ void my_parse_rsp(struct msg *r)
if(b->last - b->start < sizeof(struct DTC_HEADER_V2) + MYSQL_HEADER_SIZE)
{
log_error(
"receive size small than package header. id:%d",
"receive size small than package header. id:%lu",
r->id);
p = b->last;
goto error;
Expand All @@ -265,7 +271,7 @@ void my_parse_rsp(struct msg *r)
r->ismysql = 1;
}

log_debug("pkt_nr:%d, peerid:%d, id:%d, admin:%d, packet_len:%d, db_len:%d", r->pkt_nr,
log_debug("pkt_nr:%d, peerid:%lu, id:%lu, admin:%d, packet_len:%d, db_len:%d", r->pkt_nr,
r->peerid, r->id, r->admin, packet_len, db_len);
}

Expand Down Expand Up @@ -483,7 +489,7 @@ int my_fragment(struct msg *r, uint32_t ncontinuum, struct msg_tqh *frag_msgq)
int status, i;
struct keypos *temp_kpos;
CValue val;
log_debug("key count:%d, cmd:%d", r->keyCount, r->cmd);
log_debug("key count:%lu, cmd:%d", r->keyCount, r->cmd);

if (r->cmd == MSG_NOP || r->admin != CMD_NOP) {
uint64_t randomkey = randomHashSeed++;
Expand Down Expand Up @@ -516,7 +522,7 @@ int my_fragment(struct msg *r, uint32_t ncontinuum, struct msg_tqh *frag_msgq)
log_error("decode value:%d", status);
return -1;
}
log_debug("val.u64:%d", val.u64);
log_debug("val.u64:%lu", val.u64);
r->idx = msg_backend_idx(r, (uint8_t *)&val.u64,
sizeof(uint64_t));
log_debug("r->idx:%d", r->idx);
Expand Down Expand Up @@ -653,14 +659,14 @@ int my_get_route_key(uint8_t *sql, int sql_len, int *start_offset,
log_debug("sql: %s", str.data);
if(dbsession && strlen(dbsession))
{
log_debug("dbsession len:%d, dbsession: %s", strlen(dbsession), dbsession);
log_debug("dbsession len:%zu, dbsession: %s", strlen(dbsession), dbsession);
}

char strkey[1024] = {0};
memset(strkey, 0, 1024);

//agent sql route, rule engine
layer = rule_sql_match(str.data, ostr.data, dbsession, &strkey, &r->keytype);
layer = rule_sql_match(str.data, ostr.data, dbsession, strkey, (int*)&r->keytype);
log_debug("rule layer: %d", layer);

if(layer != 1)
Expand Down Expand Up @@ -692,7 +698,7 @@ int my_get_route_key(uint8_t *sql, int sql_len, int *start_offset,
for (; i < str.len; i++) {
if (str.len - i >= strlen(strkey)) {
log_debug(
"key: %s, key len:%d, str.len:%d i:%d dtc_key_len:%d str.data + i:%s ", strkey, strlen(strkey),
"key: %s, key len:%zu, str.len:%d i:%d dtc_key_len:%zu str.data + i:%s ", strkey, strlen(strkey),
str.len, i, strlen(strkey), str.data + i);
if (da_strncmp(str.data + i, strkey, strlen(strkey)) == 0)
{
Expand Down
14 changes: 13 additions & 1 deletion src/agent/my/my_parse.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,22 @@ MYSQL Protocol Definition, See more detail:
void my_parse_req(struct msg *r);
void my_parse_rsp(struct msg *r);

int my_do_command(struct context *ctx, struct conn *c_conn, struct msg *msg);
int my_do_command(struct msg *msg);
int my_fragment(struct msg *r, uint32_t ncontinuum, struct msg_tqh *frag_msgq);

int my_get_route_key(uint8_t *sql, int sql_len, int *start_offset,
int *end_offset, const char* dbsession, struct msg* r);

int my_get_command(uint8_t *input_raw_packet, uint32_t input_packet_length,
struct msg *r, enum enum_server_command *cmd);

#ifdef __cplusplus
extern "C" {
#endif
int rule_sql_match(const char* szsql, const char* osql, const char* dbsession, char* out_dtckey, int* out_keytype);
int get_statement_value(char* str, int len, const char* strkey, int* start_offset, int* end_offset);
#ifdef __cplusplus
}
#endif

#endif /* _MY_PARSE_H_ */
Loading
Loading