Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 36 additions & 9 deletions Cross_Cluster_Replication/cross_cluster_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import requests
import json
import datetime
import random

# --------------
# Step 1
Expand All @@ -26,6 +28,36 @@
leader_os_url = '<os_domain_endpoint_url>'
follower_os_url = '<os_domain_endpoint_url>'

os_cred = ('OSMasterUser', 'AwS#OpenSearch1')


def generate_insert():
log_messages = ["Error out of memory", "High CPU!", "Things are normal", "I'm afraid I can't do that Dave."]
return {
"eventtime": datetime.datetime.now().strftime('%Y-%m-%d %I:%M:%S'),
"ip-address": "8.8.8.8",
"application-id": random.randint(1,9),
"log-message": random.choice(log_messages)
}

# Check if the index already exists. If so, just insert a new log record
create_index_r = requests.get(leader_os_url + '/log-data-1', auth=os_cred)
if (create_index_r.status_code != 404):
## just insert records
print ("Index exists. Adding more data.")
insert_document_r_body = generate_insert()
print (insert_document_r_body)
insert_document_r = requests.post(leader_os_url + '/log-data-1/_doc', auth=os_cred, headers= {'Content-type': 'application/json'}, data=json.dumps(insert_document_r_body))
print(insert_document_r.text)

## get replication status
print("\nReplication status:")
repl_status = requests.get(follower_os_url + '/_plugins/_replication/log-data-1/_status', auth=os_cred)
print(repl_status.text)

exit(0)


# --------------
# Step 3
# --------------
Expand Down Expand Up @@ -57,7 +89,7 @@
}
}

create_index_r = requests.put(leader_os_url + '/log-data-1', auth=('OSMasterUser', 'AwS#OpenSearch1'), headers= {'Content-type': 'application/json'}, data=json.dumps(create_index_r_body))
create_index_r = requests.put(leader_os_url + '/log-data-1', auth=os_cred, headers= {'Content-type': 'application/json'}, data=json.dumps(create_index_r_body))

print('------ Step 3 - Create an index on leader os cluster ------')
print(create_index_r.text)
Expand All @@ -68,14 +100,9 @@
# --------------

# Insert a document in an index log-data-1 (on leader os cluster)
insert_document_r_body = {
"eventtime": "2022-02-25 01:00:00",
"ip-address":"52.95.4.6",
"application-id": 1,
"log-message":"Error out of memory"
}
insert_document_r_body = generate_insert()

insert_document_r = requests.put(leader_os_url + '/log-data-1/_doc/1', auth=('OSMasterUser', 'AwS#OpenSearch1'), headers= {'Content-type': 'application/json'}, data=json.dumps(insert_document_r_body))
insert_document_r = requests.put(leader_os_url + '/log-data-1/_doc/1', auth=os_cred, headers= {'Content-type': 'application/json'}, data=json.dumps(insert_document_r_body))

print('------ Step 4 - Insert a document in an index log-data-1 (on leader os cluster) ------')
print(insert_document_r.text)
Expand All @@ -96,7 +123,7 @@
}
}

create_replicate_r = requests.post(follower_os_url + '/_plugins/_replication/_autofollow', auth=('OSMasterUser', 'AwS#OpenSearch1'), headers= {'Content-type': 'application/json'}, data=json.dumps(create_replicate_r_body))
create_replicate_r = requests.post(follower_os_url + '/_plugins/_replication/_autofollow', auth=os_cred, headers= {'Content-type': 'application/json'}, data=json.dumps(create_replicate_r_body))

print('------ Step 5 - Create a replication rule on the follower cluster to auto replicate all indices starting with log ------')
print(create_replicate_r.text)
Expand Down