submodule对gitea不管用,所以直接拉了一份拉格兰
This commit is contained in:
@@ -0,0 +1,152 @@
|
||||
using System.Net.Sockets;
|
||||
using System.Net.WebSockets;
|
||||
using Lagrange.OneBot.Core.Network.Service;
|
||||
using Lagrange.OneBot.Core.Operation;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Lagrange.OneBot.Core.Network;
|
||||
|
||||
public sealed partial class LagrangeWebSvcCollection(IServiceProvider services, IConfiguration config, ILogger<LagrangeWebSvcCollection> logger)
|
||||
: IHostedService
|
||||
{
|
||||
private const string Tag = nameof(LagrangeWebSvcCollection);
|
||||
|
||||
private readonly List<(IServiceScope, ILagrangeWebService)> _webServices = [];
|
||||
|
||||
public async Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var implsSection = config.GetSection("Implementations");
|
||||
if (implsSection.Exists())
|
||||
{
|
||||
Log.LogMultiConnection(logger, Tag);
|
||||
}
|
||||
else
|
||||
{
|
||||
implsSection = config.GetSection("Implementation");
|
||||
if (!implsSection.Exists())
|
||||
{
|
||||
Log.LogNoConnection(logger, Tag);
|
||||
return;
|
||||
}
|
||||
|
||||
Log.LogSingleConnection(logger, Tag);
|
||||
}
|
||||
|
||||
var operationSvc = services.GetRequiredService<OperationService>();
|
||||
foreach (var section in implsSection.GetChildren())
|
||||
{
|
||||
if (section["Enable"] == "false") continue;
|
||||
var scope = services.CreateScope();
|
||||
var serviceProvider = scope.ServiceProvider;
|
||||
|
||||
var factory = serviceProvider.GetRequiredService<ILagrangeWebServiceFactory>();
|
||||
factory.SetConfig(section);
|
||||
|
||||
if (factory.Create() is not { } webService) continue;
|
||||
webService.OnMessageReceived += async (_, args) =>
|
||||
{
|
||||
if (await operationSvc.HandleOperation(args) is { } result)
|
||||
{
|
||||
try
|
||||
{
|
||||
await webService.SendJsonAsync(result, args.Identifier, cancellationToken);
|
||||
}
|
||||
catch (WebSocketException e) when (e.InnerException is HttpRequestException)
|
||||
{
|
||||
// ignore due to connection failed
|
||||
}
|
||||
catch (WebSocketException e) when (e.InnerException is SocketException)
|
||||
{
|
||||
// ignore due to connection closed
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Log.LogWebServiceSendFailed(logger, e, Tag);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
await webService.StartAsync(cancellationToken);
|
||||
_webServices.Add((scope, webService));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Log.LogWebServiceStartFailed(logger, e, Tag);
|
||||
scope.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
foreach (var (scope, service) in _webServices)
|
||||
{
|
||||
try
|
||||
{
|
||||
await service.StopAsync(cancellationToken);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Log.LogWebServiceStopFailed(logger, e, Tag);
|
||||
}
|
||||
finally
|
||||
{
|
||||
scope.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async Task SendJsonAsync<T>(T json, string? identifier = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
foreach (var (_, service) in _webServices)
|
||||
{
|
||||
try
|
||||
{
|
||||
var vt = service.SendJsonAsync(json, identifier, cancellationToken);
|
||||
if (!vt.IsCompletedSuccessfully)
|
||||
{
|
||||
var t = vt.AsTask();
|
||||
await t.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken);
|
||||
}
|
||||
}
|
||||
catch (WebSocketException e) when (e.InnerException is HttpRequestException)
|
||||
{
|
||||
// ignore due to connection failed
|
||||
}
|
||||
catch (WebSocketException e) when (e.InnerException is SocketException)
|
||||
{
|
||||
// ignore due to connection closed
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Log.LogWebServiceSendFailed(logger, e, Tag);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static partial class Log
|
||||
{
|
||||
[LoggerMessage(EventId = 1, Level = LogLevel.Information, Message = "[{tag}]: Multi Connection has been configured")]
|
||||
public static partial void LogMultiConnection(ILogger logger, string tag);
|
||||
|
||||
[LoggerMessage(EventId = 2, Level = LogLevel.Information, Message = "[{tag}]: Single Connection has been configured")]
|
||||
public static partial void LogSingleConnection(ILogger logger, string tag);
|
||||
|
||||
[LoggerMessage(EventId = 3, Level = LogLevel.Warning, Message = "[{Tag}]: No implementation has been configured")]
|
||||
public static partial void LogNoConnection(ILogger logger, string tag);
|
||||
|
||||
[LoggerMessage(EventId = 4, Level = LogLevel.Warning, Message = "[{Tag}]: WebService start failed.")]
|
||||
public static partial void LogWebServiceStartFailed(ILogger logger, Exception e, string tag);
|
||||
|
||||
[LoggerMessage(EventId = 5, Level = LogLevel.Warning, Message = "[{Tag}]: WebService stop failed.")]
|
||||
public static partial void LogWebServiceStopFailed(ILogger logger, Exception e, string tag);
|
||||
|
||||
[LoggerMessage(EventId = 6, Level = LogLevel.Warning, Message = "[{Tag}]: WebService send message failed.")]
|
||||
public static partial void LogWebServiceSendFailed(ILogger logger, Exception e, string tag);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
namespace Lagrange.OneBot.Core.Network;
|
||||
|
||||
public class MsgRecvEventArgs(string data, string? identifier = null) : EventArgs
|
||||
{
|
||||
public string Data { get; init; } = data;
|
||||
|
||||
public string? Identifier { get; init; } = identifier;
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
namespace Lagrange.OneBot.Core.Network.Options;
|
||||
|
||||
public class ForwardWSServiceOptions : WSServiceOptions;
|
||||
@@ -0,0 +1,12 @@
|
||||
namespace Lagrange.OneBot.Core.Network.Options;
|
||||
|
||||
public sealed class HttpPostServiceOptions : HttpServiceOptions
|
||||
{
|
||||
public string Suffix { get; set; } = "";
|
||||
|
||||
public uint HeartBeatInterval { get; set; } = 5000;
|
||||
|
||||
public bool HeartBeatEnable { get; set; } = true;
|
||||
|
||||
public string Secret { get; set; } = string.Empty;
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
namespace Lagrange.OneBot.Core.Network.Options;
|
||||
|
||||
public class HttpServiceOptions
|
||||
{
|
||||
public string Host { get; set; } = "";
|
||||
|
||||
public uint Port { get; set; }
|
||||
|
||||
public string? AccessToken { get; set; }
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
namespace Lagrange.OneBot.Core.Network.Options;
|
||||
|
||||
public sealed class ReverseWSServiceOptions : WSServiceOptions
|
||||
{
|
||||
public string Suffix { get; set; } = "";
|
||||
|
||||
public string ApiSuffix { get; set; } = "";
|
||||
|
||||
public string EventSuffix { get; set; } = "";
|
||||
|
||||
public bool UseUniversalClient { get; set; } = true;
|
||||
|
||||
public bool IgnoreSslCertificate { get; set; } = false;
|
||||
|
||||
public uint ReconnectInterval { get; set; } = 5000;
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
namespace Lagrange.OneBot.Core.Network.Options;
|
||||
|
||||
public abstract class WSServiceOptions
|
||||
{
|
||||
public string Host { get; set; } = "";
|
||||
|
||||
public uint Port { get; set; }
|
||||
|
||||
public uint HeartBeatInterval { get; set; } = 5000; // by default 5000
|
||||
|
||||
public bool HeartBeatEnable { get; set; } = true;
|
||||
|
||||
public string? AccessToken { get; set; }
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
namespace Lagrange.OneBot.Core.Network.Service
|
||||
{
|
||||
public class DefaultLagrangeWebServiceFactory(IServiceProvider services) : LagrangeWebServiceFactory(services)
|
||||
{
|
||||
public override ILagrangeWebService? Create()
|
||||
{
|
||||
var config = _config ?? throw new InvalidOperationException("Configuration must be provided");
|
||||
string? type = config["Type"] ?? (config as ConfigurationSection)?.Key;
|
||||
if (!string.IsNullOrEmpty(type))
|
||||
{
|
||||
return type switch
|
||||
{
|
||||
"ReverseWebSocket" => Create<ReverseWSService>(config),
|
||||
"ForwardWebSocket" => Create<ForwardWSService>(config),
|
||||
"HttpPost" => Create<HttpPostService>(config),
|
||||
"Http" => Create<HttpService>(config),
|
||||
_ => null
|
||||
};
|
||||
}
|
||||
|
||||
var rws = config.GetSection("ReverseWebSocket");
|
||||
if (rws.Exists()) return Create<ReverseWSService>(rws);
|
||||
|
||||
var fws = config.GetSection("ForwardWebSocket");
|
||||
if (fws.Exists()) return Create<ForwardWSService>(fws);
|
||||
|
||||
var rh = config.GetSection("HttpPost");
|
||||
if (rh.Exists()) return Create<HttpPostService>(rh);
|
||||
|
||||
var fh = config.GetSection("Http");
|
||||
if (fh.Exists()) return Create<HttpService>(fh);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
protected ILagrangeWebService? Create<TService>(IConfiguration config) where TService : ILagrangeWebService
|
||||
{
|
||||
var factory = _services.GetService<ILagrangeWebServiceFactory<TService>>();
|
||||
if (factory == null) return null;
|
||||
|
||||
factory.SetConfig(config);
|
||||
return factory.Create();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,489 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Diagnostics;
|
||||
using System.Net;
|
||||
using System.Net.WebSockets;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using Lagrange.Core;
|
||||
using Lagrange.OneBot.Core.Entity.Meta;
|
||||
using Lagrange.OneBot.Core.Network.Options;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace Lagrange.OneBot.Core.Network.Service;
|
||||
|
||||
public partial class ForwardWSService(ILogger<ForwardWSService> logger, IOptionsSnapshot<ForwardWSServiceOptions> options, BotContext botContext) : BackgroundService, ILagrangeWebService
|
||||
{
|
||||
#region Initialization
|
||||
|
||||
private readonly ForwardWSServiceOptions _options = options.Value;
|
||||
|
||||
#endregion
|
||||
|
||||
#region Lifecycle
|
||||
private readonly HttpListener _listener = new();
|
||||
|
||||
public override Task StartAsync(CancellationToken token)
|
||||
{
|
||||
string host = _options.Host == "0.0.0.0" ? "*" : _options.Host;
|
||||
|
||||
// First start the HttpListener
|
||||
_listener.Prefixes.Add($"http://{host}:{_options.Port}/");
|
||||
_listener.Start();
|
||||
|
||||
foreach (string prefix in _listener.Prefixes) Log.LogServerStarted(logger, prefix);
|
||||
|
||||
// then obtain the HttpListenerContext
|
||||
return base.StartAsync(token);
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken token)
|
||||
{
|
||||
try
|
||||
{
|
||||
while (true) // Looping to Retrieve and Handle HttpListenerContext
|
||||
{
|
||||
_ = HandleHttpListenerContext(await _listener.GetContextAsync().WaitAsync(token), token);
|
||||
|
||||
token.ThrowIfCancellationRequested();
|
||||
}
|
||||
}
|
||||
catch (Exception e) when (e is not OperationCanceledException)
|
||||
{
|
||||
Log.LogWaitConnectException(logger, e);
|
||||
}
|
||||
}
|
||||
|
||||
public override async Task StopAsync(CancellationToken token)
|
||||
{
|
||||
// Get the task of all currently connected tcs before stopping it
|
||||
var tasks = _connections.Values.Select(c => c.Tcs.Task).ToArray();
|
||||
|
||||
// Stop obtaining the HttpListenerContext first
|
||||
await base.StopAsync(token);
|
||||
|
||||
// Wait for the connection task to stop
|
||||
await Task.WhenAll(tasks).WaitAsync(token);
|
||||
|
||||
// then stop the HttpListener
|
||||
_listener.Stop();
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region Connect
|
||||
private readonly ConcurrentDictionary<string, ConnectionContext> _connections = [];
|
||||
|
||||
private async Task HandleHttpListenerContext(HttpListenerContext httpContext, CancellationToken token)
|
||||
{
|
||||
// Generating an identifier for this context
|
||||
string identifier = Guid.NewGuid().ToString();
|
||||
|
||||
var response = httpContext.Response;
|
||||
|
||||
try
|
||||
{
|
||||
Log.LogConnect(logger, identifier);
|
||||
|
||||
// Validating AccessToken
|
||||
if (!ValidatingAccessToken(httpContext))
|
||||
{
|
||||
Log.LogValidatingAccessTokenFail(logger, identifier);
|
||||
|
||||
response.StatusCode = (int)HttpStatusCode.Forbidden;
|
||||
response.Close();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// Validating whether it is a WebSocket request
|
||||
if (!httpContext.Request.IsWebSocketRequest)
|
||||
{
|
||||
Log.LogNotWebSocketRequest(logger, identifier);
|
||||
|
||||
response.StatusCode = (int)HttpStatusCode.BadRequest;
|
||||
response.Close();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// Upgrade to WebSocket
|
||||
var wsContext = await httpContext.AcceptWebSocketAsync(null).WaitAsync(token);
|
||||
|
||||
// Building and store ConnectionContext
|
||||
var cts = CancellationTokenSource.CreateLinkedTokenSource(token);
|
||||
_connections.TryAdd(identifier, new(httpContext, wsContext, cts));
|
||||
|
||||
string path = wsContext.RequestUri.LocalPath;
|
||||
bool isApi = path is "/api" or "/api/";
|
||||
bool isEvent = path is "/event" or "/event/";
|
||||
|
||||
// Only API interfaces do not require sending heartbeats
|
||||
if (!isApi)
|
||||
{
|
||||
// Send ConnectLifecycleMetaEvent
|
||||
await SendJsonAsync(new OneBotLifecycle(botContext.BotUin, "connect"), identifier, token);
|
||||
|
||||
if (_options is { HeartBeatEnable: true, HeartBeatInterval: > 0 })
|
||||
{
|
||||
_ = HeartbeatAsyncLoop(identifier, cts.Token);
|
||||
}
|
||||
}
|
||||
|
||||
// The Event interface does not need to receive messages
|
||||
// but still needs to receive Close messages to close the connection
|
||||
_ = isEvent
|
||||
? WaitCloseAsyncLoop(identifier, cts.Token)
|
||||
: ReceiveAsyncLoop(identifier, cts.Token); // The Universal interface requires receiving messages
|
||||
}
|
||||
catch (Exception e) when (e is not OperationCanceledException)
|
||||
{
|
||||
Log.LogHandleHttpListenerContextException(logger, identifier, e);
|
||||
|
||||
// Attempt to send a 500 response code
|
||||
try
|
||||
{
|
||||
response.StatusCode = (int)HttpStatusCode.InternalServerError;
|
||||
response.Close();
|
||||
}
|
||||
catch
|
||||
{
|
||||
// ignored
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private bool ValidatingAccessToken(HttpListenerContext httpContext)
|
||||
{
|
||||
// If AccessToken is not configured
|
||||
// then allow access unconditionally
|
||||
if (string.IsNullOrEmpty(_options.AccessToken)) return true;
|
||||
|
||||
string? token = null;
|
||||
|
||||
// Retrieve the Authorization request header
|
||||
string? authorization = httpContext.Request.Headers["Authorization"];
|
||||
// If the Authorization request header is not present
|
||||
// retrieve the access_token from the QueryString
|
||||
if (authorization == null) token = httpContext.Request.QueryString["access_token"];
|
||||
// If the Authorization authentication method is Bearer
|
||||
// then retrieve the AccessToken
|
||||
else if (authorization.StartsWith("Bearer ")) token = authorization["Bearer ".Length..];
|
||||
|
||||
return token == _options.AccessToken;
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region Receive
|
||||
public event EventHandler<MsgRecvEventArgs>? OnMessageReceived;
|
||||
|
||||
public async Task ReceiveAsyncLoop(string identifier, CancellationToken token)
|
||||
{
|
||||
if (!_connections.TryGetValue(identifier, out ConnectionContext? connection)) return;
|
||||
|
||||
try
|
||||
{
|
||||
byte[] buffer = new byte[1024];
|
||||
while (true)
|
||||
{
|
||||
int received = 0;
|
||||
while (true)
|
||||
{
|
||||
var resultTask = connection.WsContext.WebSocket.ReceiveAsync(buffer.AsMemory(received), default);
|
||||
|
||||
var result = !resultTask.IsCompleted ?
|
||||
await resultTask.AsTask().WaitAsync(token) :
|
||||
resultTask.Result;
|
||||
|
||||
if (result.MessageType == WebSocketMessageType.Close)
|
||||
{
|
||||
await DisconnectAsync(identifier, WebSocketCloseStatus.NormalClosure, token);
|
||||
return;
|
||||
}
|
||||
|
||||
received += result.Count;
|
||||
|
||||
if (result.EndOfMessage) break;
|
||||
|
||||
if (received == buffer.Length) Array.Resize(ref buffer, buffer.Length << 1);
|
||||
|
||||
token.ThrowIfCancellationRequested();
|
||||
}
|
||||
string message = Encoding.UTF8.GetString(buffer.AsSpan(0, received));
|
||||
|
||||
Log.LogReceive(logger, identifier, message);
|
||||
|
||||
OnMessageReceived?.Invoke(this, new(message, identifier));
|
||||
|
||||
token.ThrowIfCancellationRequested();
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
bool isCanceled = e is OperationCanceledException;
|
||||
|
||||
if (!isCanceled) Log.LogReceiveException(logger, identifier, e);
|
||||
|
||||
var status = WebSocketCloseStatus.NormalClosure;
|
||||
var t = default(CancellationToken);
|
||||
if (!isCanceled)
|
||||
{
|
||||
status = WebSocketCloseStatus.InternalServerError;
|
||||
t = token;
|
||||
}
|
||||
|
||||
await DisconnectAsync(identifier, status, t);
|
||||
|
||||
if (token.IsCancellationRequested) throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
connection.Cts.Cancel();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task WaitCloseAsyncLoop(string identifier, CancellationToken token)
|
||||
{
|
||||
if (!_connections.TryGetValue(identifier, out ConnectionContext? connection)) return;
|
||||
|
||||
try
|
||||
{
|
||||
byte[] buffer = new byte[1024];
|
||||
while (true)
|
||||
{
|
||||
ValueTask<ValueWebSocketReceiveResult> resultTask = connection.WsContext.WebSocket
|
||||
.ReceiveAsync(buffer.AsMemory(), default);
|
||||
|
||||
ValueWebSocketReceiveResult result = !resultTask.IsCompleted ?
|
||||
await resultTask.AsTask().WaitAsync(token) :
|
||||
resultTask.Result;
|
||||
|
||||
if (result.MessageType == WebSocketMessageType.Close)
|
||||
{
|
||||
await DisconnectAsync(identifier, WebSocketCloseStatus.NormalClosure, token);
|
||||
return;
|
||||
}
|
||||
|
||||
token.ThrowIfCancellationRequested();
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
bool isCanceled = e is OperationCanceledException;
|
||||
|
||||
if (!isCanceled) Log.LogWaitCloseException(logger, identifier, e);
|
||||
|
||||
var status = WebSocketCloseStatus.NormalClosure;
|
||||
var t = default(CancellationToken);
|
||||
if (!isCanceled)
|
||||
{
|
||||
status = WebSocketCloseStatus.InternalServerError;
|
||||
t = token;
|
||||
}
|
||||
|
||||
await DisconnectAsync(identifier, status, t);
|
||||
|
||||
if (token.IsCancellationRequested) throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
connection.Cts.Cancel();
|
||||
}
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region Heartbeat
|
||||
public async Task HeartbeatAsyncLoop(string identifier, CancellationToken token)
|
||||
{
|
||||
if (!_connections.TryGetValue(identifier, out ConnectionContext? connection)) return;
|
||||
|
||||
Stopwatch sw = new();
|
||||
TimeSpan interval = TimeSpan.FromMilliseconds(_options.HeartBeatInterval);
|
||||
|
||||
try
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
sw.Start();
|
||||
var heartbeat = new OneBotHeartBeat(botContext.BotUin, (int)_options.HeartBeatInterval, new OneBotStatus(true, true));
|
||||
await SendJsonAsync(heartbeat, identifier, token);
|
||||
sw.Stop();
|
||||
|
||||
// Implementing precise intervals by subtracting Stopwatch's timing from configured intervals
|
||||
var waitingTime = interval - sw.Elapsed;
|
||||
if (waitingTime >= TimeSpan.Zero) await Task.Delay(waitingTime, token);
|
||||
|
||||
sw.Reset();
|
||||
|
||||
token.ThrowIfCancellationRequested();
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
bool isCanceled = e is OperationCanceledException;
|
||||
|
||||
if (!isCanceled) Log.LogHeartbeatException(logger, identifier, e);
|
||||
|
||||
var status = WebSocketCloseStatus.NormalClosure;
|
||||
var t = default(CancellationToken);
|
||||
if (!isCanceled)
|
||||
{
|
||||
status = WebSocketCloseStatus.InternalServerError;
|
||||
t = token;
|
||||
}
|
||||
|
||||
await DisconnectAsync(identifier, status, t);
|
||||
|
||||
if (token.IsCancellationRequested) throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
connection.Cts.Cancel();
|
||||
}
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region Send
|
||||
public async ValueTask SendJsonAsync<T>(T json, string? identifier = null, CancellationToken token = default)
|
||||
{
|
||||
byte[] payload = JsonSerializer.SerializeToUtf8Bytes(json);
|
||||
if (identifier != null)
|
||||
{
|
||||
await SendBytesAsync(payload, identifier, token);
|
||||
}
|
||||
else
|
||||
{
|
||||
await Task.WhenAll(_connections
|
||||
.Where(c =>
|
||||
{
|
||||
string path = c.Value.WsContext.RequestUri.LocalPath;
|
||||
return path != "/api" && path != "/api/";
|
||||
})
|
||||
.Select(c => SendBytesAsync(payload, c.Key, token))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task SendBytesAsync(byte[] payload, string identifier, CancellationToken token)
|
||||
{
|
||||
if (!_connections.TryGetValue(identifier, out ConnectionContext? connection)) return;
|
||||
|
||||
await connection.SendSemaphoreSlim.WaitAsync(token);
|
||||
|
||||
try
|
||||
{
|
||||
Log.LogSend(logger, identifier, payload);
|
||||
await connection.WsContext.WebSocket.SendAsync(payload.AsMemory(), WebSocketMessageType.Text, true, token);
|
||||
}
|
||||
finally
|
||||
{
|
||||
connection.SendSemaphoreSlim.Release();
|
||||
}
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region Disconnect
|
||||
private async Task DisconnectAsync(string identifier, WebSocketCloseStatus status, CancellationToken token)
|
||||
{
|
||||
if (!_connections.TryRemove(identifier, out ConnectionContext? connection)) return;
|
||||
|
||||
try
|
||||
{
|
||||
await connection.WsContext.WebSocket
|
||||
.CloseAsync(status, null, token)
|
||||
.WaitAsync(TimeSpan.FromSeconds(5), token);
|
||||
|
||||
connection.HttpContext.Response.Close();
|
||||
}
|
||||
catch (Exception e) when (e is not OperationCanceledException)
|
||||
{
|
||||
Log.LogDisconnectException(logger, identifier, e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
Log.LogDisconnect(logger, identifier);
|
||||
|
||||
connection.Tcs.SetResult();
|
||||
}
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region ConnectionContext
|
||||
public class ConnectionContext(HttpListenerContext httpContext, WebSocketContext wsContext, CancellationTokenSource cts)
|
||||
{
|
||||
public HttpListenerContext HttpContext { get; } = httpContext;
|
||||
public WebSocketContext WsContext { get; } = wsContext;
|
||||
public SemaphoreSlim SendSemaphoreSlim { get; } = new(1);
|
||||
public CancellationTokenSource Cts { get; } = cts;
|
||||
public TaskCompletionSource Tcs { get; } = new();
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region Log
|
||||
public static partial class Log
|
||||
{
|
||||
#region Normal
|
||||
[LoggerMessage(EventId = 10, Level = LogLevel.Information, Message = "The server is started at {prefix}")]
|
||||
public static partial void LogServerStarted(ILogger logger, string prefix);
|
||||
|
||||
[LoggerMessage(EventId = 11, Level = LogLevel.Information, Message = "Connect({identifier})")]
|
||||
public static partial void LogConnect(ILogger logger, string identifier);
|
||||
|
||||
public static void LogReceive(ILogger logger, string identifier, string payload)
|
||||
{
|
||||
if (!logger.IsEnabled(LogLevel.Trace)) return;
|
||||
|
||||
if (payload.Length > 1024) payload = $"{payload.AsSpan(0, 1024)} ...{payload.Length - 1024} bytes";
|
||||
|
||||
InnerLogReceive(logger, identifier, payload);
|
||||
}
|
||||
|
||||
[LoggerMessage(EventId = 12, Level = LogLevel.Trace, Message = "Receive({identifier}) {payload}", SkipEnabledCheck = true)]
|
||||
private static partial void InnerLogReceive(ILogger logger, string identifier, string payload);
|
||||
|
||||
public static void LogSend(ILogger logger, string identifier, byte[] payload)
|
||||
{
|
||||
if (!logger.IsEnabled(LogLevel.Trace)) return;
|
||||
|
||||
string payloadString = Encoding.UTF8.GetString(payload);
|
||||
|
||||
if (payload.Length > 1024) payloadString = $"{payloadString.AsSpan(0, 1024)} ...{payloadString.Length - 1024} bytes";
|
||||
|
||||
InnerLogSend(logger, identifier, payloadString);
|
||||
}
|
||||
|
||||
[LoggerMessage(EventId = 13, Level = LogLevel.Trace, Message = "Send({identifier}) {payload}", SkipEnabledCheck = true)]
|
||||
private static partial void InnerLogSend(ILogger logger, string identifier, string payload);
|
||||
|
||||
[LoggerMessage(EventId = 14, Level = LogLevel.Information, Message = "Disconnect({identifier})")]
|
||||
public static partial void LogDisconnect(ILogger logger, string identifier);
|
||||
#endregion
|
||||
|
||||
#region Exception
|
||||
[LoggerMessage(EventId = 992, Level = LogLevel.Error, Message = "LogDisconnectException({identifier})")]
|
||||
public static partial void LogDisconnectException(ILogger logger, string identifier, Exception e);
|
||||
|
||||
[LoggerMessage(EventId = 993, Level = LogLevel.Error, Message = "LogHeartbeatException({identifier})")]
|
||||
public static partial void LogHeartbeatException(ILogger logger, string identifier, Exception e);
|
||||
|
||||
[LoggerMessage(EventId = 994, Level = LogLevel.Error, Message = "WaitCloseException({identifier})")]
|
||||
public static partial void LogWaitCloseException(ILogger logger, string identifier, Exception e);
|
||||
|
||||
[LoggerMessage(EventId = 995, Level = LogLevel.Error, Message = "ReceiveException({identifier})")]
|
||||
public static partial void LogReceiveException(ILogger logger, string identifier, Exception e);
|
||||
|
||||
[LoggerMessage(EventId = 996, Level = LogLevel.Warning, Message = "NotWebSocketRequest({identifier})")]
|
||||
public static partial void LogNotWebSocketRequest(ILogger logger, string identifier);
|
||||
|
||||
[LoggerMessage(EventId = 997, Level = LogLevel.Warning, Message = "ValidatingAccessTokenFail({identifier})")]
|
||||
public static partial void LogValidatingAccessTokenFail(ILogger logger, string identifier);
|
||||
|
||||
[LoggerMessage(EventId = 998, Level = LogLevel.Critical, Message = "HandleHttpListenerContextException({identifier})")]
|
||||
public static partial void LogHandleHttpListenerContextException(ILogger logger, string identifier, Exception e);
|
||||
|
||||
[LoggerMessage(EventId = 999, Level = LogLevel.Critical, Message = "WaitConnectException")]
|
||||
public static partial void LogWaitConnectException(ILogger logger, Exception e);
|
||||
#endregion
|
||||
}
|
||||
#endregion
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
using Lagrange.OneBot.Core.Network.Options;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace Lagrange.OneBot.Core.Network.Service;
|
||||
|
||||
public sealed class ForwardWSServiceFactory(IServiceProvider services) : LagrangeWebServiceFactory(services), ILagrangeWebServiceFactory<ForwardWSService>
|
||||
{
|
||||
public override ILagrangeWebService Create()
|
||||
{
|
||||
var config = _config ?? throw new InvalidOperationException("Configuration must be provided");
|
||||
var options = _services.GetRequiredService<IOptionsSnapshot<ForwardWSServiceOptions>>();
|
||||
config.Bind(options.Value);
|
||||
|
||||
return _services.GetRequiredService<ForwardWSService>();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,142 @@
|
||||
using System.Diagnostics;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using Lagrange.Core;
|
||||
using Lagrange.OneBot.Core.Entity.Action;
|
||||
using Lagrange.OneBot.Core.Entity.Meta;
|
||||
using Lagrange.OneBot.Core.Network.Options;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace Lagrange.OneBot.Core.Network.Service;
|
||||
|
||||
public partial class HttpPostService(IOptionsSnapshot<HttpPostServiceOptions> options, ILogger<HttpPostService> logger, BotContext context)
|
||||
: BackgroundService, ILagrangeWebService
|
||||
{
|
||||
private const string Tag = nameof(HttpPostService);
|
||||
|
||||
public event EventHandler<MsgRecvEventArgs>? OnMessageReceived { add { } remove { } }
|
||||
|
||||
private readonly HttpPostServiceOptions _options = options.Value;
|
||||
|
||||
private readonly ILogger _logger = logger;
|
||||
|
||||
private Uri? _url;
|
||||
|
||||
private static readonly HttpClient _client = new();
|
||||
|
||||
private HMACSHA1? _sha1;
|
||||
|
||||
private string ComputeSHA1(string data)
|
||||
{
|
||||
byte[] hash = _sha1!.ComputeHash(Encoding.UTF8.GetBytes(data));
|
||||
return Convert.ToHexString(hash).ToLower();
|
||||
}
|
||||
|
||||
public async ValueTask SendJsonAsync<T>(T payload, string? identifier, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (_url is null) throw new InvalidOperationException("Reverse HTTP service was not running");
|
||||
|
||||
if (payload is OneBotResult) return; // ignore api result
|
||||
|
||||
var json = JsonSerializer.Serialize(payload);
|
||||
Log.LogSendingData(_logger, Tag, _url.ToString(), json);
|
||||
using var request = new HttpRequestMessage(HttpMethod.Post, _url)
|
||||
{
|
||||
Headers = { { "X-Self-ID", context.BotUin.ToString() } },
|
||||
Content = new StringContent(json, Encoding.UTF8, "application/json")
|
||||
};
|
||||
|
||||
if (_sha1 is not null)
|
||||
{
|
||||
request.Headers.Add("X-Signature", $"sha1={ComputeSHA1(json)}");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await _client.SendAsync(request, cancellationToken);
|
||||
}
|
||||
catch (HttpRequestException ex)
|
||||
{
|
||||
Log.LogPostFailed(_logger, ex, Tag, _url.ToString());
|
||||
}
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(_options.Secret))
|
||||
_sha1 = new HMACSHA1(Encoding.UTF8.GetBytes(_options.Secret));
|
||||
|
||||
string urlstr = $"{_options.Host}:{_options.Port}{_options.Suffix}";
|
||||
if (!_options.Host.StartsWith("http://") && !_options.Host.StartsWith("https://"))
|
||||
{
|
||||
urlstr = "http://" + urlstr;
|
||||
}
|
||||
|
||||
if (!Uri.TryCreate(urlstr, UriKind.Absolute, out _url))
|
||||
{
|
||||
Log.LogInvalidUrl(_logger, Tag, urlstr);
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var lifecycle = new OneBotLifecycle(context.BotUin, "connect");
|
||||
await SendJsonAsync(lifecycle, null, stoppingToken);
|
||||
if (_options.HeartBeatEnable && _options.HeartBeatInterval > 0)
|
||||
{
|
||||
await HeartbeatLoop(stoppingToken);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HeartbeatLoop(CancellationToken token)
|
||||
{
|
||||
var interval = TimeSpan.FromMilliseconds(_options.HeartBeatInterval);
|
||||
Stopwatch sw = new();
|
||||
|
||||
while (true)
|
||||
{
|
||||
var status = new OneBotStatus(true, true);
|
||||
var heartBeat = new OneBotHeartBeat(context.BotUin, (int)_options.HeartBeatInterval, status);
|
||||
|
||||
sw.Start();
|
||||
await SendJsonAsync(heartBeat, null, token);
|
||||
sw.Stop();
|
||||
|
||||
// Implementing precise intervals by subtracting Stopwatch's timing from configured intervals
|
||||
var waitingTime = interval - sw.Elapsed;
|
||||
if (waitingTime >= TimeSpan.Zero)
|
||||
{
|
||||
await Task.Delay(waitingTime, token);
|
||||
}
|
||||
sw.Reset();
|
||||
}
|
||||
}
|
||||
|
||||
private static partial class Log
|
||||
{
|
||||
private enum EventIds
|
||||
{
|
||||
SendingData = 1,
|
||||
|
||||
PostFailed = 1001,
|
||||
InvalidUrl
|
||||
}
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.SendingData, Level = LogLevel.Trace, Message = "[{tag}] Send to {url}: {data}")]
|
||||
public static partial void LogSendingData(ILogger logger, string tag, string url, string data);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.PostFailed, Level = LogLevel.Error, Message = "[{tag}] Post to {url} failed")]
|
||||
public static partial void LogPostFailed(ILogger logger, Exception ex, string tag, string url);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.InvalidUrl, Level = LogLevel.Error, Message = "[{tag}] Invalid configuration was detected, url: {url}")]
|
||||
public static partial void LogInvalidUrl(ILogger logger, string tag, string url);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
using Lagrange.OneBot.Core.Network.Options;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace Lagrange.OneBot.Core.Network.Service;
|
||||
|
||||
public sealed class HttpPostServiceFactory(IServiceProvider services) : LagrangeWebServiceFactory(services), ILagrangeWebServiceFactory<HttpPostService>
|
||||
{
|
||||
public override ILagrangeWebService Create()
|
||||
{
|
||||
var config = _config ?? throw new InvalidOperationException("Configuration must be provided");
|
||||
var options = _services.GetRequiredService<IOptionsSnapshot<HttpPostServiceOptions>>();
|
||||
config.Bind(options.Value);
|
||||
|
||||
return _services.GetRequiredService<HttpPostService>();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,273 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Net;
|
||||
using System.Net.Http.Headers;
|
||||
using System.Text.Json;
|
||||
using Lagrange.OneBot.Core.Network.Options;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace Lagrange.OneBot.Core.Network.Service;
|
||||
|
||||
public sealed partial class HttpService(
|
||||
IOptionsSnapshot<HttpServiceOptions> options,
|
||||
ILogger<HttpService> logger
|
||||
) : BackgroundService, ILagrangeWebService
|
||||
{
|
||||
public event EventHandler<MsgRecvEventArgs>? OnMessageReceived;
|
||||
|
||||
private readonly HttpServiceOptions _options = options.Value;
|
||||
|
||||
private readonly ILogger _logger = logger;
|
||||
|
||||
private readonly HttpListener _listener = new();
|
||||
|
||||
private readonly ConcurrentDictionary<string, HttpListenerResponse> _responses = new();
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken token)
|
||||
{
|
||||
string prefix = $"http://{_options.Host}:{_options.Port}/";
|
||||
|
||||
try
|
||||
{
|
||||
_listener.Prefixes.Add(prefix);
|
||||
_listener.Start();
|
||||
Log.LogStarted(_logger, prefix);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Log.LogStartFailed(_logger, e);
|
||||
return;
|
||||
}
|
||||
|
||||
await ReceiveLoop(token);
|
||||
|
||||
if (_listener.IsListening)
|
||||
{
|
||||
try
|
||||
{
|
||||
_listener.Close();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Log.LogCloseFailed(_logger, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ReceiveLoop(CancellationToken token)
|
||||
{
|
||||
while (_listener.IsListening && !token.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
var context = await _listener.GetContextAsync().WaitAsync(token);
|
||||
_ = HandleRequestAsync(context, token);
|
||||
}
|
||||
catch (OperationCanceledException) when (token.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Log.LogGetContextError(_logger, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleRequestAsync(HttpListenerContext context, CancellationToken token = default)
|
||||
{
|
||||
var request = context.Request;
|
||||
var response = context.Response; // no using cause we might need to use it in SendJsonAsync
|
||||
var query = request.QueryString; // avoid creating a new nvc every get
|
||||
|
||||
|
||||
try
|
||||
{
|
||||
string identifier = Guid.NewGuid().ToString();
|
||||
Log.LogRequest(_logger, identifier, request.RemoteEndPoint.ToString());
|
||||
|
||||
if (!string.IsNullOrEmpty(_options.AccessToken))
|
||||
{
|
||||
var authorization = request.Headers.Get("Authorization") ??
|
||||
(query["access_token"] is { } accessToken ? $"Bearer {accessToken}" : null);
|
||||
if (authorization is null)
|
||||
{
|
||||
Log.LogAuthFailed(_logger, identifier);
|
||||
response.StatusCode = (int)HttpStatusCode.Unauthorized;
|
||||
response.Headers.Add("WWW-Authenticate", "Bearer");
|
||||
response.Close();
|
||||
return;
|
||||
}
|
||||
|
||||
if (authorization != $"Bearer {_options.AccessToken}")
|
||||
{
|
||||
Log.LogAuthFailed(_logger, identifier);
|
||||
response.StatusCode = (int)HttpStatusCode.Forbidden;
|
||||
response.Close();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
var action = request.Url!.AbsolutePath[1..];
|
||||
string payload;
|
||||
|
||||
switch (request.HttpMethod)
|
||||
{
|
||||
case "GET":
|
||||
{
|
||||
var @params = query.AllKeys
|
||||
.OfType<string>()
|
||||
.ToDictionary(key => key, key => query[key]);
|
||||
Log.LogReceived(_logger, identifier, request.Url.Query);
|
||||
payload = JsonSerializer.Serialize(new { action, @params });
|
||||
break;
|
||||
}
|
||||
case "POST":
|
||||
{
|
||||
if (!MediaTypeHeaderValue.TryParse(request.ContentType, out var mediaType))
|
||||
{
|
||||
Log.LogCannotParseMediaType(_logger, request.ContentType ?? string.Empty);
|
||||
response.StatusCode = (int)HttpStatusCode.NotAcceptable;
|
||||
response.Close();
|
||||
return;
|
||||
}
|
||||
|
||||
switch (mediaType.MediaType)
|
||||
{
|
||||
case "application/json":
|
||||
{
|
||||
|
||||
using var reader = new StreamReader(request.InputStream);
|
||||
var body = await reader.ReadToEndAsync(token);
|
||||
Log.LogReceived(_logger, identifier, body);
|
||||
payload = $"{{\"action\":\"{action}\",\"params\":{body}}}";
|
||||
break;
|
||||
}
|
||||
case "application/x-www-form-urlencoded":
|
||||
{
|
||||
using var reader = new StreamReader(request.InputStream);
|
||||
var body = await reader.ReadToEndAsync(token);
|
||||
Log.LogReceived(_logger, identifier, body);
|
||||
var @params = body.Split('&')
|
||||
.Where(pair => !string.IsNullOrEmpty(pair))
|
||||
.Select(pair => pair.Split('=', 2))
|
||||
.ToDictionary(pair => pair[0], pair => Uri.UnescapeDataString(pair[1]));
|
||||
payload = JsonSerializer.Serialize(new { action, @params });
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
Log.LogUnsupportedContentType(_logger, request.ContentType ?? string.Empty);
|
||||
response.StatusCode = (int)HttpStatusCode.NotAcceptable; // make them happy
|
||||
response.Close();
|
||||
return;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
Log.LogUnsupportedMethod(_logger, request.HttpMethod);
|
||||
response.StatusCode = (int)HttpStatusCode.MethodNotAllowed;
|
||||
response.Close();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
Log.LogReceived(_logger, identifier, payload);
|
||||
_responses.TryAdd(identifier, response);
|
||||
OnMessageReceived?.Invoke(this, new MsgRecvEventArgs(payload, identifier));
|
||||
}
|
||||
catch (OperationCanceledException) when (token.IsCancellationRequested)
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Log.LogHandleError(_logger, e);
|
||||
response.StatusCode = (int)HttpStatusCode.InternalServerError;
|
||||
response.Close();
|
||||
}
|
||||
}
|
||||
|
||||
public async ValueTask SendJsonAsync<T>(T json, string? identifier = null,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (identifier is null) return;
|
||||
|
||||
string payload = JsonSerializer.Serialize(json);
|
||||
Log.LogSend(_logger, identifier, payload);
|
||||
|
||||
if (_responses.TryRemove(identifier, out var response))
|
||||
{
|
||||
response.ContentType = "application/json";
|
||||
response.ContentLength64 = System.Text.Encoding.UTF8.GetByteCount(payload);
|
||||
await using (var writer = new StreamWriter(response.OutputStream))
|
||||
{
|
||||
await writer.WriteAsync(payload);
|
||||
}
|
||||
|
||||
response.Close();
|
||||
}
|
||||
}
|
||||
|
||||
private static partial class Log
|
||||
{
|
||||
private enum EventIds
|
||||
{
|
||||
Started = 1,
|
||||
Request,
|
||||
Received,
|
||||
Send,
|
||||
|
||||
StartFailed = 1001,
|
||||
CloseFailed,
|
||||
GetContextError,
|
||||
HandleError,
|
||||
AuthFailed,
|
||||
CannotParseMediaType,
|
||||
UnsupportedContentType,
|
||||
UnsupportedMethod,
|
||||
}
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.Started, Level = LogLevel.Information, Message = "HttpService started at {prefix}")]
|
||||
public static partial void LogStarted(ILogger logger, string prefix);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.Request, Level = LogLevel.Information, Message = "Request(Conn: {identifier} from {ip})")]
|
||||
public static partial void LogRequest(ILogger logger, string identifier, string ip);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.Received, Level = LogLevel.Information, Message = "Receive(Conn: {identifier}: {s})")]
|
||||
public static partial void LogReceived(ILogger logger, string identifier, string s);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.Send, Level = LogLevel.Trace, Message = "Send(Conn: {identifier}: {s})")]
|
||||
public static partial void LogSend(ILogger logger, string identifier, string s);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.CannotParseMediaType, Level = LogLevel.Warning, Message = "Cannot parse media type: {mediaType}")]
|
||||
public static partial void LogCannotParseMediaType(ILogger logger, string mediaType);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.AuthFailed, Level = LogLevel.Warning, Message = "Conn: {identifier} auth failed")]
|
||||
public static partial void LogAuthFailed(ILogger logger, string identifier);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.UnsupportedContentType, Level = LogLevel.Warning, Message = "Unsupported content type: {contentType}")]
|
||||
public static partial void LogUnsupportedContentType(ILogger logger, string contentType);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.UnsupportedMethod, Level = LogLevel.Warning, Message = "Unsupported method: {method}")]
|
||||
public static partial void LogUnsupportedMethod(ILogger logger, string method);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.HandleError, Level = LogLevel.Warning,
|
||||
Message = "An error occurred while handling the request")]
|
||||
public static partial void LogHandleError(ILogger logger, Exception e);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.GetContextError, Level = LogLevel.Warning,
|
||||
Message = "An error occurred while getting the context")]
|
||||
public static partial void LogGetContextError(ILogger logger, Exception e);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.CloseFailed, Level = LogLevel.Warning, Message = "Failed to gracefully close the listener")]
|
||||
public static partial void LogCloseFailed(ILogger logger, Exception e);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.StartFailed, Level = LogLevel.Error,
|
||||
Message = "An error occurred while starting the listener")]
|
||||
public static partial void LogStartFailed(ILogger logger, Exception e);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
using Lagrange.OneBot.Core.Network.Options;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace Lagrange.OneBot.Core.Network.Service;
|
||||
|
||||
public sealed class HttpServiceFactory(IServiceProvider services) : LagrangeWebServiceFactory(services), ILagrangeWebServiceFactory<HttpService>
|
||||
{
|
||||
public override ILagrangeWebService Create()
|
||||
{
|
||||
var config = _config ?? throw new InvalidOperationException("Configuration must be provided");
|
||||
var options = _services.GetRequiredService<IOptionsSnapshot<HttpServiceOptions>>();
|
||||
config.Bind(options.Value);
|
||||
|
||||
return _services.GetRequiredService<HttpService>();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
using Microsoft.Extensions.Hosting;
|
||||
|
||||
namespace Lagrange.OneBot.Core.Network.Service;
|
||||
|
||||
public interface ILagrangeWebService : IHostedService
|
||||
{
|
||||
public event EventHandler<MsgRecvEventArgs> OnMessageReceived;
|
||||
|
||||
public ValueTask SendJsonAsync<T>(T json, string? identifier = null, CancellationToken cancellationToken = default);
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
using Microsoft.Extensions.Configuration;
|
||||
|
||||
namespace Lagrange.OneBot.Core.Network.Service
|
||||
{
|
||||
public interface ILagrangeWebServiceFactory
|
||||
{
|
||||
void SetConfig(IConfiguration config);
|
||||
|
||||
ILagrangeWebService? Create();
|
||||
}
|
||||
|
||||
public interface ILagrangeWebServiceFactory<TService> : ILagrangeWebServiceFactory where TService : ILagrangeWebService
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
public abstract class LagrangeWebServiceFactory(IServiceProvider services) : ILagrangeWebServiceFactory
|
||||
{
|
||||
protected readonly IServiceProvider _services = services;
|
||||
|
||||
protected IConfiguration? _config;
|
||||
|
||||
public void SetConfig(IConfiguration config) => _config = config;
|
||||
|
||||
public abstract ILagrangeWebService? Create();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,300 @@
|
||||
using System.Diagnostics;
|
||||
using System.Net.WebSockets;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using Lagrange.Core;
|
||||
using Lagrange.OneBot.Core.Entity.Action;
|
||||
using Lagrange.OneBot.Core.Entity.Meta;
|
||||
using Lagrange.OneBot.Core.Network.Options;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace Lagrange.OneBot.Core.Network.Service;
|
||||
|
||||
public partial class ReverseWSService(IOptionsSnapshot<ReverseWSServiceOptions> options, ILogger<ReverseWSService> logger, BotContext context)
|
||||
: BackgroundService, ILagrangeWebService
|
||||
{
|
||||
private const string Tag = nameof(ReverseWSService);
|
||||
|
||||
private string _urlStr = string.Empty;
|
||||
private readonly string _identifier = Guid.NewGuid().ToString();
|
||||
|
||||
public event EventHandler<MsgRecvEventArgs>? OnMessageReceived;
|
||||
|
||||
private readonly ReverseWSServiceOptions _options = options.Value;
|
||||
|
||||
private readonly ILogger _logger = logger;
|
||||
|
||||
protected ConnectionContext? ConnCtx;
|
||||
|
||||
private readonly SemaphoreSlim semaphore = new(1, 1);
|
||||
|
||||
protected abstract class ConnectionContext(Task connectTask) : IDisposable
|
||||
{
|
||||
public readonly Task ConnectTask = connectTask;
|
||||
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
|
||||
public CancellationToken Token => _cts.Token;
|
||||
|
||||
public void Dispose() => _cts.Cancel();
|
||||
}
|
||||
|
||||
protected sealed class GeneralConnectionContext(ClientWebSocket apiWebSocket, ClientWebSocket eventWebSocket, Task connectTask) : ConnectionContext(connectTask)
|
||||
{
|
||||
public readonly ClientWebSocket ApiWebSocket = apiWebSocket;
|
||||
public readonly ClientWebSocket EventWebSocket = eventWebSocket;
|
||||
}
|
||||
|
||||
protected sealed class UniversalConnectionContext(ClientWebSocket webSocket, Task connectTask) : ConnectionContext(connectTask)
|
||||
{
|
||||
public readonly ClientWebSocket WebSocket = webSocket;
|
||||
}
|
||||
|
||||
public ValueTask SendJsonAsync<T>(T payload, string? identifier, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var ctx = ConnCtx ?? throw new InvalidOperationException("Reverse webSocket service was not running");
|
||||
var ws = ctx switch
|
||||
{
|
||||
UniversalConnectionContext universalCtx => universalCtx.WebSocket,
|
||||
GeneralConnectionContext generalCtx => payload is OneBotResult ? generalCtx.ApiWebSocket : generalCtx.EventWebSocket,
|
||||
_ => throw new InvalidOperationException("The connection context is not supported")
|
||||
};
|
||||
var connTask = ctx.ConnectTask;
|
||||
|
||||
return !connTask.IsCompletedSuccessfully
|
||||
? SendJsonAsync(ws, connTask, payload, ctx.Token)
|
||||
: SendJsonAsync(ws, payload, ctx.Token);
|
||||
}
|
||||
|
||||
protected async ValueTask SendJsonAsync<T>(ClientWebSocket ws, Task connectTask, T payload, CancellationToken token)
|
||||
{
|
||||
await connectTask;
|
||||
await SendJsonAsync(ws, payload, token);
|
||||
}
|
||||
|
||||
protected async ValueTask SendJsonAsync<T>(ClientWebSocket ws, T payload, CancellationToken token)
|
||||
{
|
||||
var json = JsonSerializer.Serialize(payload);
|
||||
var buffer = Encoding.UTF8.GetBytes(json);
|
||||
Log.LogSendingData(_logger, Tag, _identifier, json);
|
||||
await semaphore.WaitAsync(token);
|
||||
try
|
||||
{
|
||||
await ws.SendAsync(buffer.AsMemory(), WebSocketMessageType.Text, true, token);
|
||||
}
|
||||
finally
|
||||
{
|
||||
semaphore.Release();
|
||||
}
|
||||
}
|
||||
|
||||
protected ClientWebSocket CreateDefaultWebSocket(string role)
|
||||
{
|
||||
var ws = new ClientWebSocket();
|
||||
ws.Options.SetRequestHeader("X-Client-Role", role);
|
||||
ws.Options.SetRequestHeader("X-Self-ID", context.BotUin.ToString());
|
||||
ws.Options.SetRequestHeader("User-Agent", Constant.OneBotImpl);
|
||||
if (options.Value.IgnoreSslCertificate) ws.Options.RemoteCertificateValidationCallback = (_, _, _, _) => true;
|
||||
|
||||
if (_options.AccessToken != null) ws.Options.SetRequestHeader("Authorization", $"Bearer {_options.AccessToken}");
|
||||
ws.Options.KeepAliveInterval = Timeout.InfiniteTimeSpan;
|
||||
return ws;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_urlStr = $"{_options.Host}:{_options.Port}{_options.Suffix}";
|
||||
if (!_options.Host.StartsWith("ws://") && !_options.Host.StartsWith("wss://"))
|
||||
{
|
||||
_urlStr = "ws://" + _urlStr;
|
||||
}
|
||||
string apiurlstr = $"{_urlStr}{_options.ApiSuffix}";
|
||||
string eventurlstr = $"{_urlStr}{_options.EventSuffix}";
|
||||
|
||||
if (!Uri.TryCreate(_urlStr, UriKind.Absolute, out var url))
|
||||
{
|
||||
Log.LogInvalidUrl(_logger, Tag, _urlStr);
|
||||
return;
|
||||
}
|
||||
if (!Uri.TryCreate(apiurlstr, UriKind.Absolute, out var apiUrl))
|
||||
{
|
||||
Log.LogInvalidUrl(_logger, Tag, apiurlstr);
|
||||
return;
|
||||
}
|
||||
if (!Uri.TryCreate(eventurlstr, UriKind.Absolute, out var eventUrl))
|
||||
{
|
||||
Log.LogInvalidUrl(_logger, Tag, eventurlstr);
|
||||
return;
|
||||
}
|
||||
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (_options.UseUniversalClient)
|
||||
{
|
||||
using var ws = CreateDefaultWebSocket("Universal");
|
||||
var connTask = ws.ConnectAsync(url, stoppingToken);
|
||||
using var connCtx = new UniversalConnectionContext(ws, connTask);
|
||||
ConnCtx = connCtx;
|
||||
await connTask;
|
||||
|
||||
Log.LogConnected(_logger, Tag, _identifier, _urlStr);
|
||||
var lifecycle = new OneBotLifecycle(context.BotUin, "connect");
|
||||
await SendJsonAsync(ws, lifecycle, stoppingToken);
|
||||
|
||||
var recvTask = ReceiveLoop(ws, stoppingToken);
|
||||
if (_options.HeartBeatEnable && _options.HeartBeatInterval > 0)
|
||||
{
|
||||
var heartbeatTask = HeartbeatLoop(ws, stoppingToken);
|
||||
await Task.WhenAll(recvTask, heartbeatTask);
|
||||
}
|
||||
else
|
||||
{
|
||||
await recvTask;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
using var wsApi = CreateDefaultWebSocket("Api");
|
||||
var apiConnTask = wsApi.ConnectAsync(apiUrl, stoppingToken);
|
||||
|
||||
using var wsEvent = CreateDefaultWebSocket("Event");
|
||||
var eventConnTask = wsEvent.ConnectAsync(eventUrl, stoppingToken);
|
||||
|
||||
var connTask = Task.WhenAll(apiConnTask, eventConnTask);
|
||||
ConnCtx = new GeneralConnectionContext(wsApi, wsEvent, connTask);
|
||||
|
||||
await connTask;
|
||||
|
||||
var lifecycle = new OneBotLifecycle(context.BotUin, "connect");
|
||||
await SendJsonAsync(wsEvent, lifecycle, stoppingToken);
|
||||
|
||||
var recvTask = ReceiveLoop(wsApi, stoppingToken);
|
||||
if (_options.HeartBeatEnable && _options.HeartBeatInterval > 0)
|
||||
{
|
||||
var heartbeatTask = HeartbeatLoop(wsEvent, stoppingToken);
|
||||
await Task.WhenAll(recvTask, heartbeatTask);
|
||||
}
|
||||
else
|
||||
{
|
||||
await recvTask;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
ConnCtx = null;
|
||||
break;
|
||||
}
|
||||
catch (WebSocketException e) when (e.InnerException is HttpRequestException)
|
||||
{
|
||||
Log.LogClientReconnect(_logger, Tag, _identifier, _options.ReconnectInterval);
|
||||
var interval = TimeSpan.FromMilliseconds(_options.ReconnectInterval);
|
||||
await Task.Delay(interval, stoppingToken);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Log.LogClientDisconnected(_logger, e, Tag, _identifier);
|
||||
var interval = TimeSpan.FromMilliseconds(_options.ReconnectInterval);
|
||||
await Task.Delay(interval, stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ReceiveLoop(ClientWebSocket ws, CancellationToken token)
|
||||
{
|
||||
var buffer = new byte[1024];
|
||||
while (true)
|
||||
{
|
||||
int received = 0;
|
||||
while (true)
|
||||
{
|
||||
var result = await ws.ReceiveAsync(buffer.AsMemory(received), token);
|
||||
if (result.MessageType == WebSocketMessageType.Close)
|
||||
{
|
||||
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "Close", token);
|
||||
break;
|
||||
}
|
||||
|
||||
received += result.Count;
|
||||
if (result.EndOfMessage) break;
|
||||
|
||||
if (received == buffer.Length) Array.Resize(ref buffer, received << 1);
|
||||
}
|
||||
string text = Encoding.UTF8.GetString(buffer, 0, received);
|
||||
Log.LogDataReceived(_logger, Tag, _identifier, text);
|
||||
OnMessageReceived?.Invoke(this, new MsgRecvEventArgs(text)); // Handle user handlers error?
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HeartbeatLoop(ClientWebSocket ws, CancellationToken token)
|
||||
{
|
||||
var interval = TimeSpan.FromMilliseconds(_options.HeartBeatInterval);
|
||||
Stopwatch sw = new();
|
||||
|
||||
while (true)
|
||||
{
|
||||
var status = new OneBotStatus(true, true);
|
||||
var heartBeat = new OneBotHeartBeat(context.BotUin, (int)_options.HeartBeatInterval, status);
|
||||
|
||||
sw.Start();
|
||||
await SendJsonAsync(ws, heartBeat, token);
|
||||
sw.Stop();
|
||||
|
||||
// Implementing precise intervals by subtracting Stopwatch's timing from configured intervals
|
||||
var waitingTime = interval - sw.Elapsed;
|
||||
if (waitingTime >= TimeSpan.Zero)
|
||||
{
|
||||
await Task.Delay(waitingTime, token);
|
||||
}
|
||||
sw.Reset();
|
||||
}
|
||||
}
|
||||
|
||||
private static partial class Log
|
||||
{
|
||||
private enum EventIds
|
||||
{
|
||||
Connected = 1,
|
||||
SendingData,
|
||||
DataReceived,
|
||||
|
||||
ClientDisconnected = 1001,
|
||||
ClientReconnect,
|
||||
InvalidUrl
|
||||
}
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.Connected, Level = LogLevel.Trace, Message = "[{tag}] Connect({identifier}): {url}")]
|
||||
public static partial void LogConnected(ILogger logger, string tag, string identifier, string url);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.SendingData, Level = LogLevel.Trace, Message = "[{tag}] Send({identifier}): {data}")]
|
||||
public static partial void LogSendingData(ILogger logger, string tag, string identifier, string data);
|
||||
|
||||
public static void LogDataReceived(ILogger logger, string tag, string identifier, string data)
|
||||
{
|
||||
if (logger.IsEnabled(LogLevel.Trace))
|
||||
{
|
||||
if (data.Length > 1024)
|
||||
{
|
||||
data = string.Concat(data.AsSpan(0, 1024), "...", (data.Length - 1024).ToString(), "bytes");
|
||||
}
|
||||
InternalLogDataReceived(logger, tag, identifier, data);
|
||||
}
|
||||
}
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.DataReceived, Level = LogLevel.Trace, Message = "[{tag}] Receive({identifier}): {data}", SkipEnabledCheck = true)]
|
||||
private static partial void InternalLogDataReceived(ILogger logger, string tag, string identifier, string data);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.ClientDisconnected, Level = LogLevel.Warning, Message = "[{tag}] Disconnect({identifier})")]
|
||||
public static partial void LogClientDisconnected(ILogger logger, Exception e, string tag, string identifier);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.ClientReconnect, Level = LogLevel.Information, Message = "[{tag}] Reconnecting {identifier} at interval of {interval}")]
|
||||
public static partial void LogClientReconnect(ILogger logger, string tag, string identifier, uint interval);
|
||||
|
||||
[LoggerMessage(EventId = (int)EventIds.InvalidUrl, Level = LogLevel.Error, Message = "[{tag}] Invalid configuration was detected, url: {url}")]
|
||||
public static partial void LogInvalidUrl(ILogger logger, string tag, string url);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
using Lagrange.OneBot.Core.Network.Options;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace Lagrange.OneBot.Core.Network.Service;
|
||||
|
||||
public sealed class ReverseWSServiceFactory(IServiceProvider services) : LagrangeWebServiceFactory(services), ILagrangeWebServiceFactory<ReverseWSService>
|
||||
{
|
||||
public override ILagrangeWebService Create()
|
||||
{
|
||||
var config = _config ?? throw new InvalidOperationException("Configuration must be provided");
|
||||
var options = _services.GetRequiredService<IOptionsSnapshot<ReverseWSServiceOptions>>();
|
||||
config.Bind(options.Value);
|
||||
|
||||
return _services.GetRequiredService<ReverseWSService>();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user