在实际的分布式系统或微服务架构中,我们经常遇到一种混合交互模式:客户端通过长连接同步等待结果,但后端的部分处理却是异步的,最终通过回调返回结果。这种场景下,中间的服务(B)必须充当“桥梁”,既要维持与客户端的同步长连接,又要调用下游服务(C)并等待其异步回调,最后将结果回传给客户端。本文将深入分析这一模式的核心问题,并给出在 ASP.NET Core 中基于 TaskCompletionSourceConcurrentDictionary 的成熟解决方案。

场景还原

假设我们有三个服务:

  • A:客户端,通过长连接(如 HTTP 长轮询、WebSocket 或普通的同步 HTTP 请求)向 B 发起请求,并同步等待结果。

  • B:中间服务,接收到 A 的请求后,需要调用下游服务 C 来执行实际任务。B 对 C 的调用是短连接(如普通的 HTTP 请求),但 C 的处理是异步的,处理完成后会通过回调接口通知 B。

  • C:下游服务,接收 B 的请求后异步处理,完成后回调 B。

交互流程如下:

text

A  --(长连接请求)-->  B  --(短请求)-->  C
                               |
                               |(异步处理)
                               v
                        C --(回调)--> B

关键矛盾在于:A 到 B 是同步等待,但 B 无法立即从 C 得到结果,因为 C 是异步回调的。因此 B 必须将 A 的请求“挂起”,直到 C 的回调到达,再将结果返回给 A。

核心思想:请求挂起 + 回调唤醒

解决这个问题的标准模式是:

  1. B 收到 A 的请求后,立即生成一个唯一的请求标识(RequestId)

  2. B 将 RequestId 传递给 C(例如在调用 C 的请求体中包含该 Id)。

  3. B 将当前请求的上下文(如线程或任务)挂起,等待后续唤醒

  4. C 处理完成后,通过回调接口将 RequestId 和结果返回给 B

  5. B 根据 RequestId 找到之前挂起的请求上下文,唤醒它,并将结果返回给 A

用流程图表示如下:

text

A ----请求----> B
                |
                | 生成 RequestId
                |
                +----调用C(RequestId)---->
                                          C
                                          |
                                          | 处理
                                          |
                       <----回调(RequestId,Result)
                |
                | 唤醒等待线程
                v
A <---返回结果--- B

ASP.NET Core 中的实现

在 .NET 平台,我们可以利用 TaskCompletionSourceConcurrentDictionary 优雅地实现请求挂起与唤醒。TaskCompletionSource 可以创建一个可手动控制完成状态的任务,ConcurrentDictionary 用于存储 RequestId 到等待任务的映射。

1. 定义等待请求的存储

首先,在 B 服务中定义一个静态的并发字典,用于存放所有尚未完成的请求:

csharp

public static class PendingRequests
{
    // Key: RequestId, Value: TaskCompletionSource 用于等待结果
    public static ConcurrentDictionary<string, TaskCompletionSource<string>> _pendingRequests 
        = new ConcurrentDictionary<string, TaskCompletionSource<string>>();
}

这里假设回调的结果类型为 string,你可以根据实际情况替换为具体的 DTO。

2. A 调 B 的接口(挂起请求)

B 提供一个 HTTP 接口供 A 调用。在该接口中,我们生成 RequestId,创建 TaskCompletionSource,存储到字典中,然后调用 C,最后等待结果:

csharp

[ApiController]
[Route("api/b")]
public class BController : ControllerBase
{
    private readonly IHttpClientFactory _httpClientFactory;

    public BController(IHttpClientFactory httpClientFactory)
    {
        _httpClientFactory = httpClientFactory;
    }

    [HttpPost("call")]
    public async Task<IActionResult> CallCAsync()
    {
        // 1. 生成唯一请求ID
        string requestId = Guid.NewGuid().ToString();

        // 2. 创建 TaskCompletionSource
        var tcs = new TaskCompletionSource<string>();

        // 3. 存入字典
        PendingRequests._pendingRequests.TryAdd(requestId, tcs);

        // 4. 调用 C 接口,传递 RequestId
        await CallCInterfaceAsync(requestId);

        // 5. 等待结果(挂起)
        var result = await tcs.Task;

        return Ok(result);
    }
}

注意:await tcs.Task 会使当前请求异步挂起,但不会阻塞线程,这正是 ASP.NET Core 异步编程的优势。

