in zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java [776:941]
private void pRequestHelper(Request request) {
try {
switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
break;
case OpCode.createTTL:
CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
break;
case OpCode.deleteContainer:
DeleteContainerRequest deleteContainerRequest = request.readRequestRecord(DeleteContainerRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
break;
case OpCode.delete:
DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
break;
case OpCode.setData:
SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
break;
case OpCode.reconfig:
ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest);
break;
case OpCode.setACL:
SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
break;
case OpCode.check:
CheckVersionRequest checkRequest = request.readRequestRecord(CheckVersionRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
break;
case OpCode.multi:
MultiOperationRecord multiRequest;
try {
multiRequest = request.readRequestRecord(MultiOperationRecord::new);
} catch (IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
throw e;
}
List<Txn> txns = new ArrayList<>();
//Each op in a multi-op must have the same zxid!
long zxid = zks.getNextZxid();
KeeperException ke = null;
//Store off current pending change records in case we need to rollback
Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), request.type));
for (Op op : multiRequest) {
Record subrequest = op.toRequestRecord();
int type;
Record txn;
/* If we've already failed one of the ops, don't bother
* trying the rest as we know it's going to fail and it
* would be confusing in the logfiles.
*/
if (ke != null) {
type = OpCode.error;
txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
} else {
/* Prep the request and convert to a Txn */
try {
pRequest2Txn(op.getType(), zxid, request, subrequest);
type = op.getType();
txn = request.getTxn();
} catch (KeeperException e) {
ke = e;
type = OpCode.error;
txn = new ErrorTxn(e.code().intValue());
if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info("Got user-level KeeperException when processing {} aborting"
+ " remaining multi ops. Error Path:{} Error:{}",
request.toString(),
e.getPath(),
e.getMessage());
}
request.setException(e);
/* Rollback change records from failed multi-op */
rollbackPendingChanges(zxid, pendingChanges);
}
}
// TODO: I don't want to have to serialize it here and then
// immediately deserialize in next processor. But I'm
// not sure how else to get the txn stored into our list.
byte[] bb = RequestRecord.fromRecord(txn).readBytes();
txns.add(new Txn(type, bb));
}
request.setTxn(new MultiTxn(txns));
if (digestEnabled) {
setTxnDigest(request);
}
break;
//create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
pRequest2Txn(request.type, zks.getNextZxid(), request, null);
}
break;
//All the rest don't need to create a Txn - just verify session
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getAllChildrenNumber:
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
case OpCode.setWatches2:
case OpCode.checkWatches:
case OpCode.removeWatches:
case OpCode.getEphemerals:
case OpCode.multiRead:
case OpCode.addWatch:
case OpCode.whoAmI:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
break;
default:
LOG.warn("unknown type {}", request.type);
break;
}
} catch (KeeperException e) {
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(e.code().intValue()));
}
if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info(
"Got user-level KeeperException when processing {} Error Path:{} Error:{}",
request.toString(),
e.getPath(),
e.getMessage());
}
request.setException(e);
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process {}", request, e);
String digest = request.requestDigest();
LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), digest);
if (request.getHdr() == null) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getZxid(), Time.currentWallTime(), request.type));
}
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
}
}