Logstash filter插件开发

简介

Logstash是一个具有实时管线能力的开源数据收集引擎。在ELK Stack中,通常选择更轻量级的Filebeat收集日志,然后将日志输出到Logstash进行加工处理,再将处理后的日志输出到指定的目标(ElasticSearch,Kafka等)当中。

Logstash事件的处理管线是inputs → filters → outputs,三个阶段都可以自定义插件,本文主要介绍如何开发自定义需求最多的filter插件。

Logstash的安装就不详细介绍了,下载传送门:https://www.elastic.co/downloads/logstash

前置准备

我们使用docker来讲解如何给logstash定制一个filter插件,首先,我们下载logstash官方最新版本的镜像。下载方式: docker pull logstash,下载好后我们就来Run这个镜像,命令:docker run -it logstash bash,这样我们就进入到logstash容器中了。接下来我们就安装下两个基本的软件包,一个vim,一个rsyslog,命令如下:apt-get update && apt-get -y install vim rsyslog && /etc/init.d/rsysylog start。到此我们的准备工作就完毕了。

生成filter插件

cd到Logstash的跟目录,使用bin/logstash-plugin生成filter插件模板,如下:

1
2
3
4
cd /usr/share/logstash
mkdir -p vendor/localgems
bin/logstash-plugin generate --type filter --name test --path vendor/localgems
#vendor/localgems 可修改为你自己的路径

查看filter插件的目录结构,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
root@c11787c4cef3:/usr/share/logstash/vendor/localgems# tree .
.
└── logstash-filter-test
├── CHANGELOG.md
├── CONTRIBUTORS
├── DEVELOPER.md
├── Gemfile
├── LICENSE
├── README.md
├── Rakefile
├── lib
│   └── logstash
│   └── filters
│   └── test.rb
├── logstash-filter-test.gemspec
├── spec
│   ├── filters
│   │   └── test_spec.rb
│   └── spec_helper.rb
└── test.conf

6 directories, 12 files

filter插件初探

Logstash插件是用ruby写的,查看logstash-filter-test/lib/logstash/filters/test.rb文件,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# logstash依赖于UTF-8编码,需要在插件代码开始处添加
# encoding: utf-8

#引入了插件必备的包
require "logstash/filters/base"
require "logstash/namespace"

# 插件继承自Base基类,并配置插件的使用名称
class LogStash::Filters::Test < LogStash::Filters::Base

# 插件名称,在Logstash配置的filter块中使用
#filter{
# test{
# source => "message"
# }
#}
config_name "test"

# 插件参数配置
# source是插件test的可选参数,默认值是"Hello World!"。
config :source, :validate => :string, :default => "Hello World!"
# 下面是参数的通用配置代码
# config :variable_name, :validate => :variable_type, :default => "Default value", :required => boolean, :deprecated => boolean, :obsolete => string
# :variable_name:参数名称
# :validate:验证参数类型,如:string, :password, :boolean, :number, :array, :hash, :path等
# :required:是否必须配置
# :default:默认值
# :deprecated:是否废弃
# :obsolete:声明该配置不再使用,通常提供升级方案

public
def register
# 方法相当于初始化方法,不需要手动调用,可以在这个方法里面调用配置变量,如@message,也可以初始化自己的实例变量。
end

public
def filter(event)
# 方法是插件的数据处理逻辑,其中event变量封装了数据流,可以通过接口访问event中的内容,具体参见 https://www.elastic.co/guide/en/logstash/5.1/event-api.html。
if (source = event.get(@source))
datas = source.split("|")
event.set("data", datas[0])
event.set("data2", datas[1])
event.set("user", "xp")
end

#这个方法用于保证Logstash的配置`add_field`, `remove_field`, `add_tag`和`remove_tag`会被正确执行。
filter_matched(event)
end # def filter
end # class LogStash::Filters::Test

查看logstash-filter-test/logstash-filter-test.gemspec文件,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Gem::Specification.new do |s|
s.name = 'logstash-filter-test'
s.version = '0.1.0'
s.licenses = ['Apache License (2.0)']
s.summary = '描述这个插件的概要'
s.description = '这个插件的详细说明'
s.homepage = 'http://icyxp.github.io'
s.authors = ['icyboy']
s.email = 'icyboy@me.com'
s.require_paths = ['lib']

# Files
s.files = Dir['lib/**/*','spec/**/*','vendor/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile','LICENSE','NOTICE.TXT']
# Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/})

# Special flag to let us know this is actually a logstash plugin
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "filter" }

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", "~> 2.0"
s.add_development_dependency 'logstash-devutils'
end

在Logstash中配置定制的插件

cd到Logstash根目录下cd /usr/shar/logstash,在Gemfile末尾添加以下配置:

1
gem "logstash-filter-test", :path => "vendor/localgems/logstash-filter-test"

启动Logstash

先编写一个配置文件test.conf,这里我使用了rsyslog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#来源rsyslog
input{
file{
path => "/var/log/messages"
}
}
#或者可以设置为终端输入
#input{
# stdin{
#
# }
#}
filter{
# 定制的filter插件
test{
source => "message"
}
}
#输出到终端
output{
stdout{
codec => rubydebug
}
}

运行起来logstash -f test.conf,然后往rsyslog里写日志你就可以看下如下情况就说明ok了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#logger "测试一下"
{
"path" => "/var/log/messages",
"@timestamp" => 2017-08-16T15:27:46.606Z,
"data" => "Aug 16 15:27:45 c11787c4cef3 root: 测试一下",
"data2" => nil,
"@version" => "1",
"host" => "c11787c4cef3",
"message" => "Aug 16 15:27:45 c11787c4cef3 root: 测试一下",
"user" => "xp"
}

#logger "测试一下|from by icyboy"
{
"path" => "/var/log/messages",
"@timestamp" => 2017-08-16T15:28:17.661Z,
"data" => "Aug 16 15:28:16 c11787c4cef3 root: 测试一下",
"data2" => "from by icyboy",
"@version" => "1",
"host" => "c11787c4cef3",
"message" => "Aug 16 15:28:16 c11787c4cef3 root: 测试一下|from by icyboy",
"user" => "xp"
}

-------------本文结束感谢您的阅读-------------