在实际的分布式系统或微服务架构中,我们经常遇到一种混合交互模式:客户端通过长连接同步等待结果,但后端的部分处理却是异步的,最终通过回调返回结果。这种场景下,中间的服务(B)必须充当“桥梁”,既要维持与客户端的同步长连接,又要调用下游服务(C)并等待其异步回调,最后将结果回传给客户端。本文将深入分析这一模式的核心问题,并给出在 ASP.NET Core 中基于 TaskCompletionSource 和 ConcurrentDictionary 的成熟解决方案。
场景还原
假设我们有三个服务:
A:客户端,通过长连接(如 HTTP 长轮询、WebSocket 或普通的同步 HTTP 请求)向 B 发起请求,并同步等待结果。
B:中间服务,接收到 A 的请求后,需要调用下游服务 C 来执行实际任务。B 对 C 的调用是短连接(如普通的 HTTP 请求),但 C 的处理是异步的,处理完成后会通过回调接口通知 B。
C:下游服务,接收 B 的请求后异步处理,完成后回调 B。
交互流程如下:
text
A --(长连接请求)--> B --(短请求)--> C
|
|(异步处理)
v
C --(回调)--> B关键矛盾在于:A 到 B 是同步等待,但 B 无法立即从 C 得到结果,因为 C 是异步回调的。因此 B 必须将 A 的请求“挂起”,直到 C 的回调到达,再将结果返回给 A。
核心思想:请求挂起 + 回调唤醒
解决这个问题的标准模式是:
B 收到 A 的请求后,立即生成一个唯一的请求标识(RequestId)。
B 将 RequestId 传递给 C(例如在调用 C 的请求体中包含该 Id)。
B 将当前请求的上下文(如线程或任务)挂起,等待后续唤醒。
C 处理完成后,通过回调接口将 RequestId 和结果返回给 B。
B 根据 RequestId 找到之前挂起的请求上下文,唤醒它,并将结果返回给 A。
用流程图表示如下:
text
A ----请求----> B
|
| 生成 RequestId
|
+----调用C(RequestId)---->
C
|
| 处理
|
<----回调(RequestId,Result)
|
| 唤醒等待线程
v
A <---返回结果--- B在 ASP.NET Core 中的实现
在 .NET 平台,我们可以利用 TaskCompletionSource 和 ConcurrentDictionary 优雅地实现请求挂起与唤醒。TaskCompletionSource 可以创建一个可手动控制完成状态的任务,ConcurrentDictionary 用于存储 RequestId 到等待任务的映射。
1. 定义等待请求的存储
首先,在 B 服务中定义一个静态的并发字典,用于存放所有尚未完成的请求:
csharp
public static class PendingRequests
{
// Key: RequestId, Value: TaskCompletionSource 用于等待结果
public static ConcurrentDictionary<string, TaskCompletionSource<string>> _pendingRequests
= new ConcurrentDictionary<string, TaskCompletionSource<string>>();
}这里假设回调的结果类型为 string,你可以根据实际情况替换为具体的 DTO。
2. A 调 B 的接口(挂起请求)
B 提供一个 HTTP 接口供 A 调用。在该接口中,我们生成 RequestId,创建 TaskCompletionSource,存储到字典中,然后调用 C,最后等待结果:
csharp
[ApiController]
[Route("api/b")]
public class BController : ControllerBase
{
private readonly IHttpClientFactory _httpClientFactory;
public BController(IHttpClientFactory httpClientFactory)
{
_httpClientFactory = httpClientFactory;
}
[HttpPost("call")]
public async Task<IActionResult> CallCAsync()
{
// 1. 生成唯一请求ID
string requestId = Guid.NewGuid().ToString();
// 2. 创建 TaskCompletionSource
var tcs = new TaskCompletionSource<string>();
// 3. 存入字典
PendingRequests._pendingRequests.TryAdd(requestId, tcs);
// 4. 调用 C 接口,传递 RequestId
await CallCInterfaceAsync(requestId);
// 5. 等待结果(挂起)
var result = await tcs.Task;
return Ok(result);
}
}注意:await tcs.Task 会使当前请求异步挂起,但不会阻塞线程,这正是 ASP.NET Core 异步编程的优势。
3. B 调 C 的接口
B 通过 HttpClient 调用 C 的接口,将 RequestId 作为参数传递:
csharp
private async Task CallCInterfaceAsync(string requestId)
{
var client = _httpClientFactory.CreateClient();
var data = new { requestId = requestId };
await client.PostAsJsonAsync("http://c-service/api/c/do", data);
}4. C 回调 B 的接口
C 处理完成后,通过回调接口将 RequestId 和结果返回给 B。B 接收到回调后,从字典中取出对应的 TaskCompletionSource,设置结果,唤醒等待的任务:
csharp
[HttpPost("callback")]
public IActionResult Callback([FromBody] CallbackModel model)
{
if (PendingRequests._pendingRequests.TryRemove(model.RequestId, out var tcs))
{
// 设置结果,唤醒等待的请求
tcs.SetResult(model.Result);
}
else
{
// 可能已经超时或被移除,记录日志
}
return Ok();
}
public class CallbackModel
{
public string RequestId { get; set; }
public string Result { get; set; }
}必须考虑的超时问题
A 的长连接不能无限期等待,否则会消耗服务端资源,甚至导致连接泄漏。因此我们必须为每个挂起的请求设置超时机制。
常见的做法是使用 Task.WhenAny 在等待结果的同时,启动一个超时任务。当超时任务先完成时,认为请求超时,从字典中移除记录,并返回 504 状态码给客户端。
csharp
[HttpPost("call")]
public async Task<IActionResult> CallCWithTimeoutAsync()
{
string requestId = Guid.NewGuid().ToString();
var tcs = new TaskCompletionSource<string>();
PendingRequests._pendingRequests.TryAdd(requestId, tcs);
await CallCInterfaceAsync(requestId);
// 设置超时时间(例如 60 秒)
TimeSpan timeout = TimeSpan.FromSeconds(60);
var timeoutTask = Task.Delay(timeout);
// 等待实际任务和超时任务中的任意一个完成
var completedTask = await Task.WhenAny(tcs.Task, timeoutTask);
if (completedTask == timeoutTask)
{
// 超时:移除字典中的记录,防止内存泄漏,并返回超时响应
PendingRequests._pendingRequests.TryRemove(requestId, out _);
return StatusCode(504, "Request timeout");
}
// 正常得到结果
return Ok(await tcs.Task);
}如果希望在超时时还能取消对 C 的调用(如果 C 支持取消),可以引入 CancellationToken,但这通常需要更复杂的协调,可根据实际场景决定。
其他注意事项
线程安全:
ConcurrentDictionary保证了添加和移除的线程安全,但在回调中设置TaskCompletionSource时要注意:如果设置结果时,等待的任务已经被移除(如超时),TryRemove会返回 false,此时不应再调用SetResult。幂等性与重试:C 的回调可能由于网络问题重复发送,B 应该对重复的回调做幂等处理(例如根据 RequestId 判断是否已处理)。
资源清理:除了超时移除,还应在请求完成后(正常或异常)从字典中移除记录,确保没有内存泄漏。上面的代码在
SetResult时已经通过TryRemove移除了,如果任务被取消或出现异常,也要保证移除。异常处理:如果 C 处理失败,可以通过回调传递错误信息,或者让 B 通过其他方式感知失败(如 C 的回调中包含错误标志)。在 B 端,可以调用
tcs.SetException来使等待的任务抛出异常,从而将错误返回给 A。
总结
本文介绍了一种经典的“同步请求 + 异步回调”桥接模式,并通过 ASP.NET Core 中的 TaskCompletionSource 和 ConcurrentDictionary 给出了完整实现。该模式的核心在于:
使用唯一 RequestId 关联请求与回调。
使用
TaskCompletionSource挂起请求,实现非阻塞等待。使用
ConcurrentDictionary维护待处理请求的映射。引入超时机制,防止资源泄漏。
这种模式广泛适用于需要同步等待异步处理结果的场景,例如支付回调、任务调度、实时数据查询等。掌握它,可以帮助你在复杂的分布式交互中游刃有余。
希望本文对你有所帮助。如果你有更好的实现或遇到其他问题,欢迎在评论区讨论交流。