Files
tianrunCRM/Assets/trCRM/DistSpringWebsocketClient/Client4Stomp.cs

463 lines
14 KiB
C#
Raw Normal View History

2020-07-09 08:50:24 +08:00
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using BestHTTP;
using BestHTTP.WebSocket;
using System;
using UnityEngine.UI;
using System.Text;
using System.IO;
using Dist.SpringWebsocket;
using XLua;
using System.Text.RegularExpressions;
using Coolape;
public class Client4Stomp : MonoBehaviour
{
public static Client4Stomp self;
//public string url = "ws://localhost:8080/web1/websocket";
public string url;
public WebSocket webSocket;
/// <summary>
/// 支持的stomp协议版本
/// </summary>
public static string StompPrototypeVersion = "accept-version:1.1,1.0";
/// <summary>
/// 心跳时间
/// </summary>
public static string HeartBeating = "heart-beat:10000,10000";
/// <summary>
/// 换行符,用于构造 Stomp 消息包
/// </summary>
public static Char LF = Convert.ToChar(10);
/// <summary>
/// 空字符,用于构造 Stomp 消息包
/// </summary>
public static Char NULL = Convert.ToChar(0);
/// <summary>
/// 当前连接类型
/// </summary>
public static string TYPE = "client";
public static MemoryStream receivedBuffer = new MemoryStream();
Queue<StompFrame> queueCallback = new Queue<StompFrame>();
private static int COUNTER = 0;
private static string SubscriptionHeader = "subscription";
private static string DestinationHeader = "destination";
private static string ContentLengthHeader = "content-length";
private static string CID = "dist-connect";
private Dictionary<string, object> callbacks;
private Dictionary<string, string> subscribes;
private object statusCallback;
public bool isDebug = false;
public Client4Stomp()
{
self = this;
}
public void init(string url, object callback)
{
this.url = url;
this.statusCallback = callback;
this.callbacks = new Dictionary<string, object>();
this.subscribes = new Dictionary<string, string>();
webSocket = new WebSocket(new Uri(url));
webSocket.OnOpen += OnOpen;
webSocket.OnMessage += OnMessageReceived;
webSocket.OnBinary += OnBinaryReceived;
webSocket.OnError += OnError;
webSocket.OnClosed += OnClosed;
ConnectSocket();
}
private void antiInit()
{
webSocket.OnOpen = null;
webSocket.OnMessage = null;
webSocket.OnBinary = null;
webSocket.OnError = null;
webSocket.OnClosed = null;
webSocket = null;
}
public void ConnectSocket()
{
webSocket.Open();
}
public void SendSocket(string str)
{
if(webSocket == null)
{
socket_Error(null, "webSocket == null");
return;
}
webSocket.Send(str);
}
public void CloseSocket()
{
if (webSocket == null)
{
socket_Error(null, "webSocket == null");
return;
}
webSocket.Close();
}
#region WebSocket Event Handlers
/// <summary>
/// Called when the web socket is open, and we are ready to send and receive data
/// </summary>
void OnOpen(WebSocket ws)
{
Debug.Log("connected");
socket_Opened(this);
}
/// <summary>
/// Called when we received a text message from the server
/// </summary>
void OnMessageReceived(WebSocket ws, string message)
{
Debug.Log(message);
socket_MessageReceived(ws, message);
}
void OnBinaryReceived(WebSocket ws, byte[] bytes)
{
//TODO:
}
/// <summary>
/// Called when the web socket closed
/// </summary>
void OnClosed(WebSocket ws, UInt16 code, string message)
{
Debug.Log(message);
antiInit();
queueCallback.Enqueue(new StompFrame(StatusCodeEnum.SERVERCLOSED, "与服务器断开连接", null, statusCallback));
}
/// <summary>
/// Called when an error occured on client side
/// </summary>
void OnError(WebSocket ws, Exception ex)
{
string errorMsg = string.Empty;
#if !UNITY_WEBGL || UNITY_EDITOR
if (ws.InternalRequest.Response != null)
errorMsg = string.Format("Status Code from Server: {0} and Message: {1}", ws.InternalRequest.Response.StatusCode, ws.InternalRequest.Response.Message);
#endif
Debug.Log(errorMsg);
antiInit();
socket_Error(this, errorMsg);
}
#endregion
//==========================================================================================================
//==========================================================================================================
//==========================================================================================================
public void Connect(Dictionary<string, string> headers, object callback)
{
if (webSocket != null && webSocket.IsOpen)
{
this.callbacks[CID] = callback;
string data = StompCommandEnum.CONNECT.ToString() + LF;
if (headers != null)
{
foreach (string key in headers.Keys)
{
data += key + ":" + headers[key] + LF;
}
}
data += StompPrototypeVersion + LF
+ HeartBeating + LF + LF + NULL;
SendSocket(data);
}
}
public void Send(string destination, string content)
{
this.Send(destination, new Dictionary<string, string>(), content);
}
public void Send(string destination, Dictionary<string, string> headers, string content)
{
string data = StompCommandEnum.SEND.ToString() + LF;
if (headers != null)
{
foreach (string key in headers.Keys)
{
data += key + ":" + headers[key] + LF;
}
}
data += DestinationHeader + ":" + destination + LF
+ ContentLengthHeader + ":" + GetByteCount(content) + LF + LF
+ content + NULL;
SendSocket(data);
}
public void Send(string destination, LuaTable headers, string content)
{
string data = StompCommandEnum.SEND.ToString() + LF;
if (headers != null)
{
headers.ForEach<string, object>((key, val) =>
{
data += key + ":" + val.ToString() + LF;
});
}
data += DestinationHeader + ":" + destination + LF
+ ContentLengthHeader + ":" + GetByteCount(content) + LF + LF
+ content + NULL;
SendSocket(data);
}
public void Subscribe(string destination, object callback)
{
this.Subscribe(destination, new Dictionary<string, string>(), callback);
}
public void Subscribe(string destination, Dictionary<string, string> headers, object callback)
{
lock (this)
{
if (!this.subscribes.ContainsKey(destination))
{
string id = "sub-" + COUNTER++;
this.callbacks.Add(id, callback);
this.subscribes.Add(destination, id);
string data = StompCommandEnum.SUBSCRIBE.ToString() + LF + "id:" + id + LF;
foreach (string key in headers.Keys)
{
data += key + ":" + headers[key] + LF;
}
data += DestinationHeader + ":" + destination + LF + LF + NULL;
SendSocket(data);
}
}
}
public void UnSubscribe(string destination)
{
if (this.subscribes.ContainsKey(destination))
{
SendSocket(StompCommandEnum.UNSUBSCRIBE.ToString() + LF + "id:" + this.subscribes[destination] + LF + LF + NULL);
this.callbacks.Remove(this.subscribes[destination]);
this.subscribes.Remove(destination);
}
}
public void DisConnect()
{
//ArrayList list = new ArrayList();
//list.AddRange(subscribes.Keys);
//foreach (var key in list)
//{
// UnSubscribe(key.ToString());
//}
//this.callbacks.Clear();
//this.subscribes.Clear();
SendSocket(StompCommandEnum.DISCONNECT.ToString() + LF + LF + NULL);
}
public bool IsSubscribed(string destination)
{
return this.subscribes.ContainsKey(destination);
}
private int GetByteCount(string content)
{
return Regex.Split(Uri.EscapeUriString(content), "%..|.").Length - 1;
}
private StompFrame TransformResultFrame(string content)
{
lock (this)
{
StompFrame frame = new StompFrame();
//string[] matches = Regex.Split(content, "" + NULL + LF + "*");
//foreach (var line in matches)
//{
//if (line.Length > 0)
//{
this.HandleSingleLine(content, frame);
// }
//}
return frame;
}
}
private void HandleSingleLine(string line, StompFrame frame)
{
int divider = line.IndexOf("" + LF + LF);
if (divider >= 0)
{
string[] headerLines = Regex.Split(line.Substring(0, divider), "" + LF);
frame.Code = (StatusCodeEnum)Enum.Parse(typeof(StatusCodeEnum), headerLines[0]);
for (int i = 1; i < headerLines.Length; i++)
{
int index = headerLines[i].IndexOf(":");
string key = headerLines[i].Substring(0, index);
string value = headerLines[i].Substring(index + 1);
frame.AddHeader(Regex.Replace(key, @"^\s+|\s+$", ""), Regex.Replace(value, @"^\s+|\s+$", ""));
}
frame.Content = line.Substring(divider + 2);
}
}
private void socket_Error(object sender, string errMsg)
{
if (isDebug)
{
Debug.LogWarning("socket_Error==" + errMsg);
}
//close();
//this.connected = false;
queueCallback.Enqueue(new StompFrame(StatusCodeEnum.SERVERERROR, errMsg, null, statusCallback));
}
void socket_Closed(object sender)
{
if (isDebug)
{
Debug.LogWarning("socket_Closed");
}
//this.socket.Dispose();
queueCallback.Enqueue(new StompFrame(StatusCodeEnum.SERVERCLOSED, "与服务器断开连接", null, statusCallback));
}
private void socket_Opened(object sender)
{
queueCallback.Enqueue(new StompFrame(StatusCodeEnum.OPENSERVER, "成功连接到服务器", null, statusCallback));
}
private byte[] getBytes(string message)
{
byte[] buffer = Encoding.Default.GetBytes(message);
return buffer;
}
//public static int __maxLen = 1024 * 1024;
//byte[] tmpBuffer = new byte[__maxLen];
StringBuilder inputSb = new StringBuilder();
void socket_MessageReceived(object sender, string msg)
{
//byte[] bytes = getBytes(str);
//receivedBuffer.Write(bytes, 0, bytes.Length);
//int totalLen = (int)(receivedBuffer.Length);
//receivedBuffer.Position = 0;
//receivedBuffer.Read(tmpBuffer, 0, totalLen);
//receivedBuffer.Position = totalLen;
//string msg = Encoding.UTF8.GetString(tmpBuffer, 0, totalLen);
//if (isDebug)
//{
// Debug.LogWarning("=======取得的总数据===========" + totalLen);
// Debug.LogWarning(msg);
//}
inputSb.Append(msg);
while (true)
{
int index = inputSb.ToString().IndexOf(NULL);
if (index < 0)
{
//说明数据包还不完整
break;
}
string content = inputSb.ToString().Substring(0, index);
inputSb.Remove(0, index + 1);
if (isDebug)
{
Debug.LogWarning("=======frame===========");
Debug.LogWarning(content);
}
StompFrame frame = this.TransformResultFrame(content);
object callback;
switch (frame.Code)
{
case StatusCodeEnum.CONNECTED:
callbacks.TryGetValue(CID, out callback);
frame.callback = callback;
queueCallback.Enqueue(frame);
break;
case StatusCodeEnum.MESSAGE:
callbacks.TryGetValue(frame.GetHeader(SubscriptionHeader), out callback);
frame.callback = callback;
queueCallback.Enqueue(frame);
break;
case StatusCodeEnum.ERROR:
CloseSocket();
Debug.LogWarning(frame.Code);
break;
default:
Debug.LogError(frame.Code);
break;
}
}
// left msg proc
/*
msg = inputSb.ToString();
inputSb.Clear();
int leftLen = 0;
byte[] leftBytes = null;
if (!string.IsNullOrEmpty(msg))
{
leftBytes = Encoding.UTF8.GetBytes(msg);
leftLen = leftBytes.Length;
}
if (isDebug)
{
Debug.LogWarning("left len====" + leftLen);
}
if (totalLen != leftLen)
{
if (leftLen > 0)
{
receivedBuffer.Position = 0;
receivedBuffer.Write(leftBytes, 0, leftLen);
receivedBuffer.Position = leftLen;
}
else
{
receivedBuffer.Position = 0;
receivedBuffer.SetLength(0);
}
}
*/
}
//===============================================
StompFrame _stompframe;
private void Update()
{
if (queueCallback.Count > 0)
{
_stompframe = queueCallback.Dequeue();
if (_stompframe != null)
{
Utl.doCallback(_stompframe.callback, _stompframe);
}
}
}
public void close()
{
if (webSocket != null && webSocket.IsOpen)
{
DisConnect();
webSocket.Close();
antiInit();
}
}
private void OnDestroy()
{
close();
}
}