SpringBoot整合Thrift,从入门到实战(附源码)
什么是Thrift
Thrift是一种接口描述语言和二进制通讯协议,它被用来定义和创建跨语言的服务。它被当作一个远程过程调用(RPC)框架来使用,是由Facebook为“大规模跨语言服务开发”而开发的。它通过一个代码生成引擎联合了一个软件栈,来创建不同程度的、无缝的跨平台高效服务,可以使用C#
、C++
(基于POSIX兼容系统)、Cappuccino
、Cocoa
、Delphi
、Erlang
、Go
、Haskell
、Java
、Node.js
、OCaml
、Perl
、PHP
、Python
、Ruby
和Smalltalk
。虽然它以前是由Facebook开发的,但它现在是Apache软件基金会的开源项目了。
架构
Thrift包含一套完整的栈来创建客户端和服务端程序。顶层部分是由Thrift定义生成的代码。而服务则由这个文件客户端和处理器代码生成。在生成的代码里会创建不同于内建类型的数据结构,并将其作为结果发送。协议和传输层是运行时库的一部分。有了Thrift,就可以定义一个服务或改变通讯和传输协议,而无需重新编译代码。除了客户端部分之外,Thrift还包括服务器基础设施来集成协议和传输,如阻塞、非阻塞及多线程服务器。栈中作为I/O基础的部分对于不同的语言则有不同的实现,如下官方示意图:
支持的通讯协议
TBinaryProtocol
一种简单的二进制格式,简单,但没有为空间效率而优化。比文本协议处理起来更快,但更难于调试。TCompactProtocol
更紧凑的二进制格式,处理起来通常同样高效。TDebugProtocol
一种人类可读的文本格式,用来协助调试。TDenseProtocol
与TCompactProtocol类似,将传输数据的元信息剥离。TJSONProtocol
使用JSON对数据编码。TSimpleJSONProtocol
一种只写协议,它不能被Thrift解析,因为它使用JSON时丢弃了元数据。适合用脚本语言来解析
支持的传输协议
TFileTransport
该传输协议会写文件。TFramedTransport
当使用一个非阻塞服务器时,要求使用这个传输协议。它按帧来发送数据,其中每一帧的开头是长度信息。TMemoryTransport
使用存储器映射输入输出。(Java的实现使用了一个简单的ByteArrayOutputStream。)TSocket
使用阻塞的套接字I/O来传输。TZlibTransport
用zlib执行压缩。用于连接另一个传输协议。
支持的服务模型
TNonblockingServer
一个多线程服务器,它使用非阻塞I/O(Java的实现使用了NIO通道)。TFramedTransport必须跟这个服务器配套使用。TSimpleServer
一个单线程服务器,它使用标准的阻塞I/O。测试时很有用。TThreadPoolServer
一个多线程服务器,它使用标准的阻塞I/OTHsHaServer
YHsHa引入了线程池去处理(需要使用TFramedTransport数据传输方式),其模型把读写任务放到线程池去处理;Half-sync/Half-async(半同步半异步)的处理模式;Half-sync是在处理IO时间上(sccept/read/writr io),Half-async用于handler对RPC的同步处理
Thrift的优点
跟一些替代选择,比如SOAP相比,跨语言序列化的代价更低,因为它使用二进制格式。 它有一个又瘦又干净的库,没有编码框架, 没有XML配置
文件。绑定感觉很自然。例如,Java使用java.util.ArrayList;C++使用std::vectorstd::string。 应用层通讯格式与序列化层通讯格式是完全分离的。它们都可以独立修改。 预定义的序列化格式包括:二进制格式、对HTTP友好的格式,以及紧凑的二进制格式。 兼作跨语言文件序列化。 协议使用软版本号机制软件版本管理[需要解释]。Thrift不要求一个中心化的和显式的版本号机制,例如主版本号/次版本号。松耦合的团队可以轻松地控制RPC调用的演进。 没有构建依赖也不含非标准化的软件。不存在不兼容的软件许可证混用的情况
SpringBoot整合Thrift
为什么会出现RPC框架
高级语言越来越丰富,同时学习的成本越来越低,从而出现编程语言的 百花齐放
,同时,每种语言都有其在特定场景下的优势性,因此一个企业的系统架构可能存在这各种各样的编程语言发挥着光和热。因此跨语言间的通讯必须存在一个桥梁;微服务的盛行,使得服务逐渐模块化,功能化;单个服务仅仅实现某个特定的功能或者模块,因此使得服务间的调用变得常见且频繁; 网络的发展速度快于了硬件的发展速度,使得服务承载负载压力越来越大;因此高性能且快速响应的服务调用成了必须去面对的问题 传统的Http请求能面对跨语言的问题,但是性能远远无法达到高并发的要求 多个服务会使用到相同的数据,比如用户;因此就需要将起模块化,统一对外提供服务
常见的RPC框架集成套路
编写接口代码生成文件(用于生成客户端服务端代码) 使用开源框架提供的工具,生成各种语言下的客户端、服务器源代码 基于服务器端源代码启动并配置业务代码 客户端基于生成的客户端代码,向远端服务器获取对应的数据; 整个过程相当于于调用一个本地方法
开撸
示例源码下载 : https://gitee.com/pengfeilu/thrift-demo
官网下载代码生成工具
官网地址 : https://thrift.apache.org/
本示例模拟的业务场景
thrift client : 用于对外提供接口,getStudentByName(根据名字获取学生信息)、save(保存学生信息) thrift server: 真正getStudentByName和save操作的服务提供者 APP : 服务使用者最终实现的基础效果:
编写接口代码生成文件
thrift支持的数据类型 当一个RPC框架支持的语言越多,那么他支持的数据类型就会越少,为什么?因为支持数据必须是所有支持都有的,因此语言越多,交集就越少,以下是thrift支持的数据类型:
bool
: A boolean value (true or false)byte
: An 8-bit signed integeri16
: A 16-bit signed integeri32
: A 32-bit signed integeri64
: A 64-bit signed integerdouble
: A 64-bit floating point numberstring
: A text string encoded using UTF-8 encoding编写脚本 创建student.thrift 注:如果在Idea里面创建一个thrift文件的话,工具会提醒你安装插件,安装之后,编写脚本会有响应的提醒;可惜,我使用的2018.03的idea貌似有bug,插件无法正常适应;试了其他几个版本均可以正常使用;
//这里指明了代码生成之后,所处的文件路径
namespace java com.lupf.thriftserver.shriftcode
namespace py py.com.lupf.thriftserver.shriftcode
//将shrift的数据类型格式转换为java习惯的格式
typedef i16 short
typedef i32 int
typedef i64 long
typedef string String
typedef bool boolean
//定义学生对象
struct Student {
1:optional String name,
2:optional int age,
3:optional String address
}
//定义数据异常
exception DataException {
//optional 可选 非必传
1:optional int code,
2:optional String message,
3:optional String dateTime
}
//定义操作学生的服务
service StudentService {
//根据名称获取学生信息 返回一个学生对象 抛出DataException异常
//required 必传项
Student getStudentByName(1:required String name) throws (1:DataException dataException),
//保存一个学生信息 无返回 抛出DataException异常
void save(1:required Student student) throws (1:DataException dataException)
}生成源代码 语法:
thrift --gen <language> <Thrift filename>
注:不要以任何的理由去修改这部分自动生成的代码,一方面很复杂,另一方面,再次生成的时候,你所调整的修改将会丢失 文件说明 DataException.java
: 异常对象Student.java
: 交互的数据模型对象StudentService.java
: 服务对象,提供了接口,客户端连接相关方法
thrift-server服务
通过Idea创建一个基础的
Maven项目
pom文件中添加thrift资源库
<!--thrift 相关资源-->
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.12.0</version>
</dependency>将上面生成的代码拷贝至项目中
实现业务逻辑 创建一个Server服务,实现StudentService.Iface接口,同时这个接口添加
@Service
注解,将其交由Spring管理/**
* 服务端具体的操作的实现
*/
@Service
public class MyServerServiceImpl implements StudentService.Iface {
@Override
public Student getStudentByName(String name) throws DataException, TException {
System.out.println("服务端收到客户端获取用户名:" + name + "信息");
Student student = new Student();
student.setName(name);
student.setAge(100);
student.setAddress("深圳");
//模拟耗时
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("模拟获取成功并返回:" + student);
return student;
}
@Override
public void save(Student student) throws DataException, TException {
System.out.println("服务端收到客户端请求保存学生信息:" + student);
//模拟耗时
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("模拟保存成功!!!");
}
}配置文件添加端口及线程池基础配置
server:
thrift:
port: 8899 #监听的端口
min-thread-pool: 100 #线程池最小线程数
max-thread-pool: 1000 #线程池最大线程数创建服务对象 这里使用的是半同步半异步的
THsHaServer
,代码本身不多,大部分都是注释;同时将代码交由Spring管理@Component
public class ThriftServer2 {
//监听的端口
@Value("${server.thrift.port}")
private Integer port;
//线程池最小线程数
@Value("${server.thrift.min-thread-pool}")
private Integer minThreadPool;
//线程池最大线程数
@Value("${server.thrift.max-thread-pool}")
private Integer maxThreadPool;
//业务服务对象
@Autowired
MyServerServiceImpl myServerService;
public void start() {
try {
//thrift支持的scoker有很多种
//非阻塞的socker
TNonblockingServerSocket socket = new TNonblockingServerSocket(port);
//THsHaServer 一个高可用的server
//minWorkerThreads 最小的工作线程2
//maxWorkerThreads 最大的工作线程4
//如果这里Args不使用executorService指定线程池的话,创建THsHaServer会创建一个默认的LinkedBlockingQueue
THsHaServer.Args arg = new THsHaServer.Args(socket).minWorkerThreads(minThreadPool).maxWorkerThreads(maxThreadPool);
//可以自定义指定线程池
//ExecutorService pool = Executors.newFixedThreadPool(minThreadPool);
//arg.executorService(pool);
//Processor处理区 用于处理业务逻辑
//泛型就是实现的业务
StudentService.Processor<MyServerServiceImpl> processor = new StudentService.Processor<>(myServerService);
//---------------thrift传输协议------------------------------
//1. TBinaryProtocol 二进制传输协议
//2. TCompactProtocol 压缩协议 他是基于TBinaryProtocol二进制协议在进一步的压缩,使得体积更小
//3. TJSONProtocol Json格式传输协议
//4. TSimpleJSONProtocol 简单JSON只写协议,生成的文件很容易通过脚本语言解析,实际开发中很少使用
//5. TDebugProtocol 简单易懂的可读协议,调试的时候用于方便追踪传输过程中的数据
//-----------------------------------------------------------
//设置工厂
//协议工厂 TCompactProtocol压缩工厂 二进制压缩协议
arg.protocolFactory(new TCompactProtocol.Factory());
//---------------thrift传输格式------------------------------
//---------------thrift数据传输方式------------------------------
//1. TSocker 阻塞式Scoker 相当于Java中的ServerSocket
//2. TFrameTransport 以frame为单位进行数据传输,非阻塞式服务中使用
//3. TFileTransport 以文件的形式进行传输
//4. TMemoryTransport 将内存用于IO,Java实现的时候内部实际上是使用了简单的ByteArrayOutputStream
//5. TZlibTransport 使用zlib进行压缩,与其他传世方式联合使用;java当前无实现所以无法使用
//传输工厂 更加底层的概念
arg.transportFactory(new TFramedTransport.Factory());
//arg.transportFactory(new TTransportFactory());
//---------------thrift数据传输方式------------------------------
//设置处理器(Processor)工厂
arg.processorFactory(new TProcessorFactory(processor));
//---------------thrift支持的服务模型------------------------------
//1.TSimpleServer 简单的单线程服务模型,用于测试
//2.TThreadPoolServer 多线程服务模型,使用的标准的阻塞式IO;运用了线程池,当线程池不够时会创建新的线程,当线程池出现大量空闲线程,线程池会对线程进行回收
//3.TNonBlockingServer 多线程服务模型,使用非阻塞式IO(需要使用TFramedTransport数据传输方式)
//4.THsHaServer YHsHa引入了线程池去处理(需要使用TFramedTransport数据传输方式),其模型把读写任务放到线程池去处理;Half-sync/Half-async(半同步半异步)的处理模式;Half-sync是在处理IO时间上(sccept/read/writr io),Half-async用于handler对RPC的同步处理
//----------------------------
//根据参数实例化server
//半同步半异步的server
TServer server = new THsHaServer(arg);
//---------------thrift支持的服务模型------------------------------
System.out.println("shrift server started; port:" + port);
//启动server
// 异步非阻塞的死循环
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
}
}启动类中添加
Server
的启动配置@SpringBootApplication
public class ThriftServerApplication {
public static void main(String[] args) {
SpringApplication.run(ThriftServerApplication.class, args);
}
/**
* 向Spring注册一个Bean对象
* initMethod = "start" 表示会执行名为start的方法
* start方法执行之后,就会阻塞接受客户端的请求
*
* @return
*/
@Bean(initMethod = "start")
public ThriftServer2 init() {
ThriftServer2 thriftServer = new ThriftServer2();
return thriftServer;
}
}启动项目 到此,服务端的项目就已经配置完成了,启动服务出现下图剪头所指向的日志即可;
thrift-client客户端服务
项目创建 由于这个项目是一个中间服务,是thrift-server的客户端,是APP的服务端;因为要向APP提供接口,因此这里就创建一个
基础的Web服务
pom文件中添加thrift资源库
<!--thrift 相关资源-->
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.12.0</version>
</dependency>将上面生成的代码拷贝至项目中
配置服务端信息
server:
port: 8877 #当前服务监听的端口,给APP端提供服务的接口
thrift:
host: 'localhost' #远端thrift服务IP
port: 8899 #远端thrift服务端口创建一个远端连接对象 ThriftClient.java
public class ThriftClient {
@Setter
private String host;
@Setter
private Integer port;
private TTransport tTransport;
private TProtocol tProtocol;
private StudentService.Client client;
private void init() {
tTransport = new TFramedTransport(new TSocket(host, port), 600);
//协议对象 这里使用协议对象需要和服务器的一致
tProtocol = new TCompactProtocol(tTransport);
client = new StudentService.Client(tProtocol);
}
public StudentService.Client getService() {
return client;
}
public void open() throws TTransportException {
if (null != tTransport && !tTransport.isOpen())
tTransport.open();
}
public void close() {
if (null != tTransport && tTransport.isOpen())
tTransport.close();
}
}创建一个连接自动注入帮助类 该类用于将ThriftClient注入到Spring容器,交由Spring管理,同时每一个request请求都创建一个新的ThriftClient连接;
@Configuration
public class ThriftClientConfig {
//服务端的地址
@Value("${server.thrift.host}")
private String host;
//服务端的端口
@Value("${server.thrift.port}")
private Integer port;
//初始化Bean的时候调用对象里面的init方法
@Bean(initMethod = "init")
//每次请求实例化一个新的ThriftClient连接对象
@Scope(value = WebApplicationContext.SCOPE_REQUEST, proxyMode = ScopedProxyMode.TARGET_CLASS)
public ThriftClient init() {
ThriftClient thriftClient = new ThriftClient();
thriftClient.setHost(host);
thriftClient.setPort(port);
return thriftClient;
}
}定义调用远端服务的service接口
public interface StudentServiceInf {
//根据名称获取学生信息
Student getStudentByName(String name);
//保存学生信息
void save(Student student);
}添加Service的具体实现
@Service
public class StudentServiceImpl implements StudentServiceInf {
@Autowired
ThriftClient thriftClient;
@Override
public Student getStudentByName(String name) {
try {
thriftClient.open();
System.out.println("客户端请求用户名为:" + name + "的数据");
Student student = thriftClient.getService().getStudentByName(name);
System.out.println("获取成功!!!服务端返回的对象:" + student);
return student;
} catch (Exception e) {
e.printStackTrace();
} finally {
thriftClient.close();
}
return null;
}
@Override
public void save(Student student) {
try {
thriftClient.open();
System.out.println("客户端请求保存对象:" + student);
thriftClient.getService().save(student);
System.out.println("保存成功!!!");
} catch (Exception e) {
e.printStackTrace();
} finally {
thriftClient.close();
}
}
}创建前端Controller对象 用于对移动端提供接口
@RestController
@RequestMapping("thrift")
public class StudentController {
@Autowired
StudentServiceInf studentService;
@GetMapping("get")
public Student getStudeByName(String name) {
return studentService.getStudentByName(name);
}
@GetMapping("save")
public Student save() {
//直接模拟前端传递的数据
Student student = new Student();
student.setName("AAA");
student.setAge(10);
student.setAddress("BBB");
//调用保存服务
studentService.save(student);
return student;
}
}启动服务
测试
Thrift客户端连接池
上面的示例中,每次前端有请求上来的时候,thrift-client都是与thrift-server创建了一个新的连接,用完就直接关闭掉;Thrift框架本身是没有提供客户端的连接池的,但是,实际使用中,如果每次建立一个socket连接用完就释放掉,再次使用再次建立连接的话,其实很浪费性能;因此,连接池就是提升性能的一种很好的手段;一个Service建立一个连接之后,放在一个池子里面,需要使用的时候就直接拿出来用,用完再还回去;如果一定时间内一直没有人使用,就把资源释放掉;以上是连接池的一个基本思路;Apache Commons Pool为我们提供了一个基础的框架,使得我们能够很轻松、快速的就实现一个连接池的功能;下面我们就来通过Apache Commons Pool实现一个thrift客户端的连接池。
引入资源
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
连接池编码
创建一个客户端的连接对象 TTSocket.java
public class TTSocket {
//thrift socket对象
private TSocket tSocket;
// 传输对象
private TTransport tTransport;
// 协议对象
private TProtocol tProtocol;
// 服务客户端对象
private StudentService.Client client;
/**
* 构造方法初始化各个连接对象
*
* @param host server的地址
* @param port server的端口
*/
public TTSocket(String host, Integer port) {
tSocket = new TSocket(host, port);
tTransport = new TFramedTransport(tSocket, 600);
//协议对象 这里使用协议对象需要和服务器的一致
tProtocol = new TCompactProtocol(tTransport);
client = new StudentService.Client(tProtocol);
}
/**
* 获取服务客户端对象
*
* @return
*/
public StudentService.Client getService() {
return client;
}
/**
* 打开通道
*
* @throws TTransportException
*/
public void open() throws TTransportException {
if (null != tTransport && !tTransport.isOpen())
tTransport.open();
}
/**
* 关闭通道
*/
public void close() {
if (null != tTransport && tTransport.isOpen())
tTransport.close();
}
/**
* 判断通道是否是正常打开状态
*
* @return
*/
public boolean isOpen() {
Socket socket = tSocket.getSocket();
if (socket.isConnected() && !socket.isClosed())
return true;
return false;
}
}创建一个工厂对象
public class ThriftClientConnectPoolFactory {
/**
* 对象池
*/
public GenericObjectPool pool;
/**
* 实例化池工厂帮助类
*
* @param config
* @param ip
* @param port
*/
public ThriftClientConnectPoolFactory(GenericObjectPool.Config config, String ip, int port) {
ConnectionFactory factory = new ConnectionFactory(ip, port);
//实例化池对象
this.pool = new GenericObjectPool(factory, config);
//设置获取对象前校验对象是否可以
this.pool.setTestOnBorrow(true);
}
/**
* 在池中获取一个空闲的对象
* 如果没有空闲且池子没满,就会调用makeObject创建一个新的对象
* 如果满了,就会阻塞等待,直到有空闲对象或者超时
*
* @return
* @throws Exception
*/
public TTSocket getConnect() throws Exception {
return (TTSocket) pool.borrowObject();
}
/**
* 将对象从池中移除
*
* @param ttSocket
*/
public void invalidateObject(TTSocket ttSocket) {
try {
pool.invalidateObject(ttSocket);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 将一个用完的对象返还给对象池
*
* @param ttSocket
*/
public void returnConnection(TTSocket ttSocket) {
try {
pool.returnObject(ttSocket);
} catch (Exception e) {
if (ttSocket != null) {
try {
ttSocket.close();
} catch (Exception ex) {
//
}
}
}
}
/**
* 池里面保存的对象工厂
*/
class ConnectionFactory extends BasePoolableObjectFactory {
//远端地址
private String host;
//端口
private Integer port;
/**
* 构造方法初始化地址及端口
*
* @param ip
* @param port
*/
public ConnectionFactory(String ip, int port) {
this.host = ip;
this.port = port;
}
/**
* 创建一个对象
*
* @return
* @throws Exception
*/
@Override
public Object makeObject() throws Exception {
// 实例化一个自定义的一个thrift 对象
TTSocket ttSocket = new TTSocket(host, port);
// 打开通道
ttSocket.open();
return ttSocket;
}
/**
* 销毁对象
*
* @param obj
*/
public void destroyObject(Object obj) {
try {
if (obj instanceof TTSocket) {
//尝试关闭连接
((TTSocket) obj).close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 校验对象是否可用
* 通过 pool.setTestOnBorrow(boolean testOnBorrow) 设置
* 设置为true这会在调用pool.borrowObject()获取对象之前调用这个方法用于校验对象是否可用
*
* @param obj 待校验的对象
* @return
*/
public boolean validateObject(Object obj) {
if (obj instanceof TTSocket) {
TTSocket socket = ((TTSocket) obj);
if (!socket.isOpen()) {
return false;
}
return true;
}
return false;
}
}
}修改ThriftClientConfig,将池工厂对象通过Spring管理
@Configuration
public class ThriftClientConfig {
//服务端的地址
@Value("${server.thrift.host}")
private String host;
//服务端的端口
@Value("${server.thrift.port}")
private Integer port;
//初始化Bean的时候调用对象里面的init方法
@Bean(initMethod = "init")
//每次请求实例化一个新的ThriftClient连接对象
@Scope(value = WebApplicationContext.SCOPE_REQUEST, proxyMode = ScopedProxyMode.TARGET_CLASS)
public ThriftClient init() {
ThriftClient thriftClient = new ThriftClient();
thriftClient.setHost(host);
thriftClient.setPort(port);
return thriftClient;
}
@Bean
public ThriftClientConnectPoolFactory ThriftClientPool() {
//对象池的配置对象
//这里测试就直接使用默认的配置
//可以通过config 设置对应的参数
//参数说明见 http://commons.apache.org/proper/commons-pool/api-1.6/org/apache/commons/pool/impl/GenericObjectPool.html
GenericObjectPool.Config config = new GenericObjectPool.Config();
//创建一个池工厂对象
ThriftClientConnectPoolFactory thriftClientConnectPoolFactory = new ThriftClientConnectPoolFactory(config, host, port);
//交由Spring管理
return thriftClientConnectPoolFactory;
}
}使用示例
@Service
public class StudentServiceImpl implements StudentServiceInf {
//每次创建一个新的连接的工具类
//@Autowired
//ThriftClient thriftClient;
/**
* 连接池
*/
@Autowired
ThriftClientConnectPoolFactory thriftClientPool;
@Override
public Student getStudentByName(String name) {
TTSocket ttSocket = null;
try {
//通过对象池,获取一个服务客户端连接对象
ttSocket = thriftClientPool.getConnect();
System.out.println(ttSocket);
System.out.println("客户端请求用户名为:" + name + "的数据");
//调用远端对象
Student student = ttSocket.getService().getStudentByName(name);
System.out.println("获取成功!!!服务端返回的对象:" + student);
//用完之后把对象还给对象池
thriftClientPool.returnConnection(ttSocket);
return student;
} catch (Exception e) {
e.printStackTrace();
//出现异常则将当前对象从池子移除
thriftClientPool.invalidateObject(ttSocket);
} finally {
}
return null;
}
@Override
public void save(Student student) {
TTSocket ttSocket = null;
try {
ttSocket = thriftClientPool.getConnect();
System.out.println("客户端请求保存对象:" + student);
ttSocket.getService().save(student);
System.out.println("保存成功!!!");
thriftClientPool.returnConnection(ttSocket);
} catch (Exception e) {
e.printStackTrace();
thriftClientPool.returnConnection(ttSocket);
} finally {
}
}
}测试
测试两个并发的情况下,如图所示的两个对象具体复用了
END