up
This commit is contained in:
380
Assets/BestHTTP/ServerSentEvents/EventSourceResponse.cs
Normal file
380
Assets/BestHTTP/ServerSentEvents/EventSourceResponse.cs
Normal file
@@ -0,0 +1,380 @@
|
||||
#if !BESTHTTP_DISABLE_SERVERSENT_EVENTS && (!UNITY_WEBGL || UNITY_EDITOR)
|
||||
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
|
||||
using System.Text;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace BestHTTP.ServerSentEvents
|
||||
{
|
||||
/// <summary>
|
||||
/// A low-level class to receive and parse an EventSource(http://www.w3.org/TR/eventsource/) stream.
|
||||
/// Higher level protocol representation is implemented in the EventSource class.
|
||||
/// </summary>
|
||||
public sealed class EventSourceResponse : HTTPResponse, IProtocol
|
||||
{
|
||||
public bool IsClosed { get; private set; }
|
||||
|
||||
#region Public Events
|
||||
|
||||
public Action<EventSourceResponse, BestHTTP.ServerSentEvents.Message> OnMessage;
|
||||
public Action<EventSourceResponse> OnClosed;
|
||||
|
||||
#endregion
|
||||
|
||||
#region Privates
|
||||
|
||||
/// <summary>
|
||||
/// Thread sync object
|
||||
/// </summary>
|
||||
private object FrameLock = new object();
|
||||
|
||||
/// <summary>
|
||||
/// Buffer for the read data.
|
||||
/// </summary>
|
||||
private byte[] LineBuffer = new byte[1024];
|
||||
|
||||
/// <summary>
|
||||
/// Buffer position.
|
||||
/// </summary>
|
||||
private int LineBufferPos = 0;
|
||||
|
||||
/// <summary>
|
||||
/// The currently receiving and parsing message
|
||||
/// </summary>
|
||||
private BestHTTP.ServerSentEvents.Message CurrentMessage;
|
||||
|
||||
/// <summary>
|
||||
/// Completed messages that waiting to be dispatched
|
||||
/// </summary>
|
||||
private List<BestHTTP.ServerSentEvents.Message> CompletedMessages = new List<BestHTTP.ServerSentEvents.Message>();
|
||||
|
||||
#endregion
|
||||
|
||||
public EventSourceResponse(HTTPRequest request, Stream stream, bool isStreamed, bool isFromCache)
|
||||
:base(request, stream, isStreamed, isFromCache)
|
||||
{
|
||||
base.IsClosedManually = true;
|
||||
}
|
||||
|
||||
internal override bool Receive(int forceReadRawContentLength = -1, bool readPayloadData = true)
|
||||
{
|
||||
bool received = base.Receive(forceReadRawContentLength, false);
|
||||
|
||||
string contentType = this.GetFirstHeaderValue("content-type");
|
||||
base.IsUpgraded = received &&
|
||||
this.StatusCode == 200 &&
|
||||
!string.IsNullOrEmpty(contentType) &&
|
||||
contentType.ToLower().StartsWith("text/event-stream");
|
||||
|
||||
// If we didn't upgraded to the protocol we have to read all the sent payload because
|
||||
// next requests may read these datas as HTTP headers and will fail
|
||||
if (!IsUpgraded)
|
||||
ReadPayload(forceReadRawContentLength);
|
||||
|
||||
return received;
|
||||
}
|
||||
|
||||
internal void StartReceive()
|
||||
{
|
||||
if (IsUpgraded)
|
||||
{
|
||||
#if NETFX_CORE
|
||||
#pragma warning disable 4014
|
||||
Windows.System.Threading.ThreadPool.RunAsync(ReceiveThreadFunc);
|
||||
#pragma warning restore 4014
|
||||
#else
|
||||
ThreadPool.QueueUserWorkItem(ReceiveThreadFunc);
|
||||
//new Thread(ReceiveThreadFunc)
|
||||
// .Start();
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
#region Private Threading Functions
|
||||
|
||||
private void ReceiveThreadFunc(object param)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (HasHeaderWithValue("transfer-encoding", "chunked"))
|
||||
ReadChunked(Stream);
|
||||
else
|
||||
ReadRaw(Stream, -1);
|
||||
}
|
||||
#if !NETFX_CORE
|
||||
catch (ThreadAbortException)
|
||||
{
|
||||
this.baseRequest.State = HTTPRequestStates.Aborted;
|
||||
}
|
||||
#endif
|
||||
catch (Exception e)
|
||||
{
|
||||
if (HTTPUpdateDelegator.IsCreated)
|
||||
{
|
||||
this.baseRequest.Exception = e;
|
||||
this.baseRequest.State = HTTPRequestStates.Error;
|
||||
}
|
||||
else
|
||||
this.baseRequest.State = HTTPRequestStates.Aborted;
|
||||
}
|
||||
finally
|
||||
{
|
||||
IsClosed = true;
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Read Implementations
|
||||
|
||||
// http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.6.1
|
||||
private new void ReadChunked(Stream stream)
|
||||
{
|
||||
int chunkLength = ReadChunkLength(stream);
|
||||
byte[] buffer = new byte[chunkLength];
|
||||
|
||||
while (chunkLength != 0)
|
||||
{
|
||||
// To avoid more GC garbage we use only one buffer, and resize only if the next chunk doesn't fit.
|
||||
if (buffer.Length < chunkLength)
|
||||
Array.Resize<byte>(ref buffer, chunkLength);
|
||||
|
||||
int readBytes = 0;
|
||||
|
||||
// Fill up the buffer
|
||||
do
|
||||
{
|
||||
int bytes = stream.Read(buffer, readBytes, chunkLength - readBytes);
|
||||
if (bytes == 0)
|
||||
throw new Exception("The remote server closed the connection unexpectedly!");
|
||||
|
||||
readBytes += bytes;
|
||||
} while (readBytes < chunkLength);
|
||||
|
||||
FeedData(buffer, readBytes);
|
||||
|
||||
// Every chunk data has a trailing CRLF
|
||||
ReadTo(stream, LF);
|
||||
|
||||
// read the next chunk's length
|
||||
chunkLength = ReadChunkLength(stream);
|
||||
}
|
||||
|
||||
// Read the trailing headers or the CRLF
|
||||
ReadHeaders(stream);
|
||||
}
|
||||
|
||||
private new void ReadRaw(Stream stream, int contentLength)
|
||||
{
|
||||
byte[] buffer = new byte[1024];
|
||||
int bytes;
|
||||
|
||||
do
|
||||
{
|
||||
bytes = stream.Read(buffer, 0, buffer.Length);
|
||||
|
||||
FeedData(buffer, bytes);
|
||||
} while(bytes > 0);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Data Parsing
|
||||
|
||||
public void FeedData(byte[] buffer, int count)
|
||||
{
|
||||
if (count == -1)
|
||||
count = buffer.Length;
|
||||
|
||||
if (count == 0)
|
||||
return;
|
||||
|
||||
int newlineIdx;
|
||||
int pos = 0;
|
||||
|
||||
do {
|
||||
|
||||
newlineIdx = -1;
|
||||
int skipCount = 1; // to skip CR and/or LF
|
||||
|
||||
for (int i = pos; i < count && newlineIdx == -1; ++i)
|
||||
{
|
||||
// Lines must be separated by either a U+000D CARRIAGE RETURN U+000A LINE FEED (CRLF) character pair, a single U+000A LINE FEED (LF) character, or a single U+000D CARRIAGE RETURN (CR) character.
|
||||
if (buffer[i] == HTTPResponse.CR)
|
||||
{
|
||||
if (i + 1 < count && buffer[i + 1] == HTTPResponse.LF)
|
||||
skipCount = 2;
|
||||
newlineIdx = i;
|
||||
}
|
||||
else if (buffer[i] == HTTPResponse.LF)
|
||||
newlineIdx = i;
|
||||
}
|
||||
|
||||
int copyIndex = newlineIdx == -1 ? count : newlineIdx;
|
||||
|
||||
if (LineBuffer.Length < LineBufferPos + (copyIndex - pos))
|
||||
Array.Resize<byte>(ref LineBuffer, LineBufferPos + (copyIndex - pos));
|
||||
|
||||
Array.Copy(buffer, pos, LineBuffer, LineBufferPos, copyIndex - pos);
|
||||
|
||||
LineBufferPos += copyIndex - pos;
|
||||
|
||||
if (newlineIdx == -1)
|
||||
return;
|
||||
|
||||
ParseLine(LineBuffer, LineBufferPos);
|
||||
|
||||
LineBufferPos = 0;
|
||||
//pos += newlineIdx + skipCount;
|
||||
pos = newlineIdx + skipCount;
|
||||
|
||||
}while(newlineIdx != -1 && pos < count);
|
||||
}
|
||||
|
||||
void ParseLine(byte[] buffer, int count)
|
||||
{
|
||||
// If the line is empty (a blank line) => Dispatch the event
|
||||
if (count == 0)
|
||||
{
|
||||
if (CurrentMessage != null)
|
||||
{
|
||||
lock (FrameLock)
|
||||
CompletedMessages.Add(CurrentMessage);
|
||||
CurrentMessage = null;
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// If the line starts with a U+003A COLON character (:) => Ignore the line.
|
||||
if (buffer[0] == 0x3A)
|
||||
return;
|
||||
|
||||
//If the line contains a U+003A COLON character (:)
|
||||
int colonIdx = -1;
|
||||
for (int i = 0; i < count && colonIdx == -1; ++i)
|
||||
if (buffer[i] == 0x3A)
|
||||
colonIdx = i;
|
||||
|
||||
string field;
|
||||
string value;
|
||||
|
||||
if (colonIdx != -1)
|
||||
{
|
||||
// Collect the characters on the line before the first U+003A COLON character (:), and let field be that string.
|
||||
field = Encoding.UTF8.GetString(buffer, 0, colonIdx);
|
||||
|
||||
//Collect the characters on the line after the first U+003A COLON character (:), and let value be that string. If value starts with a U+0020 SPACE character, remove it from value.
|
||||
if (colonIdx + 1 < count && buffer[colonIdx + 1] == 0x20)
|
||||
colonIdx++;
|
||||
|
||||
colonIdx++;
|
||||
|
||||
// discarded because it is not followed by a blank line
|
||||
if (colonIdx >= count)
|
||||
return;
|
||||
|
||||
value = Encoding.UTF8.GetString(buffer, colonIdx, count - colonIdx);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Otherwise, the string is not empty but does not contain a U+003A COLON character (:) =>
|
||||
// Process the field using the whole line as the field name, and the empty string as the field value.
|
||||
field = Encoding.UTF8.GetString(buffer, 0, count);
|
||||
value = string.Empty;
|
||||
}
|
||||
|
||||
if (CurrentMessage == null)
|
||||
CurrentMessage = new BestHTTP.ServerSentEvents.Message();
|
||||
|
||||
switch(field)
|
||||
{
|
||||
// If the field name is "id" => Set the last event ID buffer to the field value.
|
||||
case "id":
|
||||
CurrentMessage.Id = value;
|
||||
break;
|
||||
|
||||
// If the field name is "event" => Set the event type buffer to field value.
|
||||
case "event":
|
||||
CurrentMessage.Event = value;
|
||||
break;
|
||||
|
||||
// If the field name is "data" => Append the field value to the data buffer, then append a single U+000A LINE FEED (LF) character to the data buffer.
|
||||
case "data":
|
||||
// Append a new line if we already have some data. This way we can skip step 3.) in the EventSource's OnMessageReceived.
|
||||
// We do only null check, because empty string can be valid payload
|
||||
if (CurrentMessage.Data != null)
|
||||
CurrentMessage.Data += Environment.NewLine;
|
||||
|
||||
CurrentMessage.Data += value;
|
||||
break;
|
||||
|
||||
// If the field name is "retry" => If the field value consists of only ASCII digits, then interpret the field value as an integer in base ten,
|
||||
// and set the event stream's reconnection time to that integer. Otherwise, ignore the field.
|
||||
case "retry":
|
||||
int result;
|
||||
if (int.TryParse(value, out result))
|
||||
CurrentMessage.Retry = TimeSpan.FromMilliseconds(result);
|
||||
break;
|
||||
|
||||
// Otherwise: The field is ignored.
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
void IProtocol.HandleEvents()
|
||||
{
|
||||
lock(FrameLock)
|
||||
{
|
||||
// Send out messages.
|
||||
if (CompletedMessages.Count > 0)
|
||||
{
|
||||
if (OnMessage != null)
|
||||
for (int i = 0; i < CompletedMessages.Count; ++i)
|
||||
{
|
||||
try
|
||||
{
|
||||
OnMessage(this, CompletedMessages[i]);
|
||||
}
|
||||
catch(Exception ex)
|
||||
{
|
||||
HTTPManager.Logger.Exception("EventSourceMessage", "HandleEvents - OnMessage", ex);
|
||||
}
|
||||
}
|
||||
|
||||
CompletedMessages.Clear();
|
||||
}
|
||||
}
|
||||
|
||||
// We are closed
|
||||
if (IsClosed)
|
||||
{
|
||||
CompletedMessages.Clear();
|
||||
|
||||
if (OnClosed != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
OnClosed(this);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
HTTPManager.Logger.Exception("EventSourceMessage", "HandleEvents - OnClosed", ex);
|
||||
}
|
||||
finally
|
||||
{
|
||||
OnClosed = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
Reference in New Issue
Block a user