ELK之logstash

filebeat是beat之一,logstash 和filebeat都具有日志收集功能,filebeat更轻量,占用资源更少,但logstash 具有filter功能,能过滤分析日志。一般结构都是filebeat采集日志。然后发送给logstash,利用filter功能过滤分析,然后存储到elasticsearch中

简介

Logstash是一个开源的数据收集发动机实时管道功能。主要包括input(采集数据)、filter(过滤数据)、output(输出数据)三个部分。他们都有自己插件可以进行配置如input可以读取文件,标准输入等。output可以标准输出、发送到es、redis等。

filter常用插件有grok(即解析,包括:match等 )、mutate(即编译包括:拼接赋值add_field、类型转换convert、字符串替换gsub等)、kv(解析器,field_split ,value_split 等)

安装与配置

/logstash/config/logstash.yml:主要用于控制logstash运行时的状态

/logstash/config/startup.options:logstash 运行相关参数

JAVACMD=/usr/bin/java   本地jdk
LS_HOME=/opt/logstash   logstash所在目录
LS_SETTINGS_DIR="${LS_HOME}/config"           默认logstash配置文件目录
LS_OPTS="–path.settings ${LS_SETTINGS_DIR}"  logstash启动命令参数 指定配置文件目录
LS_JAVA_OPTS=""  指定jdk目录
LS_PIDFILE=/var/run/logstash.pid logstash.pid所在目录
LS_USER=logstash    logstash启动用户
LS_GROUP=logstash logstash启动组
LS_GC_LOG_FILE=/var/log/logstash/gc.log logstash jvm gc日志路径
LS_OPEN_FILES=65534 logstash最多打开监控文件数量

bin/logstash -e ‘input { stdin { } } output { stdout {}}’

