python - Scripting in logstash -


is possible python scripting in logstash? can import csv data elasticsearch using logstash. need use update api instead of indexing rows.

here sample csv file...

vi /tmp/head.txt "home","home-66497273a5a83c99","spice xlife 350, 3.5inch android, bit.ly/1vszj","919359000000","hmshop","916265100000","2016-05-18 08:41:49" "home","home-26497273a5a83c99","spice xlife 350, 3.5inch android, bit.ly/1v1","919359000001","hmshop","916265100000","2016-05-18 18:41:49" "home","home-36497273a5a83c99","spice xlife 350, 3.5inch android, bit.ly/szj1","919359000001","hmshop","916265100000","2016-05-18 12:41:49" "home","home-46497273a5a83c99","spice xlife 350, 3.5inch android, bit.ly/1","919359000000","hmshop","916265100000","2016-05-18 14:41:49" "home","home-56497273a5a83c99","spice xlife 350, 3.5inch android, bit.ly/1vszj1xc","919359000000","hmshop","916265100000","2016-05-18 16:41:49" 

here logstash config file...

vi logstash.conf input {     file {         path => "/tmp/head.txt"         type => "csv"         start_position => beginning     } } filter {     csv {         columns => ["user", "messageid", "message", "destination", "code", "mobile", "mytimestamp"]         separator => ","     } }  output {     elasticsearch {         action => "index"         hosts => ["172.17.0.1"]         index => "logstash-%{+yyyy.mm.dd}"         workers => 1     } } 

i have confirmed above configuration working expected , 5 records stored 5 separate documents.

here docker command...

docker run -d -v "/tmp/logstash.conf":/usr/local/logstash/config/logstash.conf -v /tmp/:/tmp/ logstash -f /usr/local/logstash/config/logstash.conf 

the problem need merge documents based on destination number. destination should id of document. there rows same destination. e.g. _id: 919359000001 document should have both following records nested objects.

"user": "home", "messageid": "home-26497273a5a83c99", "message": "spice xlife 350, 3.5inch android, bit.ly/1v1", "code": "hmshop", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49" "user": "home", "messageid" "home-36497273a5a83c99", "message": "spice xlife 350, 3.5inch android, bit.ly/szj1", "code": "hmshop", "mobile": "916265100000", "mytimestamp": "2016-05-18 12:41:49" 

elasticsearch correctly converting csv data json shown above. need reformat statement take advantage of scripting using update api following code working correctly.

post /test_index/doc/_bulk { "update" : { "_id" : "919359000001"} } { "script" : { "inline": "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]", "lang" : "groovy", "params" : {"user": "home", "messageid": "home-26497273a5a83c99", "message": "spice xlife 350, 3.5inch android, bit.ly/1v1", "code": "hmshop", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49"}}, "upsert": {"parent" : [{"user": "home", "messageid": "home-26497273a5a83c99", "message": "spice xlife 350, 3.5inch android, bit.ly/1v1", "code": "hmshop", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49"}] }} { "update" : { "_id" : "919359000001"} } { "script" : { "inline": "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]", "lang" : "groovy", "params" : {"user": "home", "messageid": "home-36497273a5a83c99", "message": "spice xlife 350, 3.5inch android, bit.ly/1v13343", "code": "hmshop", "mobile": "916265100000", "mytimestamp" : "2016-05-18 12:41:49"}}, "upsert": {"parent" : [{"user": "home", "messageid": "home-36497273a5a83c99", "message": "spice xlife 350, 3.5inch android, bit.ly/1v13343", "code": "hmshop", "mobile": "916265100000", "mytimestamp" : "2016-05-18 12:41:49"}] }} 

how code in logstash convert csv data above?


update

i have python code works expected. know how modify code suit "output" parameters suggested per answer. in following example, df_json python object nothing python dataframe flattened json.

import copy open('myfile.txt', 'w') f:     doc1 in df_json:         import json         doc = mydict(doc1)         docnew = copy.deepcopy(doc)         del docnew['destination']         action = '{ "update": {"_id": %s }}\n' % doc['destination']          f.write(action)         entry = '{ "script" : { "inline": "ctx._source.parent += [\'user\': user, \'messageid\': messageid, \'message\': message, \'code\': code, \'mobile\': mobile, \'mytimestamp\': mytimestamp]", "lang" : "groovy", "params" : %s}, "upsert": {"parent" : [%s ] }}\n' %   (doc, docnew)         f.write(entry)  ! curl -s -xpost xxx.xx.xx.x:9200/test_index222/doc/_bulk --data-binary @myfile.txt; echo 

update 2

i tried following configuration , replacing (not updating per script) documents.

output {     elasticsearch {         action => "index"         hosts => ["172.17.0.1"]         document_id => "%{destination}"         index => "logstash3-%{+yyyy.mm.dd}"         workers => 1         script => "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]"         script_type => "inline"         script_lang =>  "groovy"         scripted_upsert => "true"     } } 

when changed action "update", following error...

:response=>{"update"=>{"_index"=>"logstash4-2016.07.29", "_type"=>"csv", "_id"=>"919359000000",  "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"failed execute script",  "caused_by"=>{"type"=>"script_exception", "reason"=>"failed run in line script  [ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]]  using lang [groovy]", "caused_by"=>{"type"=>"missing_property_exception", "reason"=>"no such property: user class: fe1b423dc4966b0f0b511b732474637705bf3bb1"}}}}}, :level=>:warn} 

update 3

as per val's answer added event , error...

:response=>{"update"=>{"_index"=>"logstash4-2016.08.06", "_type"=>"csv", "_id"=>"%{destination}", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"failed execute script", "caused_by"=>{"type"=>"script_exception", "reason"=>"failed run inline script [ctx._source.parent += ['user': event.user, 'messageid': event.messageid, 'message': event.message, 'code': event.code, 'mobile': event.mobile, 'mytimestamp': event.mytimestamp]] using lang [groovy]", "caused_by"=>{"type"=>"null_pointer_exception", "reason"=>"cannot execute null+{user=null, messageid=null, message=, code=null, mobile=null, mytimestamp=null}"}}}}} 

update 4

as per val's updated answer tried this...

script => "ctx._source.parent = (ctx._source.parent ?: []) + ['user': event.user, 'messageid': event.messageid, 'message': event.message, 'code': event.code, 'mobile': event.mobile, 'mytimestamp': event.mytimestamp]" 

and got error:

{:timestamp=>"2016-08-12t09:40:48.869000+0000", :message=>"pipeline main started"} {:timestamp=>"2016-08-12t09:40:49.517000+0000", :message=>"error parsing csv", :field=>"message", :source=>"", :exception=>#<nomethoderror: undefined method `each_index' nil:nilclass>, :level=>:warn} 

only 2 records added database.

elasticsearch output plugin supports script parameters:

output {     elasticsearch {         action => "update"         hosts => ["172.17.0.1"]         index => "logstash-%{+yyyy.mm.dd}"         workers => 1         script => "<your script here>"         script_type => "inline"         # set language of used script         # script_lang =>          # if enabled, script in charge of creating non-existent document (scripted update)         # scripted_upsert => (default false)     } } 

Comments

Popular posts from this blog

magento2 - Magento 2 admin grid add filter to collection -

Android volley - avoid multiple requests of the same kind to the server? -

Combining PHP Registration and Login into one class with multiple functions in one PHP file -