2023年7月17日发(作者:)
ELK技术栈-Logstash的详细使⽤本⽂作者:罗海鹏,叩丁狼⾼级讲师。原创⽂章,转载请注明出处。前⾔在第九章节中,我们已经安装好Logstash组件了,并且启动实例测试它的数据输⼊和输出,但是⽤的是最简单的控制台标准输⼊和标准输出,那这节我们就来深⼊的学习Logstash的详细使⽤。常⽤启动参数我们在上⼀节中演⽰了启动Logstash的实例,其中我们启动的时候给Logstash脚本传⼊了-e的参数,但实际上,Logstash的启动参数有很多,我们来看⼀下各个启动参数的作⽤:-e #⽴即启动实例,例如:./logstash -e "input {stdin {}} output {stdout {}}"-f #指定启动实例的配置⽂件,例如:./logstash -f config/-t #测试配置⽂件的正确性,例如:./logstash -f config/ -t-l #指定⽇志⽂件名称,例如:./logstash -f config/ -l logs/-w #指定filter线程数量,不指定默认是5,例如:./logstash-f config/ -w 8配置⽂件语法⽂件结构我们刚刚知道,启动参数可以指定⼀个配置⽂件,那么接下来就有必要来了解⼀下配置⽂件的结构:Logstash通过{}来定义区域,区域内可以定义插件,⼀个区域内可以定义多个插件,如下:input { #标准输⼊源插件 stdin { } #普通⽂件源插件 file { path => ["/var/log/*.log", "/var/log/message"] .... } ......}filter { #grok过滤插件 grok { match => ["message", "%{HTTPDATE:logdate}"] ..... } #date过滤插件 date { match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"] ..... } .....}output { stdout { } elasticsearch { hosts => ["127.0.0.1:9200"] .... } .....}我们先⼤概了解⼀下配置⽂件的结构,接下来我们再详细看这些插件的配置。数据类型Logstash配置⽂件⽀持的数据类型有:1、Boolean,例如:ssl_enable => true2、Number,例如:port => 333、String,例如:name => “Hello world”4、hash,例如:options => {key1 => “value1”, key2 => “value2”}5、array,例如:match => [“datetime”, “UNIX”, “ISO8601”]字段引⽤Logstash数据流中的数据被称之为Event对象,Event以JSON结构构成,Event的属性被称之为字段,如果你想在配置⽂件中引⽤这些字段,只需要把字段的名字写在中括号[]⾥就⾏了,如[type],对于嵌套字段每层字段名称都写在[]⾥就可以了,⽐如:[tags][type];除此之外,对于Logstash的arrag类型⽀持下标与倒序下表,如:[tags][type][0]和[tags][type][-1]以下的内容就是⼀个Event对象:{ "message" => "hello logstash", "@version" => "1", "@timestamp" => 2018-08-13T17:32:01.122Z, "host" => "omain"}条件判断Logstash⽀持下⾯的操作符:1、==(等于), !=(不等于), <(⼩于), >(⼤于), <=(⼩于等于), >=(⼤于等于)2、=~(匹配正则), !~(不匹配正则)3、in(包含), not in(不包含)4、and(与), or(或), nand(⾮与), xor(⾮或)5、()(复合表达式), !()(对复合表达式结果取反)例如以下的条件判断:if "_grokparsefailure" not in [tags] {}
else if [status] !~ /^2dd/ or ( [url] == "/" nand [geoip][city] != "beijing" ) {}
else {}环境变量引⽤Logstash⽀持引⽤系统环境变量,环境变量不存在时可以设置默认值,例如:export TCP_PORT=12345input { tcp { port => "${TCP_PORT:54321}" }}常⽤输⼊插件在第九章中,我们已经使⽤是标准输⼊,以键盘的输⼊数据作为Logstash数据源,但实际上我们也知道,Logstash的数据源有很多,每种数据源都有相应的配置,在Logstash中,这些数据源的相应配置称为插件,我们常⽤的输⼊插件有:file、jdbc、redis、tcp、syslog,这些输⼊插件会监听数据源的数据,如果新增数据,将数据封装成Event进程处理或者传递,更多的输⼊插件⼤家可以看Logstash官⽹,接下来我们以file和jdbc两个输⼊插件作为例⼦,来学习输⼊插件的使⽤,其他输⼊插件使⽤起来⼤同⼩异,⼤家⾃⾏扩展。file输⼊插件读取⽂件插件主要⽤来抓取⽂件的数据变化信息,以此作为Logstash的数据源。配置⽰例:input{file { path => ["/var/log/*.log", "/var/log/message"] type => "system" start_position => "beginning"}}output{stdout{}}常⽤参数参数名称数据类型arraystringstringnumberstringnumbernumberstringhashstringnumberar默认值描述path⽆⽤于匹配被监控的⽂件,如”/var/logs/*.log”或者 “/var/log/message”,必须使⽤绝对路径type⽆Event的type字段,如果采⽤elasticsearch做store,在默认情况下将作为elasticsearch的typesincedb_path“$HOME/.sincedb*”⽂件读取记录,必须指定⼀个⽂件⽽不是⽬录,⽂件中保存没个被监控的⽂件等当前inode和byteoffsetsincedb_write_interval15间隔多少秒写⼀次sincedb⽂件start_position“end”值为“beginning”和“end”,从⽂件等开头还是结尾读取⽂件内容,默认是结尾,如果需要导⼊⽂件中的⽼数据,可以设置为“beginning”,该选项只在第⼀次启动logstash时有效,如果⽂件已经存在于sincedb的记录内,则此配置⽆效stat_interval1间隔多少秒检查⼀下⽂件是否被修改,加⼤此参数将降低系统负载,但是增加了发现新⽇志的间隔时间close_older3600设置⽂件多少秒内没有更新就关掉对⽂件的监听codec“plain”输⼊数据之后对数据进⾏解码add_field{}⽤于向Event中添加字段delimiter“n”⽂件内容的⾏分隔符,默认按照换⾏进⾏Event对象封装,也就是说⼀⾏数据就⼀个Event对象discover_interval15间隔多少秒查看⼀下path匹配的路径下是否有新⽂件产⽣exclude参数名称idra数y据类st型ringnumberarray⽆默认值⽆path匹配的⽂件中指定例外,如:path => “/var/log/“;exclude =>”*.gz”描述区分两个相同类型的插件,⽐如两个filterignore_older⽆忽略历史修改,如果设置3600秒,logstash只会发现⼀⼩时内被修改过的⽂件,⼀⼩时之前修改的⽂件的变化不会被读取,如果再次修改该⽂件,所有的变化都会被读取,默认被禁⽤tags⽆可以在Event中增加标签,以便于在后续的处理流程中使⽤jdbc输⼊插件该插件可以使⽤jdbc把关系型数据库的数据作为Logstash的数据源配置⽰例:input {jdbc { jdbc_driver_library => "/opt/logstash/" jdbc_driver_class => "" jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb" jdbc_user => "mysql" jdbc_password => "123456" parameters => { "favorite_artist" => "Beethoven" } schedule => "* * * * *" statement => "SELECT * from songs where artist = :favorite_artist"}}output{stdout{}}常⽤参数(空 = 同上)参数名称数据类型stringstringstring默认值描述jdbc_driver_library⽆jdbc连接驱动的jar包位置jdbc_driver_classjdbc_connection_string⽆jdbc驱动类⽆数据库的连接地址jdbc_us参数名er称st数ri据n类g型stringbooleannumberstringhashbooleanstringbooleanhsahstringboole默⽆认值数据库的⽤户名描述jdbc_password⽆数据库的密码jdbc_paging_enabledfalse开启分页jdbc_page_size100000每页查询多少条数据statement⽆执⾏的SQL查询语句parameters{}设置SQL查询语句的参数use_column_valuefalse是否需要在程序中使⽤查询出来的列tracking_column⽆需要在程序中使⽤哪个查询出来的列clean_runfalse是否应该保留先前的运⾏状态colums_charset{}特定列的字符编码。此选项将覆盖指定列的原来charset配置schedule⽆定时执⾏数据读取的任务,使⽤cornexpression表达式:分、时、天、⽉、年,全部为*默认含义为每分钟执⾏任务,该cornexpression表达式于Linux系统的crontab使⽤⽅式⼀样,如果不会使⽤cornexpression表达式,可以查看crontab相关⽂档。不设置的话,只执⾏⼀次数据的读取lowercase_column_natru是否需要把查询出来的列名转换成⼩写,也就是说即使列名是驼峰的,也会转成全部都是⼩写的,默认为转⼩写,如果不需要,则设置为falsemes参数名称record_last_runa数n据类b型ooleanstringe默认值true是否记录数据库中最后⼀条数据的位置描述last_run_metadata_pathadd_fieldcodecidtagstype⽆记录数据库中最后⼀条数据的位置信息存放路径常⽤过滤插件丰富的过滤器插件的是 logstash威⼒如此强⼤的重要因素,过滤器插件主要处理流经当前Logstash的事件信息,可以添加字段、移除字段、转换字段类型,通过正则表达式切分数据等,也可以根据条件判断来进⾏不同的数据处理⽅式。我们常⽤的过滤插件有:grok、date、geoip、mutate、json、Split、ruby,更多的过滤插件⼤家可以看Logstash官⽹,接下来我们以grok、date和geoip这3个过滤插件作为例⼦,来学习过滤插件的使⽤,其他过滤插件使⽤起来⼤同⼩异,⼤家⾃⾏扩展。grok正则插件grok正则捕获是Logstash中将⾮结构化数据解析成结构化数据以便于查询的最好⼯具,⾮常适合解析system log,web log, database log等任意的 log⽂件。内置正则表达式调⽤grok提供100多个常⽤正则表达式可供使⽤,这100多个正则表达式定义在logstash/vendor/bundle/jruby/x.x/gems/logstash-patterns-core-xxx/patterns/grok-patterns⽂件中,想要灵活的匹配各种数据,那么必须查看该⽂件,⼤概了解grok提供了什么内置的正则表达式。调⽤它们的语法如下:%{SYNTAX:SEMANTIC}SYNTAX:表⽰内置的正则表达式的名称SEMANTIC:表⽰在Event中创建该字段名来存储匹配到的值例如:输⼊的数据内容为“[debug] 127.0.0.1 - test log content”,我们想提取127.0.0.1这个IP地址,那么可以使⽤以下语法来匹配:%{IP:client},将获得“client: 127.0.0.1”的结果,该结果将成为Event的⼀个新的字段和字段值;如果你在捕获数据时想进⾏数据类型转换可以使⽤%{NUMBER:num:int}这种语法,默认情况下,所有的返回结果都是string类型,当前Logstash所⽀持的转换类型仅有“int”和“float”;⾃定义表达式调⽤与预定义表达式相同,你也可以将⾃定义的表达式配置到Logstash中,然后就可以像于定义的表达式⼀样使⽤;以下是操作步骤说明:1、在Logstash根⽬录下创建⽂件夹“patterns”,在“patterns”⽂件夹中创建⽂件“extra”(⽂件名称⽆所谓,可⾃⼰选择有意义的⽂件名称);2、在⽂件“extra”中添加表达式,格式:patternName regexp,名称与表达式之间⽤空格隔开即可,例如:POSTFIX_QUEUEID [0-9A-F]{10,11}3、使⽤⾃定义的表达式时需要在grok插件配置“patterns_dir”属性,属性值为extra⽂件所在的⽬录。配置⽰例⽇志⽂件每⾏内容为:55.3.244.1 GET / 15824 0.043 message-id:BEF25A72965grok表达式:表达式:%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}配置⽂件内容:input {file { path => "/var/log/"}}filter {grok { patterns_dir => ["./patterns"] match => {"message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} message-id:%{POSTFIX_QUEUEID:q}}output{ stdout{}}输出结果:client: 55.3.244.1method: GETrequest: /tes: 15824duration: 0.043queueid: BEF25A72965⽰例解析1、/var/log/⽂件每⼀⾏的格式为55.3.244.1 GET / 15824 0.043 message-id:BEF25A72965,到时候会把每⼀⾏数据封装成⼀个Event。2、使⽤grok过滤插件处理该⽂件,match为匹配Event中的message,message就是该⽂件的⼀⾏数据,⽐如55.3.244.1 GET /
15824 0.043 message-id:BEF25A72965。3、匹配message的内容使⽤%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} message-id:%{POSTFIX_QUEUEID:queueid}表达式,把⼀条message拆成client、method、request、bytes、duration、queueid这6个字段,并添加到Event中。4、其中%{POSTFIX_QUEUEID:queueid}为⾃定义表达式的调⽤,其他5个是grok内置的表达式调⽤,如果需要使⽤⾃定义表达式,则需要在grok插件配置patterns_dir属性,属性值数据类型为array,就是⾃定义表达式定义的⽂件所在的⽬录。5、在表达式中,特殊字符需要使⽤来转义,⽐如-,"",[]这些特殊字符,所以我们上⾯实例中的message-id数据在⽤表达式匹配的时候是使⽤了message-id去匹配了。常⽤参数(空 = 同上)参数名称数据类型arrayarraystring{}默认值描述matchpatterns_dirpatterns_files_globadd_fieldadd_tagidbreak_on_matchkeep_empty_capturesclean_runcolums_charsetoverwriteperiodic_flushremove_fieldremove_tagtag_on_failuretag_on_timeouttimeout_millis设置pattern数组指定⾃定义的pattern⽂件存放⽬录,Logstash在启动时会读取⽂件夹内所有⽂件,但前提是这些⽂件的内容是按照grok语法语法来定义的表达式变量⽤于匹配patterns_dir中的⽂件[]“*”booleanbooleanbooleanhsaharraybooleanarrayarrayarraystringnumbertruematch字段存在多个pattern时,当第⼀个匹配成功后结束后⾯的匹配,如果想匹配所有的pattern,将此参数设置为false如果为true,捕获失败的字段奖设置为空值falsefalse是否应该保留先前的运⾏状态{}特定列的字符编码。此选项将覆盖指定列的原来charset配置覆盖字段内容: match=> { “message” => “%{SYSLOGBASE} %{DATA:message}” } overwrite=> [ “message” ]定期调⽤filter的flush⽅法[]false[]从Event中删除指定字段: remove_field=> [ “fieldname” ][][“_grokparsefailure”]“_groktimeout”删除“tags”中的值: remove_tag=> [ “tagname” ]当没有匹配成功时,将此array添加到“tags”字段内当匹配超时时,将此字符串内容添加到“tags”字段内30000设置单个match到超时时间,单位:毫秒,如果设置为0,则不启⽤超时设置date时间处理插件在讲date插件的使⽤前,我们需要先讲解⼀下Logstash的时间记录⽅式。在Logstash产⽣了⼀个Event对象的时候,会给该Event设置⼀个时间,字段为“@timestamp”,同时,我们的⽇志内容⼀般也会有时间,但是这两个时间是不⼀样的,因为⽇志内容的时间是该⽇志打印出来的时间,⽽“@timestamp”字段的时间是input插件接收到了⼀条数据并创建Event的时间,所有⼀般来说的话“@timestamp”的时间要⽐⽇志内容的时间晚⼀点,因为Logstash监控数据变化,数据输⼊,创建Event导致的时间延迟。这两个时间都可以使⽤,具体要根据⾃⼰的需求来定。但是不管“@timestamp”字段的时间还是⽇志内容中的时间,其时间格式⼀般都不是我们想要的,所以我们就需要使⽤date插件,把时间格式转成我们想要的格式,⽐如:⽐如将Apr 17 09:32:01(MMM dd HH:mm:ss)转换为04-17 09:32:01 (MM-dd HH:mm:ss)。配置⽰例:filter { date { match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z"] }
}常⽤参数(空 = 同上)参数名称数据类型默认值描述add_fieldadd_tagperiodic_flushidremove_fieldremove_tagremove_tagtag_on_failurematcharray时间字段匹配,可⾃定多种格式,直到匹配到或者匹配结束,格式:[ field,formats… ],如:match=> [ “logdate”, “MMM dd yyyy HH:mm:ss”, “MMM d yyyy HH:mm:ss”,“ISO8601” ][]targettimezonestringstring“@timestamp”⽆指定match匹配并且转换为date类型的存储位置(字段),默认覆盖到“@timestamp”指定时间格式化的时区geoip插件配置⽰例:filter { if [remote_ip] !~ "^127.|^192.168.|^172.1[6-9].|^172.2[0-9].|^172.3[01].|^10." { geoip { source => "remote_ip" database => "/usr/share/GeoIP/" } }}⽰例解析由于geoip使⽤的ip数据库不能匹配私⽹地址,所以在使⽤geoip插件前,先判断⼀下ip地址是否为私⽹ip,如果不是,这使⽤geoip插件。source为指定⼀个ip地址,为其查询归属地等信息,remote_ip为Event中存储ip地址的⾃定义字段,database为IP地址数据库所在的位置,不指定的话,使⽤geoip插件内置的GeoLite2-City默认数据库。常⽤参数(空 = 同上)参数名称数据类型默认值描述add_fieldadd_tagperiodic_flushidremove_fieldremove_tagremove_tagstringsource⽆要通过geoip插件查询的IP地址所在的字段tag_on_failurearray[“_geoip_lookup_failure”]如果ip地址查询不到地理位置信息,这在标签中添加该值cache_sizenumber1000由于geoip查询很耗时间,所以默认情况下,查询过⼀次的ip地址会缓存起来,⽬前geoip没有内存数据驱逐策略,设置的缓存⽤完了就不能在缓存新的数据了,所以缓存需要设置⼀个合理值,太⼩查询性能增加不明显,太⼤就很占⽤服务器的内存targe参数t名称st数ri据n类g型stringstring“geoi默认值p”把IP地址匹配到的信息放到该字段中描述database⽆使⽤的Maxmind数据库⽂件的路径。 如果不指定,则默认数据库是GeoLite2-City。 GeoLite2-City,GeoLite2-Country,GeoLite2-ASN是Maxmind⽀持的免费数据库。 GeoIP2-City,GeoIP2-ISP,GeoIP2-Country是Maxmind⽀持的商业数据库。default_database_type“City”该字段的值只能接受”City”和”ASN”,GeoLite2数据库下细分为GeoLite2-City,GeoLite2-ASN和GeoLite2-Country,geoip插件内嵌了GeoLite2-City和GeoLite2-ASN,默认使⽤GeoLite2-City,如果需要使⽤GeoLite2-ASN,则需设置该字段并设置为ASNfieldsarray⽆要包含在Event中的geoip字段的信息(每个数据库的信息都不⼀样)。 默认情况下,所有信息都会列出。对于内置的GeoLite2-City数据库,可以使⽤以下信息:city_name, continent_code,country_code2, country_code3, country_name, dma_code, ip, latitude, longitude,postal_code, region_name 和 timezone常⽤输出插件经过以上的学习,我们已经学习了Logstash三⼤主件中的其中两个了,分别是input和filter,那现在我们就来学习最后⼀个组件:output。每个数据流经过input和filter后,最终要由output输出,以上的内容我们都是使⽤最简单的标准输出:stdout,把数据输出到显⽰器,但实际上stdout只是Logstash的其中⼀个输出插件⽽已,它的常⽤输出插件有:Elasticsearch,Redis,File,TCP等等,更多的输出插件⼤家可以看Logstash官⽹,接下来我们以Elasticsearch和Redis输出插件作为例⼦,来学习输出插件的使⽤,其他过滤插件使⽤起来⼤同⼩异,⼤家⾃⾏扩展。elasticsearch输出插件⽤于将Event信息写⼊到Elasticsearch中,官⽅推荐插件,ELK技术栈必备插件。配置⽰例output { elasticsearch { hosts => ["127.0.0.1:9200"] index => "logstash-%{type}-%{+}" document_type => "%{type}" }}常⽤参数(空 = 同上)参数名称add_fieldadd_tagperiodic_flushflush_sizeidremove_field数据类型默认值描述remove_tag参数名称codecenable_metrichostsindexdocument_typedocument_iduserpasswordparent数据类型默认值描述arraystringstringstringstringstringstringnumbernumberstringstringstringbooleanbooleanhashnumber⽆[⽆][⽆][⽆][⽆][⽆]“nil”1000100⽆⽆⽆falsetrue⽆60elasticsearch服务器集群所在的地址⽂档索引库名称⽂档类型,不指定的话,类型名为“doc”⽂档id,不指定的话,由elasticsearch⾃动⽣成elasticsearch⽤户名elasticsearch集群访问密码为⽂档⼦节点指定⽗节点的idpool_maxpool_max_per_routeproxytemplatetemplate_nametemplate_overwritemanage_templateparameterstimeoutelasticsearch最⼤连接数每个“endpoint”的最⼤连接数代理URL设置⾃定义的⽂档映射模版存放路径设置使⽤的默版名称是否始终覆盖现有模版是否启⽤elasticsearch模版,Logstash⾃带⼀个模版,但是只有名称匹配“logstash-*”的索引才会应⽤该默版添加到elasticsearch URL后⾯的参数键值对⽹络超时时间redis输出插件⽤于将Event写⼊Redis中进⾏缓存,由于Redis数据库是先把数据存在内存的,所以效率会⾮常⾼,是⼀个常⽤的logstash输出插件配置⽰例output { redis { host => ["127.0.0.1"] port => 6379 data_type => "list" key => "logstash-list" }}常⽤参数(空 = 同上)参数名称数据类型默认值描述add_fieldadd_tagperiodic_flushflush_sizeidremove_fieldremove_tagcodecenable_metrichostsarraynumbernumberstringstringstringbooleannumber[“127.0.0.1”]redis服务列表,如果配置多个,将随机选择⼀个,如果当前的redis服务不可⽤,将选择下⼀个port6379⽂档索引库名称db0使⽤的redis数据库编号,默认使⽤0号数据库passworddata_typekey⽆redis的密码存储在redis中的数据类型,只能使⽤“list”和“channel”这两个值。如果使⽤“list”,将采⽤“RPUSH”操作,如果是“channel”,将采⽤“PUBLISH”操作给redis设置⼀个key,来存储收集到的数据库⽆⽆batchfalse是否启⽤redis的batch模式,仅在data_type=”list”时有效batch_events50batch⼤⼩,batch达到此⼤⼩时执⾏“RPUSH”后记那到这⾥,我们已经介绍了Logstash配置⽂件的语法和常⽤插件的配置⽅式了,这些常⽤的插件都是使⽤频率⾮常⾼的,所有我们后⾯需要来做⼀个Logstash的实战案例,综合运⽤我们这章所学的内容。
发布者:admin,转转请注明出处:http://www.yc00.com/xiaochengxu/1689545193a264972.html
评论列表(0条)