1 Star 0 Fork 23

carter.j.cheng/curve

forked from Gitee 极速下载/curve 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
mds_client.cpp 37.84 KB
一键复制 编辑 原始数据 按行查看 历史
charisu 提交于 2020-06-02 11:06 +08:00 . curve_ops_tool prove multiple pool CLDCFS-2617
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000
/*
* Copyright (c) 2020 NetEase Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Project: curve
* Created Date: 2019-11-25
* Author: charisu
*/
#include "src/tools/mds_client.h"
DECLARE_uint64(rpcTimeout);
DECLARE_uint64(rpcRetryTimes);
namespace curve {
namespace tool {
int MDSClient::Init(const std::string& mdsAddr) {
return Init(mdsAddr, std::to_string(kDefaultMdsDummyPort));
}
int MDSClient::Init(const std::string& mdsAddr,
const std::string& dummyPort) {
if (isInited_) {
return 0;
}
// 初始化channel
curve::common::SplitString(mdsAddr, ",", &mdsAddrVec_);
if (mdsAddrVec_.empty()) {
std::cout << "Split mds address fail!" << std::endl;
return -1;
}
int res = InitDummyServerMap(dummyPort);
if (res != 0) {
std::cout << "init dummy server map fail!" << std::endl;
return -1;
}
for (uint64_t i = 0; i < mdsAddrVec_.size(); ++i) {
if (channel_.Init(mdsAddrVec_[i].c_str(), nullptr) != 0) {
std::cout << "Init channel to " << mdsAddr << "fail!" << std::endl;
continue;
}
// 寻找哪个mds存活
curve::mds::topology::ListPhysicalPoolRequest request;
curve::mds::topology::ListPhysicalPoolResponse response;
curve::mds::topology::TopologyService_Stub stub(&channel_);
brpc::Controller cntl;
cntl.set_timeout_ms(FLAGS_rpcTimeout);
stub.ListPhysicalPool(&cntl, &request, &response, nullptr);
if (cntl.Failed()) {
continue;
}
currentMdsIndex_ = i;
isInited_ = true;
return 0;
}
std::cout << "Init channel to all mds fail!" << std::endl;
return -1;
}
int MDSClient::InitDummyServerMap(const std::string& dummyPort) {
std::vector<std::string> dummyPortVec;
curve::common::SplitString(dummyPort, ",", &dummyPortVec);
if (dummyPortVec.size() == 0) {
std::cout << "split dummy server fail!" << std::endl;
return -1;
}
// 只指定了一个端口,对所有mds采用这个端口
if (dummyPortVec.size() == 1) {
for (uint64_t i = 0; i < mdsAddrVec_.size() - 1; ++i) {
dummyPortVec.emplace_back(dummyPortVec[0]);
}
}
if (dummyPortVec.size() != mdsAddrVec_.size()) {
std::cout << "mds dummy port list must be correspond as"
" mds addr list" << std::endl;
return -1;
}
for (uint64_t i = 0; i < mdsAddrVec_.size(); ++i) {
std::vector<std::string> strs;
curve::common::SplitString(mdsAddrVec_[i], ":", &strs);
if (strs.size() != 2) {
std::cout << "split mds addr fail!" << std::endl;
return -1;
}
std::string dummyAddr = strs[0] + ":" + dummyPortVec[i];
dummyServerMap_.emplace(mdsAddrVec_[i], dummyAddr);
}
return 0;
}
int MDSClient::GetFileInfo(const std::string &fileName,
FileInfo* fileInfo) {
assert(fileInfo != nullptr);
curve::mds::GetFileInfoRequest request;
curve::mds::GetFileInfoResponse response;
request.set_filename(fileName);
FillUserInfo(&request);
curve::mds::CurveFSService_Stub stub(&channel_);
void (curve::mds::CurveFSService_Stub::*fp)(
google::protobuf::RpcController*,
const curve::mds::GetFileInfoRequest*,
curve::mds::GetFileInfoResponse*,
google::protobuf::Closure*);
fp = &curve::mds::CurveFSService_Stub::GetFileInfo;
if (SendRpcToMds(&request, &response, &stub, fp) != 0) {
std::cout << "GetFileInfo info from all mds fail!" << std::endl;
return -1;
}
if (response.has_statuscode() &&
response.statuscode() == StatusCode::kOK) {
fileInfo->CopyFrom(response.fileinfo());
return 0;
}
std::cout << "GetFileInfo fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
}
int MDSClient::GetAllocatedSize(const std::string& fileName,
uint64_t* allocSize, AllocMap* allocMap) {
assert(allocSize != nullptr);
curve::mds::GetAllocatedSizeRequest request;
curve::mds::GetAllocatedSizeResponse response;
request.set_filename(fileName);
curve::mds::CurveFSService_Stub stub(&channel_);
void (curve::mds::CurveFSService_Stub::*fp)(
google::protobuf::RpcController*,
const curve::mds::GetAllocatedSizeRequest*,
curve::mds::GetAllocatedSizeResponse*,
google::protobuf::Closure*);
fp = &curve::mds::CurveFSService_Stub::GetAllocatedSize;
if (SendRpcToMds(&request, &response, &stub, fp) != 0) {
std::cout << "GetAllocatedSize info from all mds fail!" << std::endl;
return -1;
}
if (response.statuscode() == StatusCode::kOK) {
*allocSize = response.allocatedsize();
if (allocMap) {
for (auto it = response.allocsizemap().begin();
it != response.allocsizemap().end(); ++it) {
allocMap->emplace(it->first, it->second);
}
}
return 0;
}
std::cout << "GetAllocatedSize fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
}
int MDSClient::GetFileSize(const std::string& fileName,
uint64_t* fileSize) {
assert(fileSize != nullptr);
curve::mds::GetFileSizeRequest request;
curve::mds::GetFileSizeResponse response;
request.set_filename(fileName);
curve::mds::CurveFSService_Stub stub(&channel_);
void (curve::mds::CurveFSService_Stub::*fp)(
google::protobuf::RpcController*,
const curve::mds::GetFileSizeRequest*,
curve::mds::GetFileSizeResponse*,
google::protobuf::Closure*);
fp = &curve::mds::CurveFSService_Stub::GetFileSize;
if (SendRpcToMds(&request, &response, &stub, fp) != 0) {
std::cout << "GetFileSize info from all mds fail!" << std::endl;
return -1;
}
if (response.statuscode() == StatusCode::kOK) {
*fileSize = response.filesize();
return 0;
}
std::cout << "GetAllocatedSize fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
}
int MDSClient::ListDir(const std::string& dirName,
std::vector<FileInfo>* files) {
if (!files) {
std::cout << "The argument is a null pointer!" << std::endl;
return -1;
}
curve::mds::ListDirRequest request;
curve::mds::ListDirResponse response;
request.set_filename(dirName);
FillUserInfo(&request);
curve::mds::CurveFSService_Stub stub(&channel_);
void (curve::mds::CurveFSService_Stub::*fp)(
google::protobuf::RpcController*,
const curve::mds::ListDirRequest*,
curve::mds::ListDirResponse*,
google::protobuf::Closure*);
fp = &curve::mds::CurveFSService_Stub::ListDir;
if (SendRpcToMds(&request, &response, &stub, fp) != 0) {
std::cout << "ListDir from all mds fail!" << std::endl;
return -1;
}
if (response.has_statuscode() &&
response.statuscode() == StatusCode::kOK) {
for (int i = 0; i < response.fileinfo_size(); ++i) {
files->emplace_back(response.fileinfo(i));
}
return 0;
}
std::cout << "ListDir fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
return -1;
}
GetSegmentRes MDSClient::GetSegmentInfo(const std::string& fileName,
uint64_t offset,
PageFileSegment* segment) {
if (!segment) {
std::cout << "The argument is a null pointer!" << std::endl;
return GetSegmentRes::kOtherError;
}
curve::mds::GetOrAllocateSegmentRequest request;
curve::mds::GetOrAllocateSegmentResponse response;
request.set_filename(fileName);
request.set_offset(offset);
request.set_allocateifnotexist(false);
FillUserInfo(&request);
curve::mds::CurveFSService_Stub stub(&channel_);
void (curve::mds::CurveFSService_Stub::*fp)(
google::protobuf::RpcController*,
const curve::mds::GetOrAllocateSegmentRequest*,
curve::mds::GetOrAllocateSegmentResponse*,
google::protobuf::Closure*);
fp = &curve::mds::CurveFSService_Stub::GetOrAllocateSegment;
if (SendRpcToMds(&request, &response, &stub, fp) != 0) {
std::cout << "GetOrAllocateSegment from all mds fail!" << std::endl;
return GetSegmentRes::kOtherError;
}
if (response.has_statuscode()) {
if (response.statuscode() == StatusCode::kOK) {
segment->CopyFrom(response.pagefilesegment());
return GetSegmentRes::kOK;
} else if (response.statuscode() == StatusCode::kSegmentNotAllocated) {
return GetSegmentRes::kSegmentNotAllocated;
} else if (response.statuscode() == StatusCode::kFileNotExists) {
return GetSegmentRes::kFileNotExists;
}
}
return GetSegmentRes::kOtherError;
}
int MDSClient::DeleteFile(const std::string& fileName, bool forcedelete) {
curve::mds::DeleteFileRequest request;
curve::mds::DeleteFileResponse response;
request.set_filename(fileName);
request.set_forcedelete(forcedelete);
FillUserInfo(&request);
curve::mds::CurveFSService_Stub stub(&channel_);
void (curve::mds::CurveFSService_Stub::*fp)(
google::protobuf::RpcController*,
const curve::mds::DeleteFileRequest*,
curve::mds::DeleteFileResponse*,
google::protobuf::Closure*);
fp = &curve::mds::CurveFSService_Stub::DeleteFile;
if (SendRpcToMds(&request, &response, &stub, fp) != 0) {
std::cout << "DeleteFile from all mds fail!" << std::endl;
return -1;
}
if (response.has_statuscode() &&
(response.statuscode() == StatusCode::kOK ||
response.statuscode() == StatusCode::kFileNotExists ||
response.statuscode() == StatusCode::kFileUnderDeleting)) {
return 0;
}
std::cout << "DeleteFile fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
}
int MDSClient::CreateFile(const std::string& fileName, uint64_t length) {
curve::mds::CreateFileRequest request;
curve::mds::CreateFileResponse response;
request.set_filename(fileName);
request.set_filetype(curve::mds::FileType::INODE_PAGEFILE);
request.set_filelength(length);
FillUserInfo(&request);
curve::mds::CurveFSService_Stub stub(&channel_);
void (curve::mds::CurveFSService_Stub::*fp)(
google::protobuf::RpcController*,
const curve::mds::CreateFileRequest*,
curve::mds::CreateFileResponse*,
google::protobuf::Closure*);
fp = &curve::mds::CurveFSService_Stub::CreateFile;
if (SendRpcToMds(&request, &response, &stub, fp) != 0) {
std::cout << "CreateFile from all mds fail!" << std::endl;
return -1;
}
if (response.has_statuscode() &&
response.statuscode() == StatusCode::kOK) {
return 0;
}
std::cout << "CreateFile fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
}
int MDSClient::ListClient(std::vector<std::string>* clientAddrs,
bool listClientsInRepo) {
if (!clientAddrs) {
std::cout << "The argument is a null pointer!" << std::endl;
return -1;
}
curve::mds::ListClientRequest request;
curve::mds::ListClientResponse response;
curve::mds::CurveFSService_Stub stub(&channel_);
if (listClientsInRepo) {
request.set_listallclient(true);
}
void (curve::mds::CurveFSService_Stub::*fp)(
google::protobuf::RpcController*,
const curve::mds::ListClientRequest*,
curve::mds::ListClientResponse*,
google::protobuf::Closure*);
fp = &curve::mds::CurveFSService_Stub::ListClient;
if (SendRpcToMds(&request, &response, &stub, fp) != 0) {
std::cout << "ListClient from all mds fail!" << std::endl;
return -1;
}
if (response.has_statuscode() &&
response.statuscode() == StatusCode::kOK) {
for (int i = 0; i < response.clientinfos_size(); ++i) {
const auto& clientInfo = response.clientinfos(i);
std::string clientAddr = clientInfo.ip() + ":" +
std::to_string(clientInfo.port());
clientAddrs->emplace_back(clientAddr);
}
return 0;
}
std::cout << "ListClient fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
}
int MDSClient::GetChunkServerListInCopySet(const PoolIdType& logicalPoolId,
const CopySetIdType& copysetId,
std::vector<ChunkServerLocation>* csLocs) {
if (!csLocs) {
std::cout << "The argument is a null pointer!" << std::endl;
return -1;
}
std::vector<CopySetServerInfo> csServerInfos;
int res = GetChunkServerListInCopySets(logicalPoolId,
{copysetId}, &csServerInfos);
if (res != 0) {
std::cout << "GetChunkServerListInCopySets fail" << std::endl;
return -1;
}
for (int i = 0; i < csServerInfos[0].cslocs_size(); ++i) {
auto location = csServerInfos[0].cslocs(i);
csLocs->emplace_back(location);
}
return 0;
}
int MDSClient::GetChunkServerListInCopySets(const PoolIdType& logicalPoolId,
const std::vector<CopySetIdType>& copysetIds,
std::vector<CopySetServerInfo>* csServerInfos) {
if (!csServerInfos) {
std::cout << "The argument is a null pointer!" << std::endl;
return -1;
}
curve::mds::topology::GetChunkServerListInCopySetsRequest request;
curve::mds::topology::GetChunkServerListInCopySetsResponse response;
request.set_logicalpoolid(logicalPoolId);
for (const auto& copysetId : copysetIds) {
request.add_copysetid(copysetId);
}
curve::mds::topology::TopologyService_Stub stub(&channel_);
void (curve::mds::topology::TopologyService_Stub::*fp)(
google::protobuf::RpcController*,
const curve::mds::topology::GetChunkServerListInCopySetsRequest*,
curve::mds::topology::GetChunkServerListInCopySetsResponse*,
google::protobuf::Closure*);
fp = &curve::mds::topology::TopologyService_Stub::GetChunkServerListInCopySets; // NOLINT
if (SendRpcToMds(&request, &response, &stub, fp) != 0) {
std::cout << "GetChunkServerListInCopySets from all mds fail!"
<< std::endl;
return -1;
}
if (response.has_statuscode() &&
response.statuscode() == kTopoErrCodeSuccess) {
for (int i = 0; i < response.csinfo_size(); ++i) {
csServerInfos->emplace_back(response.csinfo(i));
}
return 0;
}
std::cout << "GetChunkServerListInCopySets fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
}
int MDSClient::ListPhysicalPoolsInCluster(
std::vector<PhysicalPoolInfo>* pools) {
if (!pools) {
std::cout << "The argument is a null pointer!" << std::endl;
return -1;
}
curve::mds::topology::ListPhysicalPoolRequest request;
curve::mds::topology::ListPhysicalPoolResponse response;
curve::mds::topology::TopologyService_Stub stub(&channel_);
void (curve::mds::topology::TopologyService_Stub::*fp)(
google::protobuf::RpcController*,
const curve::mds::topology::ListPhysicalPoolRequest*,
curve::mds::topology::ListPhysicalPoolResponse*,
google::protobuf::Closure*);
fp = &curve::mds::topology::TopologyService_Stub::ListPhysicalPool;
if (SendRpcToMds(&request, &response, &stub, fp) != 0) {
std::cout << "ListPhysicalPool from all mds fail!"
<< std::endl;
return -1;
}
if (response.has_statuscode() &&
response.statuscode() == kTopoErrCodeSuccess) {
for (int i = 0; i < response.physicalpoolinfos_size(); ++i) {
pools->emplace_back(response.physicalpoolinfos(i));
}
return 0;
}
std::cout << "ListPhysicalPool fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
}
int MDSClient::ListLogicalPoolsInCluster(std::vector<LogicalPoolInfo>* pools) {
std::vector<PhysicalPoolInfo> phyPools;
int ret = ListPhysicalPoolsInCluster(&phyPools);
if (ret != 0) {
std::cout << "ListPhysicalPoolsInCluster fail!" << std::endl;
return -1;
}
for (const auto& phyPool : phyPools) {
std::vector<LogicalPoolInfo> lgPools;
ret = ListLogicalPoolsInPhysicalPool(phyPool.physicalpoolid(),
&lgPools);
if (ret != 0) {
std::cout << "ListLogicalPoolsInPhysicalPool "
<< phyPool.physicalpoolid() << " fail" << std::endl;
return -1;
}
pools->insert(pools->end(), lgPools.begin(), lgPools.end());
}
return 0;
}
int MDSClient::ListLogicalPoolsInPhysicalPool(const PoolIdType& id,
std::vector<LogicalPoolInfo>* pools) {
assert(pools != nullptr);
curve::mds::topology::ListLogicalPoolRequest request;
curve::mds::topology::ListLogicalPoolResponse response;
request.set_physicalpoolid(id);
curve::mds::topology::TopologyService_Stub stub(&channel_);
void (curve::mds::topology::TopologyService_Stub::*fp)(
google::protobuf::RpcController*,
const curve::mds::topology::ListLogicalPoolRequest*,
curve::mds::topology::ListLogicalPoolResponse*,
google::protobuf::Closure*);
fp = &curve::mds::topology::TopologyService_Stub::ListLogicalPool;
if (SendRpcToMds(&request, &response, &stub, fp) != 0) {
std::cout << "ListLogicalPool from all mds fail!"
<< std::endl;
return -1;
}
if (response.has_statuscode() &&
response.statuscode() == kTopoErrCodeSuccess) {
for (int i = 0; i < response.logicalpoolinfos_size(); ++i) {
pools->emplace_back(response.logicalpoolinfos(i));
}
return 0;
}
std::cout << "ListLogicalPool fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
}
int MDSClient::ListZoneInPhysicalPool(const PoolIdType& id,
std::vector<ZoneInfo>* zones) {
assert(zones != nullptr);
curve::mds::topology::ListPoolZoneRequest request;
curve::mds::topology::ListPoolZoneResponse response;
request.set_physicalpoolid(id);
curve::mds::topology::TopologyService_Stub stub(&channel_);
void (curve::mds::topology::TopologyService_Stub::*fp)(
google::protobuf::RpcController*,
const curve::mds::topology::ListPoolZoneRequest*,
curve::mds::topology::ListPoolZoneResponse*,
google::protobuf::Closure*);
fp = &curve::mds::topology::TopologyService_Stub::ListPoolZone;
if (SendRpcToMds(&request, &response, &stub, fp) != 0) {
std::cout << "ListPoolZone from all mds fail!"
<< std::endl;
return -1;
}
if (response.has_statuscode() &&
response.statuscode() == kTopoErrCodeSuccess) {
for (int i = 0; i < response.zones_size(); ++i) {
zones->emplace_back(response.zones(i));
}
return 0;
}
std::cout << "ListPoolZone fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
}
int MDSClient::ListServersInZone(const ZoneIdType& id,
std::vector<ServerInfo>* servers) {
assert(servers != nullptr);
curve::mds::topology::ListZoneServerRequest request;
curve::mds::topology::ListZoneServerResponse response;
request.set_zoneid(id);
curve::mds::topology::TopologyService_Stub stub(&channel_);
void (curve::mds::topology::TopologyService_Stub::*fp)(
google::protobuf::RpcController*,
const curve::mds::topology::ListZoneServerRequest*,
curve::mds::topology::ListZoneServerResponse*,
google::protobuf::Closure*);
fp = &curve::mds::topology::TopologyService_Stub::ListZoneServer;
if (SendRpcToMds(&request, &response, &stub, fp) != 0) {
std::cout << "ListZoneServer from all mds fail!"
<< std::endl;
return -1;
}
if (response.has_statuscode() &&
response.statuscode() == kTopoErrCodeSuccess) {
for (int i = 0; i < response.serverinfo_size(); ++i) {
servers->emplace_back(response.serverinfo(i));
}
return 0;
}
std::cout << "ListZoneServer fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
}
int MDSClient::ListChunkServersOnServer(const ServerIdType& id,
std::vector<ChunkServerInfo>* chunkservers) {
assert(chunkservers != nullptr);
curve::mds::topology::ListChunkServerRequest request;
request.set_serverid(id);
return ListChunkServersOnServer(&request, chunkservers);
}
int MDSClient::ListChunkServersOnServer(const std::string& ip,
std::vector<ChunkServerInfo>* chunkservers) {
assert(chunkservers != nullptr);
curve::mds::topology::ListChunkServerRequest request;
request.set_ip(ip);
return ListChunkServersOnServer(&request, chunkservers);
}
int MDSClient::ListChunkServersOnServer(ListChunkServerRequest* request,
std::vector<ChunkServerInfo>* chunkservers) {
curve::mds::topology::ListChunkServerResponse response;
curve::mds::topology::TopologyService_Stub stub(&channel_);
void (curve::mds::topology::TopologyService_Stub::*fp)(
google::protobuf::RpcController*,
const curve::mds::topology::ListChunkServerRequest*,
curve::mds::topology::ListChunkServerResponse*,
google::protobuf::Closure*);
fp = &curve::mds::topology::TopologyService_Stub::ListChunkServer;
if (SendRpcToMds(request, &response, &stub, fp) != 0) {
std::cout << "ListChunkServer from all mds fail!"
<< std::endl;
return -1;
}
if (response.has_statuscode() &&
response.statuscode() == kTopoErrCodeSuccess) {
for (int i = 0; i < response.chunkserverinfos_size(); ++i) {
const auto& chunkserver = response.chunkserverinfos(i);
// 跳过retired状态的chunkserver
if (chunkserver.status() == ChunkServerStatus::RETIRED) {
continue;
}
chunkservers->emplace_back(chunkserver);
}
return 0;
}
std::cout << "ListChunkServer fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
}
int MDSClient::GetChunkServerInfo(const ChunkServerIdType& id,
ChunkServerInfo* chunkserver) {
assert(chunkserver != nullptr);
curve::mds::topology::GetChunkServerInfoRequest request;
curve::mds::topology::GetChunkServerInfoResponse response;
request.set_chunkserverid(id);
return GetChunkServerInfo(&request, chunkserver);
}
int MDSClient::GetChunkServerInfo(const std::string& csAddr,
ChunkServerInfo* chunkserver) {
assert(chunkserver != nullptr);
curve::mds::topology::GetChunkServerInfoRequest request;
curve::mds::topology::GetChunkServerInfoResponse response;
if (!curve::common::NetCommon::CheckAddressValid(csAddr)) {
std::cout << "chunkserver address invalid!" << std::endl;
return -1;
}
std::vector<std::string> strs;
curve::common::SplitString(csAddr, ":", &strs);
std::string ip = strs[0];
uint64_t port;
curve::common::StringToUll(strs[1], &port);
request.set_hostip(ip);
request.set_port(port);
return GetChunkServerInfo(&request, chunkserver);
}
int MDSClient::GetChunkServerInfo(GetChunkServerInfoRequest* request,
ChunkServerInfo* chunkserver) {
curve::mds::topology::GetChunkServerInfoResponse response;
curve::mds::topology::TopologyService_Stub stub(&channel_);
void (curve::mds::topology::TopologyService_Stub::*fp)(
google::protobuf::RpcController*,
const curve::mds::topology::GetChunkServerInfoRequest*,
curve::mds::topology::GetChunkServerInfoResponse*,
google::protobuf::Closure*);
fp = &curve::mds::topology::TopologyService_Stub::GetChunkServer;
if (SendRpcToMds(request, &response, &stub, fp) != 0) {
std::cout << "GetChunkServer from all mds fail!"
<< std::endl;
return -1;
}
if (response.has_statuscode() &&
response.statuscode() == kTopoErrCodeSuccess) {
chunkserver->CopyFrom(response.chunkserverinfo());
return 0;
}
std::cout << "GetChunkServer fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
}
int MDSClient::GetCopySetsInChunkServer(const ChunkServerIdType& id,
std::vector<CopysetInfo>* copysets) {
assert(copysets != nullptr);
curve::mds::topology::GetCopySetsInChunkServerRequest request;
curve::mds::topology::GetCopySetsInChunkServerResponse response;
request.set_chunkserverid(id);
return GetCopySetsInChunkServer(&request, copysets);
}
int MDSClient::GetCopySetsInChunkServer(const std::string& csAddr,
std::vector<CopysetInfo>* copysets) {
assert(copysets != nullptr);
curve::mds::topology::GetCopySetsInChunkServerRequest request;
curve::mds::topology::GetCopySetsInChunkServerResponse response;
if (!curve::common::NetCommon::CheckAddressValid(csAddr)) {
std::cout << "chunkserver address invalid!" << std::endl;
return -1;
}
std::vector<std::string> strs;
curve::common::SplitString(csAddr, ":", &strs);
std::string ip = strs[0];
uint64_t port;
curve::common::StringToUll(strs[1], &port);
request.set_hostip(ip);
request.set_port(port);
return GetCopySetsInChunkServer(&request, copysets);
}
int MDSClient::GetCopySetsInChunkServer(
GetCopySetsInChunkServerRequest* request,
std::vector<CopysetInfo>* copysets) {
curve::mds::topology::GetCopySetsInChunkServerResponse response;
curve::mds::topology::TopologyService_Stub stub(&channel_);
void (curve::mds::topology::TopologyService_Stub::*fp)(
google::protobuf::RpcController*,
const curve::mds::topology::GetCopySetsInChunkServerRequest*,
curve::mds::topology::GetCopySetsInChunkServerResponse*,
google::protobuf::Closure*);
fp = &curve::mds::topology::TopologyService_Stub::GetCopySetsInChunkServer;
if (SendRpcToMds(request, &response, &stub, fp) != 0) {
std::cout << "GetCopySetsInChunkServer from all mds fail!"
<< std::endl;
return -1;
}
if (response.has_statuscode() &&
response.statuscode() == kTopoErrCodeSuccess) {
for (int i =0; i < response.copysetinfos_size(); ++i) {
copysets->emplace_back(response.copysetinfos(i));
}
return 0;
}
std::cout << "GetCopySetsInChunkServer fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
}
int MDSClient::ListServersInCluster(std::vector<ServerInfo>* servers) {
assert(servers != nullptr);
// 先列出逻辑池
std::vector<PhysicalPoolInfo> phyPools;
if (ListPhysicalPoolsInCluster(&phyPools) != 0) {
std::cout << "ListPhysicalPoolsInCluster fail!" << std::endl;
return -1;
}
for (const auto& phyPool : phyPools) {
std::vector<ZoneInfo> zones;
if (ListZoneInPhysicalPool(phyPool.physicalpoolid(), &zones) != 0) {
std::cout << "ListZoneInPhysicalPool fail, physicalPoolId: "
<< phyPool.physicalpoolid() << std::endl;
return -1;
}
for (const auto& zone : zones) {
if (ListServersInZone(zone.zoneid(), servers) != 0) {
std::cout << "ListServersInZone fail, zoneId :"
<< zone.zoneid() << std::endl;
return -1;
}
}
}
return 0;
}
int MDSClient::ListChunkServersInCluster(
std::vector<ChunkServerInfo>* chunkservers) {
assert(chunkservers != nullptr);
std::vector<ServerInfo> servers;
if (ListServersInCluster(&servers) != 0) {
std::cout << "ListServersInCluster fail!" << std::endl;
return -1;
}
for (const auto& server : servers) {
if (ListChunkServersOnServer(server.serverid(), chunkservers) != 0) {
std::cout << "ListChunkServersOnServer fail!" << std::endl;
return -1;
}
}
return 0;
}
int MDSClient::GetListenAddrFromDummyPort(const std::string& dummyAddr,
std::string* listenAddr) {
assert(listenAddr != nullptr);
MetricRet res = metricClient_.GetConfValueFromMetric(dummyAddr,
kMdsListenAddrMetricName, listenAddr);
if (res != MetricRet::kOK) {
return -1;
}
return 0;
}
void MDSClient::GetMdsOnlineStatus(std::map<std::string, bool>* onlineStatus) {
assert(onlineStatus != nullptr);
onlineStatus->clear();
for (const auto item : dummyServerMap_) {
std::string listenAddr;
int res = GetListenAddrFromDummyPort(item.second, &listenAddr);
// 如果获取到的监听地址与记录的mds地址不一致,也认为不在线
if (res != 0 || listenAddr != item.first) {
onlineStatus->emplace(item.first, false);
continue;
}
onlineStatus->emplace(item.first, true);
}
}
int MDSClient::GetMetric(const std::string& metricName, uint64_t* value) {
assert(value != nullptr);
std::string str;
int res = GetMetric(metricName, &str);
if (res != 0) {
return -1;
}
if (!curve::common::StringToUll(str, value)) {
std::cout << "parse metric as uint64_t fail!" << std::endl;
return -1;
}
return 0;
}
int MDSClient::GetMetric(const std::string& metricName, std::string* value) {
assert(value != nullptr);
int changeTimeLeft = mdsAddrVec_.size() - 1;
while (changeTimeLeft >= 0) {
brpc::Controller cntl;
MetricRet res = metricClient_.GetMetric(mdsAddrVec_[currentMdsIndex_],
metricName, value);
if (res == MetricRet::kOK) {
return 0;
}
changeTimeLeft--;
while (!ChangeMDServer() && changeTimeLeft > 0) {
changeTimeLeft--;
}
}
std::cout << "GetMetric " << metricName << " from all mds fail!"
<< std::endl;
return -1;
}
bool MDSClient::ChangeMDServer() {
currentMdsIndex_++;
if (currentMdsIndex_ > mdsAddrVec_.size() - 1) {
currentMdsIndex_ = 0;
}
if (channel_.Init(mdsAddrVec_[currentMdsIndex_].c_str(),
nullptr) != 0) {
return false;
}
return true;
}
std::vector<std::string> MDSClient::GetCurrentMds() {
std::vector<std::string> leaderAddrs;
for (const auto item : dummyServerMap_) {
// 获取status来判断正在服务的地址
std::string status;
MetricRet ret = metricClient_.GetMetric(item.second,
kMdsStatusMetricName, &status);
if (ret != MetricRet::kOK) {
std::cout << "Get status metric from " << item.second
<< " fail" << std::endl;
continue;
}
if (status == kMdsStatusLeader) {
leaderAddrs.emplace_back(item.first);
}
}
return leaderAddrs;
}
int MDSClient::RapidLeaderSchedule(PoolIdType lpoolId) {
::curve::mds::schedule::RapidLeaderScheduleRequst request;
::curve::mds::schedule::RapidLeaderScheduleResponse response;
::curve::mds::schedule::ScheduleService_Stub stub(&channel_);
request.set_logicalpoolid(lpoolId);
void (curve::mds::schedule::ScheduleService_Stub::*fp)(
google::protobuf::RpcController*,
const ::curve::mds::schedule::RapidLeaderScheduleRequst *,
::curve::mds::schedule::RapidLeaderScheduleResponse*,
google::protobuf::Closure*);
fp = &::curve::mds::schedule::ScheduleService_Stub::RapidLeaderSchedule;
if (0 != SendRpcToMds(&request, &response, &stub, fp)) {
std::cout << "RapidLeaderSchedule fail" << std::endl;
return -1;
}
if (response.statuscode() ==
::curve::mds::schedule::kScheduleErrCodeSuccess) {
std::cout << "RapidLeaderSchedule success" << std::endl;
return 0;
}
std::cout << "RapidLeaderSchedule fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
}
int MDSClient::QueryChunkServerRecoverStatus(
const std::vector<ChunkServerIdType>& cs,
std::map<ChunkServerIdType, bool> *statusMap) {
assert(statusMap != nullptr);
::curve::mds::schedule::QueryChunkServerRecoverStatusRequest request;
::curve::mds::schedule::QueryChunkServerRecoverStatusResponse response;
::curve::mds::schedule::ScheduleService_Stub stub(&channel_);
for (auto id : cs) {
request.add_chunkserverid(id);
}
void (curve::mds::schedule::ScheduleService_Stub::*fp)(
google::protobuf::RpcController*,
const ::curve::mds::schedule::QueryChunkServerRecoverStatusRequest*,
::curve::mds::schedule::QueryChunkServerRecoverStatusResponse*,
google::protobuf::Closure*);
fp = &::curve::mds::schedule::ScheduleService_Stub::QueryChunkServerRecoverStatus; // NOLINT
if (0 != SendRpcToMds(&request, &response, &stub, fp)) {
std::cout << "QueryChunkServerRecoverStatus fail" << std::endl;
return -1;
}
if (response.statuscode() ==
::curve::mds::schedule::kScheduleErrCodeSuccess) {
for (auto it = response.recoverstatusmap().begin();
it != response.recoverstatusmap().end(); ++it) {
(*statusMap)[it->first] = it->second;
}
return 0;
}
std::cout << "QueryChunkServerRecoverStatus fail with errCode: "
<< response.statuscode() << std::endl;
return -1;
}
template <typename T, typename Request, typename Response>
int MDSClient::SendRpcToMds(Request* request, Response* response, T* obp,
void (T::*func)(google::protobuf::RpcController*,
const Request*, Response*,
google::protobuf::Closure*)) {
int changeTimeLeft = mdsAddrVec_.size() - 1;
while (changeTimeLeft >= 0) {
brpc::Controller cntl;
cntl.set_timeout_ms(FLAGS_rpcTimeout);
(obp->*func)(&cntl, request, response, nullptr);
if (!cntl.Failed()) {
// 如果成功了,就返回0,对response的判断放到上一层
return 0;
}
bool needRetry = (cntl.ErrorCode() != EHOSTDOWN &&
cntl.ErrorCode() != ETIMEDOUT &&
cntl.ErrorCode() != brpc::ELOGOFF);
uint64_t retryTimes = 0;
while (needRetry && retryTimes < FLAGS_rpcRetryTimes) {
cntl.Reset();
(obp->*func)(&cntl, request, response, nullptr);
if (cntl.Failed()) {
retryTimes++;
continue;
}
return 0;
}
// 对于需要重试的错误,重试次数用完了还没成功就返回错误不切换
// ERPCTIMEDOUT比较特殊,这种情况下,mds可能切换了也可能没切换,所以
// 需要重试并且重试次数用完后切换
// 只有不需要重试的,也就是mds不在线的才会去切换mds
if (needRetry && cntl.ErrorCode() != brpc::ERPCTIMEDOUT) {
std::cout << "Send RPC to mds fail, error content: "
<< cntl.ErrorText() << std::endl;
return -1;
}
changeTimeLeft--;
while (!ChangeMDServer() && changeTimeLeft > 0) {
changeTimeLeft--;
}
}
return -1;
}
template <class T>
void MDSClient::FillUserInfo(T* request) {
uint64_t date = curve::common::TimeUtility::GetTimeofDayUs();
request->set_owner(userName_);
request->set_date(date);
if (!userName_.compare("root") &&
password_.compare("")) {
std::string str2sig = Authenticator::GetString2Signature(date,
userName_);
std::string sig = Authenticator::CalcString2Signature(str2sig,
password_);
request->set_signature(sig);
}
}
} // namespace tool
} // namespace curve
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C/C++
1
https://gitee.com/carters/curve.git
git@gitee.com:carters/curve.git
carters
curve
curve
master

搜索帮助

371d5123 14472233 46e8bd33 14472233