企业网络安全之漏洞扫描
一次性付费进群,长期免费索取教程,没有付费教程。
教程列表见微信公众号底部菜单
进微信群回复公众号:微信群;QQ群:460500587
微信公众号:计算机与网络安全
ID:Computer-network
漏洞扫描、入侵感知和应急响应是技术维度日常工作中最重要的3个部分。
漏洞是指缺少安全措施或采用的安全措施有缺陷,可能会被攻击者利用,对企业的信息资产的安全造成损害。漏洞扫描就是利用扫描器发现漏洞的过程。
企业的安全工程师在业务上线前或对公司所有资产进行周期性地例行扫描,可以在被攻击者发现可利用的漏洞之前发现并修复,将可能带来的损害减少到最低,对企业信息安全来说有积极主动、防患于未然的作用。外部黑客渗透前需要踩点,在得知域名及IP等有限的信息逐步进行尝试、还需要绕过ACL、IDS、WAF等防御措施。企业内部扫描可以直接拿到所有服务器的资产列表、所有Web的域名及在每个IDC部署扫描服务器,所以内部扫描更加方便、更全面。即便是这种信息不对称的情况下,外部的黑客或白帽子总有办法找到漏洞,所以各大互联网公司纷纷都建立了自己的应急响应中心(俗称SRC),并给报漏洞的白帽子发丰厚的奖励。(注:当下的SRC已经不只奖励漏洞,同时还奖励举报的危害线索)可以说内部扫描可以发现99%以上的漏洞,剩余的则需要建立应急响应中心借助广大白帽子的力量(众测,类似众筹的一种形式)一起消灭掉,比如接收外部白帽子提供的漏洞和威胁情报信息并进行奖励。
一、漏洞扫描的种类
1、按漏洞类型分类
● ACL扫描
● 弱口令扫描
● 系统及应用服务漏洞扫描
(1)ACL扫描
ACL扫描是用来按一定的周期监视公司服务器及网络的ACL的,比如无需对外开放的端口或IP是否暴露在了公网中。ACL扫描器的作用如下:
1)安全部门可以根据扫描报告督促网络管理员和系统管理员关闭暴露在公网中的高危服务,避免重要的服务因放在公网中被入侵的风险。
2)等某些应用或某些版本的应用发现新漏洞时,安全部门可以快速从数据库中查到存在漏洞的服务及版本,直接报到业务部门去修复。
ACL扫描的周期至少为一天一次,对于不同规模服务器的企业可以采用以下的方式:
1)对于服务器数量较少的公司,可以直接用nmap扫描,并将扫描出来的IP、端口、应用服务名、应用版本、时间等信息存放到数据库中。
2)对于服务器数量很多的公司,可以用Masscan扫描出所有的端口信息,然后再用nmap去识别端口的协议及应用程序版本信息,可以根据实际情况部署扫描服务器的数量、形成分布式的架构,加快扫描速度。
(2)弱口令扫描
管理员因疏忽大意或安全意识薄弱给网络设备、服务器或应用使用了默认的和简单的口令,这种弱口令的设备挂在公网上后很快就被黑客或蠕虫扫描到并快速渗透。常见的扫描器如Nessus、x-scan、h-scan、Hydra都具备弱口令扫描的功能,其中hydra支持的服务列表如下:
asterisk cisco cisco-enable cvs ftp ftps http[s]-{head|get}http[s]-{get|post}-form http-proxy http-proxy-urlenum
icq imap[s]irc ldap2[s]ldap3[-{cram|digest}md5][s]mssql mysql(v4)nntp oracle-listener oracle-sid pcanywhere
pcnfs pop3[s]postgres rdp redis rexec rlogin rsh s7-300 sip smb smtp[s]smtp-enum snmp
socks5 ssh sshkey svn teamspeak telnet[s]vmauthd vnc xmpp
以下为一个Python调用hydra扫描SSH弱口令的脚本,扫描结束后会将结果写到一个文本文件中:
import os
import re
import glob
import datetime
# import subprocess
import torndb
from bs4 import BeautifulSoup
# honeypot white db
db_info = dict(
hostname='127.0.0.1',
database='honeypot',
username='xxxxx',
password='xxxxxx
)
# Nmap scan class
# ------------------------------------------------------------------------------
class NmapScaner(object):
def __init__(self, ip_list, exclude_file):
self.dir = '/data1/ssh_scan'
self.ip_list = "%s/%s" % (self.dir, ip_list)
self.exclude_file = exclude_file
self.report = "%s/report_%s.xml" % (self.dir, str(datetime.datetime.now())[:10])
self.ssh_list = "%s/report_%s.ssh" % (self.dir, str(datetime.datetime.now())[:10])
self.cmd = '/usr/bin/nmap -iL %s --excludefile %s -p 22,8022 -oX %s -n --open -vv' % (
self.ip_list, self.exclude_file, self.report
)
self.hosts = []
def start(self):
print self.cmd
os.system(self.cmd)
# p = subprocess.Popen(self.cmd, shell=True)
# p.wait()
def result(self):
print "report: ", self.report, type(self.report)
p1 = r'<address addr="(.+?)" addrtype=".*"></address>'
r1 = re.compile(p1)
p2 = r'<port portid="(.+?)" protocol=".*">'
r2 = re.compile(p2)
soup = BeautifulSoup(open(self.report).read())
results = soup.find_all("host")
for item in results:
host = dict()
host["ports"] = list()
ret_ip = r1.findall(str(item.address))
if ret_ip:
ip = ret_ip[0]
host["ip"] = ip
for port in item.ports:
ret_port = r2.findall(str(port))
if ret_port:
host["ports"].append(ret_port[0])
self.hosts.append(host)
# print hosts
f = open(self.ssh_list, "w")
for item in self.hosts:
ports = item.get('ports')
ip = item.get('ip')
if ip in CONST_HONEYPOT:
continue
for port in ports:
f.write('%s:%s\n' % (ip, port))
return self.ssh_list
# Crack ssh passwd
# ------------------------------------------------------------------------------
class CrackSSH(object):
def __init__(self, ssh_filename):
self.ssh_file = ssh_filename
self.dir = '/data1/ssh_scan'
self.dir1 = '/data1/ssh_scan/scan_ssh'
self.hydra = '/usr/local/bin/hydra'
def prepare(self):
cmd = 'rm -f %s/x*' % self.dir1
print cmd
os.system(cmd)
cmd = "/usr/bin/killall -9 hydra"
os.system(cmd)
# ret = subprocess.call(cmd, shell=True)
# print ret
os.environ = '/data1/ssh_scan'
cmd = 'cd %s;/usr/bin/split -l 2000 %s' % (self.dir1, self.ssh_file)
# print cmd
# ret = subprocess.call(cmd, shell=True)
# print ret
os.system(cmd)
os.environ = None
def scan(self):
search = r'%s/x*' % self.dir1
# print search, type(search)
iplist = glob.glob(search)
# print iplist
for ip in iplist:
cmd = '%s -vV -L %s/user.txt -P %s/password.txt -M %s ssh -o
%s.log -t 4 -w 10 -e nsr >> %s.log &' % \
(self.hydra, self.dir, self.dir, ip, ip, self.ssh_file)
# print cmd
os.system(cmd)
# Honeypot white list
# ------------------------------------------------------------------------------
class HoneyWhite(object):
def __init__(self):
self.db = torndb.Connection(
host=db_info.get('hostname'), database=db_info.get('database'),
user=db_info.get('username'), password=db_info.get('password')
)
self.whiteList = []
def result(self):
sql = "select * from honeypotip"
ret = self.db.query(sql)
for item in ret:
self.whiteList.append(item.get('ip').strip())
return self.whiteList
# Main Function
# ------------------------------------------------------------------------------
if __name__ == '__main__':
# get honeypot white list
honey_white = HoneyWhite()
CONST_HONEYPOT = honey_white.result()
print "Honeypot white list\n", CONST_HONEYPOT
start_time = datetime.datetime.now()
ip_list_file = 'ip_list_test.txt'
exclude_file = "exclude_file.txt"
h_exclude_file = open(exclude_file, "w")
for ip in CONST_HONEYPOT:
h_exclude_file.write("%s\n" % ip)
h_exclude_file.close()
nmap_scanner = NmapScaner(ip_list_file, exclude_file)
nmap_scanner.start()
ssh_file = nmap_scanner.result()
end_time = datetime.datetime.now()
use_time = (end_time - start_time).seconds / 60.0
print "Start Time:%s, End Time:%s, Cost %s minutes" % (start_time, end_time, use_time)
# start to crack ssh weak password
crack_ssh = CrackSSH(ssh_file)
crack_ssh.prepare()
crack_ssh.scan()
(3)系统及应用服务漏洞扫描
常见的系统及应用服务漏洞扫描器有Nessus及开源的openVAS,当服务器数量巨大时,需要部署多台Nessus服务器以集群模式进行扫描。
实践方法如下:
1)用程序调用Nessus的接口,将Nessus的漏洞扫描做成周期性任务,每天对全部服务器进行一次安全扫描,并将扫描结果入库,按漏洞级别进行分级。
2)程序自动建立工单并提交到业务部门进行修复,修复好后再转到安全部门确认,形成一个良性的闭环(如果您所在的公司没有工单系统,则至少需要建立一个漏洞管理系统代替)。
Nessus官方提供的REST API接口的GitHub地址为:https://github.com/tenable/nessrest。
(4)Web漏洞扫描
● Acunetix Web Vulnerability Scanner(AWVS)
● IBM Rational AppScan
● sqlmap
● w3af
● arachni
● Zed Attack Proxy
以上几款扫描器中,前2款是商业软件,后几款是免费开源的。
实践方法如下:
网站较少的公司。安全工程师手工用扫描器进行Web漏洞扫描即可,但至少要使用2款以上扫描器进行交叉确认,避免因某款扫描器漏报导致漏洞没扫到而被外界黑客利用的情况发生。一般建议AWVS必用,再配合zap或arachni进行确认。
网站较多的公司。大中型的互联网公司有成千上万个大大小小的网站,安全工程师人肉利用扫描工具进行扫描已经不现实了,需要自研扫描工具,实现自动化、批量化的漏洞扫描。常见的一个自动化Web漏洞扫描器的架构图1所示。
图1 自动化Web安全检测平台
2、按扫描器行为分类
根据扫描器是否主动发包的行为,可将扫描器分为以下几种:
1)主动扫描
2)半被动扫描器
3)全被动扫描器
(1)主动扫描
常规的扫描器都是主动发包,然后根据返回的包判断目标设备是否存在漏洞。对于Web扫描器来说,是先将URL爬出来,然后再在该URL中各个可以输入参数的地方测试注入、XSS等负载。常用的AWVS、Sqlmap、Nessus等都是主动扫描器。
(2)半被动扫描
其实该类扫描器还是属于主动扫描器,区别是URL的获取途径不是爬虫,而是以下几种方式。
1)通过Access log获取URL
例如将用户或QA访问站点的Access log去重后进行扫描。
2)通过流量镜像的方式获取URL
通过旁路镜像得到全流量URL,去重后进行扫描。对于比较大规模的Web资源扫描,可以通过Storm流式计算平台将来自分光的全流量URL库rewrite替换,去重归一,验证真实性后作为扫描器的输入源,由消息队列推送至分布式扫描器中。以下为一个利用WAF log、爬虫结果及流量镜像中的URL作为输入源的扫描器的架构如图2所示。
图2 URL库作为扫描器输入源
3)HTTP代理式的扫描器
这种方式常被QA或渗透测试人员使用,在浏览器设置一个代理,然后去访问网站的页面,每访问一个URL就会被放到后台去扫描,基本的框架代码如下:
class ProxyHandler(tornado.web.RequestHandler):
SUPPORTED_METHODS = ['GET', 'POST', 'CONNECT']
@tornado.web.asynchronous
def get(self):
url_info = dict(
method=
self.request.method,
url=self.request.uri
)
self.request_info = None
def handle_response
(response):
if (response.error and not
isinstance(response.error, tornado.httpclient.HTTPError)):
self.set_status(500)
self.write('Internal server error:\n' + str(response.error))
else:
self.set_status(response.code)
for header in ('Date', 'Cache-Control', 'Server','Content-Type', 'Location'):
v = response.headers.get(header)
if v:
self.set_header(header, v)
v = response.headers.get_list('Set-Cookie')
if v:
for i in v:
self.add_header('Set-Cookie', i)
if response.body:
self.write(response.body)
# Insert http request and response into mongodb
if self.application.scan:
url = url_info.get('url')
url_filter = UrlFilter(url)
if url_filter.filter():
http_info = HttpInfo(url_info, self.request_info, response)
values = http_info.get_info()
mongodb = Mongodb(db_info)
mongodb.insert(values)
self.finish()
body = self.request.body
self.request_info = self.request
if not body:
body = None
try:
fetch_request(
self.request.uri, handle_response,
method=
self.request.method, body=body,
headers=self.request.headers, follow_redirects=False,
allow_nonstandard_methods=True)
except tornado.httpclient.HTTPError as e:
if hasattr(e, 'response') and e.response:
handle_response(e.response)
else:
self.set_status(500)
self.write('Internal server error:\n' + str(e))
self.finish()
完整的demo代码请参考:https://github.com/netxfly/passive_scan。
4)vpn式的扫描器
与前一种类似,不过该种扫描需要播入一个特定的VPN中,在VPN服务器中会设置一个透明代理,将80和443端口的数据转发到透明代理中,之后测试者每访问一个URL也会放到后台去扫描,以下的golang代码就实现了一个透明代理:
package main
import (
"flag"
"fmt"
"github.com/netxfly/Transparent-Proxy-Scanner/hyperfox/proxy"
"github.com/netxfly/Transparent-Proxy-Scanner/hyperfox/tools/capture"
"strings"
// "github.com/netxfly/Transparent-Proxy-Scanner/hyperfox/tools/logger"
"github.com/toolkits/slice"
"log"
"net/http"
"net/url"
"os"
"time"
"upper.io/db"
"upper.io/db/mongo"
)
const version = "0.9"
const (
defaultAddress = `0.0.0.0`
defaultPort = uint(3129)
defaultSSLPort = uint(3128)
)
const (
Host = "127.0.0.1"
Port = "27017"
User = "xsec"
Password = "x@xsec.io"
Database = "passive_scan"
)
var settings = mongo.ConnectionURL{
Address: db.Host(Host), // MongoDB hostname.
Database: Database, // Database name.
User: User, // Optional user name.
Password: Password, // Optional user password.
}
var (
flagAddress = flag.String("l", defaultAddress, "Bind address.")
flagPort = flag.Uint("p", defaultPort, "Port to bind to, default is 3129")
flagSSLPort = flag.Uint("s", defaultSSLPort, "Port to bind to (SSL mode),
default is 3128.")
flagSSLCertFile = flag.String("c", "", "Path to root CA certificate.")
flagSSLKeyFile = flag.String("k", "", "Path to root CA key.")
)
var (
sess db.Database
col db.Collection
)
var (
static_resource []string = []string{"js", "css", "jpg", "gif", "png", "exe",
"zip", "rar", "ico",
"gz", "7z", "tgz", "bmp", "pdf", "avi", "mp3", "mp4", "htm", "html", "shtml"}
)
// dbsetup sets up the database.
func dbsetup() error {
var err error
// Attemping to establish a connection to the database.
sess, err = db.Open(mongo.Adapter, settings)
fmt.Println(sess)
if err != nil {
log.Fatalf("db.Open(): %q\n", err)
}
// Pointing to the "http_info" table.
col, err = sess.Collection("http_info")
return nil
}
// filter function
func filter(content_type string, raw_url string) bool {
ret := false
if strings.Contains(content_type, "text/plain") || strings.Contains
(content_type, "application/x-gzip") {
url_parsed, _ := url.Parse(raw_url)
path := url_parsed.Path
t := strings.Split(path[1:], ".")
suffix := t[len(t)-1]
if !slice.ContainsString(static_resource, suffix) {
ret = true
}
}
return ret
}
// Parses flags and initializes Hyperfox tool.
func main() {
var err error
var sslEnabled bool
// Parsing command line flags.
flag.Parse()
// Opening database.
if err = dbsetup(); err != nil {
log.Fatalf("db: %q", err)
}
// Remember to close the database session.
defer sess.Close()
// Is SSL enabled?
if *flagSSLPort > 0 && *flagSSLCertFile != "" {
sslEnabled = true
}
// User requested SSL mode.
if sslEnabled {
if *flagSSLCertFile == "" {
flag.Usage()
log.Fatal(ErrMissingSSLCert)
}
if *flagSSLKeyFile == "" {
flag.Usage()
log.Fatal(ErrMissingSSLKey)
}
os.Setenv(proxy.EnvSSLCert, *flagSSLCertFile)
os.Setenv(proxy.EnvSSLKey, *flagSSLKeyFile)
}
// Creatig proxy.
p := proxy.NewProxy()
// Attaching logger.
// p.AddLogger(logger.Stdout{})
// Attaching capture tool.
res := make(chan capture.Response, 256)
p.AddBodyWriteCloser(capture.New(res))
// Saving captured data with a goroutine.
go func() {
for {
select {
case r := <-res:
if filter(r.ContentType, r.URL) {
// fmt.Println(r.Method, r.URL, r.ContentType)
if _, err := col.Append(r); err != nil {
log.Printf(ErrDatabaseError.Error(), err)
}
}
}
}
}()
cerr := make(chan error)
// Starting proxy servers.
go func() {
if err := p.Start(fmt.Sprintf("%s:%d", *flagAddress, *flagPort)); err != nil {
cerr <- err
}
}()
if sslEnabled {
go func() {
if err := p.StartTLS(fmt.Sprintf("%s:%d", *flagAddress, *flagSSLPort)); err != nil {
cerr <- err
}
}()
}
err = <-cerr
log.Fatalf(ErrBindFailed.Error(), err)
}
完整的实现代码请参考GitHub:https://github.com/netxfly/Transparent-Proxy-Scanner。
(3)全被动扫描
部署方式上类似于IDS,不主动爬URL,而是对B/S & C/S双向交互的数据流进行扫描以期发现漏洞,全被动扫描的特点如下:
1)不需要联网,不会主动爬取URL,不会主动发出任何数据包。
2)更关注漏洞感知,而不是入侵行为。
何时需要被动扫描?在日常的安全管理中,经常有一些业务部门会引发安全管理之痛,例如:
● 业务部门没有经过安全部门的安全扫描和评估就擅自上线,结果上线的服务器或站点被入侵了。
● 业务上线时确实经过安全扫描和评审环节,当时证明是安全的,但在业务运营的过程中,经过几次更新,把安全漏洞更新到生产环境中了。
● 部分业务因人员更换或离职变成了无人管理的业务,也有可能资产不在公司的资产列表中,因长期无人维护被攻击者钻了空子。
有了被动式扫描器后,可以对这些处于灰色地带的资产进行防护。
二、如何应对大规模的资产扫描
近年来云计算和大数据很火,不少厂商的安全部门也纷纷引入了大数据及分布式运算,比如安全日志分析、通过大数据反爬虫、用流量镜像中的URL进行Web扫描、分布式扫描器等。普通的扫描方式在数万或几十万台服务器的环境下会遇到以下问题:
1)单台或数台扫描器仍不足以覆盖海量IDC,完成全网扫描需要很多资源。
2)大量的并发扫描占用网络带宽,高峰时影响用户体验,执行深度检测可能会使应用或服务直接宕掉。
3)大量的误报以及中低风险漏洞会使人工解读和后续整理难上加难。
因此海量IDC规模下漏洞扫描需要寻求高效的方式,总体思路是减少工作量,有几个方法:
1)简化漏洞评估链,减少需要扫描的任务。
2)减少漏洞扫描的网络开销和被检查者的性能损耗。
3)减少漏洞扫描的种类。
4)减少手工确认的工作量。
在实践中,需要从以下几方面进行优化:
1)不做全网的漏洞扫描,先做端口扫描,这样做的前提是访问控制和纵深防御做到位,利用ACL大幅减少攻击面,把需要漏洞扫描的端口减少到22、80、443等,开的端口少了,全网全协议漏洞扫描就缩减为全网几个关键应用的扫描。
2)做好高危端口监控,防止“计划外”应用的滥用,这样漏洞扫描这件事就瘦身为端口监控加关键应用扫描。
3)在系统和应用扫描上,不完全依赖于网络扫描器,可同时借助于本机agent扫描,类似心脏滴血的漏洞与其从网络上去获取扫漏洞,不如在本地获取OpenSSL的版本更简单。服务器OS本地的agent除了可以扫系统型漏洞,还可以扫本地配置型漏洞,相对来说本地获取的信息比网络获取更准确,但代价是会消耗服务器资源,所以这块需要尽可能地减少性能损耗。
除了极个别大公司,对于绝大多数企业而言,自研扫描器比商业产品或成熟的开源产品扫描能力更强的可能性是不高的,但是单机扫描又严重影响效率,所以对于业务有一定规模但安全团队又没能力自制扫描器的公司,可以考虑将现有的扫描器改成分布式的,如下所示:
● 对于Web漏洞扫描,可以通过任务队列的方式将扫描任务发给awvs、arachni、w3af等扫描器,改成分布式扫描器。
● 对于服务器及网络漏洞扫描,可以多部署几台Nessus扫描器,并配置为集群模式,调用API进行大规模扫描。
三、结语
在攻防愈演愈烈的现代,光有基于静态漏洞规则和带fuzz功能的扫描还是会有许多漏洞覆盖不到,安全部门需要采取基于数据的扫描,比如结合社工库等。其次需要建立应急响应中心,让广大白帽子也参与到漏洞的挖掘与发现中,尽可能多地把暴露在外面的漏洞消灭掉。
大型互联网公司在安全实践上可能采取一些精简手段,只做某些事情,前提是他们已经做了另外一些看不见的防御措施,在某处精简必然是因为在其他地方削减了攻击面,并且有多层次的防御机制做互补,也就是说他们的方案往往是针对自身特点的,不是一个完全意义上的通用方案,如果我们的安全建设尚未到达那个阶段,直接和安全走在前沿的大型互联网公司看齐,采用人家“高大上”的安全解决方案时,很有可能会发生刻舟求剑的笑话,需要实事求是,根据实际情况逐步地建设所在机构的安全体系。
微信公众号:计算机与网络安全
ID:Computer-network