Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 11 additions & 18 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ using namespace ErrorCode;

DeltaWriterV2::DeltaWriterV2(WriteRequest* req,
const std::vector<std::shared_ptr<LoadStreamStub>>& streams,
RuntimeState* state)
: _state(state),
_req(*req),
std::shared_ptr<WorkloadGroup> workload_group)
: _req(*req),
_workload_group(std::move(workload_group)),
_tablet_schema(new TabletSchema),
_memtable_writer(new MemTableWriter(*req)),
_streams(streams) {}
Expand Down Expand Up @@ -127,19 +127,17 @@ Status DeltaWriterV2::init() {

_rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
RETURN_IF_ERROR(_rowset_writer->init(context));
std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
if (_state->get_query_ctx()) {
wg_sptr = _state->get_query_ctx()->workload_group();
}
RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info,
wg_sptr, _streams[0]->enable_unique_mow(_req.index_id)));
_workload_group,
_streams[0]->enable_unique_mow(_req.index_id)));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
_streams.clear();
return Status::OK();
}

Status DeltaWriterV2::write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) {
Status DeltaWriterV2::write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs,
const std::function<Status()>& cancel_check) {
if (UNLIKELY(row_idxs.empty())) {
return Status::OK();
}
Expand All @@ -155,9 +153,8 @@ Status DeltaWriterV2::write(const vectorized::Block* block, const DorisVector<ui
DBUG_EXECUTE_IF("DeltaWriterV2.write.back_pressure",
{ std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
while (_memtable_writer->flush_running_count() >= memtable_flush_running_count_limit) {
if (_state->is_cancelled()) {
return _state->cancel_reason();
}
DBUG_EXECUTE_IF("DeltaWriterV2.write.flush_limit_wait", DBUG_RUN_CALLBACK());
RETURN_IF_ERROR(cancel_check());
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
Expand Down Expand Up @@ -186,14 +183,10 @@ Status DeltaWriterV2::close_wait(int32_t& num_segments, RuntimeProfile* profile)
DCHECK(_is_init)
<< "delta writer is supposed be to initialized before close_wait() being called";

if (_state->profile_level() >= 2 && profile != nullptr) {
if (profile != nullptr) {
_update_profile(profile);
}
if (_state->profile_level() >= 2) {
RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
} else {
RETURN_IF_ERROR(_memtable_writer->close_wait());
}
RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
num_segments = _rowset_writer->next_segment_id();

_delta_written_success = true;
Expand Down
10 changes: 6 additions & 4 deletions be/src/olap/delta_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <stdint.h>

#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
#include <shared_mutex>
Expand Down Expand Up @@ -52,6 +53,7 @@ class SlotDescriptor;
class OlapTableSchemaParam;
class BetaRowsetWriterV2;
class LoadStreamStub;
class WorkloadGroup;

namespace vectorized {
class Block;
Expand All @@ -64,13 +66,14 @@ class DeltaWriterV2 {

public:
DeltaWriterV2(WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams,
RuntimeState* state);
std::shared_ptr<WorkloadGroup> workload_group);

~DeltaWriterV2();

Status init();

Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs);
Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs,
const std::function<Status()>& cancel_check);

// flush the last memtable to flush queue, must call it before close_wait()
Status close();
Expand All @@ -90,11 +93,10 @@ class DeltaWriterV2 {

void _update_profile(RuntimeProfile* profile);

RuntimeState* _state = nullptr;

bool _is_init = false;
bool _is_cancelled = false;
WriteRequest _req;
std::shared_ptr<WorkloadGroup> _workload_group;
std::shared_ptr<BetaRowsetWriterV2> _rowset_writer;
TabletSchemaSPtr _tablet_schema;
bool _delta_written_success = false;
Expand Down
18 changes: 15 additions & 3 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,11 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
<< " not found in schema, load_id=" << print_id(_load_id);
return std::unique_ptr<DeltaWriterV2>(nullptr);
}
return DeltaWriterV2::create_unique(&req, streams, _state);
std::shared_ptr<WorkloadGroup> workload_group = nullptr;
if (_state->get_query_ctx()) {
workload_group = _state->workload_group();
}
return DeltaWriterV2::create_unique(&req, streams, workload_group);
});
if (delta_writer == nullptr) {
LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id
Expand All @@ -594,7 +598,12 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
}
}
SCOPED_TIMER(_write_memtable_timer);
st = delta_writer->write(block.get(), rows.row_idxes);
st = delta_writer->write(block.get(), rows.row_idxes, [state = _state]() {
if (state->is_cancelled()) {
return state->cancel_reason();
}
return Status::OK();
});
return st;
}

Expand Down Expand Up @@ -677,7 +686,10 @@ Status VTabletWriterV2::close(Status exec_status) {
std::unordered_map<int64_t, int32_t> segments_for_tablet;
SCOPED_TIMER(_close_writer_timer);
// close all delta writers if this is the last user
auto st = _delta_writer_for_tablet->close(segments_for_tablet, _operator_profile);
RuntimeProfile* delta_writer_profile =
_state->enable_profile() && _state->profile_level() >= 2 ? _operator_profile
: nullptr;
auto st = _delta_writer_for_tablet->close(segments_for_tablet, delta_writer_profile);
_delta_writer_for_tablet.reset();
if (!st.ok()) {
_cancel(st);
Expand Down
13 changes: 6 additions & 7 deletions be/test/vec/exec/delta_writer_v2_pool_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,17 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
auto map = pool.get_or_create(load_id);
EXPECT_EQ(1, pool.size());
WriteRequest req;
RuntimeState state;
auto writer = map->get_or_create(100, [&req, &state]() {
auto writer = map->get_or_create(100, [&req]() {
return std::make_unique<DeltaWriterV2>(
&req, std::vector<std::shared_ptr<LoadStreamStub>> {}, &state);
&req, std::vector<std::shared_ptr<LoadStreamStub>> {}, nullptr);
});
auto writer2 = map->get_or_create(101, [&req, &state]() {
auto writer2 = map->get_or_create(101, [&req]() {
return std::make_unique<DeltaWriterV2>(
&req, std::vector<std::shared_ptr<LoadStreamStub>> {}, &state);
&req, std::vector<std::shared_ptr<LoadStreamStub>> {}, nullptr);
});
auto writer3 = map->get_or_create(100, [&req, &state]() {
auto writer3 = map->get_or_create(100, [&req]() {
return std::make_unique<DeltaWriterV2>(
&req, std::vector<std::shared_ptr<LoadStreamStub>> {}, &state);
&req, std::vector<std::shared_ptr<LoadStreamStub>> {}, nullptr);
});
EXPECT_EQ(2, map->size());
EXPECT_EQ(writer, writer3);
Expand Down
Loading
Loading