Skip to content

Extracting and Loading Data From Elasticsearch With Python Updated⚓︎

Executive Summary⚓︎

This article is an update to the previous one from 2018. Basically it uses the opensearch-py library as is more generic and compresses the json output using gzip.

helper.bulk⚓︎

At a high level the steps are;
* Import the required packages
* Setup some environment variables
* Create the scan iterator
* Then write all the data from the iterator to disk

## Install/Load in Libraries
!pip3 install opensearch-py compress_json
from opensearchpy import OpenSearch as Elasticsearch
from opensearchpy import helpers
import json 
import compress_json

##set variables
elasticProtocol = 'https'
elastichost     = 'hostDetails'
elasticPrefix   = 'elasticsearchURLPrefix'
elasticport     = '443'
elasticUser     = 'username'
elasticPassword = 'password'
elasticIndex    = 'index'
actions         = []
fileRecordCount = 100000 
fileCounter     = 0

## Generate RFC-1738 formatted URL
elasticURL = '%s://%s:%s@%s:%s/%s' % (elasticProtocol,elasticUser, elasticPassword, elastichost, elasticport, elasticPrefix  )

## Create Connection to Elasticsearch
es = Elasticsearch([elasticURL],verify_certs=True)

output = helpers.scan(es,
    index=elasticIndex,
    size=10000,                              ### Obviously this can be increased
    query={"query": {"match_all": {}}},
)

## Write Everything Out to Disk
for record in output:
    actions.append(record['_source'])
    if len(actions) >= fileRecordCount:
        compress_json.dump(actions, elasticIndex + '-extract-' + str(fileCounter) + '.json.gz' , {"compresslevel" : 9}, {"ensure_ascii" : False, "indent" :4, "sort_keys" :True})
        actions = []
        print('file ' + str(fileCounter) + ' written')        
        fileCounter = fileCounter + 1

if len(actions) > 0:
    compress_json.dump(actions, elasticIndex + '-extract-' + str(fileCounter) + '.json.gz' , {"compresslevel" : 9}, {"ensure_ascii" : False, "indent" :4, "sort_keys" :True})
    print('file ' + str(fileCounter) + ' written')

Now the data is written back into a different index

import glob

outputIndex = 'index-backup'
saveSize    = 1000

files_to_load = glob.glob("./*.json.gzip")
for file in files_to_load
    file_data = compress_json.load(file) # for loading a gzip file
    actions = []
    for record in file_data:
        action = {
           "_index": outputIndex,
           '_op_type': 'index',
           "_id": record['id'],
           "_source": record
            }
        actions.append(action)
        if len(actions) >= saveSize:
             helpers.bulk(es, actions)
             del actions[0:len(actions)]

    if len(actions) > 0:
        helpers.bulk(es, actions)