专业的编程技术博客社区

网站首页 > 博客文章 正文

RabbitMQ Tutorial 之:远程过程调用(RPC)

baijin 2024-10-30 01:29:01 博客文章 9 ℃ 0 评论

译者:溪边静禅 出处:https://www.cnblogs.com/ramantic/p/8116349.html

原文来自 RabbitMQ 英文官网的教程(6.Remote procedure call - RPC),其示例代码采用了 .NET C# 语言。

在第二篇教程中,我们学习了如何使用工作队列在多个工作单元之间分配耗时的任务。

但是假如我们需要运行一个在远程电脑上的函数并等待其结果将会怎样呢?好吧,这将是一个完全不同的故事,这个模式被普遍认为叫远程过程调用或者简称 RPC

在本教程中我们即将使用 RabbitMQ 来构建一个 RPC 系统:一个客户端和一个可伸缩的 RPC 服务器。由于我们并没有任何耗时的任务能拿来分配,那就创建一个返回斐波纳契数列的虚拟 RPC 服务吧。

客户端接口(Client interface)

为了说明如何使用 RPC 服务我们来创建一个简单的客户端类。我会公开一个名叫 call 的方法,该方法用以发送一个 RPC 请求并保持阻塞状态,直至接收到应答为止。

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");

var response = rpcClient.Call("30");

Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

关于 RPC

尽管 RPC 是一个很常见的计算模式,也时常遭受批评。当程序员不知道针对 call 函数的调用是本地的还是很慢的 RPC 时就会出现问题,像这样的困惑往往会导致不可预测的系统(问题)以及徒增不必要的调试复杂性。与简化软件有所不同的是,误用 RPC 会导致难以维护的意大利面条式代码。

记住以上问题,并考虑以下建议:

  • 确保可以明显区分哪一个函数是调用本地的,哪一个是远程的。

  • 为系统编写文档,确保组件之间的依赖很明确。

  • 处理错误情形,当 RPC 服务端停机很长时间时,客户端会怎样应对?

当有疑问时先避免使用 RPC,如果可以,考虑使用一个异步管道 - 它类似于 RPC 的阻塞,会通过异步的方式将结果推送到下一个计算场景。

回调队列(Callback queue)

一般而言,基于 RabbitMQ 来使用 RPC 是很简单的,即客户端发送一个请求消息,然后服务端使用一个响应消息作为应答。为了能获得一个响应,我们需要在请求过程中发送一个“callback”队列地址。

var corrId = Guid.NewGuid().ToString();

var props = channel.CreateBasicProperties();

props.ReplyTo = replyQueueName;

props.CorrelationId = corrId;

var messageBytes = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes);

// ... then code to read a response message from the callback_queue ...

消息属性

AMQP 0-9-1 协议会在消息中预定义包含有 14 个属性的集合,大部分的属性用得都比较少,除了以下几项之外:

  • deliveryMode:将消息标记为持久的(值为2),或者瞬时的(其他值),想必你在第二篇教程中还记得这个属性。

  • contentType:经常用来描述编码的 mime 类型,比如在常见的 JSON 编码中一个好的实践便是设置该属性为:application/json。

  • replyTo:通常用来为回调队列命名。

  • correlationId:用以将 RPC 响应与请求关联起来。

Correlation Id

在上面呈现的方法中我们建议为每一个 RPC 请求创建一个回调队列,不过这很低效,幸运的是我们有更好的办法 - 让我们为每一个客户端创建一个单独的回调。

这就又会出现一个问题,即在收到响应的队列中,并不清楚哪个请求隶属于该响应,这便是 correlationId 属性所用之处。我们将会对每一个请求设置 correlationId 为唯一值,然后,当我们在回调队列中接收到消息时会查看这个属性,在该属性的基础上,我们可以让请求与响应进行匹配。如果我们发现有未知的 correlationId 值,则可以放心地丢弃这些并不属于我们的请求的消息。