3. B 调 C 的接口

B 通过 HttpClient 调用 C 的接口,将 RequestId 作为参数传递:

csharp

private async Task CallCInterfaceAsync(string requestId)
{
    var client = _httpClientFactory.CreateClient();
    var data = new { requestId = requestId };
    await client.PostAsJsonAsync("http://c-service/api/c/do", data);
}

4. C 回调 B 的接口

C 处理完成后,通过回调接口将 RequestId 和结果返回给 B。B 接收到回调后,从字典中取出对应的 TaskCompletionSource,设置结果,唤醒等待的任务:

csharp

[HttpPost("callback")]
public IActionResult Callback([FromBody] CallbackModel model)
{
    if (PendingRequests._pendingRequests.TryRemove(model.RequestId, out var tcs))
    {
        // 设置结果,唤醒等待的请求
        tcs.SetResult(model.Result);
    }
    else
    {
        // 可能已经超时或被移除,记录日志
    }

    return Ok();
}

public class CallbackModel
{
    public string RequestId { get; set; }
    public string Result { get; set; }
}

必须考虑的超时问题

A 的长连接不能无限期等待,否则会消耗服务端资源,甚至导致连接泄漏。因此我们必须为每个挂起的请求设置超时机制。

常见的做法是使用 Task.WhenAny 在等待结果的同时,启动一个超时任务。当超时任务先完成时,认为请求超时,从字典中移除记录,并返回 504 状态码给客户端。

csharp

[HttpPost("call")]
public async Task<IActionResult> CallCWithTimeoutAsync()
{
    string requestId = Guid.NewGuid().ToString();
    var tcs = new TaskCompletionSource<string>();
    PendingRequests._pendingRequests.TryAdd(requestId, tcs);

    await CallCInterfaceAsync(requestId);

    // 设置超时时间(例如 60 秒)
    TimeSpan timeout = TimeSpan.FromSeconds(60);
    var timeoutTask = Task.Delay(timeout);

    // 等待实际任务和超时任务中的任意一个完成
    var completedTask = await Task.WhenAny(tcs.Task, timeoutTask);

    if (completedTask == timeoutTask)
    {
        // 超时:移除字典中的记录,防止内存泄漏,并返回超时响应
        PendingRequests._pendingRequests.TryRemove(requestId, out _);
        return StatusCode(504, "Request timeout");
    }

    // 正常得到结果
    return Ok(await tcs.Task);
}

如果希望在超时时还能取消对 C 的调用(如果 C 支持取消),可以引入 CancellationToken,但这通常需要更复杂的协调,可根据实际场景决定。

其他注意事项

  1. 线程安全ConcurrentDictionary 保证了添加和移除的线程安全,但在回调中设置 TaskCompletionSource 时要注意:如果设置结果时,等待的任务已经被移除(如超时),TryRemove 会返回 false,此时不应再调用 SetResult

  2. 幂等性与重试:C 的回调可能由于网络问题重复发送,B 应该对重复的回调做幂等处理(例如根据 RequestId 判断是否已处理)。

  3. 资源清理:除了超时移除,还应在请求完成后(正常或异常)从字典中移除记录,确保没有内存泄漏。上面的代码在 SetResult 时已经通过 TryRemove 移除了,如果任务被取消或出现异常,也要保证移除。

  4. 异常处理:如果 C 处理失败,可以通过回调传递错误信息,或者让 B 通过其他方式感知失败(如 C 的回调中包含错误标志)。在 B 端,可以调用 tcs.SetException 来使等待的任务抛出异常,从而将错误返回给 A。

总结

本文介绍了一种经典的“同步请求 + 异步回调”桥接模式,并通过 ASP.NET Core 中的 TaskCompletionSourceConcurrentDictionary 给出了完整实现。该模式的核心在于:

  • 使用唯一 RequestId 关联请求与回调。

  • 使用 TaskCompletionSource 挂起请求,实现非阻塞等待。

  • 使用 ConcurrentDictionary 维护待处理请求的映射。

  • 引入超时机制,防止资源泄漏。

这种模式广泛适用于需要同步等待异步处理结果的场景,例如支付回调、任务调度、实时数据查询等。掌握它,可以帮助你在复杂的分布式交互中游刃有余。

希望本文对你有所帮助。如果你有更好的实现或遇到其他问题,欢迎在评论区讨论交流。