查看原文
其他

如何让Kafka集群免受黑客攻击

2017-02-10 WB 大数据开放实验室

前情回顾

如《Transwarp如何让Hadoop集群免受黑客攻击》所介绍的,近期许多安全组织都检测到勒索软件正在攻击Hadoop集群,这些勒索攻击的攻击模式都较为相似,都是简单的利用相关产品的不安全配置,使攻击者有机可乘,进而对相关数据进行操作。根据shodan.io的统计结果显示,在中国有8300多个Hadoop集群的50070端口暴露在公网上,凸显了安全问题的严峻性。

概述

Kafka是由LinkedIn设计的一个高吞吐量、分布式、基于发布订阅模式的消息系统,通过Scala编写,它以可水平扩展、可靠性、异步通信和高吞吐率等特性而被广泛使用。目前越来越多的开源分布式处理系统都可以与Kafka集成,其中Spark Streaming作为后端流引擎配合Kafka作为前端消息系统正成为当前流处理系统的主流架构之一。

Kafka由于其安全机制的匮乏,导致它在数据敏感行业的部署存在严重的安全隐患。如果Kafka没有安全认证,那么黑客可以生成一个伪装的consumer,从而消费任何一个没有认证的producer的数据,从而造成数据泄露。当下越来越多的安全漏洞、数据泄露等问题的爆发,要求大数据平台供应商必须为Kafka提供安全的枷锁。

本文将简单介绍Kerberos认证协议,然后围绕Kafka,介绍其整体架构和关键概念,再深入分析Kafka架构中存在的安全问题,最后分享下Transwarp对于Kafka安全性所做的工作及其相应使用方法。

本文内容涉及Kerberos协议认证过程,建议阅读《Transwarp如何让Hadoop集群免受黑客攻击》中的相关部分进行了解。

Kafka的安全现状

下面,我们来了解下有关Kafka的几个基本概念:

  • Topic:Kafka把接收的消息按种类划分,每个种类都称之为Topic,由唯一的Topic Name标识。

  • Producer:向Topic发布消息的进程称为Producer。

  • Consumer:从Topic订阅消息的进程称为Consumer。

  • Broker:Kafka集群包含一个或多个服务器,这种服务器被称为Broker。

Kafka的整体架构如下图所示,典型的Kafka集群包含一组发布消息的Producer,一组管理Topic的Broker,和一组订阅消息的Consumer。Topic可以有多个分区,每个分区只存储于一个Broker。Producer可以按照一定的策略将消息划分给指定的分区,如简单的轮询各个分区或者按照特定字段的Hash值指定分区。Broker需要通过ZooKeeper记录集群的所有Broker、选举分区的Leader,记录Consumer的消费消息的偏移量,以及在Consumer Group发生变化时进行rebalance。Broker接收和发送消息是被动的:由Producer主动发送消息,Consumer主动拉取消息。

只要仔细分析Kafka框架,我们就会发现以下严重的安全问题:

  1. 网络中的任何一台主机,都可以通过启动Broker进程而加入Kafka集群,能够接收Producer的消息,能够篡改消息并发送给Consumer。

  2. 网络中的任何一台主机,都可以启动恶意的Producer/Consumer连接到Broker,发送非法消息或拉取隐私消息数据。

  3. Broker不支持连接到启用Kerberos认证的ZooKeeper集群,没有对存放在ZooKeeper上的数据设置权限。任意用户都能够直接访问ZooKeeper集群,对这些数据进行修改或删除。

  4. Kafka中的Topic不支持设置访问控制列表,任意连接到Kafka集群的Consumer(或Producer)都能对任意Topic读取(或发送)消息。

随着Kafka应用场景越来越广泛,特别是一些数据隐私程度较高的领域(如道路交通的视频监控),上述安全问题的存在犹如一颗定时炸弹,一旦内网被黑客入侵或者内部出现恶意用户,所有的隐私数据(如车辆出行记录)都能够轻易地被窃取,而无需攻破Broker所在的服务器。

如何让Kafka更安全?

基于上述分析,Transwarp从以下两个方面增强Kafka的安全性:

  • 身份认证(Authentication):设计并实现了基于Kerberos和基于IP的两种身份认证机制。前者为强身份认证,相比于后者具有更好的安全性,后者适用于IP地址可信的网络环境,相比于前者部署更为简便。

  • 权限控制(Authorization):设计并实现了Topic级别的权限模型。Topic的权限分为READ(从Topic拉取数据)、WRITE(向Topic中生产数据)、CREATE(创建Topic)和DELETE(删除Topic)。

基于Kerberos的身份机制如下图所示:

Broker启动时,需要使用配置文件中的身份和密钥文件向KDC(Kerberos服务器)认证,认证通过则加入Kafka集群,否则报错退出。

Producer(或Consumer)启动后需要经过如下步骤与Broker建立安全的Socket连接:

  1. Producer向KDC认证身份,通过则得到TGT(票证请求票证),否则报错退出。

  2. Producer使用TGT向KDC请求Kafka服务,KDC验证TGT并向Producer返回SessionKey(会话密钥)和ServiceTicket(服务票证)。

  3. Producer使用SessionKey和ServiceTicket与Broker建立连接,Broker使用自身的密钥解密ServiceTicket,获得与Producer通信的SessionKey,然后使用SessionKey验证Producer的身份,通过则建立连接,否则拒绝连接。

ZooKeeper需要启用Kerberos认证模式,保证Broker或Consumer与其的连接是安全的。

