466 lines
14 KiB
C#
466 lines
14 KiB
C#
using System.Collections;
|
|
using System.Collections.Generic;
|
|
using UnityEngine;
|
|
using BestHTTP;
|
|
using BestHTTP.WebSocket;
|
|
using System;
|
|
using UnityEngine.UI;
|
|
using System.Text;
|
|
using System.IO;
|
|
using Dist.SpringWebsocket;
|
|
using XLua;
|
|
using System.Text.RegularExpressions;
|
|
using Coolape;
|
|
|
|
public class Client4Stomp : MonoBehaviour
|
|
{
|
|
public static Client4Stomp self;
|
|
//public string url = "ws://localhost:8080/web1/websocket";
|
|
public string url;
|
|
public WebSocket webSocket;
|
|
|
|
/// <summary>
|
|
/// 支持的stomp协议版本
|
|
/// </summary>
|
|
public static string StompPrototypeVersion = "accept-version:1.1,1.0";
|
|
/// <summary>
|
|
/// 心跳时间
|
|
/// </summary>
|
|
public static string HeartBeating = "heart-beat:10000,10000";
|
|
/// <summary>
|
|
/// 换行符,用于构造 Stomp 消息包
|
|
/// </summary>
|
|
public static Char LF = Convert.ToChar(10);
|
|
/// <summary>
|
|
/// 空字符,用于构造 Stomp 消息包
|
|
/// </summary>
|
|
public static Char NULL = Convert.ToChar(0);
|
|
/// <summary>
|
|
/// 当前连接类型
|
|
/// </summary>
|
|
public static string TYPE = "client";
|
|
public static MemoryStream receivedBuffer = new MemoryStream();
|
|
|
|
Queue<StompFrame> queueCallback = new Queue<StompFrame>();
|
|
private static int COUNTER = 0;
|
|
|
|
private static string SubscriptionHeader = "subscription";
|
|
private static string DestinationHeader = "destination";
|
|
private static string ContentLengthHeader = "content-length";
|
|
private static string CID = "dist-connect";
|
|
|
|
private Dictionary<string, object> callbacks;
|
|
private Dictionary<string, string> subscribes;
|
|
private object statusCallback;
|
|
|
|
public bool isDebug = false;
|
|
|
|
public Client4Stomp()
|
|
{
|
|
self = this;
|
|
}
|
|
|
|
public void init(string url, object callback)
|
|
{
|
|
this.url = url;
|
|
this.statusCallback = callback;
|
|
this.callbacks = new Dictionary<string, object>();
|
|
this.subscribes = new Dictionary<string, string>();
|
|
|
|
webSocket = new WebSocket(new Uri(url));
|
|
webSocket.OnOpen += OnOpen;
|
|
webSocket.OnMessage += OnMessageReceived;
|
|
webSocket.OnBinary += OnBinaryReceived;
|
|
webSocket.OnError += OnError;
|
|
webSocket.OnClosed += OnClosed;
|
|
ConnectSocket();
|
|
}
|
|
|
|
private void antiInit()
|
|
{
|
|
webSocket.OnOpen = null;
|
|
webSocket.OnMessage = null;
|
|
webSocket.OnBinary = null;
|
|
webSocket.OnError = null;
|
|
webSocket.OnClosed = null;
|
|
webSocket = null;
|
|
}
|
|
|
|
public void ConnectSocket()
|
|
{
|
|
webSocket.Open();
|
|
}
|
|
|
|
public void SendSocket(string str)
|
|
{
|
|
if(webSocket == null)
|
|
{
|
|
socket_Error(null, "webSocket == null");
|
|
return;
|
|
}
|
|
webSocket.Send(str);
|
|
}
|
|
|
|
public void CloseSocket()
|
|
{
|
|
if (webSocket == null)
|
|
{
|
|
socket_Error(null, "webSocket == null");
|
|
return;
|
|
}
|
|
webSocket.Close();
|
|
}
|
|
|
|
#region WebSocket Event Handlers
|
|
|
|
/// <summary>
|
|
/// Called when the web socket is open, and we are ready to send and receive data
|
|
/// </summary>
|
|
void OnOpen(WebSocket ws)
|
|
{
|
|
Debug.Log("connected");
|
|
socket_Opened(this);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Called when we received a text message from the server
|
|
/// </summary>
|
|
void OnMessageReceived(WebSocket ws, string message)
|
|
{
|
|
if (isDebug)
|
|
{
|
|
Debug.Log(message);
|
|
}
|
|
socket_MessageReceived(ws, message);
|
|
}
|
|
|
|
void OnBinaryReceived(WebSocket ws, byte[] bytes)
|
|
{
|
|
//TODO:
|
|
}
|
|
/// <summary>
|
|
/// Called when the web socket closed
|
|
/// </summary>
|
|
void OnClosed(WebSocket ws, UInt16 code, string message)
|
|
{
|
|
Debug.Log(message);
|
|
antiInit();
|
|
queueCallback.Enqueue(new StompFrame(StatusCodeEnum.SERVERCLOSED, "与服务器断开连接", null, statusCallback));
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// Called when an error occured on client side
|
|
/// </summary>
|
|
void OnError(WebSocket ws, Exception ex)
|
|
{
|
|
string errorMsg = string.Empty;
|
|
#if !UNITY_WEBGL || UNITY_EDITOR
|
|
if (ws.InternalRequest.Response != null)
|
|
errorMsg = string.Format("Status Code from Server: {0} and Message: {1}", ws.InternalRequest.Response.StatusCode, ws.InternalRequest.Response.Message);
|
|
#endif
|
|
Debug.Log(errorMsg);
|
|
antiInit();
|
|
socket_Error(this, errorMsg);
|
|
}
|
|
|
|
#endregion
|
|
|
|
//==========================================================================================================
|
|
//==========================================================================================================
|
|
//==========================================================================================================
|
|
|
|
public void Connect(Dictionary<string, string> headers, object callback)
|
|
{
|
|
if (webSocket != null && webSocket.IsOpen)
|
|
{
|
|
this.callbacks[CID] = callback;
|
|
string data = StompCommandEnum.CONNECT.ToString() + LF;
|
|
if (headers != null)
|
|
{
|
|
foreach (string key in headers.Keys)
|
|
{
|
|
data += key + ":" + headers[key] + LF;
|
|
}
|
|
}
|
|
data += StompPrototypeVersion + LF
|
|
+ HeartBeating + LF + LF + NULL;
|
|
SendSocket(data);
|
|
}
|
|
}
|
|
public void Send(string destination, string content)
|
|
{
|
|
this.Send(destination, new Dictionary<string, string>(), content);
|
|
}
|
|
public void Send(string destination, Dictionary<string, string> headers, string content)
|
|
{
|
|
string data = StompCommandEnum.SEND.ToString() + LF;
|
|
if (headers != null)
|
|
{
|
|
foreach (string key in headers.Keys)
|
|
{
|
|
data += key + ":" + headers[key] + LF;
|
|
}
|
|
}
|
|
data += DestinationHeader + ":" + destination + LF
|
|
+ ContentLengthHeader + ":" + GetByteCount(content) + LF + LF
|
|
+ content + NULL;
|
|
|
|
SendSocket(data);
|
|
}
|
|
public void Send(string destination, LuaTable headers, string content)
|
|
{
|
|
string data = StompCommandEnum.SEND.ToString() + LF;
|
|
if (headers != null)
|
|
{
|
|
headers.ForEach<string, object>((key, val) =>
|
|
{
|
|
data += key + ":" + val.ToString() + LF;
|
|
});
|
|
}
|
|
data += DestinationHeader + ":" + destination + LF
|
|
+ ContentLengthHeader + ":" + GetByteCount(content) + LF + LF
|
|
+ content + NULL;
|
|
|
|
SendSocket(data);
|
|
}
|
|
|
|
public void Subscribe(string destination, object callback)
|
|
{
|
|
this.Subscribe(destination, new Dictionary<string, string>(), callback);
|
|
}
|
|
public void Subscribe(string destination, Dictionary<string, string> headers, object callback)
|
|
{
|
|
lock (this)
|
|
{
|
|
if (!this.subscribes.ContainsKey(destination))
|
|
{
|
|
string id = "sub-" + COUNTER++;
|
|
this.callbacks.Add(id, callback);
|
|
this.subscribes.Add(destination, id);
|
|
string data = StompCommandEnum.SUBSCRIBE.ToString() + LF + "id:" + id + LF;
|
|
foreach (string key in headers.Keys)
|
|
{
|
|
data += key + ":" + headers[key] + LF;
|
|
}
|
|
data += DestinationHeader + ":" + destination + LF + LF + NULL;
|
|
SendSocket(data);
|
|
}
|
|
}
|
|
}
|
|
public void UnSubscribe(string destination)
|
|
{
|
|
if (this.subscribes.ContainsKey(destination))
|
|
{
|
|
SendSocket(StompCommandEnum.UNSUBSCRIBE.ToString() + LF + "id:" + this.subscribes[destination] + LF + LF + NULL);
|
|
this.callbacks.Remove(this.subscribes[destination]);
|
|
this.subscribes.Remove(destination);
|
|
}
|
|
}
|
|
public void DisConnect()
|
|
{
|
|
//ArrayList list = new ArrayList();
|
|
//list.AddRange(subscribes.Keys);
|
|
//foreach (var key in list)
|
|
//{
|
|
// UnSubscribe(key.ToString());
|
|
//}
|
|
//this.callbacks.Clear();
|
|
//this.subscribes.Clear();
|
|
SendSocket(StompCommandEnum.DISCONNECT.ToString() + LF + LF + NULL);
|
|
}
|
|
public bool IsSubscribed(string destination)
|
|
{
|
|
return this.subscribes.ContainsKey(destination);
|
|
}
|
|
|
|
private int GetByteCount(string content)
|
|
{
|
|
return Regex.Split(Uri.EscapeUriString(content), "%..|.").Length - 1;
|
|
}
|
|
|
|
private StompFrame TransformResultFrame(string content)
|
|
{
|
|
lock (this)
|
|
{
|
|
StompFrame frame = new StompFrame();
|
|
//string[] matches = Regex.Split(content, "" + NULL + LF + "*");
|
|
//foreach (var line in matches)
|
|
//{
|
|
//if (line.Length > 0)
|
|
//{
|
|
this.HandleSingleLine(content, frame);
|
|
// }
|
|
//}
|
|
return frame;
|
|
}
|
|
}
|
|
|
|
private void HandleSingleLine(string line, StompFrame frame)
|
|
{
|
|
int divider = line.IndexOf("" + LF + LF);
|
|
if (divider >= 0)
|
|
{
|
|
string[] headerLines = Regex.Split(line.Substring(0, divider), "" + LF);
|
|
frame.Code = (StatusCodeEnum)Enum.Parse(typeof(StatusCodeEnum), headerLines[0]);
|
|
for (int i = 1; i < headerLines.Length; i++)
|
|
{
|
|
int index = headerLines[i].IndexOf(":");
|
|
string key = headerLines[i].Substring(0, index);
|
|
string value = headerLines[i].Substring(index + 1);
|
|
frame.AddHeader(Regex.Replace(key, @"^\s+|\s+$", ""), Regex.Replace(value, @"^\s+|\s+$", ""));
|
|
}
|
|
frame.Content = line.Substring(divider + 2);
|
|
}
|
|
}
|
|
private void socket_Error(object sender, string errMsg)
|
|
{
|
|
if (isDebug)
|
|
{
|
|
Debug.LogWarning("socket_Error==" + errMsg);
|
|
}
|
|
//close();
|
|
//this.connected = false;
|
|
queueCallback.Enqueue(new StompFrame(StatusCodeEnum.SERVERERROR, errMsg, null, statusCallback));
|
|
}
|
|
void socket_Closed(object sender)
|
|
{
|
|
if (isDebug)
|
|
{
|
|
Debug.LogWarning("socket_Closed");
|
|
}
|
|
//this.socket.Dispose();
|
|
queueCallback.Enqueue(new StompFrame(StatusCodeEnum.SERVERCLOSED, "与服务器断开连接", null, statusCallback));
|
|
}
|
|
private void socket_Opened(object sender)
|
|
{
|
|
queueCallback.Enqueue(new StompFrame(StatusCodeEnum.OPENSERVER, "成功连接到服务器", null, statusCallback));
|
|
}
|
|
|
|
private byte[] getBytes(string message)
|
|
{
|
|
byte[] buffer = Encoding.Default.GetBytes(message);
|
|
return buffer;
|
|
}
|
|
|
|
//public static int __maxLen = 1024 * 1024;
|
|
//byte[] tmpBuffer = new byte[__maxLen];
|
|
StringBuilder inputSb = new StringBuilder();
|
|
void socket_MessageReceived(object sender, string msg)
|
|
{
|
|
//byte[] bytes = getBytes(str);
|
|
//receivedBuffer.Write(bytes, 0, bytes.Length);
|
|
//int totalLen = (int)(receivedBuffer.Length);
|
|
//receivedBuffer.Position = 0;
|
|
//receivedBuffer.Read(tmpBuffer, 0, totalLen);
|
|
//receivedBuffer.Position = totalLen;
|
|
|
|
//string msg = Encoding.UTF8.GetString(tmpBuffer, 0, totalLen);
|
|
//if (isDebug)
|
|
//{
|
|
// Debug.LogWarning("=======取得的总数据===========" + totalLen);
|
|
// Debug.LogWarning(msg);
|
|
//}
|
|
inputSb.Append(msg);
|
|
while (true)
|
|
{
|
|
int index = inputSb.ToString().IndexOf(NULL);
|
|
if (index < 0)
|
|
{
|
|
//说明数据包还不完整
|
|
break;
|
|
}
|
|
string content = inputSb.ToString().Substring(0, index);
|
|
inputSb.Remove(0, index + 1);
|
|
if (isDebug)
|
|
{
|
|
Debug.LogWarning("=======frame===========");
|
|
Debug.LogWarning(content);
|
|
}
|
|
StompFrame frame = this.TransformResultFrame(content);
|
|
|
|
object callback;
|
|
switch (frame.Code)
|
|
{
|
|
case StatusCodeEnum.CONNECTED:
|
|
callbacks.TryGetValue(CID, out callback);
|
|
frame.callback = callback;
|
|
queueCallback.Enqueue(frame);
|
|
break;
|
|
case StatusCodeEnum.MESSAGE:
|
|
callbacks.TryGetValue(frame.GetHeader(SubscriptionHeader), out callback);
|
|
frame.callback = callback;
|
|
queueCallback.Enqueue(frame);
|
|
break;
|
|
case StatusCodeEnum.ERROR:
|
|
CloseSocket();
|
|
Debug.LogWarning(frame.Code);
|
|
break;
|
|
default:
|
|
Debug.LogError(frame.Code);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// left msg proc
|
|
/*
|
|
msg = inputSb.ToString();
|
|
inputSb.Clear();
|
|
int leftLen = 0;
|
|
byte[] leftBytes = null;
|
|
if (!string.IsNullOrEmpty(msg))
|
|
{
|
|
leftBytes = Encoding.UTF8.GetBytes(msg);
|
|
leftLen = leftBytes.Length;
|
|
}
|
|
if (isDebug)
|
|
{
|
|
Debug.LogWarning("left len====" + leftLen);
|
|
}
|
|
if (totalLen != leftLen)
|
|
{
|
|
if (leftLen > 0)
|
|
{
|
|
receivedBuffer.Position = 0;
|
|
receivedBuffer.Write(leftBytes, 0, leftLen);
|
|
receivedBuffer.Position = leftLen;
|
|
}
|
|
else
|
|
{
|
|
receivedBuffer.Position = 0;
|
|
receivedBuffer.SetLength(0);
|
|
}
|
|
}
|
|
*/
|
|
}
|
|
|
|
//===============================================
|
|
StompFrame _stompframe;
|
|
private void Update()
|
|
{
|
|
if (queueCallback.Count > 0)
|
|
{
|
|
_stompframe = queueCallback.Dequeue();
|
|
if (_stompframe != null)
|
|
{
|
|
Utl.doCallback(_stompframe.callback, _stompframe);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
public void close()
|
|
{
|
|
if (webSocket != null && webSocket.IsOpen)
|
|
{
|
|
DisConnect();
|
|
webSocket.Close();
|
|
antiInit();
|
|
}
|
|
}
|
|
|
|
private void OnDestroy()
|
|
{
|
|
close();
|
|
}
|
|
} |