in zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java [274:493]
public void onMessage(NotebookSocket conn, String msg) {
try {
Message receivedMessage = deserializeMessage(msg);
if (receivedMessage.op != OP.PING) {
LOGGER.debug("RECEIVE: " + receivedMessage.op +
", RECEIVE PRINCIPAL: " + receivedMessage.principal +
", RECEIVE TICKET: " + receivedMessage.ticket +
", RECEIVE ROLES: " + receivedMessage.roles +
", RECEIVE DATA: " + receivedMessage.data);
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("RECEIVE MSG = " + receivedMessage);
}
TicketContainer.Entry ticketEntry = TicketContainer.instance.getTicketEntry(receivedMessage.principal);
if (ticketEntry == null || StringUtils.isEmpty(ticketEntry.getTicket())) {
LOGGER.debug("{} message: invalid ticket {}", receivedMessage.op, receivedMessage.ticket);
return;
} else if (!ticketEntry.getTicket().equals(receivedMessage.ticket)) {
/* not to pollute logs, log instead of exception */
LOGGER.debug("{} message: invalid ticket {} != {}", receivedMessage.op, receivedMessage.ticket,
ticketEntry.getTicket());
if (!receivedMessage.op.equals(OP.PING)) {
conn.send(serializeMessage(new Message(OP.SESSION_LOGOUT).put("info",
"Your ticket is invalid possibly due to server restart. Please login again.")));
}
return;
}
boolean allowAnonymous = zConf.isAnonymousAllowed();
if (!allowAnonymous && receivedMessage.principal.equals("anonymous")) {
LOGGER.warn("Anonymous access not allowed.");
return;
}
if (Message.isDisabledForRunningNotes(receivedMessage.op)) {
boolean noteRunning = getNotebook().processNote((String) receivedMessage.get("noteId"),
note -> note != null && note.isRunning());
if (noteRunning) {
throw new Exception("Note is now running sequentially. Can not be performed: " + receivedMessage.op);
}
}
if (StringUtils.isEmpty(conn.getUser())) {
connectionManager.addUserConnection(receivedMessage.principal, conn);
}
ServiceContext context = getServiceContext(ticketEntry);
// Lets be elegant here
switch (receivedMessage.op) {
case LIST_NOTES:
listNotesInfo(conn, context);
break;
case RELOAD_NOTES_FROM_REPO:
broadcastReloadedNoteList(context);
break;
case GET_HOME_NOTE:
getHomeNote(conn, context);
break;
case GET_NOTE:
getNote(conn, context, receivedMessage);
break;
case RELOAD_NOTE:
reloadNote(conn, context, receivedMessage);
break;
case NEW_NOTE:
createNote(conn, context, receivedMessage);
break;
case DEL_NOTE:
deleteNote(conn, context, receivedMessage);
break;
case REMOVE_FOLDER:
removeFolder(conn, context, receivedMessage);
break;
case MOVE_NOTE_TO_TRASH:
moveNoteToTrash(conn, context, receivedMessage);
break;
case MOVE_FOLDER_TO_TRASH:
moveFolderToTrash(conn, context, receivedMessage);
break;
case EMPTY_TRASH:
emptyTrash(conn, context);
break;
case RESTORE_FOLDER:
restoreFolder(conn, context, receivedMessage);
break;
case RESTORE_NOTE:
restoreNote(conn, context, receivedMessage);
break;
case RESTORE_ALL:
restoreAll(conn, context, receivedMessage);
break;
case CLONE_NOTE:
cloneNote(conn, context, receivedMessage);
break;
case IMPORT_NOTE:
importNote(conn, context, receivedMessage);
break;
case CONVERT_NOTE_NBFORMAT:
convertNote(conn, receivedMessage);
break;
case COMMIT_PARAGRAPH:
updateParagraph(conn, context, receivedMessage);
break;
case RUN_PARAGRAPH:
runParagraph(conn, context, receivedMessage);
break;
case PARAGRAPH_EXECUTED_BY_SPELL:
broadcastSpellExecution(conn, context, receivedMessage);
break;
case RUN_ALL_PARAGRAPHS:
runAllParagraphs(conn, context, receivedMessage);
break;
case CANCEL_PARAGRAPH:
cancelParagraph(conn, context, receivedMessage);
break;
case MOVE_PARAGRAPH:
moveParagraph(conn, context, receivedMessage);
break;
case INSERT_PARAGRAPH:
insertParagraph(conn, context, receivedMessage);
break;
case COPY_PARAGRAPH:
copyParagraph(conn, context, receivedMessage);
break;
case PARAGRAPH_REMOVE:
removeParagraph(conn, context, receivedMessage);
break;
case PARAGRAPH_CLEAR_OUTPUT:
clearParagraphOutput(conn, context, receivedMessage);
break;
case PARAGRAPH_CLEAR_ALL_OUTPUT:
clearAllParagraphOutput(conn, context, receivedMessage);
break;
case NOTE_UPDATE:
updateNote(conn, context, receivedMessage);
break;
case NOTE_RENAME:
renameNote(conn, context, receivedMessage);
break;
case FOLDER_RENAME:
renameFolder(conn, context, receivedMessage);
break;
case UPDATE_PERSONALIZED_MODE:
updatePersonalizedMode(conn, context, receivedMessage);
break;
case COMPLETION:
completion(conn, context, receivedMessage);
break;
case PING:
break; //do nothing
case ANGULAR_OBJECT_UPDATED:
angularObjectUpdated(conn, context, receivedMessage);
break;
case ANGULAR_OBJECT_CLIENT_BIND:
angularObjectClientBind(conn, receivedMessage);
break;
case ANGULAR_OBJECT_CLIENT_UNBIND:
angularObjectClientUnbind(conn, receivedMessage);
break;
case LIST_CONFIGURATIONS:
sendAllConfigurations(conn, context, receivedMessage);
break;
case CHECKPOINT_NOTE:
checkpointNote(conn, context, receivedMessage);
break;
case LIST_REVISION_HISTORY:
listRevisionHistory(conn, context, receivedMessage);
break;
case SET_NOTE_REVISION:
setNoteRevision(conn, context, receivedMessage);
break;
case NOTE_REVISION:
getNoteByRevision(conn, context, receivedMessage);
break;
case NOTE_REVISION_FOR_COMPARE:
getNoteByRevisionForCompare(conn, context, receivedMessage);
break;
case LIST_NOTE_JOBS:
unicastNoteJobInfo(conn, context, receivedMessage);
break;
case UNSUBSCRIBE_UPDATE_NOTE_JOBS:
unsubscribeNoteJobInfo(conn);
break;
case GET_INTERPRETER_BINDINGS:
getInterpreterBindings(conn, context, receivedMessage);
break;
case SAVE_INTERPRETER_BINDINGS:
saveInterpreterBindings(conn, context, receivedMessage);
break;
case EDITOR_SETTING:
getEditorSetting(conn, context, receivedMessage);
break;
case GET_INTERPRETER_SETTINGS:
getInterpreterSettings(conn, context, receivedMessage);
break;
case WATCHER:
connectionManager.switchConnectionToWatcher(conn);
break;
case SAVE_NOTE_FORMS:
saveNoteForms(conn, context, receivedMessage);
break;
case REMOVE_NOTE_FORMS:
removeNoteForms(conn, context, receivedMessage);
break;
case PATCH_PARAGRAPH:
patchParagraph(conn, context, receivedMessage);
break;
default:
break;
}
} catch (Exception e) {
LOGGER.error("Can't handle message: {}", msg, e);
try {
conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", e.getMessage())));
} catch (IOException iox) {
LOGGER.error("Fail to send error info", iox);
}
}
}