从头开发一个 RPC 是种怎样的体验?
【CSDN 编者按】对于开发人员来说,调用远程服务就像是调用本地服务一样便捷。尤其是在微服务盛行的今天,了解RPC的原理过程是十分有必要的。
以下为译文:
计算机之间的通信方式多种多样,其中最常用的一种方法是远程过程调用(Remote Procedure Call,即RPC)。该协议允许一台计算机调用另一个计算机上的程序,就像调用本地程序一样,并负责所有传输和通信。
假设我们需要在一台计算机上编写一些数学程序,并且有一个判断数字是否为质数的程序或函数。在使用这个函数的时候,我们只需传递数字进去,就可以获得答案。这个函数保存在我们的计算机上。
很多时候,程序保存在本地非常方便调用,而且由于这些程序与我们其余的代码在一起,因此调用的时候几乎不会产生延迟。
但是,在有些情况下,将这些程序保留在本地也不见得是好事。有时,我们需要在拥有大量核心和内存的计算机上运行这些程序,这样它就可以检查非常大的数字。但这也不是什么难事,我们可以将主程序也放到大型计算机上运行,即使其余的程序可能并没有这种需求,质数查找函数也可以自由利用计算机上的资源。如果我们想让其他程序重用质数查找函数,该怎么办?我们可以将其转换成一个库,然后在各个程序之间共享,但是每一台运行质数查找库的计算机,都需要大量的内存资源。
如果我们将质数查找函数单独放在一台计算机上,然后在需要检查数字时与该计算机对话,怎么样呢?如此一来,我们就只需提高质数查找函数所在的计算机的性能,而且其他计算机上程序也可以共享这个函数。
这种方式的缺点是更加复杂。计算机可能会出现故障,网络也有可能出问题,而且我们还需要担心数据的来回传递。如果你只想编写一个简单的数学程序,那么可能无需担心网络状况,也不用考虑如何重新发送丢失的数据包,甚至不用担心如何查找运行质数查找函数的计算机。如果你的工作是编写最佳质数查找程序,那么你可能并不关心如何监听请求或检查已关闭的套接字。
这时就可以考虑远程过程调用。我们可以将计算机间通信的复杂性包装起来,然后在通信的任意一侧建立一个简单的接口(stub)。对于编写数学程序的人来说,看上去就像在调用同一台计算机上的函数;而对于编写质数查找程序的人来说,看上去就像是自己的函数被调用了。如果我们将中间部分抽象化,那么两侧都可以专心做好自己的细节,同时仍然可以享受将计算拆分到多台计算机的优势。
RPC调用的主要工作就是处理中间部分。它的一部分必须存在数学程序的计算机上,负责接受并打包参数,然后发送到另一台计算机。此外,在收到响应后,还需要解析响应,并传递回去。而质数查找函数计算机则必须等待请求,解析参数,然后将其传递给函数,此外,还需要获取结果,将其打包,然后再返回结果。这里的关键之处是数学程序和质数查找程序间,以及它们的stub之间都有一个清晰的接口。
更多详细信息,请参见 Andrew D. Birrell和Bruce Jay Nelson1 于1981年发表的论文《Implementing Remote Procedure Calls》。
从头编写RPC
下面,我们来试试看能不能编写一个RPC。
首先,我们来编写基本的数学程序。为了简单起见,我们编写一个命令行工具,接受输入,然后检查是否为质数。它有一个单独的方法is_prime,处理实际的检查。
// basic_math_program.c
#include <stdio.h>
#include <stdbool.h>
// Basic prime checker. This uses the 6k+-1 optimization
// (see https://en.wikipedia.org/wiki/Primality_test)
bool is_prime(int number) {
// Check first for 2 or 3
if (number == 2 || number == 3) {
return true;
}
// Check for 1 or easy modulos
if (number == 1 || number % 2 == 0 || number % 3 == 0) {
return false;
}
// Now check all the numbers up to sqrt(number)
int i = 5;
while (i * i <= number) {
// If we've found something (or something + 2) that divides it evenly, it's not
// prime.
if (number % i == 0 || number % (i + 2) == 0) {
return false;
}
i += 6;
}
return true;
}
int main(void) {
// Prompt the user to enter a number.
printf("Please enter a number: ");
// Read the user's number. Assume they're entering a valid number.
int input_number;
scanf("%d", &input_number);
// Check if it's prime
if (is_prime(input_number)) {
printf("%d is prime\n", input_number);
} else {
printf("%d is not prime\n", input_number);
}
return 0;
}
这段代码有一些潜在的问题,我们没有处理极端情况。但这里只是为了说明,无伤大雅。
目前一切顺利。下面,我们将代码拆分成多个文件,is_prime 可供同一台计算机上的程序重用。首先,我们为 is_prime 创建一个单独的库:
// is_prime.h
#ifndef IS_PRIME_H
#define IS_PRIME_H
#include <stdbool.h>
bool is_prime(int number);
#endif
// is_prime.c
#include "is_prime.h"
// Basic prime checker. This uses the 6k+-1 optimization
// (see https://en.wikipedia.org/wiki/Primality_test)
bool is_prime(int number) {
// Check first for 2 or 3
if (number == 2 || number == 3) {
return true;
}
// Check for 1 or easy modulos
if (number == 1 || number % 2 == 0 || number % 3 == 0) {
return false;
}
// Now check all the numbers up to sqrt(number)
int i = 5;
while (i * i <= number) {
// If we've found something (or something + 2) that divides it evenly, it's not
// prime.
if (number % i == 0 || number % (i + 2) == 0) {
return false;
}
i += 6;
}
return true;
}
下面,从主程序中调用:
// basic_math_program_refactored.c
#include <stdio.h>
#include <stdbool.h>
#include "is_prime.h"
int main(void) {
// Prompt the user to enter a number.
printf("Please enter a number: ");
// Read the user's number. Assume they're entering a valid number.
int input_number;
scanf("%d", &input_number);
// Check if it's prime
if (is_prime(input_number)) {
printf("%d is prime\n", input_number);
} else {
printf("%d is not prime\n", input_number);
}
return 0;
}
再试试,运行正常!当然,你也可以加一些测试:
下面,我们需要将这个函数放到其他计算机上。我们需要编写的功能包括:
调用程序的 stub:
打包参数
传输参数
接受结果
解析结果
被调用的 stub:
接受参数
解析参数
调用函数
打包结果
传输结果
我们的示例非常简单,因为我们只需要打包并发送一个 int 参数,然后接收一个字节的结果。对于调用程序的库,我们需要打包数据、创建套接字、连接到主机(暂定 localhost)、发送数据、等待结果、解析,然后返回。调用程序库的头文件如下所示:
// client/is_prime_rpc_client.h
#ifndef IS_PRIME_RPC_CLIENT_H
#define IS_PRIME_RPC_CLIENT_H
#include <stdbool.h>
bool is_prime_rpc(int number);
#endif
可能有些读者已经发现了,实际上这个接口与上面的函数库一模一样,但关键就在于此!因为调用程序只需要关注业务逻辑,无需关心其他一切。但实现就稍复杂:
// client/is_prime_rpc_client.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#define SERVERPORT "5005" // The port the server will be listening on.
#define SERVER "localhost" // Assume localhost for now
#include "is_prime_rpc_client.h"
// Packs an int. We need to convert it from host order to network order.
int pack(int input) {
return htons(input);
}
// Gets the IPv4 or IPv6 sockaddr.
void *get_in_addr(struct sockaddr *sa) {
if (sa->sa_family == AF_INET) {
return &(((struct sockaddr_in*)sa)->sin_addr);
} else {
return &(((struct sockaddr_in6*)sa)->sin6_addr);
}
}
// Gets a socket to connect with.
int get_socket() {
int sockfd;
struct addrinfo hints, *server_info, *p;
int number_of_bytes;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM; // We want to use TCP to ensure it gets there
int return_value = getaddrinfo(SERVER, SERVERPORT, &hints, &server_info);
if (return_value != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(return_value));
exit(1);
}
// We end up with a linked-list of addresses, and we want to connect to the
// first one we can
for (p = server_info; p != NULL; p = p->ai_next) {
// Try to make a socket with this one.
if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
// Something went wrong getting this socket, so we can try the next one.
perror("client: socket");
continue;
}
// Try to connect to that socket.
if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
// If something went wrong connecting to this socket, we can close it and
// move on to the next one.
close(sockfd);
perror("client: connect");
continue;
}
// If we've made it this far, we have a valid socket and can stop iterating
// through.
break;
}
// If we haven't gotten a valid sockaddr here, that means we can't connect.
if (p == NULL) {
fprintf(stderr, "client: failed to connect\n");
exit(2);
}
// Otherwise, we're good.
return sockfd;
}
// Client side library for the is_prime RPC.
bool is_prime_rpc(int number) {
// First, we need to pack the data, ensuring that it's sent across the
// network in the right format.
int packed_number = pack(number);
// Now, we can grab a socket we can use to connect see how we can connect
int sockfd = get_socket();
// Send just the packed number.
if (send(sockfd, &packed_number, sizeof packed_number, 0) == -1) {
perror("send");
close(sockfd);
exit(0);
}
// Now, wait to receive the answer.
int buf[1]; // Just receiving a single byte back that represents a boolean.
int bytes_received = recv(sockfd, &buf, 1, 0);
if (bytes_received == -1) {
perror("recv");
exit(1);
}
// Since we just have the one byte, we don't really need to do anything while
// unpacking it, since one byte in reverse order is still just a byte.
bool result = buf[0];
// All done! Close the socket and return the result.
close(sockfd);
return result;
}
如前所述,这段代码需要打包参数、连接到服务器、发送数据、接收数据、解析,并返回结果。我们的示例相对很简单,因为我们只需要确保数字的字节顺序符合网络字节顺序。
接下来,我们需要在服务器上运行被调用的库。它需要调用我们前面编写的 is_prime 库:
// server/is_prime_rpc_server.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <signal.h>
#include "is_prime.h"
#define SERVERPORT "5005" // The port the server will be listening on.
// Gets the IPv4 or IPv6 sockaddr.
void *get_in_addr(struct sockaddr *sa) {
if (sa->sa_family == AF_INET) {
return &(((struct sockaddr_in*)sa)->sin_addr);
} else {
return &(((struct sockaddr_in6*)sa)->sin6_addr);
}
}
// Unpacks an int. We need to convert it from network order to our host order.
int unpack(int packed_input) {
return ntohs(packed_input);
}
// Gets a socket to listen with.
int get_and_bind_socket() {
int sockfd;
struct addrinfo hints, *server_info, *p;
int number_of_bytes;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM; // We want to use TCP to ensure it gets there
hints.ai_flags = AI_PASSIVE; // Just use the server's IP.
int return_value = getaddrinfo(NULL, SERVERPORT, &hints, &server_info);
if (return_value != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(return_value));
exit(1);
}
// We end up with a linked-list of addresses, and we want to connect to the
// first one we can
for (p = server_info; p != NULL; p = p->ai_next) {
// Try to make a socket with this one.
if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
// Something went wrong getting this socket, so we can try the next one.
perror("server: socket");
continue;
}
// We want to be able to reuse this, so we can set the socket option.
int yes = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
perror("setsockopt");
exit(1);
}
// Try to bind that socket.
if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
// If something went wrong binding this socket, we can close it and
// move on to the next one.
close(sockfd);
perror("server: bind");
continue;
}
// If we've made it this far, we have a valid socket and can stop iterating
// through.
break;
}
// If we haven't gotten a valid sockaddr here, that means we can't connect.
if (p == NULL) {
fprintf(stderr, "server: failed to bind\n");
exit(2);
}
// Otherwise, we're good.
return sockfd;
}
int main(void) {
int sockfd = get_and_bind_socket();
// We want to listen forever on this socket
if (listen(sockfd, /*backlog=*/1) == -1) {
perror("listen");
exit(1);
}
printf("Server waiting for connections.\n");
struct sockaddr their_addr; // Address information of the client
socklen_t sin_size;
int new_fd;
while(1) {
sin_size = sizeof their_addr;
new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
if (new_fd == -1) {
perror("accept");
continue;
}
// Once we've accepted an incoming request, we can read from it into a buffer.
int buffer;
int bytes_received = recv(new_fd, &buffer, sizeof buffer, 0);
if (bytes_received == -1) {
perror("recv");
continue;
}
// We need to unpack the received data.
int number = unpack(buffer);
printf("Received a request: is %d prime?\n", number);
// Now, we can finally call the is_prime library!
bool number_is_prime = is_prime(number);
printf("Sending response: %s\n", number_is_prime ? "true" : "false");
// Note that we don't have to pack a single byte.
// We can now send it back.
if (send(new_fd, &number_is_prime, sizeof number_is_prime, 0) == -1) {
perror("send");
}
close(new_fd);
}
}
最后,我们更新一下我们的主函数,使用新的RPC库调用:
// client/basic_math_program_distributed.c
#include <stdio.h>
#include <stdbool.h>
#include "is_prime_rpc_client.h"
int main(void) {
// Prompt the user to enter a number.
printf("Please enter a number: ");
// Read the user's number. Assume they're entering a valid number.
int input_number;
scanf("%d", &input_number);
// Check if it's prime, but now via the RPC library
if (is_prime_rpc(input_number)) {
printf("%d is prime\n", input_number);
} else {
printf("%d is not prime\n", input_number);
}
return 0;
}
这个 RPC 实际的运行情况如下:
现在运行服务器,就可以运行客户端将质数检查的工作分布到其他计算机上运行!现在,程序调用 is_prime_rpc 时,所有网络业务都在后台进行。我们已经成功分发了计算,客户端实际上是在远程调用程序。
示例有待改进的方面
本文中的实现只是一个示例,虽然实现了一些功能,但只是一个玩具。真正的框架(例如 gRPC3)要复杂得多。我们的实现需要改进的方面包括:
可发现性:在上述示例中,我们我们假定服务器在 localhost 上运行。RPC 库怎么知道将 RPC 发送到哪里呢?我们需要通过某种方式来发现可以处理此 RPC 调用的服务器在哪里。
RPC 的类型:我们的的服务器非常简单,只需处理一个 RPC 调用。如果我们希望服务器提供两个不同的RPC服务,比如 is_prime 和get_factors,那么该怎么办?我们需要一种方法来区分发送到服务器的两种请求。
打包:打包整数很容易,打包一个字节更容易。如果我们需要发送一个复杂的数据结构,该怎么办?如果我们需要为了节省带宽而压缩数据,又该怎么办?
自动生成代码:我们肯定不希望每次编写新的 RPC,都需要手动编写所有的打包和网络处理代码。理想情况下,我们只需定义一个接口,然后其余的接口都由计算机自动完成,并自动提供 stub。这里,我们需要考虑协议缓冲区等。
多种语言:按照上面的思路,如果我们能够自动生成 stub,那么就可以考虑支持多种语言,如此一来,跨服务和跨语言的通信也只需调用一个函数。
错误和超时处理:如果 RPC 失败怎么办?如果网络出现故障,服务器停止运行,wifi 掉线,该怎么办?我们需要考虑超时处理。
版本控制:假设上述所有功能已全部实现,但你想修改某个正在多台计算机上运行的 RPC,那么该怎么办?
其他有关服务器的注意事项:线程、阻塞、多路复用、安全性、加密、授权等等。
计算机科学就是要站在巨人的肩膀上,很多库已经为我们完成了大量工作。
原文链接:https://alexanderell.is/posts/rpc-from-scratch/
声明:本文由CSDN翻译,转载请注明来源。