你可能会问,我们为什么应该在回调队列中忽略未知的消息,而不是(直接)返回错误?这可能是由于服务端存在竞态条件。尽管不太可能,但是针对一个请求,RPC 服务器很可能在发送完应答后中止,而不是在发送确认消息之前。如果确实发生,重启的 RPC 服务将再一次处理这个请求,这就是为什么我们在客户端需要优雅地处理重复的响应,以及应该(保持)理想地幂等性。

总结

RPC 会像如下这样运作:

  • 当客户端启动时,它将创建一个匿名的独有回调队列。

  • 针对一个 RPC 请求,客户端会发送一个基于两个属性的消息:一个是指向回调队列的 replyTo,另一个是为每一个请求标记唯一值的 correlationId。

  • 请求将发送至 rpc_queue 队列。

  • RPC 工作单元(或者叫服务端)会在队列中持续等待请求。当请求出现时,RPC 将完成工作,同时使用来自 replyTo 字段(所指代)的队列来发送携带着结果的消息返回至客户端。

  • 客户端在回调队列上等待着数据,当一个消息出现时,客户端会检查 correlationId 属性,如果该值与当前请求的值相匹配,则把响应返回给应用程序。

融合一起

斐波纳契任务(函数)

private static int fib(int n){

if (n == 0 || n == 1) return n;

return fib(n - 1) + fib(n - 2);

}

我们声明了斐波纳契函数,并假定只(允许)输入正整数。(不要期望输入过大的数字,因为很可能这个递归实现会非常慢)

针对我们的 RPC 服务端,RPCServer.cs 类文件的代码看起来如下:

服务端的代码是相当简单的。

  • 像往常一样,我们先建立连接、信道以及声明队列。

  • 我们可能想运行不只一个服务端处理程序,为了能通过多台服务器平均地分担负载,我们需要设定 channel.basicQos 中 prefetchCount 的值。

  • 我们使用 basicConsume 来访问队列,然后注册一个递送程序,在这个程序中我们执行工作并返回响应。

针对我们的 RPC 客户端,RPCClient.cs 类文件的代码如下:

客户端的代码稍微多一些:

  • 我们建立连接和信道,以及针对答复(响应)声明一个独有的“callback”队列。

  • 我们订阅这个“callback”队列,以便可以接收到 RPC 响应。

  • 我们的 call 方法将发起一个实际的 RPC 请求。

  • 在此,我们首先生成一个唯一的 correlationId 编号并保存好它,因为 while 循环会使用该值来捕获匹配的响应。

  • 接下来,我们发布请求消息,它包含了两个属性:replyTo 和 correlationId。

  • 此时,我们可以稍微等待一下直到指定的响应到来。

  • while 循环所做的事情非常简单,对于每一个响应消息,它都会检查 correlationId 是否为我们正在寻找的那一个,如果是就保存该响应。

  • 最终,我们将响应返回给用户。

客户端请求

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");

var response = rpcClient.Call("30");

Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

现在是时候来看一下 RPCClient.cs 和 RPCServer.cs 完整的示例代码了(包含了基本的异常处理)。

像往常一下创建(可参考第一篇):

我们的 RPC 服务已经就绪,现可以开启服务端:

cd RPCServer

dotnet run

# => [x] Awaiting RPC requests

运行客户端来请求一个斐波纳契数:

cd RPCClient

dotnet run

# => [x] Requesting fib(30)

目前所呈现的设计不仅仅是 RPC 服务的可能实现,而且还有一些重要优点:

  • 如果 RPC 服务很慢,你可以通过运行另一个来横向扩展,也就是尝试在新的控制台中运行第二个 RPCServer。

  • 在客户端,RPC 只能发送和接收一条消息,必需像 queueDeclare 那样进行非同步式调用。因此,RPC 客户端只需要单次请求的一次网络往返。

我们的代码仍然很简单,也并没有尝试去解决更复杂(但很重要的)问题,比如就像:

  • 如果服务端没有运行,那么客户端将如何应对?

  • 客户端针对 RPC 是否应该有某种超时(应对措施)?

  • 如果服务端出现故障并引发异常,它是否应该转发给客户端?

  • 在处理之前防备无效的传入消息(比如检查边界和类型)。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表