其他
干货 | 在字节跳动,一个更好的企业级SparkSQL Server这么做
SparkSQL是Spark生态系统中非常重要的组件。面向企业级服务时,SparkSQL存在易用性较差的问题,导致难满足日常的业务开发需求。本文将详细解读,如何通过构建SparkSQL服务器实现使用效率提升和使用门槛降低。
前言
标准的JDBC接口
Java.sql包下定义了使用Java访问存储介质的所有接口,但是并没有具体的实现,也就是说JavaEE里面仅仅定义了使用Java访问存储介质的标准流程,具体的实现需要依靠周边的第三方服务实现。
Class.forName("com.mysql.cj.jdbc.Driver");
Connection connection= DriverManager.getConnection(DB_URL,USER,PASS);
//操作
connection.close();
Hive 的JDBC实现
public TOpenSessionResp OpenSession(TOpenSessionReq req) throws org.apache.thrift.TException;
public TCloseSessionResp CloseSession(TCloseSessionReq req) throws org.apache.thrift.TException;
public TGetInfoResp GetInfo(TGetInfoReq req) throws org.apache.thrift.TException;
public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws org.apache.thrift.TException;
public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws org.apache.thrift.TException;
public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws org.apache.thrift.TException;
public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws org.apache.thrift.TException;
public TGetTablesResp GetTables(TGetTablesReq req) throws org.apache.thrift.TException;
public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws org.apache.thrift.TException;
public TGetColumnsResp GetColumns(TGetColumnsReq req) throws org.apache.thrift.TException;
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>version/version>
</dependency>
HiveStatement hiveStatement = (HiveStatement) connection.createStatement();
List<String> logs = hiveStatement.getQueryLog();
ResultSet rs = hiveStatement.executeQuery(sql);
while (rs.next()) {
//
}
构建SparkSQL服务器
支持JDBC接口,即通过Java 的JDBC标准进行访问,可以较好与周边生态进行集成且降低使用门槛。 兼容Hive协议,如果要支持JDBC接口,那么需要提供SparkSQL的JDBC Driver。目前,大数据领域Hive Server2提供的Hive-JDBC-Driver已经被广泛使用,从迁移成本来说最好的方式就是保持Hive的使用方式不变,只需要换个端口就行,也就是可以通过Hive的JDBC Driver直接访问SparkSQL服务器。 支持多租户,以及类似用户名+密码和Kerberos等常见的用户认证能力。 支持跨队列提交,同时支持在JDBC的参数里面配置Spark的相关作业参数,例如Driver Memory,Execute Number等。
public class SparkSQLThriftServer implements TCLIService.Iface {
@Override
public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {
//Hive JDBC Driver在执行创建Connection的时候会调用此接口,在这里维护一个用户与Spark 作业的对应关系。
//来判断是需要复用一个已经存在的Spark作业,还是全新执行一次spark-submt。
//用户与是否需要spark-submit的关联关系均在这里实现。
//同时需要生成THandleIdentifier对象,并且和用户身份进行关联,后续其他方法调用均需要使用这个对象关联出用户的信息。
return null;
}
@Override
public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException {
//客户端调用connection.close()方法后会进入到这里,在这里进行用户状态的清除,同时需要基于用户的情况判断是否需要停止用来执行该用户SQL的Spark 作业引擎。
return null;
}
@Override
public TGetInfoResp GetInfo(TGetInfoReq req) throws TException {
//获取服务器的元数据信息,例如使用BI工具,在命令会列出所连接的服务的版本号等信息,均由此方法提供。
return null;
}
@Override
public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {
//执行SQL任务,这里传递过来的是用户在客户端提交的SQL作业,接收到用户SQL后,将该SQL发送给常驻的Spark作业,这个常驻的作业在OpenSession的时候已经确定。
return null;
}
@Override
public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException {
//获取数据库支持的类型信息,使用BI工具,例如beeline的时候会调用到这里。
return null;
}
@Override
public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException {
//获取Catalog,使用BI工具,例如beeline的时候会调用到这里。
return null;
}
@Override
public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {
//返回查询结果,基于THandleIdentifier对象查询到用户的SQL执行的情况,将请求转发至常驻的Spark 实例,获取结果。
//参数中通过TFetchResultsReq的getFetchType来区分是获取日志数据还是查询结果数据,getFetchType == 1为获取Log,为0是查询数据查询结果。
return null;
}
}
TThreadPoolServer.Args thriftArgs = new TThreadPoolServer.Args(serverTransport)
.processorFactory(new TProcessorFactory(this))
.transportFactory(new TSaslServerTransport.Factory())
.protocolFactory(new TBinaryProtocol.Factory())
.inputProtocolFactory(
new TBinaryProtocol.Factory(
true,
true,
10000,
10000
)
)
.requestTimeout(1000L)
.requestTimeoutUnit(TimeUnit.MILLISECONDS)
.beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
.executorService(executorService);
thriftArgs
.executorService(
new ThreadPoolExecutor(
config.getMinWorkerThreads(),
config.getMaxWorkerThreads(),
config.getKeepAliveTime(),
TimeUnit.SECONDS, new SynchronousQueue<>()));
TThreadPoolServer server = new TThreadPoolServer(thriftArgs);
server.serve();
SparkSQL服务器的HA
[zk: localhost:2181(CONNECTED) 1] ls /hiveserver2\
[serverUri=127.0.01:10000;version=3.1.2;sequence=0000000000]
./bin/beeline -u "jdbc:hive2://127.0.01/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=自定义的节点;auth=LDAP" -n 用户名 -p密码
由于服务器的选择基于Connection级别的,也就是在Connection被生成新的之前,整个服务器的地址是不会发生变化的,在发生错误的时候服务端可以进行重试,进行地址的切换,因此HA的力度是在Connection级别而非请求级别。
对接生态工具
尾声
产品介绍
火山引擎 E-MapReduce
支持构建开源Hadoop生态的企业级大数据分析系统,完全兼容开源,提供 Hadoop、Spark、Hive、Flink集成和管理,帮助用户轻松完成企业大数据平台的构建,降低运维门槛,快速形成大数据分析能力。后台回复数字“3”了解产品
- End -
扫码进入官方交流群
群内定期进行干货分享
技术交流、福利放送
字节跳动数据平台