using System.Collections; using System.Collections.Generic; using UnityEngine; using BestHTTP; using BestHTTP.WebSocket; using System; using UnityEngine.UI; using System.Text; using System.IO; using Dist.SpringWebsocket; using XLua; using System.Text.RegularExpressions; using Coolape; public class Client4Stomp : MonoBehaviour { public static Client4Stomp self; //public string url = "ws://localhost:8080/web1/websocket"; public string url; public WebSocket webSocket; /// /// 支持的stomp协议版本 /// public static string StompPrototypeVersion = "accept-version:1.1,1.0"; /// /// 心跳时间 /// public static string HeartBeating = "heart-beat:10000,10000"; /// /// 换行符,用于构造 Stomp 消息包 /// public static Char LF = Convert.ToChar(10); /// /// 空字符,用于构造 Stomp 消息包 /// public static Char NULL = Convert.ToChar(0); /// /// 当前连接类型 /// public static string TYPE = "client"; public static MemoryStream receivedBuffer = new MemoryStream(); Queue queueCallback = new Queue(); private static int COUNTER = 0; private static string SubscriptionHeader = "subscription"; private static string DestinationHeader = "destination"; private static string ContentLengthHeader = "content-length"; private static string CID = "dist-connect"; private Dictionary callbacks; private Dictionary subscribes; private object statusCallback; public bool isDebug = false; public Client4Stomp() { self = this; } public void init(string url, object callback) { this.url = url; this.statusCallback = callback; this.callbacks = new Dictionary(); this.subscribes = new Dictionary(); webSocket = new WebSocket(new Uri(url)); webSocket.OnOpen += OnOpen; webSocket.OnMessage += OnMessageReceived; webSocket.OnBinary += OnBinaryReceived; webSocket.OnError += OnError; webSocket.OnClosed += OnClosed; ConnectSocket(); } private void antiInit() { webSocket.OnOpen = null; webSocket.OnMessage = null; webSocket.OnBinary = null; webSocket.OnError = null; webSocket.OnClosed = null; webSocket = null; } public void ConnectSocket() { webSocket.Open(); } public void SendSocket(string str) { if(webSocket == null) { socket_Error(null, "webSocket == null"); return; } webSocket.Send(str); } public void CloseSocket() { if (webSocket == null) { socket_Error(null, "webSocket == null"); return; } webSocket.Close(); } #region WebSocket Event Handlers /// /// Called when the web socket is open, and we are ready to send and receive data /// void OnOpen(WebSocket ws) { Debug.Log("connected"); socket_Opened(this); } /// /// Called when we received a text message from the server /// void OnMessageReceived(WebSocket ws, string message) { if (isDebug) { Debug.Log(message); } socket_MessageReceived(ws, message); } void OnBinaryReceived(WebSocket ws, byte[] bytes) { //TODO: } /// /// Called when the web socket closed /// void OnClosed(WebSocket ws, UInt16 code, string message) { Debug.Log(message); antiInit(); queueCallback.Enqueue(new StompFrame(StatusCodeEnum.SERVERCLOSED, "与服务器断开连接", null, statusCallback)); } /// /// Called when an error occured on client side /// void OnError(WebSocket ws, Exception ex) { string errorMsg = string.Empty; #if !UNITY_WEBGL || UNITY_EDITOR if (ws.InternalRequest.Response != null) errorMsg = string.Format("Status Code from Server: {0} and Message: {1}", ws.InternalRequest.Response.StatusCode, ws.InternalRequest.Response.Message); #endif Debug.Log(errorMsg); antiInit(); socket_Error(this, errorMsg); } #endregion //========================================================================================================== //========================================================================================================== //========================================================================================================== public void Connect(Dictionary headers, object callback) { if (webSocket != null && webSocket.IsOpen) { this.callbacks[CID] = callback; string data = StompCommandEnum.CONNECT.ToString() + LF; if (headers != null) { foreach (string key in headers.Keys) { data += key + ":" + headers[key] + LF; } } data += StompPrototypeVersion + LF + HeartBeating + LF + LF + NULL; SendSocket(data); } } public void Send(string destination, string content) { this.Send(destination, new Dictionary(), content); } public void Send(string destination, Dictionary headers, string content) { string data = StompCommandEnum.SEND.ToString() + LF; if (headers != null) { foreach (string key in headers.Keys) { data += key + ":" + headers[key] + LF; } } data += DestinationHeader + ":" + destination + LF + ContentLengthHeader + ":" + GetByteCount(content) + LF + LF + content + NULL; SendSocket(data); } public void Send(string destination, LuaTable headers, string content) { string data = StompCommandEnum.SEND.ToString() + LF; if (headers != null) { headers.ForEach((key, val) => { data += key + ":" + val.ToString() + LF; }); } data += DestinationHeader + ":" + destination + LF + ContentLengthHeader + ":" + GetByteCount(content) + LF + LF + content + NULL; SendSocket(data); } public void Subscribe(string destination, object callback) { this.Subscribe(destination, new Dictionary(), callback); } public void Subscribe(string destination, Dictionary headers, object callback) { lock (this) { if (!this.subscribes.ContainsKey(destination)) { string id = "sub-" + COUNTER++; this.callbacks.Add(id, callback); this.subscribes.Add(destination, id); string data = StompCommandEnum.SUBSCRIBE.ToString() + LF + "id:" + id + LF; foreach (string key in headers.Keys) { data += key + ":" + headers[key] + LF; } data += DestinationHeader + ":" + destination + LF + LF + NULL; SendSocket(data); } } } public void UnSubscribe(string destination) { if (this.subscribes.ContainsKey(destination)) { SendSocket(StompCommandEnum.UNSUBSCRIBE.ToString() + LF + "id:" + this.subscribes[destination] + LF + LF + NULL); this.callbacks.Remove(this.subscribes[destination]); this.subscribes.Remove(destination); } } public void DisConnect() { //ArrayList list = new ArrayList(); //list.AddRange(subscribes.Keys); //foreach (var key in list) //{ // UnSubscribe(key.ToString()); //} //this.callbacks.Clear(); //this.subscribes.Clear(); SendSocket(StompCommandEnum.DISCONNECT.ToString() + LF + LF + NULL); } public bool IsSubscribed(string destination) { return this.subscribes.ContainsKey(destination); } private int GetByteCount(string content) { return Regex.Split(Uri.EscapeUriString(content), "%..|.").Length - 1; } private StompFrame TransformResultFrame(string content) { lock (this) { StompFrame frame = new StompFrame(); //string[] matches = Regex.Split(content, "" + NULL + LF + "*"); //foreach (var line in matches) //{ //if (line.Length > 0) //{ this.HandleSingleLine(content, frame); // } //} return frame; } } private void HandleSingleLine(string line, StompFrame frame) { int divider = line.IndexOf("" + LF + LF); if (divider >= 0) { string[] headerLines = Regex.Split(line.Substring(0, divider), "" + LF); frame.Code = (StatusCodeEnum)Enum.Parse(typeof(StatusCodeEnum), headerLines[0]); for (int i = 1; i < headerLines.Length; i++) { int index = headerLines[i].IndexOf(":"); string key = headerLines[i].Substring(0, index); string value = headerLines[i].Substring(index + 1); frame.AddHeader(Regex.Replace(key, @"^\s+|\s+$", ""), Regex.Replace(value, @"^\s+|\s+$", "")); } frame.Content = line.Substring(divider + 2); } } private void socket_Error(object sender, string errMsg) { if (isDebug) { Debug.LogWarning("socket_Error==" + errMsg); } //close(); //this.connected = false; queueCallback.Enqueue(new StompFrame(StatusCodeEnum.SERVERERROR, errMsg, null, statusCallback)); } void socket_Closed(object sender) { if (isDebug) { Debug.LogWarning("socket_Closed"); } //this.socket.Dispose(); queueCallback.Enqueue(new StompFrame(StatusCodeEnum.SERVERCLOSED, "与服务器断开连接", null, statusCallback)); } private void socket_Opened(object sender) { queueCallback.Enqueue(new StompFrame(StatusCodeEnum.OPENSERVER, "成功连接到服务器", null, statusCallback)); } private byte[] getBytes(string message) { byte[] buffer = Encoding.Default.GetBytes(message); return buffer; } //public static int __maxLen = 1024 * 1024; //byte[] tmpBuffer = new byte[__maxLen]; StringBuilder inputSb = new StringBuilder(); void socket_MessageReceived(object sender, string msg) { //byte[] bytes = getBytes(str); //receivedBuffer.Write(bytes, 0, bytes.Length); //int totalLen = (int)(receivedBuffer.Length); //receivedBuffer.Position = 0; //receivedBuffer.Read(tmpBuffer, 0, totalLen); //receivedBuffer.Position = totalLen; //string msg = Encoding.UTF8.GetString(tmpBuffer, 0, totalLen); //if (isDebug) //{ // Debug.LogWarning("=======取得的总数据===========" + totalLen); // Debug.LogWarning(msg); //} inputSb.Append(msg); while (true) { int index = inputSb.ToString().IndexOf(NULL); if (index < 0) { //说明数据包还不完整 break; } string content = inputSb.ToString().Substring(0, index); inputSb.Remove(0, index + 1); if (isDebug) { Debug.LogWarning("=======frame==========="); Debug.LogWarning(content); } StompFrame frame = this.TransformResultFrame(content); object callback; switch (frame.Code) { case StatusCodeEnum.CONNECTED: callbacks.TryGetValue(CID, out callback); frame.callback = callback; queueCallback.Enqueue(frame); break; case StatusCodeEnum.MESSAGE: callbacks.TryGetValue(frame.GetHeader(SubscriptionHeader), out callback); frame.callback = callback; queueCallback.Enqueue(frame); break; case StatusCodeEnum.ERROR: CloseSocket(); Debug.LogWarning(frame.Code); break; default: Debug.LogError(frame.Code); break; } } // left msg proc /* msg = inputSb.ToString(); inputSb.Clear(); int leftLen = 0; byte[] leftBytes = null; if (!string.IsNullOrEmpty(msg)) { leftBytes = Encoding.UTF8.GetBytes(msg); leftLen = leftBytes.Length; } if (isDebug) { Debug.LogWarning("left len====" + leftLen); } if (totalLen != leftLen) { if (leftLen > 0) { receivedBuffer.Position = 0; receivedBuffer.Write(leftBytes, 0, leftLen); receivedBuffer.Position = leftLen; } else { receivedBuffer.Position = 0; receivedBuffer.SetLength(0); } } */ } //=============================================== StompFrame _stompframe; private void Update() { if (queueCallback.Count > 0) { _stompframe = queueCallback.Dequeue(); if (_stompframe != null) { Utl.doCallback(_stompframe.callback, _stompframe); } } } public void close() { if (webSocket != null && webSocket.IsOpen) { DisConnect(); webSocket.Close(); antiInit(); } } private void OnDestroy() { close(); } }