简介
Logstash是一个开源的数据收集发动机实时管道功能。主要包括input(采集数据)、filter(过滤数据)、output(输出数据)三个部分。他们都有自己插件可以进行配置如input可以读取文件,标准输入等。output可以标准输出、发送到es、redis等。
filter常用插件有grok(即解析,包括:match等 )、mutate(即编译包括:拼接赋值add_field、类型转换convert、字符串替换gsub等)、kv(解析器,field_split ,value_split 等)
安装与配置
/logstash/config/logstash.yml:主要用于控制logstash运行时的状态
/logstash/config/startup.options:logstash 运行相关参数
JAVACMD=/usr/bin/java 本地jdk LS_HOME=/opt/logstash logstash所在目录 LS_SETTINGS_DIR="${LS_HOME}/config" 默认logstash配置文件目录 LS_OPTS="–path.settings ${LS_SETTINGS_DIR}" logstash启动命令参数 指定配置文件目录 LS_JAVA_OPTS="" 指定jdk目录 LS_PIDFILE=/var/run/logstash.pid logstash.pid所在目录 LS_USER=logstash logstash启动用户 LS_GROUP=logstash logstash启动组 LS_GC_LOG_FILE=/var/log/logstash/gc.log logstash jvm gc日志路径 LS_OPEN_FILES=65534 logstash最多打开监控文件数量
bin/logstash -e ‘input { stdin { } } output { stdout {}}’
配置文件
# 屏幕输入
input{
stdin {}
}
#参数处理
filter {
grok {
match => {
"message" => "%{IP:log_ip} \- %{DATA} \[%{HTTPDATE:time}\] \"%{DATA:mothod} %{DATA:initial_url} %{DATA:http_v}\" %{DATA:status} %{DATA:body_bytes} \"%{DATA:referer}\" \"%{DATA:user_agent}\" \"%{DATA:forward}\" \"%{DATA:cookie}\""
}
}
grok {
match => {
"cookie" => "(?<uid>(?<=uid%22%3A%22)([0-9]+))"
}
}
mutate {
gsub => ["initial_url", "amp;", ""]
}
grok {
match => {
"initial_url" => "(?<url_para>(?<=\?)(.*))"
}
}
mutate {
convert => ["uid", "integer"]
}
# 个人信息时间处理
if [uid] {
# 查询个人信息 useSSL=true:兼容版本|tinyInt1isBit=false:处理tinyint(1)默认转为true;false
jdbc_streaming {
jdbc_driver_library => "/usr/local/...../mysql-connector-java-8.0.12.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://IP:port/db_name?useSSL=true&tinyInt1isBit=false"
jdbc_user => "zcdb_php"
jdbc_password => "K6E267CCe7799d1a"
statement => "SELECT sex,lasttime,email,name,login_phone,vip,userlevel,userid FROM tb_user WHERE id=:id"
parameters => {id=>"[uid]"}
target => "userlist"
add_field => { "userinfo_json" => "%{[userlist][0]}" }
}
# 格式化个人信息start
if [userinfo_json]{
json {
source => "userinfo_json"
target => "userinfo"
}
}
if [userinfo][lasttime] {
date{
match=>["[userinfo][lasttime]","UNIX"]
target=>"[userinfo][lasttime]"
}
ruby {
code => "event.set('[userinfo][lasttime]', event.get('[userinfo][lasttime]').time.localtime + 8*60*60)"
}
grok {
match => {
"[userinfo][lasttime]" => "%{DATA:last_a}T%{DATA:last_b}\."
}
}
}
# 格式化个人信息end
}
# 日志时间处理
date{
match=>["time","dd/MMM/yyyy:HH:mm:ss Z"]
target=>"log_time"
}
ruby {
code => "event.set('[log_time]', event.get('[log_time]').time.localtime + 8*60*60)"
}
grok {
match => {
"log_time" => "%{DATA:log_a}T%{DATA:log_b}\."
}
}
# 移除字段:防止追加写入
# 拼接url, kv获取参数
if [initial_url] {
mutate {
add_field => {
"user_url" => "https://www.iphouse.cn%{initial_url}"
}
}
} else {
mutate {
add_field => {
"user_url" => "https://www.iphouse.cn"
}
}
}
mutate {
remove_field => ["log","ecs","ip","agent","host","agent","input","@version", "fields","os", "[userinfo][lasttime]","[userinfo][regtime]","[userinfo][stsrttime]","[userinfo][endtime]","log_time"]
}
# 判断解析
if [url_para] {
kv {
source => "user_url"
field_split => "&?"
value_split => "="
target => "kv"
}
}
if [kv][anyou ]{
mutate {
add_field => {
"lawyerid" => "%{[kv][anyou]}"
}
}
}
# 数据切割1,2,3 => [1,2,3]
if [ip_type] {
mutate {
split => { "ip_type" => "," }
}
}
#时间格式处理:%Y-%m-%d %H:%i%s ==> %Y-%m-%d
if [publish_time] {
ruby {
code => "event.set('publish_date', event.get('publish_time').time.localtime.strftime('%Y-%m-%d'))"
}
}
#时间格式处理:2023-04-13T08:09:42.000Z ==> 023-04-13 08:09:42
if [publish_time] {
ruby {
code => "event.set('publish_time', event.get('publish_time').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
}
}
# 判断参数
if [url_para] {
if [kv][lawyerid] or [kv][judgeid] or [kv][firmid] or [kv][fayuanid] or [kv][litigantid] or [kv][lawyerid] {
http {
url => "http://api.iphouse.local/log/nginx/get_to_name?%{[url_para]}"
verb => "GET"
request_timeout => 10
target_body => "get_name_str"
}
}
}
# url 处理
useragent {
source => "user_agent"
target => "user_agent"
}
# 过滤:删除无效日志--------------------------------------------------------------------------------------
# if ![uid] and ![anyouid] and ![fayuanid] and ![judgeid] and ![lawyerid] and ![firmid] and ![litigantid] {
# drop {}
# }
mutate {
remove_field => [
"tags","p","t","orderby","message","messages","server_name","time","userlist",
"[userinfo][coupon]","[userinfo][language]","[userinfo][phone]","[userinfo][uc_key]","[userinfo][password]","[userinfo][passwd]",
"coas"
]
add_field => {
"os" => "%{[user_agent][os_name]}"
}
}
# 时间处理
if [uid] {
if [last_a] and [last_b] {
mutate {
add_field => {
"[userinfo][lasttime]" => "%{last_a} %{last_b}"
}
}
}
}
if [log_a] and [log_b] {
mutate {
add_field => {
"log_time" => "%{log_a} %{log_b}"
}
}
}
# 字段转义成中文
urldecode{
all_fields => true
}
# 最后移除无用字段
mutate {
remove_field => ["last_a","last_b","reg_a","reg_b","start_a","start_b","end_a","end_b","log_a","log_b","[userinfo][ipstart]","[userinfo][IPend]","cookie" ,"@timestamp" ,"rawdata" ,"parties","courts","judges","lawyers" ,"user_agent" ,"headers" ,"access_token" ,"keyword" ,"user_info","userinfo_json","panjueshijian" ,"get_name_str","get_name_json" ,"kv" ]
}
}
#屏幕输出
output{
stdout { }
}
启动
- bin/logstash -e ‘input { stdin { } } output { stdout {}}’
- -e :使用给定字符串作为配置数据。语法相同配置文件。
- bin/logstash -f ‘/tmp/{one,two,three}’
- bin/logstash -f config/nginx_log.conf
- -f :加载Logstash配置从一个特定的文件或目录
输出结果
### 成功提示
...
The stdin plugin is now waiting for input:
[2021-03-30T07:34:39,189][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2021-03-30T07:34:40,560][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9601}
### 日志内容
125.82.162.15 - - [04/Jan/2021:03:00:01 +0800] "GET /cases/list.html?lawyerid=337&p=2&rawdata=%282018%29%E7%B2%A4%E6%B0%91%E7%94%B313390%E5%8F%B7 HTTP/1.1" 200 461 "https://www.iphouse.cn/cases/detail/4von0m87qg2rle74321jx1p395dwzeky.html?keyword=%282019%29%E5%B7%9D01%E6%B0%91%E5%88%9D3080%E5%8F%B7" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" "-" "2uid%22%3A%228%22%7D"
### 返回结果
{
"log_ip" => "125.82.162.15",
"mothod" => "GET",
"http_v" => " HTTP/1.1",
"lawyerid" => "337",
"referer" => "https://www.iphouse.cn/cases/detail/4von0m87qg2rle74321jx1p395dwzeky.html?keyword=(2019)川01民初3080号",
"uid" => 8,
"body_bytes" => "461",
"userinfo" => {
"userlevel" => 2,
"userid" => 0,
"login_phone" => 0,
"name" => nil,
"vip" => 2,
"lasttime" => "2018-11-08 14:12:25",
"sex" => 2,
"email" => nil
},
"initial_url" => "/cases/list.html?lawyerid=337&p=2&rawdata=(2018)粤民申13390号",
"status" => "200",
"os" => "Windows",
"forward" => "-",
"log_time" => "2021-01-04 03:00:01",
"url_para" => "lawyerid=337&p=2&rawdata=(2018)粤民申13390号",
"user_url" => "https://www.iphouse.cn/cases/list.html?lawyerid=337&p=2&rawdata=(2018)粤民申13390号"
}
其他配置
//添加参数
input{
stdin {
add_field => {"key" => "value"}
codec =>"plain"
tags => ["add"]
type =>"std"
}
}
//指定beats
input {
beats {
port => 5044
}
}
//输出es
output {
elasticsearch {
hosts => ["127.0.0.1:9333"]
user => "elastic"
password => "password"
index => "logstash-%{+YYYY.MM}"
#禁止创建模板
# manage_template=>false
#指定模板
# template=>"/usr/local/logstash-7.11.1/template/rpt_nginx_log.json"
# template_name => "rpt_nginx_log"
#模板覆盖
# template_overwrite => true
}
}
-
插件安装
-
bin/logstash-plugin list 查看插件列表
-
bin/logstash-plugin install logstash-filter-alter 安装 alter
-
-
由于每次logstash 启动慢|易报错 可以配置热加载|测试配置文件等。 例如:bin/logstash -f xxx.conf –config.text_and_exit
- –config.test_and_exit : 测试配置文件并退出(仅测试不启动)
- –config.reload.automatic : 热加载
- (修改文件不需要重启,可在config/logstash.yml中配置!input是stdin时无效需手动重启)
- (logstash.yml对格式要求严格,开启后需顶格!)
-
其他事项
- 自定义与变量有冲突