ErrorCode HdfsFileSystem::rename()

in aios/filesystem/fslib/fslib/fs/hdfs/HdfsFileSystem.cpp [350:494]


ErrorCode HdfsFileSystem::rename(const string &oldPath, const string &newPath) {
    string oldServer;
    string oldDstPath;
    string oldPathWithoutConfig;
    HdfsFsConfig oldFsConfig;
    string newServer;
    string newDstPath;
    string newPathWithoutConfig;
    HdfsFsConfig newFsConfig;

    if (!parsePath(oldPath, oldPathWithoutConfig, oldFsConfig)) {
        return EC_PARSEFAIL;
    }
    ErrorCode ec = splitServerAndPath(oldPathWithoutConfig, oldFsConfig._prefix, oldServer, oldDstPath);
    if (ec != EC_OK) {
        return ec;
    }

    if (!parsePath(newPath, newPathWithoutConfig, newFsConfig)) {
        return EC_PARSEFAIL;
    }
    ec = splitServerAndPath(newPathWithoutConfig, newFsConfig._prefix, newServer, newDstPath);
    if (ec != EC_OK) {
        return ec;
    }

    HdfsConnectionPtr oldConnection;
    HdfsConnectionPtr newConnection;
    string oldUser;
    string newUser;
    bool isSameHdfs = true;
    getConnectUser(oldFsConfig, oldUser);
    getConnectUser(newFsConfig, newUser);
    if (oldServer == newServer && oldUser == newUser) {
        ec = connectServer(oldServer, oldFsConfig, oldConnection);
        if (ec != EC_OK) {
            return ec;
        }

        newConnection = oldConnection;
    } else {
        isSameHdfs = false;
        ec = connectServer(oldServer, oldFsConfig, oldConnection);
        if (ec != EC_OK) {
            return ec;
        }

        ec = connectServer(newServer, newFsConfig, newConnection);
        if (ec != EC_OK) {
            return ec;
        }
    }

    hdfsFileInfo *fileInfo = hdfsGetPathInfo(oldConnection->getHdfsFs(), oldDstPath.c_str());
    if (!fileInfo) {
        AUTIL_LOG(ERROR,
                  "hdfs rename path <%s> to path <%s> fail, fail to get"
                  "src path info, %s",
                  oldPath.c_str(),
                  newPath.c_str(),
                  strerror(errno));
        if (errno == EINTERNAL) {
            oldConnection->setError(true);
        }
        return convertErrno(errno);
    }

    bool isFile = (fileInfo->mKind == kObjectKindFile);
    hdfsFreeFileInfo(fileInfo, 1);

    if (hdfsExists(newConnection->getHdfsFs(), newDstPath.c_str()) == 0) {
        AUTIL_LOG(ERROR,
                  "hdfs rename path <%s> to path <%s> fail, src path"
                  "already exist",
                  oldPath.c_str(),
                  newPath.c_str());
        return EC_EXIST;
    }
    if (errno == EINTERNAL) {
        newConnection->setError(true);
    }

    if (newDstPath[newDstPath.size() - 1] == '/' && isFile == true) {
        AUTIL_LOG(ERROR,
                  "hdfs rename path <%s> to path <%s> fail, src path is"
                  "not a directory",
                  oldPath.c_str(),
                  newPath.c_str());
        return EC_NOTDIR;
    }

    size_t pos = newDstPath.rfind('/');
    if (pos != string::npos) {
        if (pos == newDstPath.size() - 1) {
            if (pos == 0) {
                AUTIL_LOG(ERROR,
                          "hdfs rename path <%s> to path <%s> fail,"
                          "dst path should not be /",
                          oldPath.c_str(),
                          newPath.c_str());
                return EC_NOTSUP;
            }
            pos = newDstPath.rfind('/', pos - 1);
        }

        if (pos != 0) {
            string parentDir = newDstPath.substr(0, pos);
            if (hdfsExists(newConnection->getHdfsFs(), parentDir.c_str()) != 0 &&
                hdfsCreateDirectory(newConnection->getHdfsFs(), parentDir.c_str()) < 0) {
                AUTIL_LOG(ERROR,
                          "hdfs rename path <%s> to path <%s> fail, create"
                          " parent dir of src path fail, %s.",
                          oldPath.c_str(),
                          newPath.c_str(),
                          strerror(errno));
                return convertErrno(errno);
            }
        }
    } else {
        AUTIL_LOG(ERROR, "hdfs rename path <%s> to path <%s> fail, invalid src path", oldPath.c_str(), newPath.c_str());
        return EC_NOENT;
    }

    if (isSameHdfs) {
        if (hdfsRename(oldConnection->getHdfsFs(), oldDstPath.c_str(), newDstPath.c_str()) < 0) {
            AUTIL_LOG(ERROR,
                      "hdfs rename path <%s> to path <%s> fail, %s",
                      oldPath.c_str(),
                      newPath.c_str(),
                      strerror(errno));
            ec = convertErrno(errno);
        }

        return ec;
    } else {
        if (hdfsMove(oldConnection->getHdfsFs(), oldDstPath.c_str(), newConnection->getHdfsFs(), newDstPath.c_str()) <
            0) {
            AUTIL_LOG(
                ERROR, "hdfsMove path <%s> to path <%s> fail, %s", oldPath.c_str(), newPath.c_str(), strerror(errno));
            ec = convertErrno(errno);
        }

        return ec;
    }
}