curl -D responseHeader -H "Content-Type: application/json" –H "Accept: application/json -H "x-zerto-session: 9UDQD6RG7YF33QJLWQXGJV8C453N277NA22P7FSNWVZCJTWCBRHQ" https://127.0.0.1:9669/v1/vpgs |
curl -D responseHeader -H "Content-Type: application/json" –H "Accept: application/json -H "x-zerto-session: 9UDQD6RG7YF33QJLWQXGJV8C453N277NA22P7FSNWVZCJTWCBRHQ" https://127.0.0.1:9669/v1/vpgs?protectedSiteType=VcVpg |
curl -D responseHeader -H "Content-Type: application/json" –H "Accept: application/json -H "x-zerto-session: 9UDQD6RG7YF33QJLWQXGJV8C453N277NA22P7FSNWVZCJTWCBRHQ" https://127.0.0.1:9669/v1/vpgs/a8c3d56d-1289-48ba-a054-72447ee73821/checkpoints |
curl -D responseHeader -H "Content-Type: application/json" –H "Accept: application/json -H "x-zerto-session: 9UDQD6RG7YF33QJLWQXGJV8C453N277NA22P7FSNWVZCJTWCBRHQ" https://127.0.0.1:9669/v1/vpgs/a8c3d56d-1289-48ba-a054-72447ee73821/failovertest -d "{\"CheckpointId\":2419}" |
curl -D responseHeader -H "Content-Type: application/json" –H "Accept: application/json -H "x-zerto-session: 9UDQD6RG7YF33QJLWQXGJV8C453N277NA22P7FSNWVZCJTWCBRHQ" https://127.0.0.1:9669/v1/vpgs/a8c3d56d-1289-48ba-a054-72447ee73821/failoverteststop |
import requests import base64 def GetVpgIdFromName(zvmIp, sessionId, vpgName): url = "https://" + zvmIp + ":9669/v1/vpgs?name=" + vpgName headers = {'x-zerto-session': sessionId, 'content-type': 'application/json'} r = requests.get(url, headers=headers, verify=False) if(r.status_code != requests.codes.ok): raise Exception("Call to ZVM failed") print r.status_code print r.text vpgData = json.loads(r.text) if(len(vpgData) == 0): raise Exception("VPG by name " + vpgName + " not found") if(len(vpgData) > 1): raise Exception("multiple vpgs named " + vpgName + " found") thisVpg = vpgData[0] res = thisVpg["VpgIdentifier"] print thisVpg print res return res |
import requests import base64 def GetLatestCheckpointForVpg(zvmIp, sessionId, vpgId): url = "https://" + zvmIp + ":9669/v1/vpgs/" + vpgId + "/checkpoints" headers = {'x-zerto-session': sessionId, 'content-type': 'application/json'} r = requests.get(url, headers=headers, verify=False) print r.status_code if(r.status_code != requests.codes.ok): raise Exception("Failed GetLatestCheckpointForVpg") allcpData = json.loads(r.text) print str(len(allcpData)) if(len(allcpData) == 0): raise Exception("no checkpoints found for vpg " + vpgId) sortedCps = sorted(allcpData, key=lambda x: x["TimeStamp"], reverse=true) latestCp = sortedCps[0] print latestCp return latestCp["CheckpointIdentifier"] |
import requests import base64 def StartFailoverTest(zvmIp, sessionId, vpgId, cpId): url = "https://" + zvmIp + ":9669/v1/vpgs/" + vpgId + "/failovertest" payload = {"CheckpointId": cpId} dataval = json.dumps(payload) headers = {'x-zerto-session': sessionId, 'content-type': 'application/json'} r = requests.post(url, data=dataval, headers=headers, verify=False) print r.status_code if(r.status_code != requests.codes.ok): raise Exception("Failed StartFailoverTest") taskId = json.loads(r.text) print "task id = " + taskId return taskId |
import requests import base64 def StopFailoverTest(zvmIp, sessionId, vpgId, status): url = "https://" + zvmIp + ":9669/v1/vpgs/" + vpgId + "/failoverteststop" payload = {"FailoverTestSuccess": status, "FailoverTestSummary":"Automatically"} dataval = json.dumps(payload) headers = {'x-zerto-session': sessionId, 'content-type': 'application/json'} r = requests.post(url, data=dataval, headers=headers, verify=False) print r.status_code if(r.status_code != requests.codes.ok): raise Exception("Failed StopFailoverTest") taskId = json.loads(r.text) print "task id = " + taskId return taskId |
/* * Example - Failover Test a VPG */ private void FailoverTestVpg() { Console.WriteLine("\nDo a Failover Test for the first VPG"); Console.WriteLine("======================================"); // Fetch all vpgs - convert data to a zerto api object string apiAddress = m_baseAddress + "vpgs"; string result = ExecuteHttpGet(apiAddress, m_format, m_sessionId); List<VpgApi> vpgs = (List<VpgApi>)DeserializeObjectFromJson(result, typeof(List<VpgApi>)); Console.WriteLine("Found " + vpgs.Count + " vpgs."); // if any vpgs are retrieved, take first one for a failover test if (vpgs.Count > 0) { VpgApi vpg = vpgs[0]; Console.WriteLine("Work on Vpg: " + vpg.VpgName + "."); // Fetch all checkpoints for the VPG string cpApiAddress = m_baseAddress + "vpgs/" + vpg.Link.Identifier + "/checkpoints"; string cpRes = ExecuteHttpGet(cpApiAddress, m_format, m_sessionId); List<CheckpointApi> cps = (List<CheckpointApi>)DeserializeObjectFromJson(cpRes, typeof(List<CheckpointApi>)); Console.WriteLine("Found " + cps.Count + " valid checkpoints."); if (cps.Count > 0) { // Post a request for failover test with the latest checkopint FailOverTestStartDataApi fotData = new FailOverTestStartDataApi(cps.Last().CheckpointIdentifier); Console.WriteLine("Start Failover Test."); string fotApiAddress = m_baseAddress + "vpgs/" + vpg.Link.Identifier + "/failovertest"; // Wait till the task is done bool isFotOk = FollowTask(ExecuteHttpPost(fotApiAddress, m_format, m_sessionId, SerializeObjectToJson(fotData, typeof(FailOverTestStartDataApi))), "Start Failover Test"); if (isFotOk) { Console.WriteLine("Failover is in Test."); // Post a request to stop the failover test StopFailoverTestDataApi stopFotData = new StopFailoverTestDataApi(true, "OK"); Console.WriteLine("Stop Failover Test."); string sfotApiAddress = m_baseAddress + "vpgs/" + vpg.Link.Identifier + "/failoverteststop"; bool isStopFotOk = FollowTask(ExecuteHttpPost(sfotApiAddress, m_format, m_sessionId, SerializeObjectToJson(stopFotData, typeof(StopFailoverTestDataApi))), "Stop Failover Test"); Console.WriteLine(isStopFotOk ? "Failover Test stopped successfully." : "Failover Test not stopped!!."); } else { Console.WriteLine("Could not start Failover Test."); } } else { Console.WriteLine("Vpg has no Checkpoints - cannot Failover Test."); } } Console.WriteLine("Testing completed."); } |