一、介绍
Logstash 是一个开源的服务器端数据处理管道,能够从多个来源动态地采集数据,转换数据,然后传输数据到指定的地方,不受格式或复杂度的影响。就像一根连接的管道,数据从一端到另一端,期间我们还可以在中间加上一层过滤网对数据进行转换过滤。Logstash最重要的便是输入、过滤转换以及输出了,即输入→过滤器→输出。
输入:采集各种格式、大小和来源的数据。数据往往以各种各样的形式,或分散或集中地存在于很多系统中。Logstash 支持各种输入选择,可以同时从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
过滤转换:实时解析和转换数据。在数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便进行更强大的分析和实现商业价值。
输出:选择存储库导出数据。虽然 Elasticsearch 是推荐首选输出方向,但它并非唯一选择。Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方。
Logstash 采用可插拔框架,拥有 200 多个插件。您可以将不同的输入选择、过滤器和输出选择混合搭配、精心安排,让它们在管道中和谐地运行。
Logstash常用于日志采集,一般和Elasticsearch、Kibana一起作为日志收集器使用。
二、支持的插件
请阅读官方文档:
• 输入插件:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
• 输出插件:https://www.elastic.co/guide/en/logstash/current/output-plugins.html
• 过滤器插件:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
• 编解码器插件:https://www.elastic.co/guide/en/logstash/current/codec-plugins.html
三、安装使用
Logstash需要java环境,自行安装。
[root@web01 ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.0-x86_64.rpm
[root@web01 ~]# rpm -vih logstash-7.10.0-x86_64.rpm
[root@web01 ~]# vim /etc/profile
export PATH=/usr/share/logstash/bin/:$PATH
[root@web01 ~]# source /etc/profile
[root@web01 ~]# logstash --version
Using bundled JDK: /usr/share/logstash/jdk
logstash 7.10.0
下面我们来简单使用下,例如以下示例使用最简单的标准输入插件和标准输出插件:
[root@web01 ~]# logstash -e 'input { stdin {} } output { stdout {} }'
......
[INFO ] 2020-12-08 08:16:52.387 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
hello # 随便输入些字符
{ # Logstash的输出信息
"message" => "hello",
"host" => "web01",
"@version" => "1",
"@timestamp" => 2020-12-08T13:17:14.864Z
}
收集标准输入的数据到文件中:
[root@web01 ~]# logstash -e 'input { stdin {} } output { file { path => "/tmp/1.txt" } }'
......
hello
[INFO ] 2020-12-08 08:31:50.398 [[main]>worker0] file - Opening file {:path=>"/tmp/1.txt"}
world
[root@web01 ~]# cat /tmp/1.txt
{"@version":"1","@timestamp":"2020-12-08T13:31:50.162Z","message":"hello","host":"web01"}
{"@version":"1","@timestamp":"2020-12-08T13:31:54.500Z","message":"world","host":"web01"}
收集标准输入的数据到Elasticsearch中:
[root@web01 ~]# logstash -e 'input { stdin {} } output { elasticsearch { hosts => "10.0.0.5:9200" index => "test" document_id => 1 } }'
......
hello world
[root@web01 ~]# curl -XGET "10.0.0.5:9200/test/_source/1?pretty"
{
"message" : "hello world",
"@version" : "1",
"@timestamp" : "2020-12-09T02:35:08.304Z",
"host" : "web01"
}
四、配置文件结构
以上示例都是使用 -e 选项以命令行给定的配置参数启动Logstash实例,在实际中我们一般使用 -f 选项从特定文件或目录中加载配置参数来启动实例。下面我们来具体了解下Logstash配置文件的结构:
# 注释
input {
...
}
filter {
...
}
output {
...
}
每个部分都包含一个或多个插件的配置选项。如果指定多个过滤器,则会按照它们在配置文件中出现的顺序进行应用。插件的配置包含插件名称,后跟该插件的一组设置,可以配置的设置因插件类型而异,关于每个插件的信息,可以从前文列出的插件相关的官方文档获得。
例如上文收集标准输入的数据到Elasticsearch中的示例使用配置文件方式启动实例:
[root@web01 logstash]# vim stdin-es.yml
# 收集标准输入数据到es中
input {
stdin {}
}
output {
elasticsearch{
hosts => "10.0.0.5:9200"
index => "test"
document_id => 1
}
}
[root@web01 logstash]# logstash -f stdin-es.yml
五、值类型
插件可以要求设置的值是某种类型,例如boolean、list或hash。支持以下值类型:
1、数组
例如:
users => [ {id => 1, name => bob}, {id => 2, name => jane} ]
2、列表
例如:
path => [ "/var/log/messages", "/var/log/*.log" ]
uris => [ "http://elastic.co", "http://example.net" ]
3、布尔
布尔值必须为true或false。请注意,true和false关键字没有用引号引起来。
例如:
ssl_enable => true
4、字节数
字节字段是代表有效字节单位的字符串字段,这是在插件选项中声明特定大小的便捷方法。
SI(k MGTPEZY)和Binary(Ki Mi Gi Ti Pi Ei Zi Yi)单元均受支持。
该字段不区分大小写,并且接受值和单位之间的空格。如果未指定单位,则整数字符串表示字节数。
例如:
my_bytes => "1113" #1113字节
my_bytes => "10MiB" #10485760字节
my_bytes => "100kib" #102400字节
my_bytes => "180 mb" #180000000字节
5、编码器(codec)
编码器是用来表示数据的Logstash codec的名称,编解码器可用于输入和输出。
输入编解码器,可以在数据输入之前解码数据。输出编解码器,可以在数据离开输出之前对数据进行编码。
使用输入或输出编解码器是一种便捷方法,可以无需在Logstash管道中使用单独的过滤器。
可用编解码器列表可在前文提到的编码器插件官方文档页面上找到。
例如:
codec => "json"
6、Hash
hash是格式中指定的键值对的集合("field1" => "value1")。请注意,多个键值条目由空格而不是逗号分隔。
例如:
match => {
"field1" => "value1"
"field2" => "value2"
...
}
# 或者作为一行,条目之间没有逗号相隔
match => { "field1" => "value1" "field2" => "value2" }
7、Number
数字必须是有效的数值(浮点数或整数)。
例如:
port => 33
8、Password
密码是一个不会被记录或打印的具有单个值的字符串。
例如:
my_password => "password"
9、URI
URI可以是任何内容,从完整的URL(如http://elastic.co/)到简单的标识符(如foobar)。
如果URI包含诸如http://user:pass@example.net之类的密码,则URI的密码部分将不会被记录或打印。
例如:
my_uri => "http://foo:bar@example.net"
10、PATH
路径是代表有效操作系统路径的字符串。
例如:
my_path => "/tmp/logstash"
11、String
字符串必须是单个字符序列。请注意,字符串值用双引号或单引号引起来。
12、转义字符
默认情况下,不启用转义字符。如果您希望在带引号的字符串使用转义字符,需要在你的logstash.yml文件中设置"config.support_escapes: true"。
支持的有:\r(回车)、\n(换行)、\t(tab)、\\(反斜杠)、\"(双引号)、\'(单引号)。
例如:
name => "Hello world"
name => 'It\'s a beautiful day'
六、字段
所有事件都有属性。例如,一个apache访问日志将具有状态码(200或者404)、请求路径(“ /"或者"index.html")、HTTP动词(GET或者POST)、客户端IP地址等内容,Logstash将这些属性称为“字段”。
Logstash中的某些配置选项需要存在字段才能运行。但是输入生成事件,此时输入块中还不存在要用到的字段。由于这些配置依赖于事件和字段,因此这些配置选项仅在过滤器和输出块内起作用,像字段引用、sprintf格式和条件语句这些需要字段的在输入块中都是不起作用的。
能够按名称引用一个字段或字段集合通常很有用。调用字段的基本语法为 “[fieldname]”,如果引用的是顶级字段,则可以省略[],而只需使用即可fieldname。要引用嵌套字段,请指定该字段的完整路径:[top-level field][nested field]。
以下是一个json格式的文件,它有五个顶级字段(agent、ip、request、response、ua)和三个嵌套字段(status、bytes、os)。
{
"agent": "Mozilla/5.0 (compatible; MSIE 9.0)",
"ip": "192.168.24.44",
"request": "/index.html",
"response": {
"status": 200,
"bytes": 52353
},
"ua": {
"os": "Windows 7"
}
}
[root@web01 logstash]# vim /tmp/field.json
{"agent":"Mozilla/5.0 (compatible; MSIE 9.0)","ip":"192.168.24.44","request":"/index.html","response":{"status":200,"bytes":52353},"ua":{"os":"Windows 7"}}
我们对它进行字段引用测试。
[root@web01 logstash]# cat field.yml
input {
file {
path => "/tmp/field.json"
sincedb_clean_after => "1s"
start_position => "beginning"
codec => "json"
}
}
output {
file {
path => "/tmp/apache.%{[response][status]}"
}
}
[root@web01 logstash]# logstash -f field.yml
......
[INFO ] 2020-12-09 05:03:25.600 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
[INFO ] 2020-12-09 05:03:26.163 [[main]>worker1] file - Opening file {:path=>"/tmp/apache.200"}
[root@web01 logstash]# ll /tmp/apache.200
-rw-r--r-- 1 root root 251 Dec 9 05:03 /tmp/apache.200
上例涉及到的 file 插件配置选项解释:(建议看官方文档:file输入插件)
1、start_position
可选值:beginning,end。默认值:end
选择Logstash最初从哪个位置开始读取文件,在开头还是结尾。默认行为将文件视为实时流,因此从结尾开始。如果您有要导入的旧数据,请将其设置为Beginning。
此选项只作用于“第一次接触”的情况,即文件是新的以前没有收集,即在Logstash读取的sincedb文件中没有记录当前位置的文件。如果以前已经看过文件,则此选项无效,并且将使用sincedb文件中记录的位置。
也就是说如果不指定上例的sincedb_clean_after为1的配置的话,停止当前logstash实例以相同的配置重新启动实例,是不会有任何输入输出的,因为之前已经收集输出过了,已经记录在logstash的sincedb文件中了。
2、sincedb_clean_after
值类型是数字或string_duration类型,此设置的默认值为"2 weeks"。
如果指定了一个数字,那么它将被解释为天,并且可以是十进制,例如0.5是12小时。
现在,sincedb记录具有与其关联的最后一个活动时间戳记。如果在过去N天内未在跟踪的文件中检测到任何更改,则它的sincedb跟踪记录将过期,并且不会保留。
因为测试,我上面配置为1s,则表示这个跟踪记录会在1s过期,这就意味着sincedb文件没有记录它了,认为它是新文件,这样我们就可以重复测试上述示例了。
3、codec
值类型为编解码器。默认值为 "plain"
用于输入数据的编解码器。输入编解码器是在数据进入输入之前对其进行解码的一种便捷方法,而无需在Logstash管道中使用单独的过滤器。
在以上示例中,我们需要将输入数据转换成json格式以生成字段。
例如,我们还可以将@timestamp字段中的时间戳转换为字符串来进行引用。
1、输出文件名字引用时间戳
output {
file {
path => "/tmp/1_%{+yyyy.MM.dd.HH.mm.ss}"
}
}
2、es索引名字引用时间戳
output {
elasticsearch {
hosts => ["10.0.0.5:9200"]
index => "2_%{+yyyy-MM-dd}"
}
}
注:y:年、M:月、d:天、H:小时、m:分、s:秒。注意默认输出时间为UTC
七、条件语句
有时,我们只想在特定条件下过滤或输出事件,为此,我们可以使用条件。Logstash中的条件语句的使用与在编程语言中相似,条件语句支持if、else if、else,并且可以被嵌套。
条件语法为:
if EXPRESSION {
...
} else if EXPRESSION {
...
} else {
...
}
支持的比较运算符:
• 关系运算符: ==, !=, <, >, <=, >=
• 正则: =~, !~ (对照左侧的字符串值检查右侧的模式)
• 包含: in, not in
支持的布尔运算符为:and, or, nand, xor
支持的一元运算符为:!(取反)
表达式(EXPRESSION)可以包含其他表达式,可以使用取反!,也可以用括号将它们分组(...)。
例如下例,如果action字段的值为login,则下面条件将使用mutate过滤器删除secret字段:
filter {
if [action] == "login" {
mutate { remove_field => "secret" }
}
}
我们也可以在单个条件中指定多个表达式:
output {
# Send production errors to pagerduty
if [loglevel] == "ERROR" and [deployment] == "production" {
pagerduty {
...
}
}
}
您可以使用in运算符来测试字段是否包含特定的字符串、键或者列表元素。请注意,in的语义可能会根据目标类型而有所不同。例如,当应用于字符串时,in表示“是”。当应用于集合类型时,in表示“集合包含确切值”。
filter {
if [foo] in [foobar] {
mutate { add_tag => "field in field" }
}
if [foo] in "foo" {
mutate { add_tag => "field in string" }
}
if "hello" in [greeting] {
mutate { add_tag => "string in field" }
}
if [foo] in ["hello", "world", "foo"] {
mutate { add_tag => "field in list" }
}
if [missing] in [alsomissing] {
mutate { add_tag => "shouldnotexist" }
}
if !("foo" in ["hello", "world"]) {
mutate { add_tag => "shouldexist" }
}
}
我们可以用上述同样的方式使用not in。例如下例,可以使用not in仅在grok成功时将事件输出到Elasticsearch中:
output {
if "_grokparsefailure" not in [tags] {
elasticsearch { ... }
}
}
您还可以使用条件语句检查特定字段的存在,但是目前尚无办法区分不存在的字段和只是false的字段。该表达式在以下情况下 if [foo] 返回 false:[foo] 在事件中不存在、[foo] 存在于事件中但为false、[foo] 存在于事件中但为null。
下例中,我们使用type(所有输入插件均支持)给输入处理的所有事件添加一个字段,然后对type进行if条件判断,来实现收集多个日志到对应的多个索引中。
input {
file {
type => "messages_log"
path => "/var/log/messages"
}
file {
type => "secure_log"
path => "/var/log/secure"
}
}
output {
if [type] == "messages_log" {
elasticsearch {
hosts => ["10.0.0.5:9200"]
index => "messages_log_%{+YYYY-MM-dd}"
}
}
if [type] == "secure_log" {
elasticsearch {
hosts => ["10.0.0.5:9200"]
index => "secure_log_%{+YYYY-MM-dd}"
}
}
}
八、@metadata
在Logstash 1.5及更高版本中,有一个名为@metadata的特殊的字段。@metadata在输出时,其内容不属于任何事件,因此非常适合用于条件、字段引用、sprintf格式扩展和构建事件字段。
以下配置文件将收集标准输入的事件,输入的内容将成为事件的message字段,使用mutate过滤器块为事件添加一些字段,其中一些是嵌套在该事件的@metadata字段中的。
[root@web01 logstash]# vim metadata.yml
input { stdin { } }
filter {
mutate { add_field => { "show" => "This data will be in the output" } }
mutate { add_field => { "[@metadata][test]" => "Hello" } }
mutate { add_field => { "[@metadata][no_show]" => "This data will not be in the output" } }
}
output {
if [@metadata][test] == "Hello" {
stdout { codec => rubydebug }
}
}
让我们看下运行结果:
[root@web01 logstash]# logstash -f metadata.yml
metadata test
{
"@version" => "1",
"message" => "metadata test",
"host" => "web01",
"@timestamp" => 2020-12-10T02:40:33.945Z,
"show" => "This data will be in the output"
}
输入的“metadata test”成为message字段内容,条件语句成功判断了嵌套在该@metadata字段中的test字段的内容。但是输出未显示名为为@metadata字段及其内容。
rubydebug编解码器允许你显示@metadata字段的内容,但是你需要添加一个配置:metadata => true
stdout { codec => rubydebug { metadata => true } }
让我们来看下更改后的输出:
[root@web01 logstash]# logstash -f metadata.yml
metadata test
{
"@timestamp" => 2020-12-10T02:46:59.355Z,
"@metadata" => {
"no_show" => "This data will not be in the output",
"test" => "Hello"
},
"host" => "web01",
"@version" => "1",
"show" => "This data will be in the output",
"message" => "metadata test"
}
现在我们可以看到@metadata字段及其子字段了,注意只有rubydebug编解码器允许您显示@metadata字段的内容。当你需要临时字段但不希望其出现在最终输出中时,可以使用该字段。
此字段最常见的用例之一可能是使用 date 过滤器来具有临时时间戳。
该配置文件已经简化,使用的是Apache和Nginx Web服务器通用的时间戳格式。过去,您必须先删除timestamp字段,然后再使用它来覆盖该@timestamp 字段。现在因为在date过滤器中进行转换后而不必删除“时间戳”字段,所以对于@metadata字段,这不再是必需的:
[root@web01 logstash]# vim date.yml
input { stdin { } }
filter {
grok { match => [ "message", "%{HTTPDATE:[@metadata][timestamp]}" ] }
date { match => [ "[@metadata][timestamp]", "dd/MMM/yyyy:HH:mm:ss Z" ] }
}
output {
stdout { codec => rubydebug }
}
请注意,此配置在过滤器grok中将提取的日期放入了[@metadata][timestamp]字段中。让我们为该配置提供一个示例日期字符串,然后看看结果如何:注意默认转换为UTC,而不是CST。
[root@web01 logstash]# logstash -f date.yml
10/Dec/2020:11:28:14 +0800
{
"@version" => "1",
"@timestamp" => 2020-12-10T03:28:14.000Z,
"message" => "10/Dec/2020:11:28:14 +0800",
"host" => "web01"
}
10.0.0.8 - - [10/Dec/2020:11:28:14 +0800] "GET / HTTP/1.1" 500 2699 "-" "curl/7.29.0" "-"
{
"@version" => "1",
"@timestamp" => 2020-12-10T03:28:14.000Z,
"message" => "10.0.0.8 - - [10/Dec/2020:11:28:14 +0800] \"GET / HTTP/1.1\" 500 2699 \"-\" \"curl/7.29.0\" \"-\"",
"host" => "web01"
}
参考文章:
https://www.elastic.co/guide/en/logstash/current/configuration.html
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html