2023年6月27日发(作者:)
开源流量分析zeek(⼜名pro)⼊侵检测系统-流量分析其他同类项⽬Suricata、Snort
Zeek (Bro) 是⼀款⼤名⿍⿍的开源⽹络安全分析⼯具。通过 Zeek 可以监测⽹络流量中的可疑活动,通过 Zeek 的脚本可以实现灵活的分析功能,可是实现多种协议的开相机⽤的分析。本⽂主要是将 Zeek 结合被动扫描器的⼀些实践的介绍,以及 Zeek 部署的踩过的⼀些坑。安装Zeek 的安装还是⽐较简单的,笔者主要是在 Mac 上以及 Linux 上安装。这两个操作系统的安装⽅式还是⽐较类似的。对于 Linux ⽽⾔,需要安装⼀些依赖包:sudo yum install cmake make gcc gcc-c++ flex bison libpcap-devel openssl-devel python-devel swig zlib-devel这⾥我有遇到⼀个问题就是可能你的 Redhat 镜像源⾥⾯没有包含 libpcap-devel,因为这个包在可选的范围内,⽽内⽹的服务器⼜没有互联⽹连接。可以通过⼿⼯下载相应版本的 libpcap 以及 libpcap-devel 即可。Mac 上需要的依赖更少⼀点,⾸先需要确保安装了 xcode-select,如果没有安装,可以通过 xcode-select --install 来进⾏安装。Mac 上只需要安装依赖 cmake, swig, openssl, bison 即可,可以通过 Homebrew 来进⾏安装。依赖包安装完毕之后就可以安装 Zeek,其实是可以通过包管理⼯具来进⾏安装的,不过这⾥我推荐使⽤基于源码的安装⽅式,安装⽐较简单⽽且还容易排查问题。从 Zeek 的 Github 即可下载源码包,⽬前我安装的是 3.0.0 版本,注意⼀点是,如果使⽤最新的版本,可能需要 7.0 以上版本的 cmake,因为需要 C++17 的语⾔特性。⽽⼀般镜像源默认的 cmake 版本是4+版本,所以如果你的服务器也⽆法上互联⽹,建议可以安装 3.0.0 版本。./configure & make & make install安装使⽤上⾯的命令就可以了,不过 make 的时间还是⽐较长的,这个取决于你机器的性能,不过⼀般安装还是需要半个⼩时到⼀个⼩时,这也是因为 C++ 编译速度⽐较慢的原因。集群安装集群安装的⽅式和单机的⽅式不太⼀样。之前在测试环境使⽤的都是单机模式,集群则可以管理多个实例,后来我也尝试了通过集群的⽅式来进⾏安装。如果需要配置集群的话,建议安装 PF_RING,PF_RING 可以加速⽹络包的速度。对于 Zeek 集群上的每个 worker 都是需要安装 PF_RING,但只需要在 manager 上安装 Zeek 就可以了,可以通过 zeekctl 在其它 worker 上安装 Zeek。不过需要确保可以通过ssh 到其它 woker 机器上,可以通过公钥的形式来实现,将 manager 的公钥放到其它 worker 的 authorized_keys 中。PF_RING 的安装步骤相对来说多了⼀些,但也是按照说明安装即可。和上⾯的单机安装⽅式不同的是集群安装的⽅式的时候,安装 Zeek需要配置前缀。安装 PF_RINGtar xvzf PF_ PF_RING-5.6.2/userland/lib./configure --prefix=/opt/pfringmake installcd ../libpcap./configure --prefix=/opt/pfringmake installcd ../tcpdump-4.1.1./configure --prefix=/opt/pfringmake installcd ../../kernelmakemake installmodprobe pf_ring enable_tx_capture=0 min_num_slots=32768安装 Zeek./configure --with-pcap=/opt/pfringmake
make install确保 Zeek 正确关联到了 PF_RING 中的 libpcap 库中ldd /usr/local/zeek/bin/zeek | grep pcap .1 => /opt/pfring/lib/.1 (0x00007fa6d7d24000)接着就是通过 PF_RING 来进⾏ Zeekctl 的配置,Zeek 的安装路径⼀般都在 /usr/local/zeek。通过 /usr/local/zeek/etc/ 来进⾏集群结点的配置,在集群配置中,manager, proxy 以及 worker 是必须的,如果不设置 logger,默认将 manager 作为 logger。[worker-1]type=workerhost=10.0.0.50interface=eth0lb_method=pf_ringlb_procs=10pin_cpus=2,3,4,5,6,7,8,9,10,11接下来只需要通过 zeekctl install 就会在其它实例上来进⾏安装了。如果安装过程中出现了问题,可以通过 zeekctl diag woker-1 来排查具体的原因。Zeek 结合被动扫描器的玩法上⾯讲的都是 Zeek 的安装,下⾯聊⼀下 Zeek 和被动扫描器的结合。被动扫描器的效果往往取决于流量的质量和数量,在我们的实际实践中,发现通过 Zeek 获取的流量占我们被动扫描器测试流量的绝⼤⼀部分。Zeek 对于 http 解析的⽇志都会存储在 /usr/local/zeek/logs中。如果 Zeek 是启动状态,那么 的路径会在 /usr/local/zeel/logs/current 中,⽽历史⽇志则会被打包。如果使⽤ Zeek 去捕获流量的时候,⽇志往往会占很⼤的存储,所以要记得修改 Zeek ⽇志的存储路径,否则很容易就把系统盘塞满。通过脚本⾃定义 中其实已经包含了丰富的字段,常见的⼀些字段如下:# ts uid orig_h orig_p resp_h resp_p1311627961.8 HSH4uV8KVJg 192.168.1.100 52303 192.150.187.43 80不过⾥⾯还有⼀些信息是缺失的,⽐如⼀些 http 请求头以及 POST 请求的请求体,为了添加这些字段,可以通过⾃定义 Zeek 脚本来实现,Zeek 脚本的能⼒真的⾮常强⼤,通过脚本其实有很多更⾼级的玩法。添加请求头@load base/protocols/http/mainmodule HTTP;export { redef record Info += { header_host: string &log &optional; header_accept: string &log &optional; header_accept_charset: string &log &optional; header_accept_encoding: string &log &optional; header_accept_language: string &log &optional; header_accept_ranges: string &log &optional; header_authorization: string &log &optional; header_connection: string &log &optional; header_cookie: string &log &optional; header_content_length: string &log &optional; header_content_type: string &log &optional; };}event http_header(c: connection, is_orig: bool, name: string, value: string) &priority=3 { if ( ! c?$http ) return; if ( is_orig ) { if ( log_client_header_names ) { switch ( name ) { case "HOST": c$http$header_host = value; break; case "ACCEPT": c$http$header_accept = value; break; case "ACCEPT-CHARSET": c$http$header_accept_charset = value; break; case "ACCEPT-ENCODING": c$http$header_accept_encoding = value; break; case "ACCEPT-LANGUAGE": c$http$header_accept_language = value; break; case "ACCEPT-RANGES": c$http$header_accept_ranges = value; break; case "AUTHORIZATION": c$http$header_authorization = value; break; case "CONNECTION": c$http$header_connection = value; break; case "COOKIE": c$http$header_cookie = value; break; case "CONTENT-LENGTH": c$http$header_content_length = value; break; case "CONTENT-TYPE": c$http$header_content_type = value; break; } } } }添加 POST 请求体export { ## The length of POST bodies to extract. const http_post_body_length = 200 &redef;}redef record HTTP::Info += { postdata: string &log &optional;};event log_post_bodies(f: fa_file, data: string) { for ( cid in f$conns ) { local c: connection = f$conns[cid]; if ( ! c$http?$postdata ) c$http$postdata = ""; # If we are already above the captured size here, just return. if ( |c$http$postdata| > http_post_body_length ) return; c$http$postdata = c$http$postdata + data; if ( |c$http$postdata| > http_post_body_length ) { c$http$postdata = c$http$postdata[0:http_post_body_length] + "..."; } } }event file_over_new_connection(f: fa_file, c: connection, is_orig: bool) { if ( is_orig && c?$http && c$http?$method && c$http$method == "POST" ) { Files::add_analyzer(f, Files::ANALYZER_DATA_EVENT, [$stream_event=log_post_bodies]); } }通过上述的脚本就可以添加⼀些请求头以及 POST 请求的请求体,完整的脚本可以参考 。脚本编写完毕,需要通过 zeekctl 部署才能⽣效,步骤也⾮常简单。mv http-custom /usr/local/bro/share/bro/base/protocolsecho '@load base/protocols/http-custom' >> /usr/local/bro/share/bro/site/kctl deploy对于被动扫描器,我们⽬前的⽅案是通过 Filebeat 去采集⽇志然后输出给 Logstash 做处理,处理完毕之后再输出到 Kafka。Filebeat 加 Logstash 适⽤于多种场景,在⽇常的各种⽇志采集场景都能派上⽤场。通过 Logstash 可以完成⽇志灵活的处理,因为Logstash ⾥⾯包含了各种丰富的插件,⼏乎可以完成对于⽇志的任何操作。⽐如为了保证 POST 请求体保证传输的正确性,可以通过base64 来进⾏编码。通过 可以遍历地实现字段的编码或者解码。通过 filter 中的 mutate 插件可以增加字段或者删除字段。base64 { field => "postdata" action => "encode" }通过这种⽅案还有⼀个优势就是我们还可以将我们的⽇志输出到别的地⽅,⽐如 es,这个也可以⽅便后续排查⽇志问题。不过我在后⾯⼜发现了⼀种新的⽅案,可以通过 Zeek 的插件,将 直接输出到 Kafka,这个⽅案的优点主要是更⾼效,同时也节省了⼀些成本,毕竟 Logstash 需要的机器性能还是⽐较⼤的。对于这个⽅案主要是两个问题,第⼀个问题是⾸先需要处理好⽇志的格式,这样保证后续处理地便利性;第⼆个问题是如何将⽇志直接从 Zeek 输出到 Kafka。其实我是先解决了第⼀个问题再解决第⼆个问题的,因为第⼆个问题的处理的⽅式更灵活,得益于 Zeek 脚本的便利性,肯定是可以实现的。 是 Apache 官⽅的⼀个 Bro 的插件,不过因为 Zeek3.0.0 是可以兼容的,所以这个插件是可以使⽤的。这个插件有两种安装⽅式,⼀种是通过 bro-pkg (Bro 的官⽅包管理⼯具)来进⾏安装,另外⼀种则是通过⼿⼯安装。由于⽹络的原因,我更推荐使⽤⼿⼯安装的⽅式,我尝试通过 bro-pkg 的⽅式来进⾏安装,速度特别慢。安装 librdkafkacurl -L /edenhill/librdkafka/archive/ | tar xvzcd librdkafka-0.11.5/./configure --enable-saslmakesudo make install安装插件./configure --bro-dist=$BRO_SRCmakesudo make install这⾥有⼀个坑就是安装⽂档根本就没有说 $BRO_SRC 是哪个路径,所以安装的时候总是报错,后来才弄清楚这个路径其实就是当初 Zeek解压后的路径,即 Zeek 安装包的路径。验证结果zeek -N Apache::KafkaApache::Kafka - Writes logs to Kafka (dynamic, version 0.3)接着就是将 http 的⽇志进⾏处理,因为在原始的 中有还多字段是我们并不需要的。在研究了官⽅⽂档之后,可以通过 可以定义⼀个新的⽇志⽂件,可以拷贝其它的⽇志输出到新的⽂件,可以⾃定义字段,⽅式⽐较灵活。另外还可以通过 Writer 可以将⽇志写⼊到sqlite 数据库中。不过,这⾥我们主要是通过插件将⽇志写⼊到 Kafka。我们的⽬标是获取 中的部分字段,所以可以通过 Filters 来实现⽇志⽂件的复制并且对⽇志字段进⾏过滤,基于 KafkaWriter 将⽇志⽂件直接写⼊到 Kafka 中。为了定义 Filter,在 /usr/local/zeek/share/zeek/base/protocols/http/ 的 zeek_init 函数中进⾏定义:event zeek_init() &priority=5 { Log::create_stream(HTTP::LOG, [$columns=Info, $ev=log_http, $path="http"]); Analyzer::register_for_ports(Analyzer::ANALYZER_HTTP, ports); local filter: Log::Filter = [ $name="kafka-http", $include=set("host","_p","uri"), $writer=Log::WRITER_KAFKAWRITER ]; Log::add_filter(HTTP::LOG, filter); }另外,记得在 /usr/local/zeek/share/zeek/site/ 中定义 Kafka 的 topic 和 Broker:redef Kafka::topic_name = "bro-test";redef Kafka::kafka_conf = table( [""] = "127.0.0.1:9092");最后记得使⽤ zeekctl deploy 重新部署⼀下,这样脚本就⽣效了,⽇志就可以直接写⼊到 Kafka 中,⼤⼤提⾼效率。总结其实 Zeek 有很多⾼级玩法,你完全可以将 Zeek 改造成⼀个 IDS 产品。Zeek 脚本的强⼤能⼒赋予其⽆限的可能性,⽐如在流量中发现sql 注⼊。本⽂主要就是就 Zeek 的安装部署以及结合被动扫描器的⼀些⽤法的介绍。后续如果更进⼀步地探索,会做更多的分享。
发布者:admin,转转请注明出处:http://www.yc00.com/xiaochengxu/1687866533a52125.html
评论列表(0条)