connect method
void
connect()
Implementation
void connect() async {
//
if (isConnected()) {
debugPrint('mqtt is connected, return');
return;
}
debugPrint('connecting mqtt....');
bytedeskEventBus.fire(
ConnectionEventBus(BytedeskConstants.CONNECTION_STATUS_CONNECTING));
//
currentUid = SpUtil.getString(BytedeskConstants.VISITOR_UID);
nickname = SpUtil.getString(BytedeskConstants.VISITOR_NICKNAME);
avatar = SpUtil.getString(BytedeskConstants.VISITOR_AVATAR);
orgUid = SpUtil.getString(BytedeskConstants.VISITOR_ORGUID);
client = BytedeskUtils.client();
deviceUid = SpUtil.getString(BytedeskConstants.VISITOR_DEVICEUID);
clientId = "$currentUid/$client/$deviceUid";
messageProvider = MessageProvider();
debugPrint('mqtt clientId: $clientId');
//注意:必须要先判断web,否则在web运行会报错:
// Unsupported operation: Platform._operatingSystem
// if (BytedeskUtils.isWeb) {
// if (BytedeskConstants.isDebug) {
// mqttClient = MqttBrowserClient.withPort(
// 'ws://127.0.0.1/websocket', clientId!, BytedeskConstants.mqttPort);
// } else {
// mqttClient = MqttBrowserClient.withPort(
// BytedeskConstants.webSocketWssUrl, clientId!, 443);
// }
// } else {
if (BytedeskConstants.isWebSocketWss) {
debugPrint('isWebSocketWss mqttClient connecting....');
mqttClient =
MqttServerClient(BytedeskConstants.webSocketWssUrl, clientId!);
mqttClient?.useWebSocket = true;
mqttClient?.port = 443;
/// You can also supply your own websocket protocol list or disable this feature using the websocketProtocols
/// setter, read the API docs for further details here, the vast majority of brokers will support the client default
/// list so in most cases you can ignore this. Mosquito needs the single default setting.
// mqttClient.websocketProtocols = MqttClientBytedeskConstants.protocolsSingleDefault;
} else {
debugPrint('isTcp mqttClient connecting....');
mqttClient = MqttServerClient(BytedeskConstants.mqttHost, clientId!);
mqttClient?.port = BytedeskConstants.mqttPort;
mqttClient?.secure = BytedeskConstants.isSecure;
}
// }
// 启用3.1.1版本协议,否则clientId限制最大长度为23
mqttClient?.setProtocolV311();
/// Set logging on if needed, defaults to off
mqttClient?.logging(on: BytedeskConstants.isDebug);
// mqttClient?.logging(on: false);
/// If you intend to use a keep alive value in your connect message that is not the default(60s)
/// you must set it here
mqttClient?.keepAlivePeriod = keepAlivePeriod;
mqttClient?.autoReconnect = true; // FIXME:
mqttClient?.onAutoReconnect = _onAutoReconnect; // FIXME:
mqttClient?.onDisconnected = _onDisconnected;
mqttClient?.onConnected = _onConnected;
mqttClient?.onSubscribed = _onSubscribed;
mqttClient?.onUnsubscribed = _onUnSubscribed;
mqttClient?.onSubscribeFail = _onSubscribeFailed;
/// Set a ping received callback if needed, called whenever a ping response(pong) is received from the broker.
mqttClient?.pongCallback = _onPong;
// Create a connection message to use or use the default one. The default one sets the
/// client identifier, any supplied username/password, the default keepalive interval(60s)
/// and clean session, an example of a specific one below.
final MqttConnectMessage connMessage = MqttConnectMessage()
.withClientIdentifier(clientId!)
.authenticateAs(currentUid, '');
// .keepAliveFor(keepAlivePeriod); // Must agree with the keep alive set above or not set
// 取消客户端设置,直接在服务器端统一内容格式推送
// .withWillTopic('protobuf/lastWill/mqtt') // If you set this you must set a will message
// .withWillMessage('My Will message')
// .startClean() // Non persistent session for testing
// .withWillQos(MqttQos.atLeastOnce);
debugPrint('mqttClient connecting....');
mqttClient?.connectionMessage = connMessage;
/// Connect the client, any errors here are communicated by raising of the appropriate exception. Note
/// in some circumstances the broker will just disconnect us, see the spec about this, we however eill
/// never send malformed messages.
try {
await mqttClient?.connect();
} on Exception catch (e) {
debugPrint('mqttClient exception - $e');
mqttClient?.disconnect();
}
//
mqttClient?.published?.listen((MqttPublishMessage messageBinary) {
protomsg.Message messageProto =
protomsg.Message.fromBuffer(messageBinary.payload.message);
var uid = messageProto.uid;
debugPrint(
'receive message uid: $uid, ${messageProto.type}, ${messageProto.content}');
// 处理接收消息
// Thread thread = Thread .fromProto(messageProto);
Message message = Message.fromProto(messageProto);
//
if (!message.isSend()) {
// 接收的非自己发送消息
switch (message.type) {
case BytedeskConstants.MESSAGE_TYPE_READ:
case BytedeskConstants.MESSAGE_TYPE_DELIVERED:
// 回执消息
updateMessageStatus(message);
return;
case BytedeskConstants.MESSAGE_TYPE_TYPING:
case BytedeskConstants.MESSAGE_TYPE_PROCESSING:
// 非自己发送的:正在输入
handleTypingMessage(message);
return;
case BytedeskConstants.MESSAGE_TYPE_PREVIEW:
// 非自己发送的:消息预知
handlePreviewMessage(message);
return;
case BytedeskConstants.MESSAGE_TYPE_FAQ_UP:
case BytedeskConstants.MESSAGE_TYPE_FAQ_DOWN:
case BytedeskConstants.MESSAGE_TYPE_ROBOT_UP:
case BytedeskConstants.MESSAGE_TYPE_ROBOT_DOWN:
case BytedeskConstants.MESSAGE_TYPE_RATE_SUBMIT:
case BytedeskConstants.MESSAGE_TYPE_RATE_CANCEL:
// 访客提交评价或取消评价
updateMessageStatus(message);
return;
case BytedeskConstants.MESSAGE_TYPE_STREAM:
// handleTypingMessage(currentThread, thread, messageProtobuf.getType());
break;
case BytedeskConstants.MESSAGE_TYPE_TRANSFER:
// 转接
debugPrint("transfer message");
// handleTransferMessage(message, thread);
break;
case BytedeskConstants.MESSAGE_TYPE_TRANSFER_ACCEPT:
// 转接被接受
debugPrint("transfer accept message");
// handleTransferAcceptMessage(message, thread);
return;
case BytedeskConstants.MESSAGE_TYPE_TRANSFER_REJECT:
// 转接被拒绝
debugPrint("transfer reject message");
// handleTransferRejectMessage(message, thread);
return;
default:
// 向服务器发送消息送达回执
var vibrateType = FeedbackType.success;
Vibrate.feedback(vibrateType);
//
// if (BytedeskUtils.shouldSendReceipt(messageProto.type)) {
// sendReceiptReceivedMessage(messageProto.uid, thread);
// }
}
} else {
// 自己发送的消息
switch (message.type) {
case BytedeskConstants.MESSAGE_TYPE_READ:
case BytedeskConstants.MESSAGE_TYPE_DELIVERED:
// 自己发送的消息回执
updateMessageStatus(message);
return;
case BytedeskConstants.MESSAGE_TYPE_TYPING:
case BytedeskConstants.MESSAGE_TYPE_PROCESSING:
// 自己发送的在输入
return;
case BytedeskConstants.MESSAGE_TYPE_PREVIEW:
return;
case BytedeskConstants.MESSAGE_TYPE_TRANSFER:
// 转接
debugPrint("transfer message");
// handleTransferMessage(message, thread);
break;
case BytedeskConstants.MESSAGE_TYPE_TRANSFER_ACCEPT:
// 转接被接受
debugPrint("transfer accept message");
// handlTransferAcceptMessage(message, thread);
return;
case BytedeskConstants.MESSAGE_TYPE_TRANSFER_REJECT:
// 转接被拒绝
debugPrint("transfer reject message");
// handleTransferRejectMessage(message, thread);
return;
default:
// 收到从服务器返回自己发的消息,发送成功
updateMessageSuccess(uid);
}
}
//
messageProvider?.insert(message);
// 通知界面显示聊天记录
bytedeskEventBus.fire(ReceiveMessageEventBus(message));
// bytedeskEventBus.fire(ReceiveThreadEventBus(thread));
// // 接收消息播放提示音,放到SDK外实现,迁移到demo中
// if (BytedeskKefu.getPlayAudioOnReceiveMessage()! && !message.isSend()) {
// SystemSound.play(SystemSoundType.click);
// }
// // 振动,放到SDK外实现,迁移到demo中
// if (BytedeskKefu.getVibrateOnReceiveMessage()! && !message.isSend()) {
// vibrate();
// }
//
});
}