Zerto Virtual Replication APIs : VPGs API : Code Examples
  
Code Examples
For complete code examples, see “Code Samples”, on page 22.
/v1/vpgs cURL Code Example
Retrieve the list of all VPGs.
curl -D responseHeader -H "Content-Type: application/json" –H "Accept: application/json -H "x-zerto-session: 9UDQD6RG7YF33QJLWQXGJV8C453N277NA22P7FSNWVZCJTWCBRHQ" https://127.0.0.1:9669/v1/vpgs
Retrieve the list of all VPGs where the protected site is a vCenter Server.
curl -D responseHeader -H "Content-Type: application/json" –H "Accept: application/json -H "x-zerto-session: 9UDQD6RG7YF33QJLWQXGJV8C453N277NA22P7FSNWVZCJTWCBRHQ" https://127.0.0.1:9669/v1/vpgs?protectedSiteType=VcVpg
Retrieve the list of checkpoints for a specified VPG with the identifier a8c3d56d-1289-48ba-a054-72447ee73821.
curl -D responseHeader -H "Content-Type: application/json" –H "Accept: application/json -H "x-zerto-session: 9UDQD6RG7YF33QJLWQXGJV8C453N277NA22P7FSNWVZCJTWCBRHQ" https://127.0.0.1:9669/v1/vpgs/a8c3d56d-1289-48ba-a054-72447ee73821/checkpoints
Perform a failover test on a specified VPG with the identifier a8c3d56d-1289-48ba-a054-72447ee73821, with a specified checkpoint as the request body.
curl -D responseHeader -H "Content-Type: application/json" –H "Accept: application/json -H "x-zerto-session: 9UDQD6RG7YF33QJLWQXGJV8C453N277NA22P7FSNWVZCJTWCBRHQ" https://127.0.0.1:9669/v1/vpgs/a8c3d56d-1289-48ba-a054-72447ee73821/failovertest -d "{\"CheckpointId\":2419}"
Stop a failover test on a specified VPG with the identifier a8c3d56d-1289-48ba-a054-72447ee73821.
curl -D responseHeader -H "Content-Type: application/json" –H "Accept: application/json -H "x-zerto-session: 9UDQD6RG7YF33QJLWQXGJV8C453N277NA22P7FSNWVZCJTWCBRHQ" https://127.0.0.1:9669/v1/vpgs/a8c3d56d-1289-48ba-a054-72447ee73821/failoverteststop
For more code examples, see “cURL Code”, on page 22.
/v1/vpgs Python Code Example
The following code samples are extracts from the fuller code example, “Python Code”, on page 23.
The following code sample retrieves a VPG identifier based on the VPG name.
import requests
import base64
 
def GetVpgIdFromName(zvmIp, sessionId, vpgName):
  url = "https://" + zvmIp + ":9669/v1/vpgs?name=" + vpgName
  headers = {'x-zerto-session': sessionId, 'content-type': 'application/json'}
  r = requests.get(url, headers=headers, verify=False)
  if(r.status_code != requests.codes.ok):
    raise Exception("Call to ZVM failed")
  print r.status_code
  print r.text
  vpgData = json.loads(r.text)
  if(len(vpgData) == 0):
    raise Exception("VPG by name " + vpgName + " not found")
  if(len(vpgData) > 1):
  raise Exception("multiple vpgs named " + vpgName + " found")
  thisVpg = vpgData[0]
  res = thisVpg["VpgIdentifier"]
  print thisVpg
  print res
  return res
The following code sample retrieves the latest checkpoint for the VPG.
import requests
import base64
 
def GetLatestCheckpointForVpg(zvmIp, sessionId, vpgId):
  url = "https://" + zvmIp + ":9669/v1/vpgs/" + vpgId + "/checkpoints"
  headers = {'x-zerto-session': sessionId, 'content-type': 'application/json'}
  r = requests.get(url, headers=headers, verify=False)
  print r.status_code
  if(r.status_code != requests.codes.ok):
    raise Exception("Failed GetLatestCheckpointForVpg")
  allcpData = json.loads(r.text)
  print str(len(allcpData))
  if(len(allcpData) == 0):
    raise Exception("no checkpoints found for vpg " + vpgId)
  sortedCps = sorted(allcpData, key=lambda x: x["TimeStamp"], reverse=true)
  latestCp = sortedCps[0]
  print latestCp
  return latestCp["CheckpointIdentifier"]
The following code starts a failover test for the VPG, using the VPG identifier and checkpoint retrieved by the previous code examples. The code returns the task identifier which can then be used to track the test.
import requests
import base64
 
def StartFailoverTest(zvmIp, sessionId, vpgId, cpId):
  url = "https://" + zvmIp + ":9669/v1/vpgs/" + vpgId + "/failovertest"
  payload = {"CheckpointId": cpId}
  dataval = json.dumps(payload)
  headers = {'x-zerto-session': sessionId, 'content-type': 'application/json'}
  r = requests.post(url, data=dataval, headers=headers, verify=False)
  print r.status_code
  if(r.status_code != requests.codes.ok):
    raise Exception("Failed StartFailoverTest")
  taskId = json.loads(r.text)
  print "task id = " + taskId
  return taskId
The following code stops the failover test for the VPG.
import requests
import base64
 
def StopFailoverTest(zvmIp, sessionId, vpgId, status):
  url = "https://" + zvmIp + ":9669/v1/vpgs/" + vpgId + "/failoverteststop"
  payload = {"FailoverTestSuccess": status, "FailoverTestSummary":"Automatically"}
  dataval = json.dumps(payload)
  headers = {'x-zerto-session': sessionId, 'content-type': 'application/json'}
  r = requests.post(url, data=dataval, headers=headers, verify=False)
  print r.status_code
  if(r.status_code != requests.codes.ok):
    raise Exception("Failed StopFailoverTest")
  taskId = json.loads(r.text)
  print "task id = " + taskId
  return taskId