void printStatus()

in fdbcli/StatusCommand.actor.cpp [203:1178]


void printStatus(StatusObjectReader statusObj,
                 StatusClient::StatusLevel level,
                 bool displayDatabaseAvailable,
                 bool hideErrorMessages) {
	if (FlowTransport::transport().incompatibleOutgoingConnectionsPresent()) {
		fprintf(
		    stderr,
		    "WARNING: One or more of the processes in the cluster is incompatible with this version of fdbcli.\n\n");
	}

	try {
		bool printedCoordinators = false;

		// status or status details
		if (level == StatusClient::NORMAL || level == StatusClient::DETAILED) {

			StatusObjectReader statusObjClient;
			statusObj.get("client", statusObjClient);

			// The way the output string is assembled is to add new line character before addition to the string rather
			// than after
			std::string outputString = "";
			std::string clusterFilePath;
			if (statusObjClient.get("cluster_file.path", clusterFilePath))
				outputString = format("Using cluster file `%s'.\n", clusterFilePath.c_str());
			else
				outputString = "Using unknown cluster file.\n";

			StatusObjectReader statusObjCoordinators;
			StatusArray coordinatorsArr;

			if (statusObjClient.get("coordinators", statusObjCoordinators)) {
				// Look for a second "coordinators", under the first one.
				if (statusObjCoordinators.has("coordinators"))
					coordinatorsArr = statusObjCoordinators.last().get_array();
			}

			// Check if any coordination servers are unreachable
			bool quorum_reachable;
			if (statusObjCoordinators.get("quorum_reachable", quorum_reachable) && !quorum_reachable) {
				outputString += "\nCould not communicate with a quorum of coordination servers:";
				outputString += getCoordinatorsInfoString(statusObj);

				printf("%s\n", outputString.c_str());
				return;
			} else {
				for (StatusObjectReader coor : coordinatorsArr) {
					bool reachable;
					if (coor.get("reachable", reachable) && !reachable) {
						outputString += "\nCould not communicate with all of the coordination servers."
						                "\n  The database will remain operational as long as we"
						                "\n  can connect to a quorum of servers, however the fault"
						                "\n  tolerance of the system is reduced as long as the"
						                "\n  servers remain disconnected.\n";
						outputString += getCoordinatorsInfoString(statusObj);
						outputString += "\n";
						printedCoordinators = true;
						break;
					}
				}
			}

			// print any client messages
			if (statusObjClient.has("messages")) {
				for (StatusObjectReader message : statusObjClient.last().get_array()) {
					std::string desc;
					if (message.get("description", desc))
						outputString += "\n" + lineWrap(desc.c_str(), 80);
				}
			}

			bool fatalRecoveryState = false;
			StatusObjectReader statusObjCluster;
			try {
				if (statusObj.get("cluster", statusObjCluster)) {

					StatusObjectReader recoveryState;
					if (statusObjCluster.get("recovery_state", recoveryState)) {
						std::string name;
						std::string description;
						if (recoveryState.get("name", name) && recoveryState.get("description", description) &&
						    name != "accepting_commits" && name != "all_logs_recruited" &&
						    name != "storage_recovered" && name != "fully_recovered") {
							fatalRecoveryState = true;

							if (name == "recruiting_transaction_servers") {
								description +=
								    format("\nNeed at least %d log servers across unique zones, %d commit proxies, "
								           "%d GRV proxies and %d resolvers.",
								           recoveryState["required_logs"].get_int(),
								           recoveryState["required_commit_proxies"].get_int(),
								           recoveryState["required_grv_proxies"].get_int(),
								           recoveryState["required_resolvers"].get_int());
								if (statusObjCluster.has("machines") && statusObjCluster.has("processes")) {
									auto numOfNonExcludedProcessesAndZones =
									    getNumOfNonExcludedProcessAndZones(statusObjCluster);
									description +=
									    format("\nHave %d non-excluded processes on %d machines across %d zones.",
									           numOfNonExcludedProcessesAndZones.first,
									           getNumofNonExcludedMachines(statusObjCluster),
									           numOfNonExcludedProcessesAndZones.second);
								}
							} else if (name == "locking_old_transaction_servers" &&
							           recoveryState["missing_logs"].get_str().size()) {
								description += format("\nNeed one or more of the following log servers: %s",
								                      recoveryState["missing_logs"].get_str().c_str());
							}
							description = lineWrap(description.c_str(), 80);
							if (!printedCoordinators &&
							    (name == "reading_coordinated_state" || name == "locking_coordinated_state" ||
							     name == "configuration_never_created" || name == "writing_coordinated_state")) {
								description += getCoordinatorsInfoString(statusObj);
								description += "\n";
								printedCoordinators = true;
							}

							outputString += "\n" + description;
						}
					}
				}
			} catch (std::runtime_error&) {
			}

			// Check if cluster controllable is reachable
			try {
				// print any cluster messages
				if (statusObjCluster.has("messages") && statusObjCluster.last().get_array().size()) {

					// any messages we don't want to display
					std::set<std::string> skipMsgs = { "unreachable_process", "" };
					if (fatalRecoveryState) {
						skipMsgs.insert("status_incomplete");
						skipMsgs.insert("unreadable_configuration");
						skipMsgs.insert("immediate_priority_transaction_start_probe_timeout");
						skipMsgs.insert("batch_priority_transaction_start_probe_timeout");
						skipMsgs.insert("transaction_start_probe_timeout");
						skipMsgs.insert("read_probe_timeout");
						skipMsgs.insert("commit_probe_timeout");
					}

					for (StatusObjectReader msgObj : statusObjCluster.last().get_array()) {
						std::string messageName;
						if (!msgObj.get("name", messageName)) {
							continue;
						}
						if (skipMsgs.count(messageName)) {
							continue;
						} else if (messageName == "client_issues") {
							if (msgObj.has("issues")) {
								for (StatusObjectReader issue : msgObj["issues"].get_array()) {
									std::string issueName;
									if (!issue.get("name", issueName)) {
										continue;
									}

									std::string description;
									if (!issue.get("description", description)) {
										description = issueName;
									}

									std::string countStr;
									StatusArray addresses;
									if (!issue.has("addresses")) {
										countStr = "Some client(s)";
									} else {
										addresses = issue["addresses"].get_array();
										countStr = format("%d client(s)", addresses.size());
									}
									outputString +=
									    format("\n%s reported: %s\n", countStr.c_str(), description.c_str());

									if (level == StatusClient::StatusLevel::DETAILED) {
										for (int i = 0; i < addresses.size() && i < 4; ++i) {
											outputString += format("  %s\n", addresses[i].get_str().c_str());
										}
										if (addresses.size() > 4) {
											outputString += "  ...\n";
										}
									}
								}
							}
						} else {
							if (msgObj.has("description"))
								outputString += "\n" + lineWrap(msgObj.last().get_str().c_str(), 80);
						}
					}
				}
			} catch (std::runtime_error&) {
			}

			if (fatalRecoveryState) {
				printf("%s", outputString.c_str());
				return;
			}

			StatusObjectReader statusObjConfig;
			StatusArray excludedServersArr;
			Optional<std::string> activePrimaryDC;

			if (statusObjCluster.has("active_primary_dc")) {
				activePrimaryDC = statusObjCluster["active_primary_dc"].get_str();
			}
			if (statusObjCluster.get("configuration", statusObjConfig)) {
				if (statusObjConfig.has("excluded_servers"))
					excludedServersArr = statusObjConfig.last().get_array();
			}

			// If there is a configuration message then there is no configuration information to display
			outputString += "\nConfiguration:";
			std::string outputStringCache = outputString;
			bool isOldMemory = false;
			try {
				// Configuration section
				// FIXME: Should we suppress this if there are cluster messages implying that the database has no
				// configuration?

				outputString += "\n  Redundancy mode        - ";
				std::string strVal;

				if (statusObjConfig.get("redundancy_mode", strVal)) {
					outputString += strVal;
				} else
					outputString += "unknown";

				outputString += "\n  Storage engine         - ";
				if (statusObjConfig.get("storage_engine", strVal)) {
					if (strVal == "memory-1") {
						isOldMemory = true;
					}
					outputString += strVal;
				} else
					outputString += "unknown";

				int intVal;
				outputString += "\n  Coordinators           - ";
				if (statusObjConfig.get("coordinators_count", intVal)) {
					outputString += std::to_string(intVal);
				} else
					outputString += "unknown";

				if (excludedServersArr.size()) {
					outputString += format("\n  Exclusions             - %d (type `exclude' for details)",
					                       excludedServersArr.size());
				}

				if (statusObjConfig.get("commit_proxies", intVal))
					outputString += format("\n  Desired Commit Proxies - %d", intVal);

				if (statusObjConfig.get("grv_proxies", intVal))
					outputString += format("\n  Desired GRV Proxies    - %d", intVal);

				if (statusObjConfig.get("resolvers", intVal))
					outputString += format("\n  Desired Resolvers      - %d", intVal);

				if (statusObjConfig.get("logs", intVal))
					outputString += format("\n  Desired Logs           - %d", intVal);

				if (statusObjConfig.get("remote_logs", intVal))
					outputString += format("\n  Desired Remote Logs    - %d", intVal);

				if (statusObjConfig.get("log_routers", intVal))
					outputString += format("\n  Desired Log Routers    - %d", intVal);

				if (statusObjConfig.get("tss_count", intVal) && intVal > 0) {
					int activeTss = 0;
					if (statusObjCluster.has("active_tss_count")) {
						statusObjCluster.get("active_tss_count", activeTss);
					}
					outputString += format("\n  TSS                    - %d/%d", activeTss, intVal);

					if (statusObjConfig.get("tss_storage_engine", strVal))
						outputString += format("\n  TSS Storage Engine     - %s", strVal.c_str());
				}

				outputString += "\n  Usable Regions         - ";
				if (statusObjConfig.get("usable_regions", intVal)) {
					outputString += std::to_string(intVal);
				} else {
					outputString += "unknown";
				}

				StatusArray regions;
				if (statusObjConfig.has("regions")) {
					outputString += "\n  Regions: ";
					regions = statusObjConfig["regions"].get_array();
					for (StatusObjectReader region : regions) {
						bool isPrimary = false;
						std::vector<std::string> regionSatelliteDCs;
						std::string regionDC;
						for (StatusObjectReader dc : region["datacenters"].get_array()) {
							if (!dc.has("satellite")) {
								regionDC = dc["id"].get_str();
								if (activePrimaryDC.present() && dc["id"].get_str() == activePrimaryDC.get()) {
									isPrimary = true;
								}
							} else if (dc["satellite"].get_int() == 1) {
								regionSatelliteDCs.push_back(dc["id"].get_str());
							}
						}
						if (activePrimaryDC.present()) {
							if (isPrimary) {
								outputString += "\n    Primary -";
							} else {
								outputString += "\n    Remote -";
							}
						} else {
							outputString += "\n    Region -";
						}
						outputString += format("\n        Datacenter                    - %s", regionDC.c_str());
						if (regionSatelliteDCs.size() > 0) {
							outputString += "\n        Satellite datacenters         - ";
							for (int i = 0; i < regionSatelliteDCs.size(); i++) {
								if (i != regionSatelliteDCs.size() - 1) {
									outputString += format("%s, ", regionSatelliteDCs[i].c_str());
								} else {
									outputString += format("%s", regionSatelliteDCs[i].c_str());
								}
							}
						}
						isPrimary = false;
						if (region.get("satellite_redundancy_mode", strVal)) {
							outputString += format("\n        Satellite Redundancy Mode     - %s", strVal.c_str());
						}
						if (region.get("satellite_anti_quorum", intVal)) {
							outputString += format("\n        Satellite Anti Quorum         - %d", intVal);
						}
						if (region.get("satellite_logs", intVal)) {
							outputString += format("\n        Satellite Logs                - %d", intVal);
						}
						if (region.get("satellite_log_policy", strVal)) {
							outputString += format("\n        Satellite Log Policy          - %s", strVal.c_str());
						}
						if (region.get("satellite_log_replicas", intVal)) {
							outputString += format("\n        Satellite Log Replicas        - %d", intVal);
						}
						if (region.get("satellite_usable_dcs", intVal)) {
							outputString += format("\n        Satellite Usable DCs          - %d", intVal);
						}
					}
				}
			} catch (std::runtime_error&) {
				outputString = outputStringCache;
				outputString += "\n  Unable to retrieve configuration status";
			}

			// Cluster section
			outputString += "\n\nCluster:";
			StatusObjectReader processesMap;
			StatusObjectReader machinesMap;

			outputStringCache = outputString;

			bool machinesAreZones = true;
			std::map<std::string, int> zones;
			try {
				outputString += "\n  FoundationDB processes - ";
				if (statusObjCluster.get("processes", processesMap)) {

					outputString += format("%d", processesMap.obj().size());

					int errors = 0;
					int processExclusions = 0;
					for (auto p : processesMap.obj()) {
						StatusObjectReader process(p.second);
						bool excluded = process.has("excluded") && process.last().get_bool();
						if (excluded) {
							processExclusions++;
						}
						if (process.has("messages") && process.last().get_array().size()) {
							errors++;
						}

						std::string zoneId;
						if (process.get("locality.zoneid", zoneId)) {
							std::string machineId;
							if (!process.get("locality.machineid", machineId) || machineId != zoneId) {
								machinesAreZones = false;
							}
							int& nonExcluded = zones[zoneId];
							if (!excluded) {
								nonExcluded = 1;
							}
						}
					}

					if (errors > 0 || processExclusions) {
						outputString += format(" (less %d excluded; %d with errors)", processExclusions, errors);
					}

				} else
					outputString += "unknown";

				if (zones.size() > 0) {
					outputString += format("\n  Zones                  - %d", zones.size());
					int zoneExclusions = 0;
					for (auto itr : zones) {
						if (itr.second == 0) {
							++zoneExclusions;
						}
					}
					if (zoneExclusions > 0) {
						outputString += format(" (less %d excluded)", zoneExclusions);
					}
				} else {
					outputString += "\n  Zones                  - unknown";
				}

				outputString += "\n  Machines               - ";
				if (statusObjCluster.get("machines", machinesMap)) {
					outputString += format("%d", machinesMap.obj().size());

					int machineExclusions = 0;
					for (auto mach : machinesMap.obj()) {
						StatusObjectReader machine(mach.second);
						if (machine.has("excluded") && machine.last().get_bool())
							machineExclusions++;
					}

					if (machineExclusions) {
						outputString += format(" (less %d excluded)", machineExclusions);
					}

					int64_t minMemoryAvailable = std::numeric_limits<int64_t>::max();
					for (auto proc : processesMap.obj()) {
						StatusObjectReader process(proc.second);
						int64_t availBytes;
						if (process.get("memory.available_bytes", availBytes)) {
							minMemoryAvailable = std::min(minMemoryAvailable, availBytes);
						}
					}

					if (minMemoryAvailable < std::numeric_limits<int64_t>::max()) {
						double worstServerGb = minMemoryAvailable / (1024.0 * 1024 * 1024);
						outputString += "\n  Memory availability    - ";
						outputString += format("%.1f GB per process on machine with least available", worstServerGb);
						outputString += minMemoryAvailable < 4294967296
						                    ? "\n                           >>>>> (WARNING: 4.0 GB recommended) <<<<<"
						                    : "";
					}

					double retransCount = 0;
					for (auto mach : machinesMap.obj()) {
						StatusObjectReader machine(mach.second);
						double hz;
						if (machine.get("network.tcp_segments_retransmitted.hz", hz))
							retransCount += hz;
					}

					if (retransCount > 0) {
						outputString += format("\n  Retransmissions rate   - %d Hz", (int)round(retransCount));
					}
				} else
					outputString += "\n  Machines               - unknown";

				StatusObjectReader faultTolerance;
				if (statusObjCluster.get("fault_tolerance", faultTolerance)) {
					int availLoss, dataLoss;

					if (faultTolerance.get("max_zone_failures_without_losing_availability", availLoss) &&
					    faultTolerance.get("max_zone_failures_without_losing_data", dataLoss)) {

						outputString += "\n  Fault Tolerance        - ";

						int minLoss = std::min(availLoss, dataLoss);
						const char* faultDomain = machinesAreZones ? "machine" : "zone";
						outputString += format("%d %ss", minLoss, faultDomain);

						if (dataLoss > availLoss) {
							outputString += format(" (%d without data loss)", dataLoss);
						}

						if (dataLoss == -1) {
							ASSERT_WE_THINK(availLoss == -1);
							outputString += format(
							    "\n\n  Warning: the database may have data loss and availability loss. Please restart "
							    "following tlog interfaces, otherwise storage servers may never be able to catch "
							    "up.\n");
							StatusObjectReader logs;
							if (statusObjCluster.has("logs")) {
								for (StatusObjectReader logEpoch : statusObjCluster.last().get_array()) {
									bool possiblyLosingData;
									if (logEpoch.get("possibly_losing_data", possiblyLosingData) &&
									    !possiblyLosingData) {
										continue;
									}
									// Current epoch doesn't have an end version.
									int64_t epoch, beginVersion, endVersion = invalidVersion;
									bool current;
									logEpoch.get("epoch", epoch);
									logEpoch.get("begin_version", beginVersion);
									logEpoch.get("end_version", endVersion);
									logEpoch.get("current", current);
									std::string missing_log_interfaces;
									if (logEpoch.has("log_interfaces")) {
										for (StatusObjectReader logInterface : logEpoch.last().get_array()) {
											bool healthy;
											std::string address, id;
											if (logInterface.get("healthy", healthy) && !healthy) {
												logInterface.get("id", id);
												logInterface.get("address", address);
												missing_log_interfaces += format("%s,%s ", id.c_str(), address.c_str());
											}
										}
									}
									outputString += format(
									    "  %s log epoch: %ld begin: %ld end: %s, missing "
									    "log interfaces(id,address): %s\n",
									    current ? "Current" : "Old",
									    epoch,
									    beginVersion,
									    endVersion == invalidVersion ? "(unknown)" : format("%ld", endVersion).c_str(),
									    missing_log_interfaces.c_str());
								}
							}
						}
					}
				}

				std::string serverTime = getDateInfoString(statusObjCluster, "cluster_controller_timestamp");
				if (serverTime != "") {
					outputString += "\n  Server time            - " + serverTime;
				}
			} catch (std::runtime_error&) {
				outputString = outputStringCache;
				outputString += "\n  Unable to retrieve cluster status";
			}

			StatusObjectReader statusObjData;
			statusObjCluster.get("data", statusObjData);

			// Data section
			outputString += "\n\nData:";
			outputStringCache = outputString;
			try {
				outputString += "\n  Replication health     - ";

				StatusObjectReader statusObjDataState;
				statusObjData.get("state", statusObjDataState);

				std::string dataState;
				statusObjDataState.get("name", dataState);

				std::string description = "";
				statusObjDataState.get("description", description);

				bool healthy;
				if (statusObjDataState.get("healthy", healthy) && healthy) {
					outputString += "Healthy" + (description != "" ? " (" + description + ")" : "");
				} else if (dataState == "missing_data") {
					outputString += "UNHEALTHY" + (description != "" ? ": " + description : "");
				} else if (dataState == "healing") {
					outputString += "HEALING" + (description != "" ? ": " + description : "");
				} else if (description != "") {
					outputString += description;
				} else {
					outputString += "unknown";
				}

				if (statusObjData.has("moving_data")) {
					StatusObjectReader movingData = statusObjData.last();
					double dataInQueue, dataInFlight;
					if (movingData.get("in_queue_bytes", dataInQueue) &&
					    movingData.get("in_flight_bytes", dataInFlight))
						outputString += format("\n  Moving data            - %.3f GB",
						                       ((double)dataInQueue + (double)dataInFlight) / 1e9);
				} else if (dataState == "initializing") {
					outputString += "\n  Moving data            - unknown (initializing)";
				} else {
					outputString += "\n  Moving data            - unknown";
				}

				outputString += "\n  Sum of key-value sizes - ";

				if (statusObjData.has("total_kv_size_bytes")) {
					double totalDBBytes = statusObjData.last().get_int64();

					if (totalDBBytes >= 1e12)
						outputString += format("%.3f TB", (totalDBBytes / 1e12));

					else if (totalDBBytes >= 1e9)
						outputString += format("%.3f GB", (totalDBBytes / 1e9));

					else
						// no decimal points for MB
						outputString += format("%d MB", (int)round(totalDBBytes / 1e6));
				} else {
					outputString += "unknown";
				}

				outputString += "\n  Disk space used        - ";

				if (statusObjData.has("total_disk_used_bytes")) {
					double totalDiskUsed = statusObjData.last().get_int64();

					if (totalDiskUsed >= 1e12)
						outputString += format("%.3f TB", (totalDiskUsed / 1e12));

					else if (totalDiskUsed >= 1e9)
						outputString += format("%.3f GB", (totalDiskUsed / 1e9));

					else
						// no decimal points for MB
						outputString += format("%d MB", (int)round(totalDiskUsed / 1e6));
				} else
					outputString += "unknown";

			} catch (std::runtime_error&) {
				outputString = outputStringCache;
				outputString += "\n  Unable to retrieve data status";
			}

			// Operating space section
			outputString += "\n\nOperating space:";
			std::string operatingSpaceString = "";
			try {
				int64_t val;
				if (statusObjData.get("least_operating_space_bytes_storage_server", val))
					operatingSpaceString += format("\n  Storage server         - %.1f GB free on most full server",
					                               std::max(val / 1e9, 0.0));

				if (statusObjData.get("least_operating_space_bytes_log_server", val))
					operatingSpaceString += format("\n  Log server             - %.1f GB free on most full server",
					                               std::max(val / 1e9, 0.0));

			} catch (std::runtime_error&) {
				operatingSpaceString = "";
			}

			if (operatingSpaceString.empty()) {
				operatingSpaceString += "\n  Unable to retrieve operating space status";
			}
			outputString += operatingSpaceString;

			// Workload section
			outputString += "\n\nWorkload:";
			outputStringCache = outputString;
			bool foundLogAndStorage = false;
			try {
				// Determine which rates are unknown
				StatusObjectReader statusObjWorkload;
				statusObjCluster.get("workload", statusObjWorkload);

				std::string performanceLimited = "";
				bool unknownMCT = false;
				bool unknownRP = false;

				// Print performance limit details if known.
				try {
					StatusObjectReader limit = statusObjCluster["qos.performance_limited_by"];
					std::string name = limit["name"].get_str();
					if (name != "workload") {
						std::string desc = limit["description"].get_str();
						std::string serverID;
						limit.get("reason_server_id", serverID);
						std::string procAddr = getProcessAddressByServerID(processesMap, serverID);
						performanceLimited = format("\n  Performance limited by %s: %s",
						                            (procAddr == "unknown")
						                                ? ("server" + (serverID == "" ? "" : (" " + serverID))).c_str()
						                                : "process",
						                            desc.c_str());
						if (procAddr != "unknown")
							performanceLimited += format("\n  Most limiting process: %s", procAddr.c_str());
					}
				} catch (std::exception&) {
					// If anything here throws (such as for an incompatible type) ignore it.
				}

				// display the known rates
				outputString += "\n  Read rate              - ";
				outputString += getWorkloadRates(statusObjWorkload, unknownRP, "reads", "hz");

				outputString += "\n  Write rate             - ";
				outputString += getWorkloadRates(statusObjWorkload, unknownMCT, "writes", "hz");

				outputString += "\n  Transactions started   - ";
				outputString += getWorkloadRates(statusObjWorkload, unknownMCT, "started", "hz", true);

				outputString += "\n  Transactions committed - ";
				outputString += getWorkloadRates(statusObjWorkload, unknownMCT, "committed", "hz", true);

				outputString += "\n  Conflict rate          - ";
				outputString += getWorkloadRates(statusObjWorkload, unknownMCT, "conflicted", "hz", true);

				outputString += unknownRP ? "" : performanceLimited;

				// display any process messages
				// FIXME:  Above comment is not what this code block does, it actually just looks for a specific message
				// in the process map, *by description*, and adds process addresses that have it to a vector.  Either
				// change the comment or the code.
				std::vector<std::string> messagesAddrs;
				for (auto proc : processesMap.obj()) {
					StatusObjectReader process(proc.second);
					if (process.has("roles")) {
						StatusArray rolesArray = proc.second.get_obj()["roles"].get_array();
						bool storageRole = false;
						bool logRole = false;
						for (StatusObjectReader role : rolesArray) {
							if (role["role"].get_str() == "storage") {
								storageRole = true;
							} else if (role["role"].get_str() == "log") {
								logRole = true;
							}
						}
						if (storageRole && logRole) {
							foundLogAndStorage = true;
						}
					}
					if (process.has("messages")) {
						StatusArray processMessagesArr = process.last().get_array();
						if (processMessagesArr.size()) {
							for (StatusObjectReader msg : processMessagesArr) {
								std::string desc;
								std::string addr;
								if (msg.get("description", desc) && desc == "Unable to update cluster file." &&
								    process.get("address", addr)) {
									messagesAddrs.push_back(addr);
								}
							}
						}
					}
				}
				if (messagesAddrs.size()) {
					outputString += format("\n\n%d FoundationDB processes reported unable to update cluster file:",
					                       messagesAddrs.size());
					for (auto msg : messagesAddrs) {
						outputString += "\n  " + msg;
					}
				}
			} catch (std::runtime_error&) {
				outputString = outputStringCache;
				outputString += "\n  Unable to retrieve workload status";
			}

			// Backup and DR section
			outputString += "\n\nBackup and DR:";

			std::map<std::string, std::string> backupTags;
			getBackupDRTags(statusObjCluster, "backup", backupTags);

			std::map<std::string, std::string> drPrimaryTags;
			getBackupDRTags(statusObjCluster, "dr_backup", drPrimaryTags);

			std::map<std::string, std::string> drSecondaryTags;
			getBackupDRTags(statusObjCluster, "dr_backup_dest", drSecondaryTags);

			outputString += format("\n  Running backups        - %d", backupTags.size());
			outputString += format("\n  Running DRs            - ");

			if (drPrimaryTags.size() == 0 && drSecondaryTags.size() == 0) {
				outputString += format("%d", 0);
			} else {
				if (drPrimaryTags.size() > 0) {
					outputString += format("%d as primary", drPrimaryTags.size());
					if (drSecondaryTags.size() > 0) {
						outputString += ", ";
					}
				}
				if (drSecondaryTags.size() > 0) {
					outputString += format("%d as secondary", drSecondaryTags.size());
				}
			}

			// status details
			if (level == StatusClient::DETAILED) {
				outputString += logBackupDR("Running backup tags", backupTags);
				outputString += logBackupDR("Running DR tags (as primary)", drPrimaryTags);
				outputString += logBackupDR("Running DR tags (as secondary)", drSecondaryTags);

				outputString += "\n\nProcess performance details:";
				outputStringCache = outputString;
				try {
					// constructs process performance details output
					std::map<NetworkAddress, std::string> workerDetails;
					for (auto proc : processesMap.obj()) {
						StatusObjectReader procObj(proc.second);
						std::string address;
						procObj.get("address", address);

						std::string line;

						NetworkAddress parsedAddress;
						try {
							parsedAddress = NetworkAddress::parse(address);
						} catch (Error&) {
							// Groups all invalid IP address/port pair in the end of this detail group.
							line = format("  %-22s (invalid IP address or port)", address.c_str());
							IPAddress::IPAddressStore maxIp;
							for (int i = 0; i < maxIp.size(); ++i) {
								maxIp[i] = std::numeric_limits<std::remove_reference<decltype(maxIp[0])>::type>::max();
							}
							std::string& lastline =
							    workerDetails[NetworkAddress(IPAddress(maxIp), std::numeric_limits<uint16_t>::max())];
							if (!lastline.empty())
								lastline.append("\n");
							lastline += line;
							continue;
						}

						try {
							double tx = -1, rx = -1, mCPUUtil = -1;
							int64_t processTotalSize;

							// Get the machine for this process
							// StatusObjectReader mach = machinesMap[procObj["machine_id"].get_str()];
							StatusObjectReader mach;
							if (machinesMap.get(procObj["machine_id"].get_str(), mach, false)) {
								StatusObjectReader machCPU;
								if (mach.get("cpu", machCPU)) {

									machCPU.get("logical_core_utilization", mCPUUtil);

									StatusObjectReader network;
									if (mach.get("network", network)) {
										network.get("megabits_sent.hz", tx);
										network.get("megabits_received.hz", rx);
									}
								}
							}

							procObj.get("memory.used_bytes", processTotalSize);

							StatusObjectReader procCPUObj;
							procObj.get("cpu", procCPUObj);

							line = format("  %-22s (", address.c_str());

							double usageCores;
							if (procCPUObj.get("usage_cores", usageCores))
								line += format("%3.0f%% cpu;", usageCores * 100);

							line += mCPUUtil != -1 ? format("%3.0f%% machine;", mCPUUtil * 100) : "";
							line += std::min(tx, rx) != -1 ? format("%6.3f Gbps;", std::max(tx, rx) / 1000.0) : "";

							double diskBusy;
							if (procObj.get("disk.busy", diskBusy))
								line += format("%3.0f%% disk IO;", 100.0 * diskBusy);

							line += processTotalSize != -1
							            ? format("%4.1f GB", processTotalSize / (1024.0 * 1024 * 1024))
							            : "";

							double availableBytes;
							if (procObj.get("memory.available_bytes", availableBytes))
								line += format(" / %3.1f GB RAM  )", availableBytes / (1024.0 * 1024 * 1024));
							else
								line += "  )";

							if (procObj.has("messages")) {
								for (StatusObjectReader message : procObj.last().get_array()) {
									std::string desc;
									if (message.get("description", desc)) {
										if (message.has("type")) {
											line += "\n    Last logged error: " + desc;
										} else {
											line += "\n    " + desc;
										}
									}
								}
							}

							workerDetails[parsedAddress] = line;
						}

						catch (std::runtime_error&) {
							std::string noMetrics = format("  %-22s (no metrics available)", address.c_str());
							workerDetails[parsedAddress] = noMetrics;
						}
					}
					for (auto w : workerDetails)
						outputString += "\n" + format("%s", w.second.c_str());
				} catch (std::runtime_error&) {
					outputString = outputStringCache;
					outputString += "\n  Unable to retrieve process performance details";
				}

				if (!printedCoordinators) {
					printedCoordinators = true;
					outputString += "\n\nCoordination servers:";
					outputString += getCoordinatorsInfoString(statusObj);
				}
			}

			// client time
			std::string clientTime = getDateInfoString(statusObjClient, "timestamp");
			if (clientTime != "") {
				outputString += "\n\nClient time: " + clientTime;
			}

			if (processesMap.obj().size() > 1 && isOldMemory) {
				outputString += "\n\nWARNING: type `configure memory' to switch to a safer method of persisting data "
				                "on the transaction logs.";
			}
			if (processesMap.obj().size() > 9 && foundLogAndStorage) {
				outputString +=
				    "\n\nWARNING: A single process is both a transaction log and a storage server.\n  For best "
				    "performance use dedicated disks for the transaction logs by setting process classes.";
			}

			if (statusObjCluster.has("data_distribution_disabled")) {
				outputString += "\n\nWARNING: Data distribution is off.";
			} else {
				if (statusObjCluster.has("data_distribution_disabled_for_ss_failures")) {
					outputString += "\n\nWARNING: Data distribution is currently turned on but disabled for all "
					                "storage server failures.";
				}
				if (statusObjCluster.has("data_distribution_disabled_for_rebalance")) {
					outputString += "\n\nWARNING: Data distribution is currently turned on but shard size balancing is "
					                "currently disabled.";
				}
			}

			printf("%s\n", outputString.c_str());
		}

		// status minimal
		else if (level == StatusClient::MINIMAL) {
			// Checking for field exsistence is not necessary here because if a field is missing there is no additional
			// information that we would be able to display if we continued execution. Instead, any missing fields will
			// throw and the catch will display the proper message.
			try {
				// If any of these throw, can't get status because the result makes no sense.
				StatusObjectReader statusObjClient = statusObj["client"].get_obj();
				StatusObjectReader statusObjClientDatabaseStatus = statusObjClient["database_status"].get_obj();

				bool available = statusObjClientDatabaseStatus["available"].get_bool();

				// Database unavailable
				if (!available) {
					printf("%s", "The database is unavailable; type `status' for more information.\n");
				} else {
					try {
						bool healthy = statusObjClientDatabaseStatus["healthy"].get_bool();

						// Database available without issues
						if (healthy) {
							if (displayDatabaseAvailable) {
								printf("The database is available.\n");
							}
						} else { // Database running but with issues
							printf("The database is available, but has issues (type 'status' for more information).\n");
						}
					} catch (std::runtime_error&) {
						printf("The database is available, but has issues (type 'status' for more information).\n");
					}
				}

				bool upToDate;
				if (!statusObjClient.get("cluster_file.up_to_date", upToDate) || !upToDate) {
					fprintf(stderr,
					        "WARNING: The cluster file is not up to date. Type 'status' for more information.\n");
				}
			} catch (std::runtime_error&) {
				printf("Unable to determine database state, type 'status' for more information.\n");
			}

		}

		// status JSON
		else if (level == StatusClient::JSON) {
			printf("%s\n",
			       json_spirit::write_string(json_spirit::mValue(statusObj.obj()),
			                                 json_spirit::Output_options::pretty_print)
			           .c_str());
		}
	} catch (Error&) {
		if (hideErrorMessages)
			return;
		if (level == StatusClient::MINIMAL) {
			printf("Unable to determine database state, type 'status' for more information.\n");
		} else if (level == StatusClient::JSON) {
			printf("Could not retrieve status json.\n\n");
		} else {
			printf("Could not retrieve status, type 'status json' for more information.\n");
		}
	}
}