in src/shell/commands/data_operations.cpp [879:1042]
bool check_and_mutate(command_executor *e, shell_context *sc, arguments args)
{
if (args.argc < 2)
return false;
std::string hash_key = sds_to_string(args.argv[1]);
bool check_sort_key_provided = false;
std::string check_sort_key;
::dsn::apps::cas_check_type::type check_type = ::dsn::apps::cas_check_type::CT_NO_CHECK;
std::string check_type_name;
bool check_operand_provided = false;
std::string check_operand;
pegasus::pegasus_client::mutations mutations;
pegasus::pegasus_client::check_and_mutate_options options;
static struct option long_options[] = {{"check_sort_key", required_argument, 0, 'c'},
{"check_type", required_argument, 0, 't'},
{"check_operand", required_argument, 0, 'o'},
{"return_check_value", no_argument, 0, 'r'},
{0, 0, 0, 0}};
escape_sds_argv(args.argc, args.argv);
std::string str;
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "c:t:o:r", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'c':
check_sort_key_provided = true;
check_sort_key = unescape_str(optarg);
break;
case 't':
check_type = type_from_string(::dsn::apps::_cas_check_type_VALUES_TO_NAMES,
std::string("ct_value_") + optarg,
::dsn::apps::cas_check_type::CT_NO_CHECK);
if (check_type == ::dsn::apps::cas_check_type::CT_NO_CHECK) {
fprintf(stderr, "ERROR: invalid check_type param\n");
return false;
}
check_type_name = optarg;
break;
case 'o':
check_operand_provided = true;
check_operand = unescape_str(optarg);
break;
case 'r':
options.return_check_value = true;
break;
default:
return false;
}
}
if (!check_sort_key_provided) {
fprintf(stderr, "ERROR: check_sort_key not provided\n");
return false;
}
if (check_type == ::dsn::apps::cas_check_type::CT_NO_CHECK) {
fprintf(stderr, "ERROR: check_type not provided\n");
return false;
}
if (!check_operand_provided &&
check_type >= ::dsn::apps::cas_check_type::CT_VALUE_MATCH_ANYWHERE) {
fprintf(stderr, "ERROR: check_operand not provided\n");
return false;
}
fprintf(stderr,
"Load mutations, like\n"
" set <sort_key> <value> [ttl]\n"
" del <sort_key>\n"
"Print \"ok\" to finish loading, \"abort\" to abort this command\n");
if (load_mutations(sc, mutations)) {
fprintf(stderr, "INFO: abort check_and_mutate command\n");
return true;
}
if (mutations.is_empty()) {
fprintf(stderr, "ERROR: mutations not provided\n");
return false;
}
fprintf(stderr, "hash_key: \"%s\"\n", pegasus::utils::c_escape_string(hash_key).c_str());
fprintf(stderr,
"check_sort_key: \"%s\"\n",
pegasus::utils::c_escape_string(check_sort_key).c_str());
fprintf(stderr, "check_type: %s\n", check_type_name.c_str());
if (check_type >= ::dsn::apps::cas_check_type::CT_VALUE_MATCH_ANYWHERE) {
fprintf(stderr,
"check_operand: \"%s\"\n",
pegasus::utils::c_escape_string(check_operand).c_str());
}
fprintf(stderr, "return_check_value: %s\n", options.return_check_value ? "true" : "false");
std::vector<pegasus::pegasus_client::mutate> copy_of_mutations;
mutations.get_mutations(copy_of_mutations);
fprintf(stderr, "mutations:\n");
for (int i = 0; i < copy_of_mutations.size(); ++i) {
if (copy_of_mutations[i].operation ==
pegasus::pegasus_client::mutate::mutate_operation::MO_PUT) {
fprintf(stderr,
" mutation[%d].type: SET\n mutation[%d].sort_key: \"%s\"\n "
"mutation[%d].value: "
"\"%s\"\n mutation[%d].expire_seconds: %d\n",
i,
i,
pegasus::utils::c_escape_string(copy_of_mutations[i].sort_key).c_str(),
i,
pegasus::utils::c_escape_string(copy_of_mutations[i].value).c_str(),
i,
copy_of_mutations[i].set_expire_ts_seconds);
} else {
fprintf(stderr,
" mutation[%d].type: DEL\n mutation[%d].sort_key: \"%s\"\n",
i,
i,
pegasus::utils::c_escape_string(copy_of_mutations[i].sort_key).c_str());
}
}
fprintf(stderr, "\n");
pegasus::pegasus_client::check_and_mutate_results results;
pegasus::pegasus_client::internal_info info;
int ret = sc->pg_client->check_and_mutate(hash_key,
check_sort_key,
(pegasus::pegasus_client::cas_check_type)check_type,
check_operand,
mutations,
options,
results,
sc->timeout_ms,
&info);
if (ret != pegasus::PERR_OK) {
fprintf(stderr, "ERROR: %s\n", sc->pg_client->get_error_string(ret));
} else {
if (results.mutate_succeed) {
fprintf(stderr, "Mutate succeed.\n");
} else {
fprintf(stderr, "Mutate failed, because check not passed.\n");
}
if (results.check_value_returned) {
fprintf(stderr, "\n");
if (results.check_value_exist) {
fprintf(
stderr,
"Check value: \"%s\"\n",
pegasus::utils::c_escape_string(results.check_value, sc->escape_all).c_str());
} else {
fprintf(stderr, "Check value not exist.\n");
}
}
}
fprintf(stderr, "\n");
fprintf(stderr, "app_id : %d\n", info.app_id);
fprintf(stderr, "partition_index : %d\n", info.partition_index);
fmt::fprintf(stderr, "decree : %lld\n", info.decree);
fprintf(stderr, "server : %s\n", info.server.c_str());
return true;
}