配置文件
# 屏幕输入
input{
  stdin {}
}
#参数处理
filter {
    grok {	
        match => {
           "message" => "%{IP:log_ip} \- %{DATA} \[%{HTTPDATE:time}\] \"%{DATA:mothod} %{DATA:initial_url} %{DATA:http_v}\" %{DATA:status} %{DATA:body_bytes} \"%{DATA:referer}\" \"%{DATA:user_agent}\" \"%{DATA:forward}\" \"%{DATA:cookie}\""
        }
    }
	grok {	
        match => {
          "cookie" => "(?<uid>(?<=uid%22%3A%22)([0-9]+))"
        }
    }
	mutate {
		gsub => ["initial_url", "amp;", ""]
	}
	grok {	
        match => {
          "initial_url" => "(?<url_para>(?<=\?)(.*))"
        }
    }
	mutate {	
		convert => ["uid", "integer"]
	}
	
	# 个人信息时间处理
	if [uid] {
		#  查询个人信息 useSSL=true:兼容版本|tinyInt1isBit=false:处理tinyint(1)默认转为true;false
		jdbc_streaming {
			jdbc_driver_library => "/usr/local/...../mysql-connector-java-8.0.12.jar"
			jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
			jdbc_connection_string => "jdbc:mysql://IP:port/db_name?useSSL=true&tinyInt1isBit=false"
			jdbc_user => "zcdb_php"
			jdbc_password => "K6E267CCe7799d1a"   
			statement => "SELECT  sex,lasttime,email,name,login_phone,vip,userlevel,userid FROM tb_user WHERE id=:id"
			parameters => {id=>"[uid]"}
			target => "userlist"
			add_field => { "userinfo_json" => "%{[userlist][0]}" } 
		}

		# 格式化个人信息start
		if [userinfo_json]{
			json {	
				source => "userinfo_json"
				target => "userinfo"
			}
		}
		if [userinfo][lasttime] {
			date{
				match=>["[userinfo][lasttime]","UNIX"]
				target=>"[userinfo][lasttime]"
			}	
			ruby {
				code => "event.set('[userinfo][lasttime]', event.get('[userinfo][lasttime]').time.localtime + 8*60*60)"
			}
			grok {	
				match => {
				   "[userinfo][lasttime]" => "%{DATA:last_a}T%{DATA:last_b}\."          
				} 
			}
		}
		# 格式化个人信息end
	}
	
	# 日志时间处理
	date{
		match=>["time","dd/MMM/yyyy:HH:mm:ss Z"]
		target=>"log_time"
	}
	ruby {
				code => "event.set('[log_time]', event.get('[log_time]').time.localtime + 8*60*60)"
	}
	grok {	
        match => {
           "log_time" => "%{DATA:log_a}T%{DATA:log_b}\."    
        }  
    }
	
	# 移除字段:防止追加写入
	# 拼接url, kv获取参数
	if [initial_url] {
		mutate {		
			add_field => { 
				"user_url" => "https://www.iphouse.cn%{initial_url}"  
			}
		}
	} else {
		mutate {		
			add_field => { 
				"user_url" => "https://www.iphouse.cn"  
			}
		}
	}
	
	mutate {
		remove_field => ["log","ecs","ip","agent","host","agent","input","@version", "fields","os",	"[userinfo][lasttime]","[userinfo][regtime]","[userinfo][stsrttime]","[userinfo][endtime]","log_time"]
    }
	
	# 判断解析
	if [url_para]	{
		kv {
			source => "user_url"
			field_split => "&?"
			value_split => "="
			target  => "kv"
		}
	}
	if [kv][anyou  ]{
		mutate {		
			add_field => { 
				"lawyerid" => "%{[kv][anyou]}" 
			}
		}
	}
	# 数据切割1,2,3 => [1,2,3]
	if [ip_type] {
        mutate {
                split => { "ip_type" => "," }
        }
    }
    #时间格式处理:%Y-%m-%d %H:%i%s ==> %Y-%m-%d
    if [publish_time] {
        ruby {
        	code => "event.set('publish_date', event.get('publish_time').time.localtime.strftime('%Y-%m-%d'))"
        }	
	}
	 #时间格式处理:2023-04-13T08:09:42.000Z  ==>  023-04-13 08:09:42
    if [publish_time] {
        ruby {
        	code => "event.set('publish_time', event.get('publish_time').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
        }
    }
    
	# 判断参数
	if [url_para] {	
		if  [kv][lawyerid] or  [kv][judgeid] or  [kv][firmid] or  [kv][fayuanid] or  [kv][litigantid] or  [kv][lawyerid] {
			http {
			url => "http://api.iphouse.local/log/nginx/get_to_name?%{[url_para]}"
			verb => "GET"
			request_timeout => 10
			target_body => "get_name_str"
			}
		}
	}
	
	# url 处理
	useragent {
		source => "user_agent"
		target => "user_agent"	
	}
	
	# 过滤:删除无效日志--------------------------------------------------------------------------------------
	# if ![uid] and ![anyouid] and ![fayuanid] and ![judgeid] and ![lawyerid] and ![firmid] and ![litigantid] {
	#	      drop {}       
	# } 

	mutate {	
        remove_field => [
			"tags","p","t","orderby","message","messages","server_name","time","userlist",
			"[userinfo][coupon]","[userinfo][language]","[userinfo][phone]","[userinfo][uc_key]","[userinfo][password]","[userinfo][passwd]",
			"coas"
		]		

		add_field => { 
			"os" =>  "%{[user_agent][os_name]}"
		}
		
    }
	
	# 时间处理
	if [uid] {
		if [last_a] and [last_b] {
			mutate {	
				add_field => { 
					"[userinfo][lasttime]" =>  "%{last_a} %{last_b}"
					}
				}
		}
	}	
	if [log_a] and [log_b] {
		mutate {	
			add_field => { 
				"log_time" =>  "%{log_a} %{log_b}"
			}
		}
	}

	# 字段转义成中文
	urldecode{
		all_fields => true
	}
	
	# 最后移除无用字段
	mutate {	
	  remove_field => ["last_a","last_b","reg_a","reg_b","start_a","start_b","end_a","end_b","log_a","log_b","[userinfo][ipstart]","[userinfo][IPend]","cookie" ,"@timestamp" ,"rawdata" ,"parties","courts","judges","lawyers" ,"user_agent"  ,"headers" ,"access_token"  ,"keyword" ,"user_info","userinfo_json","panjueshijian"  ,"get_name_str","get_name_json"  ,"kv" ]
	}
	
}
#屏幕输出
output{
 stdout {  }
}

