2023年7月17日发(作者:)
使⽤ELK保存Syslog、Netflow⽇志和审计⽹络接⼝流量简介ELK是三个开源软件的缩写,分别表⽰:Elasticsearch , Logstash, Kibana , 它们都是开源软件。后来新增了⼀个Beats,它是⼀个轻量级的⽇志收集处理⼯具(Agent),Beats占⽤资源少,适合于在各个服务器上搜集⽇志后传输给Logstash,官⽅也推荐此⼯具。Elasticsearch是个开源分布式搜索引擎,它的特点有:分布式,零配置,⾃动发现,索引⾃动分⽚,索引副本机制,restful风格接⼝,多数据源,⾃动搜索负载等。Logstash是开源的数据收集引擎。它可以动态统⼀不同来源的数据,并将数据标准化到您选择的⽬标输出。它提供了⼤量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。Kibana是⼀个开源的分析与可视化平台,它可以为 Logstash 和 ElasticSearch 提供的⽇志分析友好的 Web 界⾯,可以帮助您汇总、分析和搜索重要数据⽇志。Beats是⼀个轻量级⽇志采集器,早期的ELK架构中使⽤Logstash收集、解析⽇志,但是Logstash对内存、cpu、io等资源消耗⽐较⾼。相⽐Logstash,Beats所占系统的CPU和内存⼏乎可以忽略不计。Beats集合有7个成员⼯具,其中Packetbeat是负责收集⽹络流量⽇志的。⼀、下载⼆、环境系统:openEuler 20.03 LTS SP2⽹卡1:ens33IP地址:172.25.53.160/24⽹卡2:ens37IP地址:⽆ 作为审计接⼝三、 安装Logstash、Elasticsearch、Kibana#
创建⽬录mkdir -p /opt/softs#
进⼊软件包⽬录cd /opt/softs#
上传安装包或者下载安装包wget /downloads/logstash/logstash-7.15.2-x86_t /downloads/elasticsearch/elasticsearch-7.15.2-x86_t /downloads/kibana/kibana-7.15.2-x86_#
安装logstashrpm -ivh logstash-7.15.2-x86_#
安装elasticsearchrpm -ivh elasticsearch-7.15.2-x86_#
安装kibanarpm -ivh kibana-7.15.2-x86_四、配置Elasticsearch#
移动数据⽬录mv /var/lib/elasticsearch /opt/#
修改配置⽂件vi /etc/elasticsearch/参考#
数据⽬录: /opt/elasticsearch#
⽇志⽬录: /var/log/elasticsearch#
集群名称: "cluster01"#
集群模式:单节点: "single-node"#
绑定IP为所有_host: 0.0.0.0#
开启安全管理d: true#
启动elasticsearch服务systemctl start e#
设置密码/usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactive内置⽤户名:elastic 超级⽤户apm_system APM监控⽤户kibana_system Kibana⽤户logstash_system Logstash⽤户beats_system Beats⽤户remote_monitoring_user 远程监控⽤户五、配置Logstash#
移动logstash⽬录mv /var/lib/logstash /opt/#
修改服务配置⽂件vi /etc/logstash/参考#
数据⽬录: /opt/logstash#
⽇志⽬录: /var/log/logstash#
创建⽇志配置⽂件touch /etc/logstash/conf.d/#
编辑⽇志配置⽂件vi /etc/logstash/conf.d/参考#
⽇志输⼊input { #
监听TCP UDP 1514端⼝接收syslog⽇志 syslog { type => syslog port => 1514 timezone => "Asia/Shanghai" } #
监听TCP UDP 2055端⼝接收netflow⽇志 syslog { type => netflow port => 2055 codec => netflow codec => netflow timezone => "Asia/Shanghai" }}#
⽇志处理过滤filter { #
判断⽇志输⼊部分类型为syslog的⽇志 if [type] == "syslog"{ # grok过滤插件
主要⽤来提取字段内的内容⽣成新字段
具体参考下⽂grok插件章节 grok { match =>{ "message" =>".*source-ip=%{IPV4:src_ip}.*source-port=%{POSINT:src_port}.*destination-ip=%{IPV4:dst_ip}.*destination-port=%{POSINT:dst_port}.*time=(?
主要⽤来刷新⽇志的时间戳字段
具体参考date插件章节 date{ match => [ "time", "MMM dd HH:mm:ss"] locale => "en" add_tag => "@timestamp" timezone => "Asia/Shanghai" } } #
判断⽇志输⼊部分类型为netflow的⽇志 if [type] == "netflow"{ # date过滤插件
主要⽤来刷新⽇志的时间戳字段
具体参考date插件章节 date{ match => [ "flowStartSeconds", "UNIX"] locale => "en" add_tag => "@timestamp" timezone => "Asia/Shanghai" } }}#
⽇志输出output { #
判断⽇志类型 syslog if [type] == "syslog"{ #
判断⽇志来源主机IP地址
只输出需要的来源主机
如:测试机 if [host] == "10.0.0.3" { #
输出到elasticsearch
elasticsearch {
# elasticsearch
协议
主机
端⼝ hosts => ["127.0.0.1:9200"] #
索引名 index => "syslog-%{[host]}-%{+}" # elasticsearch⽤户名 user => "elastic" # elasticsearch密码 password => "⽤户密码" } } } #
判断⽇志类型 netflow if [type] == "netflow"{ if [host] == "10.0.0.3" { elasticsearch {
hosts => ["127.0.0.1:9200"] index => "netflow-%{[host]}-%{+}" user => "elastic" password => "⽤户密码" } } } } #
⽇志输出到控制台(调试时取消注释使⽤) #stdout { # codec => rubydebug #}}grok插件例⼦原始数据:source-ip=192.168.0.1 source-port=12345表达式:source-ip=%{IPV4:src_ip}s+source-port=%{POSINT:src_port}会提取IP地址192.168.0.1放⼊src_ip字段会提取12345端⼝放⼊src_port字段其中IPV4和POSINT为预定义规则grok内置规则参考:USERNAME [a-zA-Z0-9._-]+USER %{USERNAME}INT (?:[+-]?(?:[0-9]+))BASE10NUM (?[+-]?(?:(?:[0-9]+(?:.[0-9]+)?)|(?:.[0-9]+)))NUMBER (?:%{BASE10NUM})BASE16NUM (?(?"(?>.|[^"]+)+"|""|(?>'(?>.|[^']+)+')|''|(?>`(?>.|[^`]+)+`)|``))UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}# NetworkingMAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})CISCOMAC (?:(?:[A-Fa-f0-9]{4}.){2}[A-Fa-f0-9]{4})WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:)))(%.+)?IPV4 (?/(?>[w_%!$@:.,-]+|.)*)+TTY (?:/dev/(pts|tty([pq])?)(w+)?/?(?:[0-9]+))WINPATH (?>[A-Za-z]+:|)(?:[^?*]*)+WINPATH (?>[A-Za-z]+:|)(?:[^?*]*)+URIPROTO [A-Za-z]+(+[A-Za-z+]+)?URIHOST %{IPORHOST}(?::%{POSINT:port})?# uripath comes loosely from RFC1738, but mostly from what Firefox# doesn't turn into %XXURIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_-]*)+#URIPARAM ?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?URIPARAM ?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?-[]]*URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?# Months: January, Feb, 3, 03, 12, DecemberMONTH b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)bMONTHNUM (?:0?[1-9]|1[0-2])MONTHNUM2 (?:0[1-9]|1[0-2])MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])# Days: Monday, Tue, Thu, DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)# Years?YEAR (?>dd){1,2}HOUR (?:2[0123]|[01]?[0-9])MINUTE (?:[0-5][0-9])# '60' is a leap second in most time standards and thus is (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])# datestamp is YYYY/MM/DD-HH:MM: (or something like it)DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))ISO8601_SECOND (?:%{SECOND}|60)TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?DATE %{DATE_US}|%{DATE_EU}DATESTAMP %{DATE}[- ]%{TIME}TZ (?:[PMCE][SD]T|UTC)DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}# Syslog Dates: Month Day HH:MM:SSSYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}PROG (?:[w._/%-]+)SYSLOGPROG %{PROG:program}(?:[%{POSINT:pid}])?SYSLOGHOST %{IPORHOST}SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}# ShortcutsQS %{QUOTEDSTRING}# Log formatsSYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "(?:%{WORD:verb} %{NOTSPACE:request}(?:
HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}# Log LevelsLOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)date插件date插件主要⽤来提取字段内的⽇期时间转换成相应的格式存⼊某些字段,缺省存⼊@timestamp字段。官⽅⽂档:date内置规则字符ISO8601UNIXUNIX_MSTAI64NyyyyyyMMMMMMMMMMdddHHHmmmsssSSSSSSZZZZZZwwwDeE,EE,EEEEEEE释义解析任何有效的ISO8601时间解析浮点或整形10位时间戳解析整形13位时间戳解析TAI64N时间4位年份2位年份1位或2位⽉份2位⽉份英⽂缩写⽉份完整的⽉份1位或2位⽇2位⽇1位或2位⼩时2位⼩时1位或2位分钟2位分钟1位或2位秒2位秒⼗分之⼀秒百分之⼀秒千分之⼀秒时区时区时区标识⼀年中的第⼏周2位⼀年中的第⼏周⼀年中的第⼏天星期⼏英⽂缩写星期⼏英⽂全称星期⼏例⼦2011-04-19T03:44:01.103Z13261491FebFebruary540400-0700-07:00Asia/Shanghai1012457SunTuesday#
测试配置⽂件是否有错误/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/ --_and_exit#
临时关闭防⽕墙systemctl stop firewalld#
取消/etc/logstash/conf.d/配置⽂件内输出部分的控制台输出注释#
以配置⽂件热加载的形式临时启动
测试是否输⼊输出⽇志/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/ --tic使⽤telnet或其他调试⼯具发送tcp或udp包到1514端⼝测试是否正常输⼊输出测试数据:<123>Dec 8 2021 06:10:48 USG6600E:vsys=public, protocol=6, source-ip=172.16.1.2, source-port=63354,destination-ip=172.16.1.1, destination-port=80, time=2021/12/8 14:10:48.#
成功解析后Ctrl+C关闭临时启动# /etc/logstash/conf.d/配置⽂件内输出部分的控制台输出注释掉#
启动logstash服务systemctl start e六、配置Kibana#
修改配置⽂件vi /etc/kibana/参考#
监听端⼝: 5601#
绑定: "0.0.0.0"# elasticsearch⽤户名me: "kibana_system"# elasticsearch密码rd: "⽤户密码"#
开启中⽂: "zh-CN"#
设置URL全称BaseUrl: "172.25.53.160:5601"#
启动服务systemctl start e七、配置防⽕墙#
开启防⽕墙systemctl start firewalld#
放⾏端⼝firewall-cmd --zone=public --add-port=1514/udp --permanentfirewall-cmd --zone=public --add-port=1514/tcp --permanentfirewall-cmd --zone=public --add-port=2055/udp --permanentfirewall-cmd --zone=public --add-port=2055/tcp --permanentfirewall-cmd --zone=public --add-port=9200/tcp --permanentfirewall-cmd --zone=public --add-port=9300/tcp --permanentfirewall-cmd --zone=public --add-port=5601/tcp --permanent#
重载防⽕墙firewall-cmd --reload如果需要从tcp 80端⼝访问kibana需要配置端⼝转发(1514转发到514同理)iptables防⽕墙端⼝转发# tcp80端⼝转发到5601iptables -t nat -A PREROUTING -i ens33 -p tcp --dport 80 -j REDIRECT --to-port 5601#
保存配置iptables-save⼋、登录Kibana浏览器访问 172.25.53.160:5601 输⼊⽤户名elastic密码******登录点击⾃⼰浏览使⽤telnet或其他调试⼯具发送tcp或udp包到1514端⼝测试是否正常输⼊输出测试数据:<123>Dec 8 2021 06:10:48 USG6600E:vsys=public, protocol=6, source-ip=172.16.1.2, source-port=63354,destination-ip=172.16.1.1, destination-port=80, time=2021/12/8 14:10:48.左侧菜单栏点击 Management - Stack Management左侧⼆级菜单点击 数据 - 索引管理可以看到已经⾃动创建索引syslog-xxxxxx由于是单节点部署不满⾜副本分⽚1所以修改副本分⽚为0所以状态为黄⾊点击 索引名 - 编辑 修改_of_replicas的值为0后点击保存 状态恢复为绿⾊点击左侧⼆级菜单 Kibana - 索引模式创建索引模式输⼊名称syslog-*匹配所有syslog⽇志时间戳字段选择 @timestamp点击创建索引模式⼀级菜单Analytics - Discover可以查看⽇志数据点击⼀级菜单 Observability - ⽇志点击右上⾓的设置按钮点击 使⽤Kibana索引模式选择之前创建的索引模式根据配置⽇志的列应⽤设置点击⼆级菜单 Logs - Stream可以查看、搜索⽇志九、审计⽹络接⼝流量9.1 安装Packetbeat#
创建⽬录mkdir -p /opt/softs#
进⼊软件包⽬录cd /opt/softs#
上传安装包或者下载安装包wget /downloads/beats/packetbeat/packetbeat-7.16.0-x86_#
安装packetbeatrpm -ivh packetbeat-7.16.0-x86_9.2 配置Packetbeat#
修改配置⽂件vi /etc/packetbeat/参考……#
配置审计⽹卡接⼝
可以是: ens37……#
对接kibana# =================================== Kibana =================================== host: "localhost:5601"……#
对接elasticsearch# ---------------------------- Elasticsearch Output ----------------------------csearch: # elasticsearch地址和端⼝ hosts: ["localhost:9200"] #
⽤户名 username: "elastic" #
密码 password: "⽤户密码"……#
启动packetbeat服务systemctl start e#
切换⾄packetbeat⽬录cd /usr/share/packetbeat/bin#
添加⾯板packetbeat setup --dashboards登录Kibana ⼀级菜单 Analytics - Dashboard 可以查看Packetbeat⾃带的⾯板[Packetbeat] Overview ECS⾯板⼗、设置开机⾃动启动#
开机⾃动启动elasticsearchsystemctl enable e#
开机⾃动启动logstashsystemctl enable e#
开机⾃动启动kibanasystemctl enable e#
开机⾃动启动packetbeatsystemctl enable e
发布者:admin,转转请注明出处:http://www.yc00.com/news/1689545286a264986.html
评论列表(0条)