在这里寻求帮助的呼声很高,当我尝试使用Logstash将MySQL值转换为嵌套的Elasticsearch字段时,出现以下错误。
{"exception"=>"expecting List or Map, found class org.logstash.bivalues.StringBiValue", "backtrace"=>["org.logstash.Accessors.newCollectionException(Accessors.java:195)"
使用以下配置文件:
input { jdbc { jdbc_driver_library => "/logstash/mysql-connector-java-5.1.42-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/data" jdbc_user => "username" jdbc_password => "password" statement => "SELECT id, suggestions, address_count FROM `suggestions` WHERE id <= 100" jdbc_paging_enabled => "true" jdbc_page_size => "50000" } } filter { mutate { rename => { 'address_count' => '[suggestions][payload][count]' } } } output { elasticsearch { hosts => [ "localhost:9200" ] index => "dev_suggestions" document_type => "address" } }
但是,如果我将address_count重命名为我的映射中尚未存在的字段,那么它可以正常工作,并且可以正确地将值添加为嵌套属性 ,我已经尝试了索引中的其他字段,而不仅仅是建议.payloads.address_count和我遇到了同样的问题, 它仅在未在映射中定义字段时才有效。
这使我有些头疼,如果有人可以帮助我解决这个问题,我将非常感激,因为我已经花了最后48个小时将自己的头撞在桌子上!
我最初以为我可以对MySQL查询执行以下操作:
SELECT id, suggestion, '[suggestions][payload][count]' FROM `suggestions` WHERE id <= 100
然后我也尝试了
SELECT id, suggestion, 'suggestions.payload.count' FROM `suggestions` WHERE id <= 100
两者都无法使用后面的选项插入值,从而导致一个错误,即字段不能包含点。
最后是映射:
{ "mappings": { "address": { "properties": { "suggestions": { "type": "completion", "payloads" : true } } } } }
感谢Val- 以及与我本人处于相同情况的未来用户,他们需要使用Logstash将MySQL数据转换为嵌套的Elasticsearch对象, 这是使用Logstash 5和Elasticsearch 2的可行解决方案。
input { jdbc { jdbc_driver_library => "/logstash/mysql-connector-java-5.1.42-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/data" jdbc_user => "username" jdbc_password => "password" statement => "SELECT addrid, suggestion, address_count FROM `suggestions` WHERE id <= 20" jdbc_paging_enabled => "true" jdbc_page_size => "50000" } } filter { ruby { code => " event.set('[suggestions][input]', event.get('suggestion')) event.set('[suggestions][payload][address_count]', event.get('address_count')) event.set('[v][payload][id]', event.get('addrid')) " remove_field => [ 'suggestion', 'address_count', 'addrid' ] } } output { elasticsearch { hosts => [ "localhost:9200" ] index => "dev_suggestions" document_type => "address" } }
我认为您需要以不同的方式进行。首先,我将suggestionsSQL查询中的字段重命名为其他名称,然后根据suggestions从SQL查询中获得的值来构建对象。
suggestions
statement => "SELECT id, suggestion, address_count FROM `suggestions` WHERE id <= 100"
然后,您可以使用ruby过滤器(并删除一个过滤器mutate)来构建您的suggestions字段,如下所示:
ruby
mutate
Logstash 2.x代码:
ruby { code => " event['suggestions']['input'] = event['suggestion'] event['suggestions']['payload']['count'] = event['address_count'] " remove_field => [ 'suggestion', 'address_count' ] }
Logstash 5.x代码:
ruby { code => " event.set('[suggestions][input]', event.get('suggestion')) event.set('[suggestions][payload][count]', event.get('address_count')) " remove_field => [ 'suggestion', 'address_count' ] }
PS:所有这些都假定您正在使用ES 2.x,因为该payload字段在ES 5.x中已消失
payload