diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 19f897d2d36f..35d81f304d04 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1142,6 +1142,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetTypeRewrite()); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 50a7c9efad91..941472cd0bb7 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -581,6 +581,51 @@ class TKqpExecuterBase : public TActor { static_cast(this)->CheckExecutionComplete(); } + void HandleHttpInfo(NMon::TEvHttpInfo::TPtr& ev) { + TStringStream str; + HTML(str) { + PRE() { + str << "KQP Executer, SelfId=" << SelfId() << Endl; + + TABLE_SORTABLE_CLASS("table table-condensed") { + TABLEHEAD() { + TABLER() { + TABLEH() {str << "TxId";} + TABLEH() {str << "StageId";} + TABLEH() {str << "TaskId";} + TABLEH() {str << "NodeId";} + TABLEH() {str << "ActorId";} + TABLEH() {str << "Completed";} + } + } + TABLEBODY() { + for (const auto& task : TasksGraph.GetTasks()) { + TABLER() { + TABLED() {str << task.StageId.TxId;} + TABLED() {str << task.StageId.StageId;} + TABLED() {str << task.Id;} + TABLED() {str << task.Meta.NodeId;} + TABLED() { + if (task.ComputeActorId) { + HREF(TStringBuilder() << "/node/" << task.ComputeActorId.NodeId() << "/actors/kqp_node?ca=" << task.ComputeActorId) { + str << task.ComputeActorId; + } + } else { + str << "N/A"; + } + str << Endl; + } + TABLED() {str << task.Meta.Completed;} + } + } + } + } + } + } + + this->Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str())); + } + STATEFN(ReadyState) { switch (ev->GetTypeRewrite()) { hFunc(TEvKqpExecuter::TEvTxRequest, HandleReady); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 6b2f0fe8af95..a92946278139 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -105,6 +105,7 @@ class TKqpScanExecuter : public TKqpExecuterBaseGetTypeRewrite()); diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index b8aea9a7427a..33e9f70530e2 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -510,14 +510,28 @@ class TKqpNodeService : public TActorBootstrapped { } void HandleWork(NMon::TEvHttpInfo::TPtr& ev) { + + const TCgiParameters &cgi = ev->Get()->Request.GetParams(); + TActorId id; + + auto caId = cgi.Get("ca"); + if (caId && State_->ValidateComputeActorId(caId, id)) { + TActivationContext::Send(ev->Forward(id)); + return; + } + + auto exId = cgi.Get("ex"); + if (exId && State_->ValidateKqpExecuterId(exId, SelfId().NodeId(), id)) { + TActivationContext::Send(ev->Forward(id)); + return; + } + TStringStream str; HTML(str) { PRE() { - str << "Current config:" << Endl; + str << "TKqpNodeService, SelfId=" << SelfId() << Endl; + str << Endl << "Current config:" << Endl; str << Config.DebugString() << Endl; - str << Endl; - - str << Endl << "Transactions:" << Endl; State_->DumpInfo(str); } } diff --git a/ydb/core/kqp/node_service/kqp_node_state.cpp b/ydb/core/kqp/node_service/kqp_node_state.cpp index af1bcb9b0bc2..2316f64895c9 100644 --- a/ydb/core/kqp/node_service/kqp_node_state.cpp +++ b/ydb/core/kqp/node_service/kqp_node_state.cpp @@ -113,26 +113,86 @@ std::vector TNodeState::GetTasksByTxId(ui64 txId) const } void TNodeState::DumpInfo(TStringStream& str) const { - for (const auto& bucket : Buckets) { - TReadGuard guard(bucket.Mutex); - TMap>> byTx; + HTML(str) { + str << Endl << "Transactions:" << Endl; + TABLE_SORTABLE_CLASS("table table-condensed") { + TABLEHEAD() { + TABLER() { + TABLEH() {str << "TxId";} + TABLEH() {str << "Executer";} + TABLEH() {str << "StartTime";} + TABLEH() {str << "Deadline";} + } + } + TABLEBODY() { + for (const auto& bucket : Buckets) { + TReadGuard guard(bucket.Mutex); + TMap>> byTx; - for (const auto& [txId, request] : bucket.Requests) { - byTx[txId].emplace_back(request.ExecuterId, &request); + for (const auto& [txId, request] : bucket.Requests) { + byTx[txId].emplace_back(request.ExecuterId, &request); + } + + for (const auto& [txId, requests] : byTx) { + for (auto& [requester, request] : requests) { + TABLER() { + TABLED() {str << txId;} + TABLED() { + HREF(TStringBuilder() << "/node/" << requester.NodeId() << "/actors/kqp_node?ex=" << requester) { + str << requester; + } + } + TABLED() {str << request->StartTime;} + TABLED() {str << request->Deadline;} + } + } + } + } + } } - for (const auto& [txId, requests] : byTx) { - str << " Requests:" << Endl; - for (auto& [requester, request] : requests) { - str << " Requester: " << requester << Endl; - str << " StartTime: " << request->StartTime << Endl; - str << " Deadline: " << request->Deadline << Endl; - str << " In-fly tasks:" << Endl; - for (auto& [taskId, actorId] : request->Tasks) { - str << " Task: " << taskId << Endl; - if (actorId) { - str << " Compute actor: " << *actorId << Endl; - } else { - str << " Compute actor: (task not started yet)" << Endl; + + str << Endl << "Tasks:" << Endl; + TABLE_SORTABLE_CLASS("table table-condensed") { + TABLEHEAD() { + TABLER() { + TABLEH() {str << "TxId";} + TABLEH() {str << "Executer";} + TABLEH() {str << "TaskId";} + TABLEH() {str << "ComputeActorId";} + } + } + TABLEBODY() { + for (const auto& bucket : Buckets) { + TReadGuard guard(bucket.Mutex); + TMap>> byTx; + + for (const auto& [txId, request] : bucket.Requests) { + byTx[txId].emplace_back(request.ExecuterId, &request); + } + + for (const auto& [txId, requests] : byTx) { + for (auto& [requester, request] : requests) { + for (auto& [taskId, actorId] : request->Tasks) { + TABLER() { + TABLED() {str << txId;} + TABLED() { + HREF(TStringBuilder() << "/node/" << requester.NodeId() << "/actors/kqp_node?ex=" << requester) { + str << requester; + } + } + TABLED() {str << taskId;} + TABLED() { + if (actorId) { + HREF(TStringBuilder() << "/node/" << requester.NodeId() << "/actors/kqp_node?ca=" << *actorId) { + str << *actorId; + } + } else { + str << "N/A"; + } + } + } + } + } } } } @@ -140,4 +200,32 @@ void TNodeState::DumpInfo(TStringStream& str) const { } } +bool TNodeState::ValidateComputeActorId(const TString& computeActorId, TActorId& id) const { + for (const auto& bucket : Buckets) { + TReadGuard guard(bucket.Mutex); + for (const auto& [_, request] : bucket.Requests) { + for (auto& [_, actorId] : request.Tasks) { + if (actorId && ToString(*actorId) == computeActorId) { + id = *actorId; + return true; + } + } + } + } + return false; +} + +bool TNodeState::ValidateKqpExecuterId(const TString& kqpExecuterId, ui32 nodeId, TActorId& id) const { + for (const auto& bucket : Buckets) { + TReadGuard guard(bucket.Mutex); + for (const auto& [_, request] : bucket.Requests) { + if (ToString(request.ExecuterId) == kqpExecuterId && request.ExecuterId.NodeId() == nodeId) { + id = request.ExecuterId; + return true; + } + } + } + return false; +} + } // namespace NKikimr::NKqp::NKqpNode diff --git a/ydb/core/kqp/node_service/kqp_node_state.h b/ydb/core/kqp/node_service/kqp_node_state.h index 97bf9d732530..747b94b1d92d 100644 --- a/ydb/core/kqp/node_service/kqp_node_state.h +++ b/ydb/core/kqp/node_service/kqp_node_state.h @@ -61,6 +61,8 @@ class TNodeState { std::vector GetTasksByTxId(ui64 txId) const; void DumpInfo(TStringStream& str) const; + bool ValidateComputeActorId(const TString& computeActorId, TActorId& id) const; + bool ValidateKqpExecuterId(const TString& kqpExecuterId, ui32 nodeId, TActorId& id) const; private: inline auto& GetBucketByTxId(ui64 txId) {