From e7bccb8b852c9ffecdcfd6f7b7154f3dfd112d4b Mon Sep 17 00:00:00 2001
From: jonyguo <guozhijian@huawei.com>
Date: Fri, 26 Jun 2020 02:04:22 +0800
Subject: [PATCH 1/8] use distribute tasks in shard reader

---
 .../engine/datasetops/source/mindrecord_op.cc |  1 +
 .../include/shard_distributed_sample.h        |  4 +++
 .../ccsrc/mindrecord/include/shard_reader.h   |  1 +
 .../ccsrc/mindrecord/include/shard_sample.h   |  2 ++
 .../ccsrc/mindrecord/include/shard_task.h     |  2 ++
 mindspore/ccsrc/mindrecord/io/shard_reader.cc | 27 +++++++++++++---
 .../meta/shard_distributed_sample.cc          | 32 ++++++++++++++++++-
 .../ccsrc/mindrecord/meta/shard_sample.cc     |  4 +++
 mindspore/ccsrc/mindrecord/meta/shard_task.cc |  4 +++
 9 files changed, 71 insertions(+), 6 deletions(-)

diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc
index a2c496f5e39..3d6dd39fc2f 100644
--- a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc
+++ b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc
@@ -463,6 +463,7 @@ Status MindRecordOp::Reset() {
     shard_reader_->Reset();
     buffer_water_mark_ = 0;
   } else {
+    std::cout << "gzj in MindRecordOp::Reset" << std::endl;	  
     shard_reader_->ShuffleTask();
   }
   shard_reader_wait_post_.Set();
diff --git a/mindspore/ccsrc/mindrecord/include/shard_distributed_sample.h b/mindspore/ccsrc/mindrecord/include/shard_distributed_sample.h
index bfe1638fbf8..145d702c0f4 100644
--- a/mindspore/ccsrc/mindrecord/include/shard_distributed_sample.h
+++ b/mindspore/ccsrc/mindrecord/include/shard_distributed_sample.h
@@ -37,11 +37,15 @@ class ShardDistributedSample : public ShardSample {
 
   int64_t GetNumSamples(int64_t dataset_size, int64_t num_classes) override;
 
+  MSRStatus DoIt(ShardTask &tasks, ShardTask &tasks_distribute);
+
  private:
   bool shuffle_;
   int no_of_padded_samples_;
   bool first_epoch_;  // check  (num_sample + num_padded) % num_shards == 0 in first epoch
   ShardTask task_;    // maintain the input tasks in first epoch
+
+  uint32_t seed_;
 };
 }  // namespace mindrecord
 }  // namespace mindspore
