in src/main/java/com/alibaba/cloudapi/sdk/client/WebSocketApiClient.java [155:336]
public void connect(){
if (null == connectLatch.getObj()) {
connectLatch.setObj(new CountDownLatch(1));
}
if(null == webSocketListener){
webSocketListener = new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
webSocketRef.setObj(webSocket);
status = WebSocketConnectStatus.CONNECTED;
registerLatch.setObj(new CountDownLatch(1));
String registerCommand = SdkConstant.CLOUDAPI_COMMAND_REGISTER_REQUEST + "#" + deviceId;
webSocketRef.getObj().send(registerCommand);
if (null != connectLatch.getObj()) {
connectLatch.getObj().countDown();
}
}
@Override
public void onMessage(WebSocket webSocket, String text) {
if(null == text || "".equalsIgnoreCase(text)) {
return;
}else if(text.length() > 2 && text.startsWith(SdkConstant.CLOUDAPI_COMMAND_HEART_BEAT_RESPONSE)){
if(!connectionCredential.equalsIgnoreCase(text.substring(3))){
reSendRegister();
}
return;
}else if (SdkConstant.CLOUDAPI_COMMAND_OVER_FLOW_BY_SECOND.equalsIgnoreCase(text)) {
//overflow by server
close();
return;
}else if (SdkConstant.CLOUDAPI_COMMAND_CONNECTION_RUNS_OUT.equalsIgnoreCase(text)) {
//bye by server
close();
return;
}else if(text.length() > 2 && text.startsWith(SdkConstant.CLOUDAPI_COMMAND_REGISTER_FAIL_REQUEST)){
registerCommandSuccess.setObj(false);
String responseObject[] = text.split("#");
errorMessage.setObj(responseObject[1]);
if(null != registerLatch.getObj()) {
registerLatch.getObj().countDown();
}
if (null != heartBeatManager) {
heartBeatManager.stop();
}
return;
}else if(text.length() > 2 && text.startsWith(SdkConstant.CLOUDAPI_COMMAND_REGISTER_SUCCESS_RESPONSE)){
registerCommandSuccess.setObj(true);
String responseObject[] = text.split("#");
connectionCredential = responseObject[1];
heartBeatInterval = Integer.parseInt(responseObject[2]);
if(null != registerLatch.getObj()) {
registerLatch.getObj().countDown();
}
if (null != heartBeatManager) {
heartBeatManager.stop();
}
heartBeatManager = new HeartBeatManager(instance, heartBeatInterval);
heartbeatThread = new Thread(heartBeatManager);
heartbeatThread.start();
if(isRegister) {
reSendRegister();
}
return;
}else if(text.length() > 2 && text.startsWith(SdkConstant.CLOUDAPI_COMMAND_NOTIFY_REQUEST)){
String message = text.substring(3);
apiWebSocketListner.onNotify(message);
if(status == WebSocketConnectStatus.CONNECTED && webSocketRef.getObj() != null){
webSocketRef.getObj().send(SdkConstant.CLOUDAPI_COMMAND_NOTIFY_RESPONSE);
}
return ;
}else if(text.length() > 2 && !text.startsWith("{") && "#".equalsIgnoreCase(text.substring(3 ,4))){
//兼容以后新版本信令
return;
}
else{
try {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readValue(text, JsonNode.class);
ApiResponse response = new ApiResponse(jsonNode);
String seqStr = response.getFirstHeaderValue(SdkConstant.CLOUDAPI_X_CA_SEQ);
Integer seq = Integer.parseInt(seqStr);
ApiContext context = callbackManager.getContext(seq);
WebSocketApiType type = context.getRequest().getWebSocketApiType();
if (null != context && type != WebSocketApiType.COMMON) {
postSendWebsocketCommandApi(type , response);
}
callbackManager.callback(seq, response);
}
catch (Exception ex){
apiWebSocketListner.onFailure(ex , new ApiResponse(508 , "Call back occue error" , ex));
}
}
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
webSocketRef.setObj(null);
reconnect();
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
try {
ApiResponse apiResponse;
if (null != response) {
apiResponse = new ApiResponse(response.code());
apiResponse.setMessage(response.message());
} else {
apiResponse = new ApiResponse(505);
apiResponse.setMessage("WebSocket inner failed");
}
apiResponse.setEx(new SdkException(t));
apiWebSocketListner.onFailure(t, apiResponse);
if (null != t) {
/**
* 连接不上
*/
if (t instanceof ConnectException || t instanceof SocketTimeoutException || t instanceof UnknownHostException) {
if (null != connectLatch.getObj()) {
connectLatch.getObj().countDown();
}
try {
Thread.sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
reconnect();
}
/**
* 被动断开连接
*/
else if (t instanceof SocketException || t instanceof EOFException) {
if (null == connectLatch.getObj()) {
connectLatch.setObj(new CountDownLatch(1));
}
try {
Thread.sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
reconnect();
}
}
}
catch (Exception ex){
apiWebSocketListner.onFailure(ex , new ApiResponse(507 , "Failure block" , ex));
}
}
};
}
//建立连接
client.newWebSocket(connectRequest, webSocketListener);
}