aios/filesystem/fslib/fslib/fs/zookeeper/ZookeeperFileSystem.cpp (846 lines of code) (raw):

/* * Copyright 2014-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "fslib/fs/zookeeper/ZookeeperFileSystem.h" #include <errno.h> #include "autil/EnvUtil.h" #include "autil/StringUtil.h" #include "fslib/fs/zookeeper/ZookeeperFile.h" #include "fslib/util/SafeBuffer.h" using namespace std; using namespace autil; using namespace fslib::util; FSLIB_PLUGIN_BEGIN_NAMESPACE(zookeeper); AUTIL_DECLARE_AND_SETUP_LOGGER(zookeeper, ZookeeperFileSystem); int ZookeeperFileSystem::RECV_TIMEOUT = 6000; const int ZookeeperFileSystem::ERROR_INIT_HANDLE_FAIL = -200; const string ZookeeperFileSystem::CONFIG_SEPARATOR("?"); const string ZookeeperFileSystem::ZOOKEEPER_PREFIX("zfs://"); const string ZookeeperFileSystem::KV_SEPARATOR("="); const string ZookeeperFileSystem::KV_PAIR_SEPARATOR("&"); const string ZookeeperFileSystem::ZOOFS_RETRY("retry"); #define ZOOKEEPER_CLOSE(zkHandle) \ { \ int ret = zookeeper_close(zkHandle); \ if (ZOK != ret) { \ AUTIL_LOG(WARN, "close zk handle [%p] failed, ret [%d].", zkHandle, ret); \ } \ } ZookeeperFileSystem::ZookeeperFileSystem() { zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); string zk_log_str("zookeeper_log"); string tmp_dir = autil::EnvUtil::getEnv("TEST_TMPDIR"); if (!tmp_dir.empty()) { zk_log_str.insert(0, tmp_dir); } _logFile = fopen(zk_log_str.c_str(), "a"); if (_logFile) { zoo_set_log_stream(_logFile); } else { cerr << "open ./zookeeper_log for logging fail, " << strerror(errno) << endl; } } ZookeeperFileSystem::~ZookeeperFileSystem() { if (_logFile) { fclose(_logFile); } } void ZookeeperFileSystem::reconnect(zhandle_t *&zh, const char *server) { ZOOKEEPER_CLOSE(zh); zh = zookeeper_init(server, NULL, RECV_TIMEOUT, 0, NULL, 0); if (!zh) { AUTIL_LOG(ERROR, "zoo init fail, %s", zerror(errno)); } } int ZookeeperFileSystem::zooWExistWithRetry(zhandle_t *&zh, const char *path, watcher_fn watcher, void *watcherCtx, struct Stat *stat, int8_t retryCnt, const char *server) { int zrc = ZOK; int8_t actualRetryCnt = 0; for (; actualRetryCnt <= retryCnt; actualRetryCnt++) { zrc = zoo_wexists(zh, path, watcher, watcherCtx, stat); switch (zrc) { case ZCONNECTIONLOSS: case ZOPERATIONTIMEOUT: continue; case ZINVALIDSTATE: case ZSESSIONEXPIRED: { reconnect(zh, server); if (zh) { continue; } else { return ERROR_INIT_HANDLE_FAIL; } } default: return zrc; } } return zrc; } int ZookeeperFileSystem::zooExistWithRetry( zhandle_t *&zh, const char *path, int watch, struct Stat *stat, int8_t retryCnt, const char *server) { int zrc = ZOK; int8_t actualRetryCnt = 0; for (; actualRetryCnt <= retryCnt; actualRetryCnt++) { zrc = zoo_exists(zh, path, 0, stat); switch (zrc) { case ZCONNECTIONLOSS: case ZOPERATIONTIMEOUT: continue; case ZINVALIDSTATE: case ZSESSIONEXPIRED: { reconnect(zh, server); if (zh) { continue; } else { return ERROR_INIT_HANDLE_FAIL; } } default: return zrc; } } return zrc; } int ZookeeperFileSystem::zooCreateWithRetry(zhandle_t *&zh, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, char *path_buffer, int path_buffer_len, int8_t retryCnt, const char *server) { int zrc = ZOK; int8_t actualRetryCnt = 0; for (; actualRetryCnt <= retryCnt; actualRetryCnt++) { zrc = zoo_create(zh, path, value, valuelen, acl, flags, path_buffer, path_buffer_len); switch (zrc) { case ZCONNECTIONLOSS: case ZOPERATIONTIMEOUT: continue; case ZINVALIDSTATE: case ZSESSIONEXPIRED: { reconnect(zh, server); if (zh) { continue; } else { return ERROR_INIT_HANDLE_FAIL; } } default: return zrc; } } return zrc; } int ZookeeperFileSystem::zooGetChildrenWithRetry( zhandle_t *&zh, const char *path, int watch, struct String_vector *strings, int8_t retryCnt, const char *server) { int zrc = ZOK; int8_t actualRetryCnt = 0; for (; actualRetryCnt <= retryCnt; actualRetryCnt++) { zrc = zoo_get_children(zh, path, watch, strings); switch (zrc) { case ZCONNECTIONLOSS: case ZOPERATIONTIMEOUT: continue; case ZINVALIDSTATE: case ZSESSIONEXPIRED: { reconnect(zh, server); if (zh) { continue; } else { return ERROR_INIT_HANDLE_FAIL; } } case ZOK: { if (checkReturnResults(strings)) { return zrc; } else { zrc = ZCONNECTIONLOSS; continue; } } default: { return zrc; } } } return zrc; } bool ZookeeperFileSystem::checkReturnResults(struct String_vector *result) { for (int32_t i = 0; i < result->count; i++) { if (result->data[i] == NULL) { return false; } } return true; } int ZookeeperFileSystem::zooGetWithRetry(zhandle_t *&zh, const char *path, int watch, char *buffer, int *buffer_len, struct Stat *stat, int8_t retryCnt, const char *server) { int zrc = ZOK; int8_t actualRetryCnt = 0; for (; actualRetryCnt <= retryCnt; actualRetryCnt++) { zrc = zoo_get(zh, path, watch, buffer, buffer_len, stat); switch (zrc) { case ZCONNECTIONLOSS: case ZOPERATIONTIMEOUT: continue; case ZINVALIDSTATE: case ZSESSIONEXPIRED: { reconnect(zh, server); if (zh) { continue; } else { return ERROR_INIT_HANDLE_FAIL; } } default: return zrc; } } return zrc; } int ZookeeperFileSystem::zooSetWithRetry(zhandle_t *&zh, const char *path, const char *buffer, int buflen, int version, int8_t retryCnt, const char *server) { int zrc = ZOK; int8_t actualRetryCnt = 0; for (; actualRetryCnt <= retryCnt; actualRetryCnt++) { zrc = zoo_set(zh, path, buffer, buflen, version); switch (zrc) { case ZCONNECTIONLOSS: case ZOPERATIONTIMEOUT: continue; case ZINVALIDSTATE: case ZSESSIONEXPIRED: { reconnect(zh, server); if (zh) { continue; } else { return ERROR_INIT_HANDLE_FAIL; } } default: return zrc; } } return zrc; } int ZookeeperFileSystem::zooDeleteWithRetry( zhandle_t *&zh, const char *path, int version, int8_t retryCnt, const char *server) { int zrc = ZOK; int8_t actualRetryCnt = 0; for (; actualRetryCnt <= retryCnt; actualRetryCnt++) { zrc = zoo_delete(zh, path, version); switch (zrc) { case ZCONNECTIONLOSS: case ZOPERATIONTIMEOUT: continue; case ZINVALIDSTATE: case ZSESSIONEXPIRED: { reconnect(zh, server); if (zh) { continue; } else { return ERROR_INIT_HANDLE_FAIL; } } default: return zrc; } } return zrc; } File *ZookeeperFileSystem::openFile(const std::string &fileName, Flag flag) { string server; string path; zhandle_t *zh = NULL; ZookeeperFsConfig fsConfig; ErrorCode ec = internalGetZhandleAndPath(fileName, fsConfig, zh, server, path); if (ec != EC_OK) { return new ZookeeperFile(NULL, server, path, flag, 0, fsConfig._retry, ec); } struct Stat stat; stat.dataLength = 0; int zrc = zooExistWithRetry(zh, path.c_str(), 0, &stat, fsConfig._retry, server.c_str()); flag = (Flag)(flag & FLAG_CMD_MASK); switch (flag) { case READ: break; case WRITE: if (zrc == ZNONODE) { ec = internalCreateParentNode(zh, path, fsConfig._retry, server); if (ec != EC_OK) { ZOOKEEPER_CLOSE(zh); return new ZookeeperFile(NULL, server, path, flag, 0, fsConfig._retry, ec); } zrc = zooCreateWithRetry( zh, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0, fsConfig._retry, server.c_str()); } break; default: AUTIL_LOG(ERROR, "openFile %s fail, flag %d not support", fileName.c_str(), flag); ZOOKEEPER_CLOSE(zh); return new ZookeeperFile(NULL, server, path, flag, 0, fsConfig._retry, EC_NOTSUP); } if (zrc != ZOK && zrc != ZNODEEXISTS) { AUTIL_LOG(ERROR, "open file %s fail! %s", fileName.c_str(), zerror(zrc)); ZOOKEEPER_CLOSE(zh); return new ZookeeperFile(NULL, server, path, flag, 0, fsConfig._retry, convertErrno(zrc)); } return new ZookeeperFile(zh, server, path, flag, stat.dataLength, fsConfig._retry); } MMapFile *ZookeeperFileSystem::mmapFile( const std::string &fileName, Flag openMode, char *start, size_t length, int prot, int mapFlag, off_t offset) { return new MMapFile(fileName, -1, NULL, -1, -1, EC_NOTSUP); } ErrorCode ZookeeperFileSystem::rename(const std::string &oldPath, const std::string &newPath) { string oldServer; string oldDstPath; zhandle_t *oldZh = NULL; ZookeeperFsConfig oldFsConfig; ErrorCode ec = internalGetZhandleAndPath(oldPath, oldFsConfig, oldZh, oldServer, oldDstPath); if (ec != EC_OK) { return ec; } int8_t oldRetryCnt = oldFsConfig._retry; int zrc = zooExistWithRetry(oldZh, oldDstPath.c_str(), 0, NULL, oldRetryCnt, oldServer.c_str()); if (zrc != ZOK) { AUTIL_LOG(ERROR, "rename node %s to node %s fail! %s", oldPath.c_str(), newPath.c_str(), zerror(zrc)); ZOOKEEPER_CLOSE(oldZh); return convertErrno(zrc); } string newServer; string newDstPath; zhandle_t *newZh = NULL; ZookeeperFsConfig newFsConfig; ec = internalGetZhandleAndPath(newPath, newFsConfig, newZh, newServer, newDstPath); int8_t newRetryCnt = newFsConfig._retry; if (ec != EC_OK) { return ec; } if (oldServer == newServer && oldDstPath == newDstPath) { return EC_OK; } zrc = zooExistWithRetry(newZh, newDstPath.c_str(), 0, NULL, newRetryCnt, newServer.c_str()); if (zrc == ZOK) { AUTIL_LOG(ERROR, "rename %s to %s fail, newfile already exist", oldPath.c_str(), newPath.c_str()); ZOOKEEPER_CLOSE(oldZh); ZOOKEEPER_CLOSE(newZh); return EC_EXIST; } ErrorCode ret = internalCreateParentNode(newZh, newDstPath, newRetryCnt, newServer); if (ret != EC_OK) { ZOOKEEPER_CLOSE(oldZh); ZOOKEEPER_CLOSE(newZh); return ret; } ret = internalCopyNode(oldZh, oldDstPath, oldRetryCnt, oldServer, newZh, newDstPath, newRetryCnt, newServer); if (ret != EC_OK) { ZOOKEEPER_CLOSE(oldZh); ZOOKEEPER_CLOSE(newZh); return ret; } ret = internalRemoveNode(oldZh, oldDstPath, oldRetryCnt, oldServer); if (ret == EC_NOENT) { ret = EC_OK; } ZOOKEEPER_CLOSE(oldZh); ZOOKEEPER_CLOSE(newZh); return ret; } ErrorCode ZookeeperFileSystem::link(const std::string &oldPath, const std::string &newPath) { return EC_NOTSUP; } ErrorCode ZookeeperFileSystem::getFileMeta(const std::string &fileName, FileMeta &fileMeta) { string server; string path; zhandle_t *zh = NULL; ZookeeperFsConfig fsConfig; ErrorCode ret = internalGetZhandleAndPath(fileName, fsConfig, zh, server, path); if (ret != EC_OK) { return ret; } struct Stat stat; int zrc = zooExistWithRetry(zh, path.c_str(), 0, &stat, fsConfig._retry, server.c_str()); ZOOKEEPER_CLOSE(zh); if (zrc != ZOK) { AUTIL_LOG(ERROR, "get file %s meta fail, %s", fileName.c_str(), zerror(zrc)); return convertErrno(zrc); } fileMeta.fileLength = stat.dataLength; fileMeta.createTime = stat.ctime; fileMeta.lastModifyTime = stat.mtime; return EC_OK; } ErrorCode ZookeeperFileSystem::isFile(const std::string &path) { ErrorCode ret = isExist(path); if (ret == EC_TRUE) { return EC_TRUE; } else if (ret == EC_FALSE) { return EC_NOENT; } else { return ret; } return EC_OK; } FileChecksum ZookeeperFileSystem::getFileChecksum(const std::string &fileName) { return 0; } ErrorCode ZookeeperFileSystem::mkDir(const std::string &dirName, bool recursive) { string server; string path; zhandle_t *zh = NULL; ZookeeperFsConfig fsConfig; ErrorCode ret = internalGetZhandleAndPath(dirName, fsConfig, zh, server, path); if (ret != EC_OK) { return ret; } size_t size = path.size(); if (path[size - 1] == '/') { if (size == 1) { AUTIL_LOG(ERROR, "create node %s fail! dir / already exist", dirName.c_str()); ZOOKEEPER_CLOSE(zh); return EC_EXIST; } else { path.resize(size - 1); } } if (recursive) { ret = internalCreateParentNode(zh, path, fsConfig._retry, server); if (ret != EC_OK) { ZOOKEEPER_CLOSE(zh); return ret; } } int zrc = zooCreateWithRetry( zh, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0, fsConfig._retry, server.c_str()); if (zrc != ZOK) { AUTIL_LOG(ERROR, "create node %s fail! %s", dirName.c_str(), zerror(zrc)); ZOOKEEPER_CLOSE(zh); return convertErrno(zrc); } ZOOKEEPER_CLOSE(zh); return EC_OK; } ErrorCode ZookeeperFileSystem::listDir(const std::string &dirName, FileList &fileList) { string server; string path; zhandle_t *zh = NULL; ZookeeperFsConfig fsConfig; ErrorCode ret = internalGetZhandleAndPath(dirName, fsConfig, zh, server, path); if (ret != EC_OK) { return ret; } struct String_vector strings; int zrc = zooGetChildrenWithRetry(zh, path.c_str(), 0, &strings, fsConfig._retry, server.c_str()); ZOOKEEPER_CLOSE(zh); if (zrc == EC_OK) { for (int i = 0; i < strings.count; i++) { fileList.push_back(strings.data[i]); } freeStringVector(&strings); return EC_OK; } AUTIL_LOG(ERROR, "list dir of %s fail, %s", dirName.c_str(), zerror(zrc)); return convertErrno(zrc); } ErrorCode ZookeeperFileSystem::listDir(const std::string &dirName, EntryList &entryList) { FileList fileList; auto ec = listDir(dirName, fileList); std::for_each(fileList.begin(), fileList.end(), [&entryList](const string &file) { entryList.push_back({true, file}); }); return ec; } ErrorCode ZookeeperFileSystem::isDirectory(const std::string &path) { ErrorCode ret = isExist(path); if (ret == EC_TRUE) { return EC_TRUE; } else if (ret == EC_FALSE) { return EC_NOENT; } else { return ret; } return EC_OK; } ErrorCode ZookeeperFileSystem::isExist(const std::string &pathName) { zhandle_t *zh = NULL; string server; string path; ZookeeperFsConfig fsConfig; ErrorCode ret = internalGetZhandleAndPath(pathName, fsConfig, zh, server, path); if (ret != EC_OK) { return ret; } int zrc = zooExistWithRetry(zh, path.c_str(), 0, NULL, fsConfig._retry, server.c_str()); ZOOKEEPER_CLOSE(zh); if (zrc == ZOK) { return EC_TRUE; } else if (zrc == ZNONODE) { return EC_FALSE; } AUTIL_LOG(ERROR, "zoo exists fail, %s", zerror(zrc)); return convertErrno(zrc); } ErrorCode ZookeeperFileSystem::convertErrno(int errNum) { switch (errNum) { case ZOK: return EC_OK; case ZCONNECTIONLOSS: return EC_CONNECTIONLOSS; case ZNONODE: return EC_NOENT; case ZNODEEXISTS: return EC_EXIST; case ZBADARGUMENTS: return EC_BADARGS; case ZOPERATIONTIMEOUT: return EC_OPERATIONTIMEOUT; case ZINVALIDSTATE: return EC_INVALIDSTATE; case 2: return EC_BADARGS; case ERROR_INIT_HANDLE_FAIL: return EC_INIT_ZOOKEEPER_FAIL; case ZSESSIONEXPIRED: return EC_SESSIONEXPIRED; default: AUTIL_LOG(ERROR, "unknown errNum: %d", errNum); return EC_UNKNOWN; } } ErrorCode ZookeeperFileSystem::internalCopyNode(zhandle_t *oldZh, const string &oldNode, int8_t oldRetryCnt, const string &oldServer, zhandle_t *newZh, const string &newNode, int8_t newRetryCnt, const string &newServer) { string oldNodeStr(oldNode); string newNodeStr(newNode); struct Stat stat; int zrc = zooExistWithRetry(oldZh, oldNode.c_str(), 0, &stat, oldRetryCnt, oldServer.c_str()); if (zrc != ZOK) { AUTIL_LOG(ERROR, "internal rename node %s to node %s fail! %s", oldNode.c_str(), newNode.c_str(), zerror(zrc)); return convertErrno(zrc); } int bufferLen = stat.dataLength; SafeBuffer contentBuf(bufferLen); zrc = zooGetWithRetry( oldZh, oldNode.c_str(), 0, contentBuf.getBuffer(), &bufferLen, NULL, oldRetryCnt, oldServer.c_str()); if (zrc != ZOK) { AUTIL_LOG(ERROR, "internal rename node %s to node %s fail! %s", oldNode.c_str(), newNode.c_str(), zerror(zrc)); return convertErrno(zrc); } zrc = zooExistWithRetry(newZh, newNode.c_str(), 0, NULL, newRetryCnt, newServer.c_str()); if (zrc == ZOK) { struct String_vector strings; zrc = zooGetChildrenWithRetry(newZh, newNode.c_str(), 0, &strings, newRetryCnt, newServer.c_str()); if (zrc != ZOK) { AUTIL_LOG( ERROR, "internal rename node %s to node %s fail! %s", oldNode.c_str(), newNode.c_str(), zerror(zrc)); return convertErrno(zrc); } if (strings.count != 0) { AUTIL_LOG(ERROR, "internal rename node %s to node %s fail! which has " "child nodes", oldNode.c_str(), newNode.c_str()); freeStringVector(&strings); return EC_NOTEMPTY; } freeStringVector(&strings); zrc = zooSetWithRetry( newZh, newNode.c_str(), contentBuf.getBuffer(), bufferLen, -1, newRetryCnt, newServer.c_str()); if (zrc != ZOK) { AUTIL_LOG( ERROR, "internal rename node %s to node %s fail! %s", oldNode.c_str(), newNode.c_str(), zerror(zrc)); return convertErrno(zrc); } } else if (zrc == ZNONODE) { zrc = zooCreateWithRetry(newZh, newNode.c_str(), contentBuf.getBuffer(), bufferLen, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0, newRetryCnt, newServer.c_str()); if (zrc != ZOK && zrc != ZNODEEXISTS) { AUTIL_LOG( ERROR, "internal rename node %s to node %s fail! %s", oldNode.c_str(), newNode.c_str(), zerror(zrc)); return convertErrno(zrc); } } else { AUTIL_LOG(ERROR, "internal rename node %s to node %s fail! %s", oldNode.c_str(), newNode.c_str(), zerror(zrc)); return convertErrno(zrc); } struct String_vector strings; zrc = zooGetChildrenWithRetry(oldZh, oldNode.c_str(), 0, &strings, oldRetryCnt, oldServer.c_str()); if (zrc != ZOK) { AUTIL_LOG(ERROR, "internal rename node %s to node %s fail! %s", oldNode.c_str(), newNode.c_str(), zerror(zrc)); return convertErrno(zrc); } for (int i = 0; i < strings.count; i++) { string oldName = oldNode + "/" + string(strings.data[i]); string newName = newNode + "/" + string(strings.data[i]); ErrorCode ret = internalCopyNode( oldZh, oldName, oldRetryCnt, oldServer.c_str(), newZh, newName, newRetryCnt, newServer.c_str()); if (ret != EC_OK) { freeStringVector(&strings); return ret; } } freeStringVector(&strings); return EC_OK; } void ZookeeperFileSystem::freeStringVector(struct String_vector *v) { if (v->data) { for (int32_t i = 0; i < v->count; i++) { free(v->data[i]); } free(v->data); v->data = NULL; } } ErrorCode ZookeeperFileSystem::remove(const std::string &pathName) { string server; string path; zhandle_t *zh = NULL; ZookeeperFsConfig fsConfig; ErrorCode ret = internalGetZhandleAndPath(pathName, fsConfig, zh, server, path); if (ret != EC_OK) { return ret; } struct String_vector strings; int zrc = zooGetChildrenWithRetry(zh, path.c_str(), 0, &strings, fsConfig._retry, server.c_str()); if (zrc != ZOK) { AUTIL_LOG(ERROR, "get children of node %s fail! %s", path.c_str(), zerror(zrc)); ZOOKEEPER_CLOSE(zh); return convertErrno(zrc); } for (int i = 0; i < strings.count; i++) { string childName = path + "/" + string(strings.data[i]); ret = internalRemoveNode(zh, childName, fsConfig._retry, server.c_str()); if (ret != EC_OK) { freeStringVector(&strings); ZOOKEEPER_CLOSE(zh); return ret; } } freeStringVector(&strings); zrc = zooDeleteWithRetry(zh, path.c_str(), -1, fsConfig._retry, server.c_str()); if (zrc != ZOK) { AUTIL_LOG(ERROR, "remove node %s fail! %s", path.c_str(), zerror(zrc)); ZOOKEEPER_CLOSE(zh); return convertErrno(zrc); } ZOOKEEPER_CLOSE(zh); return EC_OK; } ErrorCode ZookeeperFileSystem::internalRemoveNode(zhandle_t *zh, const string &node, int8_t retryCnt, const string &server) { struct String_vector strings; int zrc = zooGetChildrenWithRetry(zh, node.c_str(), 0, &strings, retryCnt, server.c_str()); if (zrc == ZNONODE) { return EC_OK; } else if (zrc != ZOK) { AUTIL_LOG(ERROR, "internal remove node %s fail! %s", node.c_str(), zerror(zrc)); return convertErrno(zrc); } for (int i = 0; i < strings.count; i++) { string childName = node + "/" + string(strings.data[i]); ErrorCode ret = internalRemoveNode(zh, childName, retryCnt, server.c_str()); if (ret != EC_OK) { freeStringVector(&strings); return ret; } } freeStringVector(&strings); zrc = zooDeleteWithRetry(zh, node.c_str(), -1, retryCnt, server.c_str()); if (zrc != ZOK && zrc != ZNONODE) { AUTIL_LOG(ERROR, "internal remove node %s fail! %s", node.c_str(), zerror(zrc)); return convertErrno(zrc); } return EC_OK; } ErrorCode ZookeeperFileSystem::internalCreateParentNode(zhandle_t *zh, const string &node, int8_t retryCnt, const string &server) { size_t pos = node.rfind("/"); if (pos == 0) { return EC_OK; } string parentNode = node.substr(0, pos); int zrc = zooExistWithRetry(zh, parentNode.c_str(), 0, NULL, retryCnt, server.c_str()); if (zrc == ZNONODE) { if (pos == 0) { return EC_OK; } else { ErrorCode ret = internalCreateParentNode(zh, parentNode, retryCnt, server); if (ret != EC_OK) { return ret; } zrc = zooCreateWithRetry( zh, parentNode.c_str(), NULL, 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0, retryCnt, server.c_str()); if (zrc != ZOK && zrc != ZNODEEXISTS) { AUTIL_LOG(ERROR, "internal create node %s fail! %s", parentNode.c_str(), zerror(zrc)); return convertErrno(zrc); } } } else if (zrc != ZOK && zrc != ZNODEEXISTS) { AUTIL_LOG(ERROR, "internal create node %s fail! %s", parentNode.c_str(), zerror(zrc)); return convertErrno(zrc); } return EC_OK; } ErrorCode ZookeeperFileSystem::internalGetZhandleAndPath( const string &pathName, ZookeeperFsConfig &config, zhandle_t *&zh, string &server, string &path) { size_t prefixPos = pathName.find(ZookeeperFileSystem::ZOOKEEPER_PREFIX, 0); if (prefixPos == string::npos) { AUTIL_LOG(ERROR, "parse zookeeper config fail for path %s which " "is an invalid zfs path", pathName.c_str()); return EC_NOTSUP; } string NonConfigPath; if (!parsePath(pathName, NonConfigPath, config)) { return EC_PARSEFAIL; } if (!parsePath(NonConfigPath, server, path)) { return EC_PARSEFAIL; } zh = zookeeper_init(server.c_str(), NULL, RECV_TIMEOUT, 0, NULL, 0); if (!zh) { AUTIL_LOG(ERROR, "zoo init fail, %s", zerror(errno)); return EC_BADARGS; } return EC_OK; } bool ZookeeperFileSystem::parsePath(const string &pathName, string &server, string &dstPath) { size_t pos = pathName.find("/"); if (pos == string::npos) { AUTIL_LOG(ERROR, "cannot find / between servername and pathname for %s", pathName.c_str()); return false; } server = pathName.substr(0, pos); dstPath = pathName.substr(pos); return true; } bool ZookeeperFileSystem::parsePath(const string &fullPath, string &filePath, ZookeeperFsConfig &config) { filePath.reserve(fullPath.size()); filePath.resize(0); size_t prefixLen = ZookeeperFileSystem::ZOOKEEPER_PREFIX.size(); size_t beginPosForFileName = fullPath.find('/', prefixLen); if (beginPosForFileName == string::npos) { AUTIL_LOG(ERROR, "parse zookeeper config fail for path %s which " "is an invalid path", fullPath.c_str()); return false; } string configUri; configUri.reserve(beginPosForFileName); configUri.append(fullPath.c_str(), beginPosForFileName); size_t beginPosForConfig = configUri.find(ZookeeperFileSystem::CONFIG_SEPARATOR.c_str(), prefixLen); configUri.resize(0); if (beginPosForConfig != string::npos) { configUri.append(fullPath.c_str() + beginPosForConfig + 1, beginPosForFileName - beginPosForConfig - 1); filePath.append(fullPath.c_str() + prefixLen, beginPosForConfig - prefixLen); } else { filePath.append(fullPath.c_str() + prefixLen, beginPosForFileName - prefixLen); } if (!eliminateSeqSlash( fullPath.c_str() + beginPosForFileName, fullPath.size() - beginPosForFileName, filePath, false)) { AUTIL_LOG(ERROR, "parse zookeeper config fail for path %s which " "is an invalid path", fullPath.c_str()); return false; } // parse config config._retry = 3; return parseConfig(configUri, config); } bool ZookeeperFileSystem::eliminateSeqSlash(const char *src, size_t len, string &dst, bool keepLastSlash) { int slashNum = 0; bool flag = false; for (size_t i = 0; i < len; i++) { if (src[i] == '/') { if (flag == false) { flag = true; dst.append(1, src[i]); slashNum++; } else { continue; } } else { flag = false; dst.append(1, src[i]); } } if (*dst.rbegin() == '/' && slashNum > 1 && !keepLastSlash) { dst.resize(dst.size() - 1); } return slashNum > 0; } bool ZookeeperFileSystem::parseConfig(const string &configUri, ZookeeperFsConfig &config) { vector<string> confVec = StringUtil::split(configUri, KV_PAIR_SEPARATOR); for (uint32_t i = 0; i < confVec.size(); i++) { vector<string> subConfVec = StringUtil::split(confVec[i], KV_SEPARATOR); if (subConfVec.size() != 2) { AUTIL_LOG(ERROR, "parse ZooFsConfig string <%s> fail! " "parse key value pair error", configUri.c_str()); return false; } switch (subConfVec[0][0]) { case 'r': { if (strcmp(subConfVec[0].c_str(), ZOOFS_RETRY.c_str()) == 0) { if (!StringUtil::strToInt8(subConfVec[1].c_str(), config._retry)) { AUTIL_LOG(ERROR, "parse ZooFsConfig string <%s> fail " "while parsing retry times, convert string to " "int8_t error", configUri.c_str()); return false; } else { continue; } } break; } default: break; } AUTIL_LOG(ERROR, "parse ZooFsConfig string <%s> fail! " "not support config: %s", configUri.c_str(), subConfVec[0].c_str()); return false; } return true; } FileLock *ZookeeperFileSystem::createFileLock(const std::string &fileName) { ZookeeperFileLockCreator creator; return creator.createFileLock(fileName); } FSLIB_PLUGIN_END_NAMESPACE(zookeeper);