winlogbeat监听windows日志到kafka、hdfs,不同层级取数据

xiaoxiao2021-02-28  88

只截取关键部分:

# https://go.es.io/WinlogbeatConfig winlogbeat.event_logs:   - name: Application     ignore_older: 72h   - name: Security   - name: System

接入kafka

#--------------------------kafka----------------------------------- output.kafka:   # initial brokers for reading cluster metadata   hosts: ["192.1.1.216:9092"]   topic: 'chao-beat0710'

kafka查询数据命令

bin/kafka-console-consumer.sh --zookeeper 192.1.1.216:2181 --topic chao-beat0710 --from-beginning

接入到hdfs

input{         kafka{                 zk_connect=>"192.1.1.216:2181"                 topic_id=>"chao-beat0710"         } } filter{         mutate{

#替换windows中获取message数据中的\n以及\t为空                 gsub => ["message","\n",""]                 gsub => ["message","\t",""]                 #获取key值的测试,可以通过map的值,比如..beat是map,则beat下面的字段是key,可以通过beat[name]去到字段值                 update => {"message" => "%{activity_id}--%{beat}--%{beat[hostname]}"}         } } output{         webhdfs{                 host => "192.1.1.151"                 port => 50070                 path => "/chao/hdfs/test/kafka3/data.txt"                 user => "lee"         }         stdout{                 codec => rubydebug         } }

数据格式例子

取出数据在hdfs展示

附带winlogbeat接入es,以及logstash方式

#-------------------------- Elasticsearch output ------------------------------ #output.elasticsearch:   # Array of hosts to connect to.   #hosts: ["localhost:9200"]   # Optional protocol and basic auth credentials.   #protocol: "https"   #username: "elastic"   #password: "changeme" #----------------------------- Logstash output -------------------------------- #output.logstash:   # The Logstash hosts   #hosts: ["localhost:5044"]

最后附带官方文档API(

packetbeat(用于监控网络流量)、filebeat(用于监听日志数据,可以替代logstash-input-file)、topbeat(用于搜集进程的信息、负载、内存、磁盘等数据)、winlogbeat(用于搜集windows事件日志) )

https://www.elastic.co/guide/en/beats/winlogbeat/current/winlogbeat-configuration-details.html

转载请注明原文地址: https://www.6miu.com/read-34290.html

最新回复(0)