Here's a simple example of how a basic automation can be set up to create, monitor and download batches of requests using python. The below code snippet creates jobs based on an array of request parameters, monitors the jobs until they're processed, and then downloads the exported data to the local directory. The script is written to create, monitor and download requests in batches of 20.
First, we create a requests.json that contains an array of requests that need to be created. Each json object contains two main properties. The MeasureType property can be marketsize, companyshare, or brandshare. The Params property contains the request parameters for the respective measuretype. The json should look like:
[
{
"MeasureType": "marketsize",
"Params": {
"IndustryCode": "HD",
"GeographyIds": [11]
}
},
{
"MeasureType": "marketsize",
"Params": {
"IndustryCode": "HD",
"GeographyIds": [243]
}
},
{
"MeasureType": "marketsize",
"Params": {
"IndustryCode": "HD",
"GeographyIds": [389]
}
},
{
"MeasureType": "marketsize",
"Params": {
"IndustryCode": "HD",
"GeographyIds": [11]
}
},
{
"MeasureType": "marketsize",
"Params": {
"IndustryCode": "HD",
"GeographyIds": [243]
}
},
{
"MeasureType": "marketsize",
"Params": {
"IndustryCode": "HD",
"GeographyIds": [389]
}
}
]
Create a script.py file with the following code and replace <<apim_key>>, <<username>> and <<password>> with your own details:
import datetime
import http.client
import json
import time
import requests
import os
url = "https://api.euromonitor.com"
extractor_apim_key = "<<apim_key>>"
username = "<<username>>"
password = "<<password>>"
payload = f"username={username}&password={password}"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Ocp-Apim-Subscription-Key": extractor_apim_key,
"Accept": "application/json; api-version=1.0"
}
def get_token():
authTokenResponse = requests.post(f"{url}/authentication/connect/token", headers = headers, data = payload)
authTokenResponse.raise_for_status()
authToken = authTokenResponse.json()
return authToken['access_token']
def create_job(request):
response = requests.post(f'{url}/statistics/create{request["MeasureType"]}job', headers = headers, data = json.dumps(request['Params']))
response.raise_for_status()
return response.json()
def job_history():
response = requests.get(f'{url}/statistics/jobHistory', headers = headers)
response.raise_for_status()
return response.json()
def job_download(job_id, file_extension):
response = requests.get(f'{url}/statistics/getjobdownloadurl/{job_id}', headers = headers)
response.raise_for_status()
jobDownloadJson = response.json()
jobDownloadURL = jobDownloadJson['jobDownloadUri']
print(f"The URL for downloading the job is {jobDownloadURL}")
download_file(jobDownloadURL, f"{job_id}.{file_extension}")
def download_file(url, destination_path):
try:
response = requests.get(url)
response.raise_for_status()
with open(f'./export/{destination_path}', "wb") as f:
f.write(response.content)
print("File downloaded successfully!")
except requests.exceptions.RequestException as e:
print("Request error:", e)
except Exception as e:
print("An error occurred:", e)
def get_jobs():
jobs_stream = open('jobs.json')
jobs = json.load(jobs_stream)
jobs_stream.close()
return jobs
def write_jobs(file, data):
with open(file, "w") as myfile:
myfile.write(data)
myfile.close()
try:
export_path = "export"
# Check whether the specified path exists or not
if not os.path.exists(export_path):
# Create an export directory because it does not exist
os.makedirs(export_path)
print("The export directory is created.")
write_jobs("jobs.json", '[]')
write_jobs("jobs_bkp.json", '[]')
job_count = 0
auth_token = get_token()
headers["Content-Type"] = "application/json"
headers["Ocp-Apim-Subscription-Key"] = extractor_apim_key
headers["Authorization"] = "Bearer " + auth_token
print(f'Auth token obtained.')
requests_stream = open('requests.json')
requests_json = json.load(requests_stream)
requests_stream.close()
batch_size = 20
index = 0
duration = 3600
while index < len(requests_json):
if duration < 3600:
print(f'Waiting for {3600 - duration} seconds')
time.sleep(3600 - duration)
requests_batch = requests_json[index : index + batch_size]
# Create jobs
for r in requests_batch:
print(f'Creating job for {r}')
createJobJson = create_job(r)
print(createJobJson)
jobStatus = createJobJson['processingStatus']
jobId = createJobJson['jobId']
print(f"The job {jobId} has been created.")
jobs = get_jobs()
jobs.append({"JobId": jobId})
write_jobs("jobs.json", json.dumps(jobs))
write_jobs("jobs_bkp.json", json.dumps(jobs))
job_count += 1
#start timer
start = time.time()
# Check job status, download if 'Processed'
while job_count > 0:
print(f'{job_count} jobs remaining')
time.sleep(300)
jobs = get_jobs()
jobHistoryJson = job_history()
for j in jobs:
job_details = list(filter(lambda i: i['jobId'] == j["JobId"], jobHistoryJson))
timestamp = datetime.datetime.now()
formatted_timestamp = timestamp.strftime("%Y-%m-%d_%H:%M:%S")
print(f"Job: {j['JobId']}, status: {job_details[0]['processingStatus']} at %s" % formatted_timestamp)
if job_details[0]['processingStatus'] in ["Processed", "NoData"]:
jobs = get_jobs()
remaining_jobs = list(filter(lambda i: i['JobId'] != j['JobId'], jobs))
print(remaining_jobs)
write_jobs("jobs.json", json.dumps(remaining_jobs))
job_count = len(remaining_jobs)
if job_details[0]['processingStatus'] in ["Processed"]:
job_download(j["JobId"], json.loads(job_details[0]['executionList'])['FileType'])
index += batch_size
#end timer
end = time.time()
duration = int(end - start)
except requests.exceptions.RequestException as e:
print("Request error:", e)
except Exception as e:
print("[Errno {0}] {1}".format(e.errno, e.strerror))
Run the python script using the below command:
python script.py
The exported data will be downloaded in a directory in the same path of the python script.