启动

  • bin/logstash -e ‘input { stdin { } } output { stdout {}}’
    • -e :使用给定字符串作为配置数据。语法相同配置文件。
  • bin/logstash -f ‘/tmp/{one,two,three}’
  • bin/logstash -f config/nginx_log.conf
    • -f :加载Logstash配置从一个特定的文件或目录
输出结果
### 成功提示
...
The stdin plugin is now waiting for input:
[2021-03-30T07:34:39,189][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2021-03-30T07:34:40,560][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9601}

### 日志内容
125.82.162.15 - - [04/Jan/2021:03:00:01 +0800] "GET /cases/list.html?lawyerid=337&amp;p=2&rawdata=%282018%29%E7%B2%A4%E6%B0%91%E7%94%B313390%E5%8F%B7   HTTP/1.1" 200 461 "https://www.iphouse.cn/cases/detail/4von0m87qg2rle74321jx1p395dwzeky.html?keyword=%282019%29%E5%B7%9D01%E6%B0%91%E5%88%9D3080%E5%8F%B7" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" "-" "2uid%22%3A%228%22%7D"    

### 返回结果
{
         "log_ip" => "125.82.162.15",
         "mothod" => "GET",
         "http_v" => "  HTTP/1.1",
       "lawyerid" => "337",
        "referer" => "https://www.iphouse.cn/cases/detail/4von0m87qg2rle74321jx1p395dwzeky.html?keyword=(2019)川01民初3080号",
            "uid" => 8,
     "body_bytes" => "461",
       "userinfo" => {
          "userlevel" => 2,
             "userid" => 0,
        "login_phone" => 0,
               "name" => nil,
                "vip" => 2,
           "lasttime" => "2018-11-08 14:12:25",
                "sex" => 2,
              "email" => nil
    },
    "initial_url" => "/cases/list.html?lawyerid=337&p=2&rawdata=(2018)粤民申13390号",
         "status" => "200",
             "os" => "Windows",
        "forward" => "-",
       "log_time" => "2021-01-04 03:00:01",
       "url_para" => "lawyerid=337&p=2&rawdata=(2018)粤民申13390号",
       "user_url" => "https://www.iphouse.cn/cases/list.html?lawyerid=337&p=2&rawdata=(2018)粤民申13390号"
}
其他配置
//添加参数
input{
  stdin {
     add_field => {"key" => "value"}
     codec =>"plain"
     tags => ["add"]
     type =>"std"       
  }
}

//指定beats
input {
  beats {
    port => 5044
  }
}

//输出es
output {
    elasticsearch {
        hosts => ["127.0.0.1:9333"]
		user => "elastic"
        password => "password"
  		index => "logstash-%{+YYYY.MM}"

		#禁止创建模板
		# manage_template=>false 
		#指定模板
		# template=>"/usr/local/logstash-7.11.1/template/rpt_nginx_log.json"		
		# template_name => "rpt_nginx_log"
		#模板覆盖
		# template_overwrite  => true
    }
}
  • 插件安装

    • bin/logstash-plugin list 查看插件列表

    • bin/logstash-plugin install logstash-filter-alter 安装 alter

  • 由于每次logstash 启动慢|易报错 可以配置热加载|测试配置文件等。 例如:bin/logstash -f xxx.conf –config.text_and_exit

    • –config.test_and_exit : 测试配置文件并退出(仅测试不启动)
    • –config.reload.automatic : 热加载
      • (修改文件不需要重启,可在config/logstash.yml中配置!input是stdin时无效需手动重启)
      • logstash.yml对格式要求严格,开启后需顶格!)
  • 其他事项

    • 自定义与变量有冲突
Licensed under 京ICP备17003353号-3