Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
614 views
in Technique[技术] by (71.8m points)

logstash - Elasticsearch merge multiple indexes based on common field

I'm using ELK to generate views out of the data from two different DB. One is mysql other one is PostgreSQL. There is no way of writing join query between those two DB instance. But I have a common field call "nic". Following are the documents from each index.

MySQL

index: user_detail

"_id": "871123365V",
"_source": {
    "type": "db-poc-user",
    "fname": "Iraj",
    "@version": "1",
    "field_lname": "Sanjeewa",
    "nic": "871456365V",
    "@timestamp": "2020-07-22T04:12:00.376Z",
    "id": 2,
    "lname": "Santhosh"
  }

PostgreSQL

Index: track_details

"_id": "871456365V",
"_source": {
   "@version": "1",
   "nic": "871456365V",
   "@timestamp": "2020-07-22T04:12:00.213Z",
   "track": "ELK",
   "type": "db-poc-ceg"
},

I want to merge both index in to single index using common field "nic". And create new index. So I can create visualization on Kibana. How can this be achieved?

Please note that each document in new index should have "nic,fname,lname,track" as fields. Not the aggregation.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I would leverage the enrich processor to achieve this.

First, you need to create an enrich policy (use the smallest index, let's say it's user_detail):

PUT /_enrich/policy/user-policy
{
  "match": {
    "indices": "user_detail",
    "match_field": "nic",
    "enrich_fields": ["fname", "lname"]
  }
}

Then you can execute that policy in order to create an enrichment index

POST /_enrich/policy/user-policy/_execute

The next step requires you to create an ingest pipeline that uses the above enrich policy/index:

PUT /_ingest/pipeline/user_lookup
{
  "description" : "Enriching user details with tracks",
  "processors" : [
    {
      "enrich" : {
        "policy_name": "user-policy",
        "field" : "nic",
        "target_field": "tmp",
        "max_matches": "1"
      }
    },
    {
      "script": {
        "if": "ctx.tmp != null",
        "source": "ctx.putAll(ctx.tmp); ctx.remove('tmp');"
      }
    },
    {
      "remove": {
        "field": ["@version", "@timestamp", "type"]
      }
    }
  ]
}

Finally, you're now ready to create your target index with the joined data. Simply leverage the _reindex API combined with the ingest pipeline we've just created:

POST _reindex
{
  "source": {
    "index": "track_details"
  },
  "dest": {
    "index": "user_tracks",
    "pipeline": "user_lookup"
  }
}

After running this, the user_tracks index will contain exactly what you need, for instance:

  {
    "_index" : "user_tracks",
    "_type" : "_doc",
    "_id" : "0uA8dXMBU9tMsBeoajlw",
    "_score" : 1.0,
    "_source" : {
      "fname" : "Iraj",
      "nic" : "871456365V",
      "lname" : "Santhosh",
      "track" : "ELK"
    }
  }

If your source indexes ever change (new users, changed names, etc), you'll need to re-run the above steps, but before doing it, you need to delete the ingest pipeline and the ingest policy (in that order):

DELETE /_ingest/pipeline/user_lookup
DELETE /_enrich/policy/user-policy

After that you can freely re-run the above steps.

PS: Just note that I cheated a bit since the record in user_detail doesn't have the same nic in your example, but I guess it was a copy/paste issue.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...