From e7bccb8b852c9ffecdcfd6f7b7154f3dfd112d4b Mon Sep 17 00:00:00 2001 From: jonyguo <guozhijian@huawei.com> Date: Fri, 26 Jun 2020 02:04:22 +0800 Subject: [PATCH 1/8] use distribute tasks in shard reader --- .../engine/datasetops/source/mindrecord_op.cc | 1 + .../include/shard_distributed_sample.h | 4 +++ .../ccsrc/mindrecord/include/shard_reader.h | 1 + .../ccsrc/mindrecord/include/shard_sample.h | 2 ++ .../ccsrc/mindrecord/include/shard_task.h | 2 ++ mindspore/ccsrc/mindrecord/io/shard_reader.cc | 27 +++++++++++++--- .../meta/shard_distributed_sample.cc | 32 ++++++++++++++++++- .../ccsrc/mindrecord/meta/shard_sample.cc | 4 +++ mindspore/ccsrc/mindrecord/meta/shard_task.cc | 4 +++ 9 files changed, 71 insertions(+), 6 deletions(-) diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc index a2c496f5e39..3d6dd39fc2f 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc @@ -463,6 +463,7 @@ Status MindRecordOp::Reset() { shard_reader_->Reset(); buffer_water_mark_ = 0; } else { + std::cout << "gzj in MindRecordOp::Reset" << std::endl; shard_reader_->ShuffleTask(); } shard_reader_wait_post_.Set(); diff --git a/mindspore/ccsrc/mindrecord/include/shard_distributed_sample.h b/mindspore/ccsrc/mindrecord/include/shard_distributed_sample.h index bfe1638fbf8..145d702c0f4 100644 --- a/mindspore/ccsrc/mindrecord/include/shard_distributed_sample.h +++ b/mindspore/ccsrc/mindrecord/include/shard_distributed_sample.h @@ -37,11 +37,15 @@ class ShardDistributedSample : public ShardSample { int64_t GetNumSamples(int64_t dataset_size, int64_t num_classes) override; + MSRStatus DoIt(ShardTask &tasks, ShardTask &tasks_distribute); + private: bool shuffle_; int no_of_padded_samples_; bool first_epoch_; // check (num_sample + num_padded) % num_shards == 0 in first epoch ShardTask task_; // maintain the input tasks in first epoch + + uint32_t seed_; }; } // namespace mindrecord } // namespace mindspore diff --git a/mindspore/ccsrc/mindrecord/include/shard_reader.h b/mindspore/ccsrc/mindrecord/include/shard_reader.h index 9be017c6467..9da482881c0 100644 --- a/mindspore/ccsrc/mindrecord/include/shard_reader.h +++ b/mindspore/ccsrc/mindrecord/include/shard_reader.h @@ -327,6 +327,7 @@ class ShardReader { std::map<string, uint64_t> column_schema_id_; // column-schema map std::vector<std::shared_ptr<ShardOperator>> operators_; // data operators, including shuffle, sample and category ShardTask tasks_; // shard task + ShardTask tasks_distribute_; std::mutex shard_locker_; // locker of shard // flags diff --git a/mindspore/ccsrc/mindrecord/include/shard_sample.h b/mindspore/ccsrc/mindrecord/include/shard_sample.h index 111df3bc1aa..6654300a8bd 100644 --- a/mindspore/ccsrc/mindrecord/include/shard_sample.h +++ b/mindspore/ccsrc/mindrecord/include/shard_sample.h @@ -44,6 +44,8 @@ class ShardSample : public ShardOperator { int64_t GetNumSamples(int64_t dataset_size, int64_t num_classes) override; + int GetDenominator(); + protected: int numerator_; int denominator_; diff --git a/mindspore/ccsrc/mindrecord/include/shard_task.h b/mindspore/ccsrc/mindrecord/include/shard_task.h index 08c7be815ed..2b6f6fcb305 100644 --- a/mindspore/ccsrc/mindrecord/include/shard_task.h +++ b/mindspore/ccsrc/mindrecord/include/shard_task.h @@ -43,6 +43,8 @@ class ShardTask { void InsertTask(std::tuple<TaskType, std::tuple<int, int>, std::vector<uint64_t>, json> task); + void InsertTask(uint32_t &i, std::tuple<TaskType, std::tuple<int, int>, std::vector<uint64_t>, json> task); + void PopBack(); uint32_t Size() const; diff --git a/mindspore/ccsrc/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/mindrecord/io/shard_reader.cc index d3863323237..c2b69e0932c 100644 --- a/mindspore/ccsrc/mindrecord/io/shard_reader.cc +++ b/mindspore/ccsrc/mindrecord/io/shard_reader.cc @@ -1065,9 +1065,19 @@ MSRStatus ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, u const auto &op = operators[operator_no]; if (std::dynamic_pointer_cast<ShardCategory>(op)) continue; if (block_reader_ && std::dynamic_pointer_cast<ShardShuffle>(op)) continue; - if (SUCCESS != (*op)(tasks_)) { + std::cout << "gzj 000" << std::endl; + if (std::dynamic_pointer_cast<ShardDistributedSample>(op)) { + for (int i=0; i<std::dynamic_pointer_cast<ShardDistributedSample>(op)->GetDenominator(); i++) { + tasks_distribute_.InsertTask(tasks_.GetTaskByID(i)); // init for distribute + tasks_distribute_.permutation_.push_back(i); + } + } + std::cout << "gzj 111" << std::endl; + assert(std::dynamic_pointer_cast<ShardDistributedSample>(op)); + if (SUCCESS != std::dynamic_pointer_cast<ShardDistributedSample>(op)->DoIt(tasks_, tasks_distribute_)) { return FAILED; } + std::cout << "gzj 222" << std::endl; } if (tasks_.permutation_.empty()) tasks_.MakePerm(); @@ -1079,13 +1089,16 @@ MSRStatus ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, u TASK_RETURN_CONTENT ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_id) { // All tasks are done - if (task_id >= static_cast<int>(tasks_.Size())) { + if (task_id >= static_cast<int>(tasks_distribute_.Size())) { return std::make_pair(FAILED, std::make_pair(TaskType::kCommonTask, std::vector<std::tuple<std::vector<uint8_t>, json>>())); } + std::cout << "gzj in ConsumerOneTask 000, task_id: " << task_id << ":" << consumer_id + << ", tasks_distribute_.Size: " << tasks_distribute_.Size() + << ", permutation_.size: " << tasks_distribute_.permutation_.size() << std::endl; // Pick up task from task list - auto task = tasks_.GetTaskByID(tasks_.permutation_[task_id]); + auto task = tasks_distribute_.GetTaskByID(tasks_distribute_.permutation_[task_id]); // check task type auto task_type = std::get<0>(task); @@ -1097,6 +1110,7 @@ TASK_RETURN_CONTENT ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_ auto shard_id = std::get<0>(std::get<1>(task)); auto group_id = std::get<1>(std::get<1>(task)); auto addr = std::get<2>(task); + std::cout << "gzj in ConsumerOneTask 111" << std::endl; const auto &ret = shard_header_->GetPageByGroupId(group_id, shard_id); if (SUCCESS != ret.first) { return std::make_pair(FAILED, @@ -1147,7 +1161,7 @@ MSRStatus ShardReader::ConsumerByRow(int consumer_id) { task_id = task_id_++; // All tasks are done - if (task_id >= static_cast<int>(tasks_.Size())) { + if (task_id >= static_cast<int>(tasks_distribute_.Size())) { return FAILED; } const auto &ret = ConsumerOneTask(task_id, consumer_id); @@ -1325,6 +1339,7 @@ std::vector<std::tuple<std::vector<uint8_t>, json>> ShardReader::GetNext() { std::pair<TaskType, std::vector<std::tuple<std::vector<uint8_t>, json>>> ShardReader::GetNextById( const int64_t &task_id, const int32_t &consumer_id) { + std::cout << "gzj in GetNextById 000" << std::endl; if (interrupt_) { return std::make_pair(TaskType::kCommonTask, std::vector<std::tuple<std::vector<uint8_t>, json>>()); } @@ -1333,6 +1348,7 @@ std::pair<TaskType, std::vector<std::tuple<std::vector<uint8_t>, json>>> ShardRe } const auto &ret = ConsumerOneTask(task_id, consumer_id); if (SUCCESS != ret.first) { + std::cout << "gzj in GetNextById 1111" << std::endl; return std::make_pair(TaskType::kCommonTask, std::vector<std::tuple<std::vector<uint8_t>, json>>()); } return std::move(ret.second); @@ -1385,6 +1401,7 @@ void ShardReader::Reset() { } void ShardReader::ShuffleTask() { + std::cout << "gzj in Shuffle Task" << std::endl; for (const auto &op : operators_) { if (block_reader_) { continue; @@ -1395,7 +1412,7 @@ void ShardReader::ShuffleTask() { MS_LOG(WARNING) << "Redo randomSampler failed."; } } else if (std::dynamic_pointer_cast<ShardDistributedSample>(op)) { - if (SUCCESS != (*op)(tasks_)) { + if (SUCCESS != std::dynamic_pointer_cast<ShardDistributedSample>(op)->DoIt(tasks_, tasks_distribute_)) { MS_LOG(WARNING) << "Redo distributeSampler failed."; } } diff --git a/mindspore/ccsrc/mindrecord/meta/shard_distributed_sample.cc b/mindspore/ccsrc/mindrecord/meta/shard_distributed_sample.cc index 4984c0d3cd0..37112eea624 100644 --- a/mindspore/ccsrc/mindrecord/meta/shard_distributed_sample.cc +++ b/mindspore/ccsrc/mindrecord/meta/shard_distributed_sample.cc @@ -27,7 +27,8 @@ ShardDistributedSample::ShardDistributedSample(int num_shards, int shard_id, int : ShardSample(1, num_shards, shard_id), shuffle_(shuffle), no_of_padded_samples_(no_of_padded_samples), - first_epoch_(true) { + first_epoch_(true), + seed_(0) { shuffle_op_ = std::make_shared<ShardShuffle>(seed, kShuffleSample); } @@ -72,5 +73,34 @@ MSRStatus ShardDistributedSample::PreExecute(ShardTask &tasks) { } return SUCCESS; } + +MSRStatus ShardDistributedSample::DoIt(ShardTask &tasks, ShardTask &tasks_distribute) { + MS_LOG(INFO) << "gzj tasks.Size: " << tasks.Size() << ", tasks_distribute.Size: " << tasks_distribute.Size(); + seed_++; + auto total_no = tasks.Size(); + if (no_of_padded_samples_ > 0 && first_epoch_) { + if (total_no % denominator_ != 0) { + MS_LOG(ERROR) << "Dataset size plus number of padded samples is not divisible by number of shards. " + << "task size: " << total_no << ", number padded: " << no_of_padded_samples_ + << ", denominator: " << denominator_; + return FAILED; + } + } + first_epoch_ = false; + if (shuffle_ == true) { + if (tasks.permutation_.empty() == true) { + tasks.MakePerm(); + } + std::shuffle(tasks.permutation_.begin(), tasks.permutation_.end(), std::default_random_engine(seed_)); + } + int taking = (total_no + denominator_ - 1) / denominator_; + uint32_t start = 0; + for (int i = partition_id_ * taking; i < (partition_id_ + 1) * taking; i++) { + tasks_distribute.InsertTask(start, tasks.GetTaskByID(tasks.permutation_[i] % total_no)); // rounding up. if overflow, go back to start + start++; + } + MS_LOG(INFO) << "gzj tasks.Size: " << tasks.Size() << ", tasks_distribute.Size: " << tasks_distribute.Size(); + return SUCCESS; +} } // namespace mindrecord } // namespace mindspore diff --git a/mindspore/ccsrc/mindrecord/meta/shard_sample.cc b/mindspore/ccsrc/mindrecord/meta/shard_sample.cc index c207747194a..23856c544dc 100644 --- a/mindspore/ccsrc/mindrecord/meta/shard_sample.cc +++ b/mindspore/ccsrc/mindrecord/meta/shard_sample.cc @@ -56,6 +56,10 @@ ShardSample::ShardSample(const std::vector<int64_t> &indices, uint32_t seed) shuffle_op_ = std::make_shared<ShardShuffle>(seed); } +int ShardSample::GetDenominator() { + return denominator_; +} + int64_t ShardSample::GetNumSamples(int64_t dataset_size, int64_t num_classes) { if (sampler_type_ == kCustomTopNSampler) { return no_of_samples_; diff --git a/mindspore/ccsrc/mindrecord/meta/shard_task.cc b/mindspore/ccsrc/mindrecord/meta/shard_task.cc index 50825e6fa2c..7a2208382a7 100644 --- a/mindspore/ccsrc/mindrecord/meta/shard_task.cc +++ b/mindspore/ccsrc/mindrecord/meta/shard_task.cc @@ -59,6 +59,10 @@ void ShardTask::InsertTask(std::tuple<TaskType, std::tuple<int, int>, std::vecto task_list_.push_back(std::move(task)); } +void ShardTask::InsertTask(uint32_t &i, std::tuple<TaskType, std::tuple<int, int>, std::vector<uint64_t>, json> task) { + task_list_[i] = std::move(task); +} + void ShardTask::PopBack() { task_list_.pop_back(); } uint32_t ShardTask::Size() const { return static_cast<uint32_t>(task_list_.size()); } -- Gitee From 75b18c98551ae3775199ad72c3e5228a50700026 Mon Sep 17 00:00:00 2001 From: jonyguo <guozhijian@huawei.com> Date: Fri, 26 Jun 2020 08:53:47 +0800 Subject: [PATCH 2/8] 2 --- mindspore/ccsrc/mindrecord/io/shard_reader.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mindspore/ccsrc/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/mindrecord/io/shard_reader.cc index c2b69e0932c..9402654b122 100644 --- a/mindspore/ccsrc/mindrecord/io/shard_reader.cc +++ b/mindspore/ccsrc/mindrecord/io/shard_reader.cc @@ -1066,8 +1066,10 @@ MSRStatus ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, u if (std::dynamic_pointer_cast<ShardCategory>(op)) continue; if (block_reader_ && std::dynamic_pointer_cast<ShardShuffle>(op)) continue; std::cout << "gzj 000" << std::endl; + int total_no = static_cast<int>(tasks_.Size()); + int taking = (total_no + std::dynamic_pointer_cast<ShardDistributedSample>(op)->GetDenominator() - 1) / std::dynamic_pointer_cast<ShardDistributedSample>(op)->GetDenominator(); if (std::dynamic_pointer_cast<ShardDistributedSample>(op)) { - for (int i=0; i<std::dynamic_pointer_cast<ShardDistributedSample>(op)->GetDenominator(); i++) { + for (int i=0; i<taking; i++) { tasks_distribute_.InsertTask(tasks_.GetTaskByID(i)); // init for distribute tasks_distribute_.permutation_.push_back(i); } @@ -1081,7 +1083,7 @@ MSRStatus ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, u } if (tasks_.permutation_.empty()) tasks_.MakePerm(); - num_rows_ = block_reader_ ? tasks_.SizeOfRows() : tasks_.Size(); + num_rows_ = block_reader_ ? tasks_.SizeOfRows() : tasks_distribute_.Size(); num_blocks_ = block_reader_ ? tasks_.Size() : 0; MS_LOG(INFO) << "Total rows is " << num_rows_; return SUCCESS; -- Gitee From d091edcad17f32b27993f2730a38ca5b88858ae8 Mon Sep 17 00:00:00 2001 From: jonyguo <guozhijian@huawei.com> Date: Fri, 26 Jun 2020 09:54:18 +0800 Subject: [PATCH 3/8] 3 --- .../ccsrc/mindrecord/include/shard_task.h | 5 +++++ mindspore/ccsrc/mindrecord/io/shard_reader.cc | 11 ++++++++++- mindspore/ccsrc/mindrecord/meta/shard_task.cc | 19 ++++++++++++++----- 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/mindspore/ccsrc/mindrecord/include/shard_task.h b/mindspore/ccsrc/mindrecord/include/shard_task.h index 2b6f6fcb305..5484890d6c0 100644 --- a/mindspore/ccsrc/mindrecord/include/shard_task.h +++ b/mindspore/ccsrc/mindrecord/include/shard_task.h @@ -41,6 +41,9 @@ class ShardTask { void InsertTask(TaskType task_type, int shard_id, int group_id, const std::vector<uint64_t> &offset, const json &label); + void InsertTask(uint32_t i, TaskType task_type, int shard_id, int group_id, const std::vector<uint64_t> &offset, + const json &label); + void InsertTask(std::tuple<TaskType, std::tuple<int, int>, std::vector<uint64_t>, json> task); void InsertTask(uint32_t &i, std::tuple<TaskType, std::tuple<int, int>, std::vector<uint64_t>, json> task); @@ -57,6 +60,8 @@ class ShardTask { static ShardTask Combine(std::vector<ShardTask> &category_tasks, bool replacement, int64_t num_elements); + void ResizeTaskList(uint32_t size); + uint32_t categories; std::vector<int> permutation_; diff --git a/mindspore/ccsrc/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/mindrecord/io/shard_reader.cc index 9402654b122..dd03521e3c8 100644 --- a/mindspore/ccsrc/mindrecord/io/shard_reader.cc +++ b/mindspore/ccsrc/mindrecord/io/shard_reader.cc @@ -1017,11 +1017,20 @@ MSRStatus ShardReader::CreateTasksByRow(const std::vector<std::tuple<int, int, i auto offsets = std::get<1>(ret); auto local_columns = std::get<2>(ret); if (shard_count_ <= kMaxShardCount) { + int count = 0; for (int shard_id = 0; shard_id < shard_count_; shard_id++) { for (uint32_t i = 0; i < offsets[shard_id].size(); i += 1) { - tasks_.InsertTask(TaskType::kCommonTask, offsets[shard_id][i][0], offsets[shard_id][i][1], + count++; + } + } + tasks_.ResizeTaskList(count); + int start = 0; + for (int shard_id = 0; shard_id < shard_count_; shard_id++) { + for (uint32_t i = 0; i < offsets[shard_id].size(); i += 1) { + tasks_.InsertTask(start, TaskType::kCommonTask, offsets[shard_id][i][0], offsets[shard_id][i][1], std::vector<uint64_t>{offsets[shard_id][i][2], offsets[shard_id][i][3]}, local_columns[shard_id][i]); + start++; } } } else { diff --git a/mindspore/ccsrc/mindrecord/meta/shard_task.cc b/mindspore/ccsrc/mindrecord/meta/shard_task.cc index 7a2208382a7..c102b2a5e7c 100644 --- a/mindspore/ccsrc/mindrecord/meta/shard_task.cc +++ b/mindspore/ccsrc/mindrecord/meta/shard_task.cc @@ -44,17 +44,26 @@ void ShardTask::MakePerm() { } } +void ShardTask::ResizeTaskList(uint32_t size) { + task_list_.resize(size); +} + void ShardTask::InsertTask(TaskType task_type, int shard_id, int group_id, const std::vector<uint64_t> &offset, const json &label) { - MS_LOG(DEBUG) << "Into insert task, shard_id: " << shard_id << ", group_id: " << group_id - << ", label: " << label.dump() << ", size of task_list_: " << task_list_.size() << "."; + // MS_LOG(DEBUG) << "Into insert task, shard_id: " << shard_id << ", group_id: " << group_id + // << ", label: " << label.dump() << ", size of task_list_: " << task_list_.size() << "."; task_list_.emplace_back(task_type, std::make_tuple(shard_id, group_id), offset, label); } +void ShardTask::InsertTask(uint32_t i, TaskType task_type, int shard_id, int group_id, const std::vector<uint64_t> &offset, + const json &label) { + task_list_[i] = {task_type, std::make_tuple(shard_id, group_id), offset, label}; +} + void ShardTask::InsertTask(std::tuple<TaskType, std::tuple<int, int>, std::vector<uint64_t>, json> task) { - MS_LOG(DEBUG) << "Into insert task, shard_id: " << std::get<0>(std::get<1>(task)) - << ", group_id: " << std::get<1>(std::get<1>(task)) << ", label: " << std::get<3>(task).dump() - << ", size of task_list_: " << task_list_.size() << "."; + // MS_LOG(DEBUG) << "Into insert task, shard_id: " << std::get<0>(std::get<1>(task)) + // << ", group_id: " << std::get<1>(std::get<1>(task)) << ", label: " << std::get<3>(task).dump() + // << ", size of task_list_: " << task_list_.size() << "."; task_list_.push_back(std::move(task)); } -- Gitee From df87bbaf48a30e90d091d285c6525afb5f82f2b4 Mon Sep 17 00:00:00 2001 From: jonyguo <guozhijian@huawei.com> Date: Fri, 26 Jun 2020 10:13:24 +0800 Subject: [PATCH 4/8] multi thread insert tasks --- mindspore/ccsrc/mindrecord/io/shard_reader.cc | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/mindspore/ccsrc/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/mindrecord/io/shard_reader.cc index dd03521e3c8..66ebf056a48 100644 --- a/mindspore/ccsrc/mindrecord/io/shard_reader.cc +++ b/mindspore/ccsrc/mindrecord/io/shard_reader.cc @@ -1024,14 +1024,23 @@ MSRStatus ShardReader::CreateTasksByRow(const std::vector<std::tuple<int, int, i } } tasks_.ResizeTaskList(count); - int start = 0; + int current_offset = 0; + std::vector<std::thread> thread_for_init_tasks(shard_count_); for (int shard_id = 0; shard_id < shard_count_; shard_id++) { - for (uint32_t i = 0; i < offsets[shard_id].size(); i += 1) { - tasks_.InsertTask(start, TaskType::kCommonTask, offsets[shard_id][i][0], offsets[shard_id][i][1], - std::vector<uint64_t>{offsets[shard_id][i][2], offsets[shard_id][i][3]}, - local_columns[shard_id][i]); - start++; - } + thread_for_init_tasks[shard_id] = std::thread([this, &offsets, &local_columns, shard_id, current_offset] () { + auto l_current_offset = current_offset; + for (uint32_t i = 0; i < offsets[shard_id].size(); i += 1) { + tasks_.InsertTask(l_current_offset, TaskType::kCommonTask, offsets[shard_id][i][0], offsets[shard_id][i][1], + std::vector<uint64_t>{offsets[shard_id][i][2], offsets[shard_id][i][3]}, + local_columns[shard_id][i]); + l_current_offset++; + } + }); + current_offset = current_offset + offsets[shard_id].size(); + } + + for (int shard_id = 0; shard_id < shard_count_; shard_id++) { + thread_for_init_tasks[shard_id].join(); } } else { return FAILED; -- Gitee From 57df6b9f79b9d2c33ccf9e44b26304ed56673047 Mon Sep 17 00:00:00 2001 From: jonyguo <guozhijian@huawei.com> Date: Fri, 26 Jun 2020 10:59:05 +0800 Subject: [PATCH 5/8] del log --- .../engine/datasetops/source/mindrecord_op.cc | 2 +- mindspore/ccsrc/mindrecord/io/shard_reader.cc | 21 ++++++++++--------- .../meta/shard_distributed_sample.cc | 4 ++-- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc index 3d6dd39fc2f..fcd69aaca0c 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc @@ -463,7 +463,7 @@ Status MindRecordOp::Reset() { shard_reader_->Reset(); buffer_water_mark_ = 0; } else { - std::cout << "gzj in MindRecordOp::Reset" << std::endl; + // std::cout << "gzj in MindRecordOp::Reset" << std::endl; shard_reader_->ShuffleTask(); } shard_reader_wait_post_.Set(); diff --git a/mindspore/ccsrc/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/mindrecord/io/shard_reader.cc index 66ebf056a48..90b93212c72 100644 --- a/mindspore/ccsrc/mindrecord/io/shard_reader.cc +++ b/mindspore/ccsrc/mindrecord/io/shard_reader.cc @@ -1035,6 +1035,7 @@ MSRStatus ShardReader::CreateTasksByRow(const std::vector<std::tuple<int, int, i local_columns[shard_id][i]); l_current_offset++; } + MS_LOG(INFO) << "gzj task " << shard_id; }); current_offset = current_offset + offsets[shard_id].size(); } @@ -1083,7 +1084,7 @@ MSRStatus ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, u const auto &op = operators[operator_no]; if (std::dynamic_pointer_cast<ShardCategory>(op)) continue; if (block_reader_ && std::dynamic_pointer_cast<ShardShuffle>(op)) continue; - std::cout << "gzj 000" << std::endl; + // std::cout << "gzj 000" << std::endl; int total_no = static_cast<int>(tasks_.Size()); int taking = (total_no + std::dynamic_pointer_cast<ShardDistributedSample>(op)->GetDenominator() - 1) / std::dynamic_pointer_cast<ShardDistributedSample>(op)->GetDenominator(); if (std::dynamic_pointer_cast<ShardDistributedSample>(op)) { @@ -1092,12 +1093,12 @@ MSRStatus ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, u tasks_distribute_.permutation_.push_back(i); } } - std::cout << "gzj 111" << std::endl; + // std::cout << "gzj 111" << std::endl; assert(std::dynamic_pointer_cast<ShardDistributedSample>(op)); if (SUCCESS != std::dynamic_pointer_cast<ShardDistributedSample>(op)->DoIt(tasks_, tasks_distribute_)) { return FAILED; } - std::cout << "gzj 222" << std::endl; + // std::cout << "gzj 222" << std::endl; } if (tasks_.permutation_.empty()) tasks_.MakePerm(); @@ -1113,9 +1114,9 @@ TASK_RETURN_CONTENT ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_ return std::make_pair(FAILED, std::make_pair(TaskType::kCommonTask, std::vector<std::tuple<std::vector<uint8_t>, json>>())); } - std::cout << "gzj in ConsumerOneTask 000, task_id: " << task_id << ":" << consumer_id - << ", tasks_distribute_.Size: " << tasks_distribute_.Size() - << ", permutation_.size: " << tasks_distribute_.permutation_.size() << std::endl; + // std::cout << "gzj in ConsumerOneTask 000, task_id: " << task_id << ":" << consumer_id + // << ", tasks_distribute_.Size: " << tasks_distribute_.Size() + // << ", permutation_.size: " << tasks_distribute_.permutation_.size() << std::endl; // Pick up task from task list auto task = tasks_distribute_.GetTaskByID(tasks_distribute_.permutation_[task_id]); @@ -1130,7 +1131,7 @@ TASK_RETURN_CONTENT ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_ auto shard_id = std::get<0>(std::get<1>(task)); auto group_id = std::get<1>(std::get<1>(task)); auto addr = std::get<2>(task); - std::cout << "gzj in ConsumerOneTask 111" << std::endl; + // std::cout << "gzj in ConsumerOneTask 111" << std::endl; const auto &ret = shard_header_->GetPageByGroupId(group_id, shard_id); if (SUCCESS != ret.first) { return std::make_pair(FAILED, @@ -1359,7 +1360,7 @@ std::vector<std::tuple<std::vector<uint8_t>, json>> ShardReader::GetNext() { std::pair<TaskType, std::vector<std::tuple<std::vector<uint8_t>, json>>> ShardReader::GetNextById( const int64_t &task_id, const int32_t &consumer_id) { - std::cout << "gzj in GetNextById 000" << std::endl; + // std::cout << "gzj in GetNextById 000" << std::endl; if (interrupt_) { return std::make_pair(TaskType::kCommonTask, std::vector<std::tuple<std::vector<uint8_t>, json>>()); } @@ -1368,7 +1369,7 @@ std::pair<TaskType, std::vector<std::tuple<std::vector<uint8_t>, json>>> ShardRe } const auto &ret = ConsumerOneTask(task_id, consumer_id); if (SUCCESS != ret.first) { - std::cout << "gzj in GetNextById 1111" << std::endl; + // std::cout << "gzj in GetNextById 1111" << std::endl; return std::make_pair(TaskType::kCommonTask, std::vector<std::tuple<std::vector<uint8_t>, json>>()); } return std::move(ret.second); @@ -1421,7 +1422,7 @@ void ShardReader::Reset() { } void ShardReader::ShuffleTask() { - std::cout << "gzj in Shuffle Task" << std::endl; + // std::cout << "gzj in Shuffle Task" << std::endl; for (const auto &op : operators_) { if (block_reader_) { continue; diff --git a/mindspore/ccsrc/mindrecord/meta/shard_distributed_sample.cc b/mindspore/ccsrc/mindrecord/meta/shard_distributed_sample.cc index 37112eea624..4dda0dd2452 100644 --- a/mindspore/ccsrc/mindrecord/meta/shard_distributed_sample.cc +++ b/mindspore/ccsrc/mindrecord/meta/shard_distributed_sample.cc @@ -75,7 +75,7 @@ MSRStatus ShardDistributedSample::PreExecute(ShardTask &tasks) { } MSRStatus ShardDistributedSample::DoIt(ShardTask &tasks, ShardTask &tasks_distribute) { - MS_LOG(INFO) << "gzj tasks.Size: " << tasks.Size() << ", tasks_distribute.Size: " << tasks_distribute.Size(); + // MS_LOG(INFO) << "gzj tasks.Size: " << tasks.Size() << ", tasks_distribute.Size: " << tasks_distribute.Size(); seed_++; auto total_no = tasks.Size(); if (no_of_padded_samples_ > 0 && first_epoch_) { @@ -99,7 +99,7 @@ MSRStatus ShardDistributedSample::DoIt(ShardTask &tasks, ShardTask &tasks_distri tasks_distribute.InsertTask(start, tasks.GetTaskByID(tasks.permutation_[i] % total_no)); // rounding up. if overflow, go back to start start++; } - MS_LOG(INFO) << "gzj tasks.Size: " << tasks.Size() << ", tasks_distribute.Size: " << tasks_distribute.Size(); + // MS_LOG(INFO) << "gzj tasks.Size: " << tasks.Size() << ", tasks_distribute.Size: " << tasks_distribute.Size(); return SUCCESS; } } // namespace mindrecord -- Gitee From c42bc04d6b0e9fba2af845202b6c2c7f807078b1 Mon Sep 17 00:00:00 2001 From: jonyguo <guozhijian@huawei.com> Date: Fri, 26 Jun 2020 11:25:13 +0800 Subject: [PATCH 6/8] add some log --- mindspore/ccsrc/mindrecord/io/shard_reader.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mindspore/ccsrc/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/mindrecord/io/shard_reader.cc index 90b93212c72..69d126c6736 100644 --- a/mindspore/ccsrc/mindrecord/io/shard_reader.cc +++ b/mindspore/ccsrc/mindrecord/io/shard_reader.cc @@ -427,6 +427,7 @@ ROW_GROUPS ShardReader::ReadAllRowGroup(std::vector<std::string> &columns) { } else { // fetch raw data from Raw page while some field is not index. fields += ", PAGE_ID_RAW, PAGE_OFFSET_RAW, PAGE_OFFSET_RAW_END "; } + MS_LOG(INFO) << "gzj before ReadAllRowsInShard"; std::string sql = "SELECT " + fields + " FROM INDEXES ORDER BY ROW_ID ;"; @@ -1010,10 +1011,12 @@ MSRStatus ShardReader::CreateTasksByRow(const std::vector<std::tuple<int, int, i const std::vector<std::shared_ptr<ShardOperator>> &operators) { CheckIfColumnInIndex(selected_columns_); + MS_LOG(INFO) << "gzj before ReadAllRowGroup"; auto ret = ReadAllRowGroup(selected_columns_); if (std::get<0>(ret) != SUCCESS) { return FAILED; } + MS_LOG(INFO) << "gzj after ReadAllRowGroup"; auto offsets = std::get<1>(ret); auto local_columns = std::get<2>(ret); if (shard_count_ <= kMaxShardCount) { -- Gitee From 30fc766692334674fa5954dcff5336fef93405a1 Mon Sep 17 00:00:00 2001 From: jonyguo <guozhijian@huawei.com> Date: Fri, 26 Jun 2020 11:56:51 +0800 Subject: [PATCH 7/8] optimize: count & add log --- mindspore/ccsrc/mindrecord/io/shard_reader.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/mindspore/ccsrc/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/mindrecord/io/shard_reader.cc index 69d126c6736..c641926ff1c 100644 --- a/mindspore/ccsrc/mindrecord/io/shard_reader.cc +++ b/mindspore/ccsrc/mindrecord/io/shard_reader.cc @@ -1022,13 +1022,12 @@ MSRStatus ShardReader::CreateTasksByRow(const std::vector<std::tuple<int, int, i if (shard_count_ <= kMaxShardCount) { int count = 0; for (int shard_id = 0; shard_id < shard_count_; shard_id++) { - for (uint32_t i = 0; i < offsets[shard_id].size(); i += 1) { - count++; - } + count += offsets[shard_id].size(); } tasks_.ResizeTaskList(count); int current_offset = 0; std::vector<std::thread> thread_for_init_tasks(shard_count_); + MS_LOG(INFO) << "gzj befor multi thread"; for (int shard_id = 0; shard_id < shard_count_; shard_id++) { thread_for_init_tasks[shard_id] = std::thread([this, &offsets, &local_columns, shard_id, current_offset] () { auto l_current_offset = current_offset; -- Gitee From eb9b798fd7950d06e3456df38db9900a68d625e3 Mon Sep 17 00:00:00 2001 From: jonyguo <guozhijian@huawei.com> Date: Fri, 26 Jun 2020 12:15:49 +0800 Subject: [PATCH 8/8] newest --- mindspore/ccsrc/mindrecord/io/shard_reader.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mindspore/ccsrc/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/mindrecord/io/shard_reader.cc index c641926ff1c..814d513320c 100644 --- a/mindspore/ccsrc/mindrecord/io/shard_reader.cc +++ b/mindspore/ccsrc/mindrecord/io/shard_reader.cc @@ -1020,12 +1020,15 @@ MSRStatus ShardReader::CreateTasksByRow(const std::vector<std::tuple<int, int, i auto offsets = std::get<1>(ret); auto local_columns = std::get<2>(ret); if (shard_count_ <= kMaxShardCount) { + MS_LOG(INFO) << "gzj before count"; int count = 0; for (int shard_id = 0; shard_id < shard_count_; shard_id++) { count += offsets[shard_id].size(); } + MS_LOG(INFO) << "gzj after count"; tasks_.ResizeTaskList(count); int current_offset = 0; + MS_LOG(INFO) << "gzj after resize"; std::vector<std::thread> thread_for_init_tasks(shard_count_); MS_LOG(INFO) << "gzj befor multi thread"; for (int shard_id = 0; shard_id < shard_count_; shard_id++) { -- Gitee