Topic的访问控制列表(ACL)存储于ZooKeeper中,存储节点的路径为/acl/<topic>/<user>,节点数据为R(ead)、W(rite)、C(reate)、D(elete)权限的集合,如/acl/transaction/jack节点的数据为RW,则表示用户jack能够对transaction这个topic进行读和写。

另外,Kafka为特权用户,只有Kafka用户能够赋予/取消权限。因此,ACL相关的ZooKeeper节点权限为Kafka具有所有权限,其他用户不具有任何权限。

构建安全的Kafka服务

首先,我们为Broker启用Kerberos认证模式,配置文件为/etc/kafka/conf/server.properties,安全相关的参数如下所示:

其中,authentication参数表示认证模式,可选配置项为simple, kerberos和ipaddress,默认为simple。当认证模式为kerberos时,需要额外配置账户属性principal和对应的密钥文件路径keytab。

认证模式为ipaddress时,Producer和Consumer创建时不需要做任何改变。而认证模式为kerberos时,需要预先创建好相应的principal和keytab,并使用API进行登录,样例代码如下所示:

public class SecureProducer extends Thread {

  private final kafka.javaapi.producer.Producer<Integer, String> producer;
  private final String topic;
  private final Properties props = new Properties();

  public SecureProducer(String topic) {
    AuthenticationManager.setAuthMethod("kerberos");
    AuthenticationManager.login("producer1", "/etc/producer1.keytab");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("metadata.broker.list",

"172.16.1.190:9092,172.16.1.192:9092,172.16.1.193:9092");
    // Use random partitioner. Don't need the key type. Just set it to Integer.
    // The message is of type String.
    producer = new kafka.javaapi.producer.Producer<Integer, String>(

new ProducerConfig(props));
    this.topic = topic;
  }

Topic权限管理

Topic的权限管理主要是通过AuthorizationManager这个类来完成的,其类结构如下图所示:

 

其中

  • resetPermission(user, Permissions, topic) 为重置user对topic的权限。

  • grant(user, Permissions, topic) 为赋予user对topic权限。

  • revoke(user, Permissions, topic) 为取消user对topic权限。

  • isPermitted(user, Permissions, topic) 为检查user对topic是否具有指定权限。

调用grant或revoke进行权限设置完成后,需要commit命令提交修改到ZooKeeper。

kerberos模式

Kerberos模式下,AuthorizationManager需要先使用AuthenticationManager.login方法登录,与ZooKeeper建立安全的连接,再进行权限设置。示例代码如下所示:

public class AuthzTest {

  public static void main(String[] args) {
    Properties props = new Properties();
    props.setProperty("authentication", "kerberos");
    props.setProperty("zookeeper.connect", "172.16.2.116:2181,172.16.2.117:2181,172.16.2.118:2181");
    props.setProperty("principal", "kafka/host1@TDH");
    props.setProperty("keytab", "/usr/lib/kafka/config/kafka.keytab");
    ZKConfig config = new ZKConfig(props);
    AuthenticationManager.setAuthMethod(config.authentication());
    AuthenticationManager.login(config.principal(), config.keytab());
    AuthorizationManager authzManager = new AuthorizationManager(config);
    // reset permission READ and WRITE to ip 172.16.1.87 on topic test
    authzManager.resetPermission("172.16.1.87",
        new Permissions(Permissions.READ, Permissions.WRITE), "test");
    // grant permission WRITE to ip 172.16.1.87 on topic test
    authzManager.grant("172.16.1.87", new Permissions(Permissions.CREATE), "test");
    // revoke permission READ from ip 172.16.1.87 on topic test
    authzManager.revoke("172.16.1.87", new Permissions(Permissions.READ), "test");
    // commit the permission settings
    authzManager.commit();
    authzManager.close();
  }
}

ipaddress模式

ipaddress认证模式下,取消和赋予权限的操作如下所示:

public class AuthzTest {

  public static void main(String[] args) {
    Properties props = new Properties();
    props.setProperty("authentication", "ipaddress");
    props.setProperty("zookeeper.connect",
      "172.16.1.87:2181,172.16.1.88:2181,172.16.1.89:2181");
    ZKConfig config = new ZKConfig(props);
    // new authorization manager
    AuthorizationManager authzManager = new AuthorizationManager(config);
    // reset permission READ and WRITE to ip 172.16.1.87 on topic test
    authzManager.resetPermission("172.16.1.87",
      new Permissions(Permissions.READ, Permissions.WRITE), "test");
    // grant permission WRITE to ip 172.16.1.87 on topic test
    authzManager.grant("172.16.1.87", new Permissions(Permissions.CREATE), "test");
    // revoke permission READ from ip 172.16.1.87 on topic test
    authzManager.revoke("172.16.1.87", new Permissions(Permissions.READ), "test");
    // commit the permission settings
    authzManager.commit();
    authzManager.close();
  }
}

总结

本文介绍了Kafka现有架构,深入挖掘其中存在的安全问题,并给出Transwarp在Kafka安全上所做的工作及其使用方式。其实,Transwarp对Hadoop安全性的贡献不仅只局限于Kafka,在长期的开发过程中,我们还一直在改进整个Hadoop & Spark生态系统在安全功能方面的缺陷,不断努力寻找可用性和安全性的平衡点,帮助集群抵抗恶意攻击,加强信息的安全防护,努力打造一个可靠易用的优秀数据平台。




大数据开放实验室由星环信息科技(上海)有限公司运营,专门致力于大数据技术的研究和传播。若转载请在文章开头明显注明“文章来源于微信订阅号——大数据开放实验室”,并保留作者和账号介绍。


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存