diff --git a/mindspore/ccsrc/mindrecord/include/shard_reader.h b/mindspore/ccsrc/mindrecord/include/shard_reader.h
index 9be017c6467..9da482881c0 100644
--- a/mindspore/ccsrc/mindrecord/include/shard_reader.h
+++ b/mindspore/ccsrc/mindrecord/include/shard_reader.h
@@ -327,6 +327,7 @@ class ShardReader {
   std::map<string, uint64_t> column_schema_id_;            // column-schema map
   std::vector<std::shared_ptr<ShardOperator>> operators_;  // data operators, including shuffle, sample and category
   ShardTask tasks_;                                        // shard task
+  ShardTask tasks_distribute_;
   std::mutex shard_locker_;                                // locker of shard
 
   // flags
diff --git a/mindspore/ccsrc/mindrecord/include/shard_sample.h b/mindspore/ccsrc/mindrecord/include/shard_sample.h
index 111df3bc1aa..6654300a8bd 100644
--- a/mindspore/ccsrc/mindrecord/include/shard_sample.h
+++ b/mindspore/ccsrc/mindrecord/include/shard_sample.h
@@ -44,6 +44,8 @@ class ShardSample : public ShardOperator {
 
   int64_t GetNumSamples(int64_t dataset_size, int64_t num_classes) override;
 
+  int GetDenominator();
+
  protected:
   int numerator_;
   int denominator_;
diff --git a/mindspore/ccsrc/mindrecord/include/shard_task.h b/mindspore/ccsrc/mindrecord/include/shard_task.h
index 08c7be815ed..2b6f6fcb305 100644
--- a/mindspore/ccsrc/mindrecord/include/shard_task.h
+++ b/mindspore/ccsrc/mindrecord/include/shard_task.h
@@ -43,6 +43,8 @@ class ShardTask {
 
   void InsertTask(std::tuple<TaskType, std::tuple<int, int>, std::vector<uint64_t>, json> task);
 
+  void InsertTask(uint32_t &i, std::tuple<TaskType, std::tuple<int, int>, std::vector<uint64_t>, json> task);
+
   void PopBack();
 
   uint32_t Size() const;
diff --git a/mindspore/ccsrc/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/mindrecord/io/shard_reader.cc
index d3863323237..c2b69e0932c 100644
--- a/mindspore/ccsrc/mindrecord/io/shard_reader.cc
+++ b/mindspore/ccsrc/mindrecord/io/shard_reader.cc
@@ -1065,9 +1065,19 @@ MSRStatus ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, u
     const auto &op = operators[operator_no];
     if (std::dynamic_pointer_cast<ShardCategory>(op)) continue;
     if (block_reader_ && std::dynamic_pointer_cast<ShardShuffle>(op)) continue;
-    if (SUCCESS != (*op)(tasks_)) {
+    std::cout << "gzj 000" << std::endl;
+    if (std::dynamic_pointer_cast<ShardDistributedSample>(op)) {
+      for (int i=0; i<std::dynamic_pointer_cast<ShardDistributedSample>(op)->GetDenominator(); i++) {
+        tasks_distribute_.InsertTask(tasks_.GetTaskByID(i));     // init for distribute
+	tasks_distribute_.permutation_.push_back(i);
+      }
+    }
+    std::cout << "gzj 111" << std::endl;
+    assert(std::dynamic_pointer_cast<ShardDistributedSample>(op));
+    if (SUCCESS != std::dynamic_pointer_cast<ShardDistributedSample>(op)->DoIt(tasks_, tasks_distribute_)) {
       return FAILED;
     }
+    std::cout << "gzj 222" << std::endl;
   }
 
   if (tasks_.permutation_.empty()) tasks_.MakePerm();
@@ -1079,13 +1089,16 @@ MSRStatus ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, u
 
 TASK_RETURN_CONTENT ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_id) {
   // All tasks are done
-  if (task_id >= static_cast<int>(tasks_.Size())) {
+  if (task_id >= static_cast<int>(tasks_distribute_.Size())) {
     return std::make_pair(FAILED,
                           std::make_pair(TaskType::kCommonTask, std::vector<std::tuple<std::vector<uint8_t>, json>>()));
   }
+  std::cout << "gzj in ConsumerOneTask 000, task_id: " << task_id << ":" << consumer_id
+	    << ", tasks_distribute_.Size: " << tasks_distribute_.Size()
+	    << ", permutation_.size: " << tasks_distribute_.permutation_.size() << std::endl;
 
   // Pick up task from task list
-  auto task = tasks_.GetTaskByID(tasks_.permutation_[task_id]);
+  auto task = tasks_distribute_.GetTaskByID(tasks_distribute_.permutation_[task_id]);
 
   // check task type
   auto task_type = std::get<0>(task);
@@ -1097,6 +1110,7 @@ TASK_RETURN_CONTENT ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_
   auto shard_id = std::get<0>(std::get<1>(task));
   auto group_id = std::get<1>(std::get<1>(task));
   auto addr = std::get<2>(task);
+  std::cout << "gzj in ConsumerOneTask 111" << std::endl;
   const auto &ret = shard_header_->GetPageByGroupId(group_id, shard_id);
   if (SUCCESS != ret.first) {
     return std::make_pair(FAILED,
@@ -1147,7 +1161,7 @@ MSRStatus ShardReader::ConsumerByRow(int consumer_id) {
     task_id = task_id_++;
 
     // All tasks are done
-    if (task_id >= static_cast<int>(tasks_.Size())) {
+    if (task_id >= static_cast<int>(tasks_distribute_.Size())) {
       return FAILED;
     }
     const auto &ret = ConsumerOneTask(task_id, consumer_id);
@@ -1325,6 +1339,7 @@ std::vector<std::tuple<std::vector<uint8_t>, json>> ShardReader::GetNext() {
 
 std::pair<TaskType, std::vector<std::tuple<std::vector<uint8_t>, json>>> ShardReader::GetNextById(
   const int64_t &task_id, const int32_t &consumer_id) {
+  std::cout << "gzj in GetNextById 000" << std::endl;
   if (interrupt_) {
     return std::make_pair(TaskType::kCommonTask, std::vector<std::tuple<std::vector<uint8_t>, json>>());
   }
@@ -1333,6 +1348,7 @@ std::pair<TaskType, std::vector<std::tuple<std::vector<uint8_t>, json>>> ShardRe
   }
   const auto &ret = ConsumerOneTask(task_id, consumer_id);
   if (SUCCESS != ret.first) {
+    std::cout << "gzj in GetNextById 1111" << std::endl;
     return std::make_pair(TaskType::kCommonTask, std::vector<std::tuple<std::vector<uint8_t>, json>>());
   }
   return std::move(ret.second);
@@ -1385,6 +1401,7 @@ void ShardReader::Reset() {
 }
 
 void ShardReader::ShuffleTask() {
+  std::cout << "gzj in Shuffle Task" << std::endl;
   for (const auto &op : operators_) {
     if (block_reader_) {
       continue;
@@ -1395,7 +1412,7 @@ void ShardReader::ShuffleTask() {
         MS_LOG(WARNING) << "Redo randomSampler failed.";
       }
     } else if (std::dynamic_pointer_cast<ShardDistributedSample>(op)) {
-      if (SUCCESS != (*op)(tasks_)) {
+      if (SUCCESS != std::dynamic_pointer_cast<ShardDistributedSample>(op)->DoIt(tasks_, tasks_distribute_)) {
         MS_LOG(WARNING) << "Redo distributeSampler failed.";
       }
     }
diff --git a/mindspore/ccsrc/mindrecord/meta/shard_distributed_sample.cc b/mindspore/ccsrc/mindrecord/meta/shard_distributed_sample.cc
index 4984c0d3cd0..37112eea624 100644
--- a/mindspore/ccsrc/mindrecord/meta/shard_distributed_sample.cc
+++ b/mindspore/ccsrc/mindrecord/meta/shard_distributed_sample.cc
@@ -27,7 +27,8 @@ ShardDistributedSample::ShardDistributedSample(int num_shards, int shard_id, int
     : ShardSample(1, num_shards, shard_id),
       shuffle_(shuffle),
       no_of_padded_samples_(no_of_padded_samples),
-      first_epoch_(true) {
+      first_epoch_(true),
+      seed_(0) {
   shuffle_op_ = std::make_shared<ShardShuffle>(seed, kShuffleSample);
 }
 
@@ -72,5 +73,34 @@ MSRStatus ShardDistributedSample::PreExecute(ShardTask &tasks) {
   }
   return SUCCESS;
 }
+
+MSRStatus ShardDistributedSample::DoIt(ShardTask &tasks, ShardTask &tasks_distribute) {
+  MS_LOG(INFO) << "gzj tasks.Size: " << tasks.Size() << ", tasks_distribute.Size: " << tasks_distribute.Size();
+  seed_++;
+  auto total_no = tasks.Size();
+  if (no_of_padded_samples_ > 0 && first_epoch_) {
+    if (total_no % denominator_ != 0) {
+      MS_LOG(ERROR) << "Dataset size plus number of padded samples is not divisible by number of shards. "
+                    << "task size: " << total_no << ", number padded: " << no_of_padded_samples_
+                    << ", denominator: " << denominator_;
+      return FAILED;
+    }
+  }
+  first_epoch_ = false;
+  if (shuffle_ == true) {
+    if (tasks.permutation_.empty() == true) {
+      tasks.MakePerm();
+    }
+    std::shuffle(tasks.permutation_.begin(), tasks.permutation_.end(), std::default_random_engine(seed_));
+  }
+  int taking = (total_no + denominator_ - 1) / denominator_;
+  uint32_t start = 0;
+  for (int i = partition_id_ * taking; i < (partition_id_ + 1) * taking; i++) {
+    tasks_distribute.InsertTask(start, tasks.GetTaskByID(tasks.permutation_[i] % total_no));  // rounding up. if overflow, go back to start
+    start++;
+  }
+  MS_LOG(INFO) << "gzj tasks.Size: " << tasks.Size() << ", tasks_distribute.Size: " << tasks_distribute.Size();
+  return SUCCESS;
+}
 }  // namespace mindrecord
 }  // namespace mindspore
diff --git a/mindspore/ccsrc/mindrecord/meta/shard_sample.cc b/mindspore/ccsrc/mindrecord/meta/shard_sample.cc
index c207747194a..23856c544dc 100644
--- a/mindspore/ccsrc/mindrecord/meta/shard_sample.cc
+++ b/mindspore/ccsrc/mindrecord/meta/shard_sample.cc
@@ -56,6 +56,10 @@ ShardSample::ShardSample(const std::vector<int64_t> &indices, uint32_t seed)
   shuffle_op_ = std::make_shared<ShardShuffle>(seed);
 }
 
+int ShardSample::GetDenominator() {
+  return denominator_;
+}
+
 int64_t ShardSample::GetNumSamples(int64_t dataset_size, int64_t num_classes) {
   if (sampler_type_ == kCustomTopNSampler) {
     return no_of_samples_;
diff --git a/mindspore/ccsrc/mindrecord/meta/shard_task.cc b/mindspore/ccsrc/mindrecord/meta/shard_task.cc
index 50825e6fa2c..7a2208382a7 100644
--- a/mindspore/ccsrc/mindrecord/meta/shard_task.cc
+++ b/mindspore/ccsrc/mindrecord/meta/shard_task.cc
@@ -59,6 +59,10 @@ void ShardTask::InsertTask(std::tuple<TaskType, std::tuple<int, int>, std::vecto
   task_list_.push_back(std::move(task));
 }
 
+void ShardTask::InsertTask(uint32_t &i, std::tuple<TaskType, std::tuple<int, int>, std::vector<uint64_t>, json> task) {
+  task_list_[i] = std::move(task);
+}
+
 void ShardTask::PopBack() { task_list_.pop_back(); }
 
 uint32_t ShardTask::Size() const { return static_cast<uint32_t>(task_list_.size()); }
-- 
Gitee


From 75b18c98551ae3775199ad72c3e5228a50700026 Mon Sep 17 00:00:00 2001
From: jonyguo <guozhijian@huawei.com>
Date: Fri, 26 Jun 2020 08:53:47 +0800
Subject: [PATCH 2/8] 2

---
 mindspore/ccsrc/mindrecord/io/shard_reader.cc | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/mindspore/ccsrc/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/mindrecord/io/shard_reader.cc
index c2b69e0932c..9402654b122 100644
--- a/mindspore/ccsrc/mindrecord/io/shard_reader.cc
+++ b/mindspore/ccsrc/mindrecord/io/shard_reader.cc
@@ -1066,8 +1066,10 @@ MSRStatus ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, u
     if (std::dynamic_pointer_cast<ShardCategory>(op)) continue;
     if (block_reader_ && std::dynamic_pointer_cast<ShardShuffle>(op)) continue;
     std::cout << "gzj 000" << std::endl;
+    int total_no = static_cast<int>(tasks_.Size());
+    int taking = (total_no + std::dynamic_pointer_cast<ShardDistributedSample>(op)->GetDenominator() - 1) / std::dynamic_pointer_cast<ShardDistributedSample>(op)->GetDenominator();
     if (std::dynamic_pointer_cast<ShardDistributedSample>(op)) {
-      for (int i=0; i<std::dynamic_pointer_cast<ShardDistributedSample>(op)->GetDenominator(); i++) {
+      for (int i=0; i<taking; i++) {
         tasks_distribute_.InsertTask(tasks_.GetTaskByID(i));     // init for distribute
 	tasks_distribute_.permutation_.push_back(i);
       }
@@ -1081,7 +1083,7 @@ MSRStatus ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, u
   }
 
   if (tasks_.permutation_.empty()) tasks_.MakePerm();
-  num_rows_ = block_reader_ ? tasks_.SizeOfRows() : tasks_.Size();
+  num_rows_ = block_reader_ ? tasks_.SizeOfRows() : tasks_distribute_.Size();
   num_blocks_ = block_reader_ ? tasks_.Size() : 0;
   MS_LOG(INFO) << "Total rows is " << num_rows_;
   return SUCCESS;
-- 
Gitee


From d091edcad17f32b27993f2730a38ca5b88858ae8 Mon Sep 17 00:00:00 2001
From: jonyguo <guozhijian@huawei.com>
Date: Fri, 26 Jun 2020 09:54:18 +0800
Subject: [PATCH 3/8] 3

---
 .../ccsrc/mindrecord/include/shard_task.h     |  5 +++++
 mindspore/ccsrc/mindrecord/io/shard_reader.cc | 11 ++++++++++-
 mindspore/ccsrc/mindrecord/meta/shard_task.cc | 19 ++++++++++++++-----
 3 files changed, 29 insertions(+), 6 deletions(-)

diff --git a/mindspore/ccsrc/mindrecord/include/shard_task.h b/mindspore/ccsrc/mindrecord/include/shard_task.h
index 2b6f6fcb305..5484890d6c0 100644
--- a/mindspore/ccsrc/mindrecord/include/shard_task.h
+++ b/mindspore/ccsrc/mindrecord/include/shard_task.h
@@ -41,6 +41,9 @@ class ShardTask {
   void InsertTask(TaskType task_type, int shard_id, int group_id, const std::vector<uint64_t> &offset,
                   const json &label);
 
+  void InsertTask(uint32_t i, TaskType task_type, int shard_id, int group_id, const std::vector<uint64_t> &offset,
+                           const json &label);
+
   void InsertTask(std::tuple<TaskType, std::tuple<int, int>, std::vector<uint64_t>, json> task);
 
   void InsertTask(uint32_t &i, std::tuple<TaskType, std::tuple<int, int>, std::vector<uint64_t>, json> task);
@@ -57,6 +60,8 @@ class ShardTask {
 
   static ShardTask Combine(std::vector<ShardTask> &category_tasks, bool replacement, int64_t num_elements);
 
+  void ResizeTaskList(uint32_t size);
+
   uint32_t categories;
 
   std::vector<int> permutation_;
diff --git a/mindspore/ccsrc/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/mindrecord/io/shard_reader.cc
index 9402654b122..dd03521e3c8 100644
--- a/mindspore/ccsrc/mindrecord/io/shard_reader.cc
+++ b/mindspore/ccsrc/mindrecord/io/shard_reader.cc
@@ -1017,11 +1017,20 @@ MSRStatus ShardReader::CreateTasksByRow(const std::vector<std::tuple<int, int, i
   auto offsets = std::get<1>(ret);
   auto local_columns = std::get<2>(ret);
   if (shard_count_ <= kMaxShardCount) {
+    int count = 0;
     for (int shard_id = 0; shard_id < shard_count_; shard_id++) {
       for (uint32_t i = 0; i < offsets[shard_id].size(); i += 1) {
-        tasks_.InsertTask(TaskType::kCommonTask, offsets[shard_id][i][0], offsets[shard_id][i][1],
+        count++;
+      }
+    }
+    tasks_.ResizeTaskList(count);
+    int start = 0;
+    for (int shard_id = 0; shard_id < shard_count_; shard_id++) {
+      for (uint32_t i = 0; i < offsets[shard_id].size(); i += 1) {
+        tasks_.InsertTask(start, TaskType::kCommonTask, offsets[shard_id][i][0], offsets[shard_id][i][1],
                           std::vector<uint64_t>{offsets[shard_id][i][2], offsets[shard_id][i][3]},
                           local_columns[shard_id][i]);
+	start++;
       }
     }
   } else {
diff --git a/mindspore/ccsrc/mindrecord/meta/shard_task.cc b/mindspore/ccsrc/mindrecord/meta/shard_task.cc
index 7a2208382a7..c102b2a5e7c 100644
--- a/mindspore/ccsrc/mindrecord/meta/shard_task.cc
+++ b/mindspore/ccsrc/mindrecord/meta/shard_task.cc
@@ -44,17 +44,26 @@ void ShardTask::MakePerm() {
   }
 }
 
+void ShardTask::ResizeTaskList(uint32_t size) {
+  task_list_.resize(size);
+}
+
 void ShardTask::InsertTask(TaskType task_type, int shard_id, int group_id, const std::vector<uint64_t> &offset,
                            const json &label) {
-  MS_LOG(DEBUG) << "Into insert task, shard_id: " << shard_id << ", group_id: " << group_id
-                << ", label: " << label.dump() << ", size of task_list_: " << task_list_.size() << ".";
+  // MS_LOG(DEBUG) << "Into insert task, shard_id: " << shard_id << ", group_id: " << group_id
+  //               << ", label: " << label.dump() << ", size of task_list_: " << task_list_.size() << ".";
   task_list_.emplace_back(task_type, std::make_tuple(shard_id, group_id), offset, label);
 }
 
+void ShardTask::InsertTask(uint32_t i, TaskType task_type, int shard_id, int group_id, const std::vector<uint64_t> &offset,
+                           const json &label) {
+  task_list_[i] = {task_type, std::make_tuple(shard_id, group_id), offset, label};
+}
+
 void ShardTask::InsertTask(std::tuple<TaskType, std::tuple<int, int>, std::vector<uint64_t>, json> task) {
-  MS_LOG(DEBUG) << "Into insert task, shard_id: " << std::get<0>(std::get<1>(task))
-                << ", group_id: " << std::get<1>(std::get<1>(task)) << ", label: " << std::get<3>(task).dump()
-                << ", size of task_list_: " << task_list_.size() << ".";
+  // MS_LOG(DEBUG) << "Into insert task, shard_id: " << std::get<0>(std::get<1>(task))
+  //               << ", group_id: " << std::get<1>(std::get<1>(task)) << ", label: " << std::get<3>(task).dump()
+  //               << ", size of task_list_: " << task_list_.size() << ".";
 
   task_list_.push_back(std::move(task));
 }
-- 
Gitee


From df87bbaf48a30e90d091d285c6525afb5f82f2b4 Mon Sep 17 00:00:00 2001
From: jonyguo <guozhijian@huawei.com>
Date: Fri, 26 Jun 2020 10:13:24 +0800
Subject: [PATCH 4/8] multi thread insert tasks

---
 mindspore/ccsrc/mindrecord/io/shard_reader.cc | 23 +++++++++++++------
 1 file changed, 16 insertions(+), 7 deletions(-)

diff --git a/mindspore/ccsrc/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/mindrecord/io/shard_reader.cc
index dd03521e3c8..66ebf056a48 100644
--- a/mindspore/ccsrc/mindrecord/io/shard_reader.cc
+++ b/mindspore/ccsrc/mindrecord/io/shard_reader.cc
@@ -1024,14 +1024,23 @@ MSRStatus ShardReader::CreateTasksByRow(const std::vector<std::tuple<int, int, i
       }
     }
     tasks_.ResizeTaskList(count);
-    int start = 0;
+    int current_offset = 0;
+    std::vector<std::thread> thread_for_init_tasks(shard_count_);
     for (int shard_id = 0; shard_id < shard_count_; shard_id++) {
-      for (uint32_t i = 0; i < offsets[shard_id].size(); i += 1) {
-        tasks_.InsertTask(start, TaskType::kCommonTask, offsets[shard_id][i][0], offsets[shard_id][i][1],
-                          std::vector<uint64_t>{offsets[shard_id][i][2], offsets[shard_id][i][3]},
-                          local_columns[shard_id][i]);
-	start++;
-      }
+      thread_for_init_tasks[shard_id] = std::thread([this, &offsets, &local_columns, shard_id, current_offset] () {
+        auto l_current_offset = current_offset;
+        for (uint32_t i = 0; i < offsets[shard_id].size(); i += 1) {
+          tasks_.InsertTask(l_current_offset, TaskType::kCommonTask, offsets[shard_id][i][0], offsets[shard_id][i][1],
+                            std::vector<uint64_t>{offsets[shard_id][i][2], offsets[shard_id][i][3]},
+                            local_columns[shard_id][i]);
+          l_current_offset++;
+        }
+      });
+      current_offset = current_offset + offsets[shard_id].size();
+    }
+
+    for (int shard_id = 0; shard_id < shard_count_; shard_id++) {
+      thread_for_init_tasks[shard_id].join();
     }
   } else {
     return FAILED;
-- 
Gitee


From 57df6b9f79b9d2c33ccf9e44b26304ed56673047 Mon Sep 17 00:00:00 2001
From: jonyguo <guozhijian@huawei.com>
Date: Fri, 26 Jun 2020 10:59:05 +0800
Subject: [PATCH 5/8] del log

---
 .../engine/datasetops/source/mindrecord_op.cc |  2 +-
 mindspore/ccsrc/mindrecord/io/shard_reader.cc | 21 ++++++++++---------
 .../meta/shard_distributed_sample.cc          |  4 ++--
 3 files changed, 14 insertions(+), 13 deletions(-)

diff --git a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc
index 3d6dd39fc2f..fcd69aaca0c 100644
--- a/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc
+++ b/mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.cc
@@ -463,7 +463,7 @@ Status MindRecordOp::Reset() {
     shard_reader_->Reset();
     buffer_water_mark_ = 0;
   } else {
-    std::cout << "gzj in MindRecordOp::Reset" << std::endl;	  
+    // std::cout << "gzj in MindRecordOp::Reset" << std::endl;	  
     shard_reader_->ShuffleTask();
   }
   shard_reader_wait_post_.Set();
diff --git a/mindspore/ccsrc/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/mindrecord/io/shard_reader.cc
index 66ebf056a48..90b93212c72 100644
--- a/mindspore/ccsrc/mindrecord/io/shard_reader.cc
+++ b/mindspore/ccsrc/mindrecord/io/shard_reader.cc
@@ -1035,6 +1035,7 @@ MSRStatus ShardReader::CreateTasksByRow(const std::vector<std::tuple<int, int, i
                             local_columns[shard_id][i]);
           l_current_offset++;
         }
+	MS_LOG(INFO) << "gzj task " << shard_id;
       });
       current_offset = current_offset + offsets[shard_id].size();
     }
@@ -1083,7 +1084,7 @@ MSRStatus ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, u
     const auto &op = operators[operator_no];
     if (std::dynamic_pointer_cast<ShardCategory>(op)) continue;
     if (block_reader_ && std::dynamic_pointer_cast<ShardShuffle>(op)) continue;
-    std::cout << "gzj 000" << std::endl;
+    // std::cout << "gzj 000" << std::endl;
     int total_no = static_cast<int>(tasks_.Size());
     int taking = (total_no + std::dynamic_pointer_cast<ShardDistributedSample>(op)->GetDenominator() - 1) / std::dynamic_pointer_cast<ShardDistributedSample>(op)->GetDenominator();
     if (std::dynamic_pointer_cast<ShardDistributedSample>(op)) {
@@ -1092,12 +1093,12 @@ MSRStatus ShardReader::CreateTasks(const std::vector<std::tuple<int, int, int, u
 	tasks_distribute_.permutation_.push_back(i);
       }
     }
-    std::cout << "gzj 111" << std::endl;
+    // std::cout << "gzj 111" << std::endl;
     assert(std::dynamic_pointer_cast<ShardDistributedSample>(op));
     if (SUCCESS != std::dynamic_pointer_cast<ShardDistributedSample>(op)->DoIt(tasks_, tasks_distribute_)) {
       return FAILED;
     }
-    std::cout << "gzj 222" << std::endl;
+    // std::cout << "gzj 222" << std::endl;
   }
 
   if (tasks_.permutation_.empty()) tasks_.MakePerm();
@@ -1113,9 +1114,9 @@ TASK_RETURN_CONTENT ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_
     return std::make_pair(FAILED,
                           std::make_pair(TaskType::kCommonTask, std::vector<std::tuple<std::vector<uint8_t>, json>>()));
   }
-  std::cout << "gzj in ConsumerOneTask 000, task_id: " << task_id << ":" << consumer_id
-	    << ", tasks_distribute_.Size: " << tasks_distribute_.Size()
-	    << ", permutation_.size: " << tasks_distribute_.permutation_.size() << std::endl;
+  // std::cout << "gzj in ConsumerOneTask 000, task_id: " << task_id << ":" << consumer_id
+  //    << ", tasks_distribute_.Size: " << tasks_distribute_.Size()
+  //    << ", permutation_.size: " << tasks_distribute_.permutation_.size() << std::endl;
 
   // Pick up task from task list
   auto task = tasks_distribute_.GetTaskByID(tasks_distribute_.permutation_[task_id]);
@@ -1130,7 +1131,7 @@ TASK_RETURN_CONTENT ShardReader::ConsumerOneTask(int task_id, uint32_t consumer_
   auto shard_id = std::get<0>(std::get<1>(task));
   auto group_id = std::get<1>(std::get<1>(task));
   auto addr = std::get<2>(task);
-  std::cout << "gzj in ConsumerOneTask 111" << std::endl;
+  // std::cout << "gzj in ConsumerOneTask 111" << std::endl;
   const auto &ret = shard_header_->GetPageByGroupId(group_id, shard_id);
   if (SUCCESS != ret.first) {
     return std::make_pair(FAILED,
@@ -1359,7 +1360,7 @@ std::vector<std::tuple<std::vector<uint8_t>, json>> ShardReader::GetNext() {
 
 std::pair<TaskType, std::vector<std::tuple<std::vector<uint8_t>, json>>> ShardReader::GetNextById(
   const int64_t &task_id, const int32_t &consumer_id) {
-  std::cout << "gzj in GetNextById 000" << std::endl;
+  // std::cout << "gzj in GetNextById 000" << std::endl;
   if (interrupt_) {
     return std::make_pair(TaskType::kCommonTask, std::vector<std::tuple<std::vector<uint8_t>, json>>());
   }
@@ -1368,7 +1369,7 @@ std::pair<TaskType, std::vector<std::tuple<std::vector<uint8_t>, json>>> ShardRe
   }
   const auto &ret = ConsumerOneTask(task_id, consumer_id);
   if (SUCCESS != ret.first) {
-    std::cout << "gzj in GetNextById 1111" << std::endl;
+    // std::cout << "gzj in GetNextById 1111" << std::endl;
     return std::make_pair(TaskType::kCommonTask, std::vector<std::tuple<std::vector<uint8_t>, json>>());
   }
   return std::move(ret.second);
@@ -1421,7 +1422,7 @@ void ShardReader::Reset() {
 }
 
 void ShardReader::ShuffleTask() {
-  std::cout << "gzj in Shuffle Task" << std::endl;
+  // std::cout << "gzj in Shuffle Task" << std::endl;
   for (const auto &op : operators_) {
     if (block_reader_) {
       continue;
diff --git a/mindspore/ccsrc/mindrecord/meta/shard_distributed_sample.cc b/mindspore/ccsrc/mindrecord/meta/shard_distributed_sample.cc
index 37112eea624..4dda0dd2452 100644
--- a/mindspore/ccsrc/mindrecord/meta/shard_distributed_sample.cc
+++ b/mindspore/ccsrc/mindrecord/meta/shard_distributed_sample.cc
@@ -75,7 +75,7 @@ MSRStatus ShardDistributedSample::PreExecute(ShardTask &tasks) {
 }
 
 MSRStatus ShardDistributedSample::DoIt(ShardTask &tasks, ShardTask &tasks_distribute) {
-  MS_LOG(INFO) << "gzj tasks.Size: " << tasks.Size() << ", tasks_distribute.Size: " << tasks_distribute.Size();
+  // MS_LOG(INFO) << "gzj tasks.Size: " << tasks.Size() << ", tasks_distribute.Size: " << tasks_distribute.Size();
   seed_++;
   auto total_no = tasks.Size();
   if (no_of_padded_samples_ > 0 && first_epoch_) {
@@ -99,7 +99,7 @@ MSRStatus ShardDistributedSample::DoIt(ShardTask &tasks, ShardTask &tasks_distri
     tasks_distribute.InsertTask(start, tasks.GetTaskByID(tasks.permutation_[i] % total_no));  // rounding up. if overflow, go back to start
     start++;
   }
-  MS_LOG(INFO) << "gzj tasks.Size: " << tasks.Size() << ", tasks_distribute.Size: " << tasks_distribute.Size();
+  // MS_LOG(INFO) << "gzj tasks.Size: " << tasks.Size() << ", tasks_distribute.Size: " << tasks_distribute.Size();
   return SUCCESS;
 }
 }  // namespace mindrecord
-- 
Gitee


From c42bc04d6b0e9fba2af845202b6c2c7f807078b1 Mon Sep 17 00:00:00 2001
From: jonyguo <guozhijian@huawei.com>
Date: Fri, 26 Jun 2020 11:25:13 +0800
Subject: [PATCH 6/8] add some log

---
 mindspore/ccsrc/mindrecord/io/shard_reader.cc | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/mindspore/ccsrc/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/mindrecord/io/shard_reader.cc
index 90b93212c72..69d126c6736 100644
--- a/mindspore/ccsrc/mindrecord/io/shard_reader.cc
+++ b/mindspore/ccsrc/mindrecord/io/shard_reader.cc
@@ -427,6 +427,7 @@ ROW_GROUPS ShardReader::ReadAllRowGroup(std::vector<std::string> &columns) {
   } else {  // fetch raw data from Raw page while some field is not index.
     fields += ", PAGE_ID_RAW, PAGE_OFFSET_RAW, PAGE_OFFSET_RAW_END ";
   }
+  MS_LOG(INFO) << "gzj before ReadAllRowsInShard";
 
   std::string sql = "SELECT " + fields + " FROM INDEXES ORDER BY ROW_ID ;";
 
@@ -1010,10 +1011,12 @@ MSRStatus ShardReader::CreateTasksByRow(const std::vector<std::tuple<int, int, i
                                         const std::vector<std::shared_ptr<ShardOperator>> &operators) {
   CheckIfColumnInIndex(selected_columns_);
 
+  MS_LOG(INFO) << "gzj before ReadAllRowGroup";
   auto ret = ReadAllRowGroup(selected_columns_);
   if (std::get<0>(ret) != SUCCESS) {
     return FAILED;
   }
+  MS_LOG(INFO) << "gzj after ReadAllRowGroup";
   auto offsets = std::get<1>(ret);
   auto local_columns = std::get<2>(ret);
   if (shard_count_ <= kMaxShardCount) {
-- 
Gitee


From 30fc766692334674fa5954dcff5336fef93405a1 Mon Sep 17 00:00:00 2001
From: jonyguo <guozhijian@huawei.com>
Date: Fri, 26 Jun 2020 11:56:51 +0800
Subject: [PATCH 7/8] optimize: count & add log

---
 mindspore/ccsrc/mindrecord/io/shard_reader.cc | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/mindspore/ccsrc/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/mindrecord/io/shard_reader.cc
index 69d126c6736..c641926ff1c 100644
--- a/mindspore/ccsrc/mindrecord/io/shard_reader.cc
+++ b/mindspore/ccsrc/mindrecord/io/shard_reader.cc
@@ -1022,13 +1022,12 @@ MSRStatus ShardReader::CreateTasksByRow(const std::vector<std::tuple<int, int, i
   if (shard_count_ <= kMaxShardCount) {
     int count = 0;
     for (int shard_id = 0; shard_id < shard_count_; shard_id++) {
-      for (uint32_t i = 0; i < offsets[shard_id].size(); i += 1) {
-        count++;
-      }
+      count += offsets[shard_id].size();
     }
     tasks_.ResizeTaskList(count);
     int current_offset = 0;
     std::vector<std::thread> thread_for_init_tasks(shard_count_);
+    MS_LOG(INFO) << "gzj befor multi thread";
     for (int shard_id = 0; shard_id < shard_count_; shard_id++) {
       thread_for_init_tasks[shard_id] = std::thread([this, &offsets, &local_columns, shard_id, current_offset] () {
         auto l_current_offset = current_offset;
-- 
Gitee


From eb9b798fd7950d06e3456df38db9900a68d625e3 Mon Sep 17 00:00:00 2001
From: jonyguo <guozhijian@huawei.com>
Date: Fri, 26 Jun 2020 12:15:49 +0800
Subject: [PATCH 8/8] newest

---
 mindspore/ccsrc/mindrecord/io/shard_reader.cc | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/mindspore/ccsrc/mindrecord/io/shard_reader.cc b/mindspore/ccsrc/mindrecord/io/shard_reader.cc
index c641926ff1c..814d513320c 100644
--- a/mindspore/ccsrc/mindrecord/io/shard_reader.cc
+++ b/mindspore/ccsrc/mindrecord/io/shard_reader.cc
@@ -1020,12 +1020,15 @@ MSRStatus ShardReader::CreateTasksByRow(const std::vector<std::tuple<int, int, i
   auto offsets = std::get<1>(ret);
   auto local_columns = std::get<2>(ret);
   if (shard_count_ <= kMaxShardCount) {
+    MS_LOG(INFO) << "gzj before count";
     int count = 0;
     for (int shard_id = 0; shard_id < shard_count_; shard_id++) {
       count += offsets[shard_id].size();
     }
+    MS_LOG(INFO) << "gzj after count";
     tasks_.ResizeTaskList(count);
     int current_offset = 0;
+    MS_LOG(INFO) << "gzj after resize";
     std::vector<std::thread> thread_for_init_tasks(shard_count_);
     MS_LOG(INFO) << "gzj befor multi thread";
     for (int shard_id = 0; shard_id < shard_count_; shard_id++) {
-- 
Gitee