input {
stdin{
}
jdbc {
#索引的类型
type => "tag"
# 数据库
jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
# 用户名密码
jdbc_user => "root"
jdbc_password => "testpwd"
# jar包的位置
jdbc_driver_library => "/usr/share/logstash/data/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
# mysql的Driver
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "config-mysql/sql/tag.sql"
#statement => "SELECT * from tbl WHERE utime > :sql_last_value"
use_column_value => true
tracking_column => "utime"
record_last_run => true
schedule => "* * * * *"
jdbc_default_timezone => "Asia/Shanghai"
}
jdbc {
#索引的类型
type => "tag2"
# 数据库
jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
# 用户名密码
jdbc_user => "root"
jdbc_password => "testpwd"
# jar包的位置
jdbc_driver_library => "/usr/share/logstash/data/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
# mysql的Driver
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "config-mysql/sql/index2_tag2.sql"
#statement => "SELECT * from tbl WHERE utime > :sql_last_value"
use_column_value => true
tracking_column => "utime"
record_last_run => true
schedule => "* * * * *"
jdbc_default_timezone => "Asia/Shanghai"
}
}
filter {
mutate {
remove_field => ["@timestamp","@version","create_time","update_time"]
}
}
output {
if[type]=="tag"{
elasticsearch {
hosts => "localhost:9200"
# index名
index => "recommend"
# 需要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
user => elastic
password => "elastic@test.com"
"document_type" => "%{type}"
doc_as_upsert => true
action => "update"
}
}
if[type]=="tag2"{
elasticsearch {
hosts => "localhost:9200"
# index名
index => "index2"
# 需要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
user => elastic
password => "elastic@test.com"
"document_type" => "%{type}"
doc_as_upsert => true
action => "update"
}
}
stdout {
codec => json_lines
}
}