# -*- coding: utf-8 -*- """ Created on Wed May 16 17:40:55 2018 @author: jagmeet.singh """ #==================================================================================== #On Demand request demo code, using the following steps: # - Authentication token request # - On Demand extraction request # - Extraction status polling request # Extraction notes retrieval # - Data retrieval and save to disk (the data file is gzipped) # Includes AWS download capability #==================================================================================== #Imports: import requests import json import shutil import time import urllib3 import gzip import pandas as pd import timeit import sys import re import datetime if sys.version_info[0] < 3: from StringIO import StringIO else: from io import StringIO import scipy.stats.mstats import numpy as np #Set these parameters before running the code: # 'R:/New Structure/Quant Insight/Quant/Thomson Reuters/Futures Normalised Data from TR/Drafts/FilesUsedInCode/' filePath0 = 'R:/New Structure/Quant Insight/Quant/Thomson Reuters/Futures Normalised Data from TR/Drafts/FilesGenerated/Block6/' #Location to save downloaded files #fileNameRoot = "" #Root of the name for the downloaded files myUsername = "myUsername" myPassword = "myPassword" useAws = True #Set the last parameter (useAws) above to: # False to download from TRTH servers # True to download from Amazon Web Services cloud (recommended, it is faster) def generateChainRICs(startDate,nContracts,rootRIC,dTable): #in the format "YYYY-MM-DD" """ This function takes generates a series of RICs for follownig inputs - Start Date - Root RIC - Number of contracts to be generated in the series - Mapping table of months (eg - January is F, February in G and so on) """ rr = rootRIC nc = nContracts sd = startDate dt = dTable cr = [] #Will store chain of RICs based on input parameteres - ie monthly contracts sy = int(sd[3:4]) #Starting year - year of 1st contract in the series sm = int(sd[5:7]) #Starting month - month of 1st contract in the series mi = [] #Will store months of all the futures to be generated - eg - for a series of April-June2018, it will store 4, 5, and 6 mi.append(sm) for i in range(1,nc): if mi[i-1]==12: mi.append(mi[i-1]+1-12) else: mi.append(mi[i-1]+1) cr.append(rr+dt.Code[sm-1]+str(sy%10)) #Appending the full RIC of 1st contract in the series #Loop below will append to cr, the full RICs of all the other contracts in the series for i in range(1,nc): if cr[i-1][len(cr[i-1])-2] == 'Z': sy = sy+1 cr.append(rr+dt.Code[mi[i]-1]+str(sy%10)) else: cr.append(rr+dt.Code[mi[i]-1]+str(sy%10)) #Creating a dataframe to store the RICs dfx = pd.DataFrame(columns = ['Identifier','IdentifierType']) dfx.Identifier = cr dfx.IdentifierType = 'Ric' #Converting the dataframe into format readable in the subsequest API requests crFormat = list(dfx.T.to_dict().values()) return crFormat def paramsProcessed(filepath,filename): """ This function reads data from an excel/csv file with given name, stored at the given path """ fp = filepath fn = filename #full string for file location fs = fp+fn if fn.split(".")[1] == 'csv': obj = pd.read_csv(fs) else: obj = pd.ExcelFile(fs) if fn.split(".")[1] == 'csv': dfParams = obj else: dfParams = obj.parse('Sheet1') cols = list(dfParams.columns) values = [] for i in range(len(cols)): values.append(dfParams.count()[i]) dfSummaryX = pd.DataFrame(columns = ['Column','NoOfValues']) dfSummaryX['Column'] = cols dfSummaryX['NoOfValues'] = values return dfParams def paramsSummary(filepath,filename): """ This function just summarises the data read from a given file from a given path """ fp = filepath fn = filename #full string for file location fs = fp+fn if fn.split(".")[1] == 'csv': obj = pd.read_csv(fs) else: obj = pd.ExcelFile(fs) if fn.split(".")[1] == 'csv': dfParams = obj else: dfParams = obj.parse('Sheet1') cols = list(dfParams.columns) values = [] for i in range(len(cols)): values.append(dfParams.count()[i]) dfSummaryX = pd.DataFrame(columns = ['Column','NoOfValues']) dfSummaryX['Column'] = cols dfSummaryX['NoOfValues'] = values return dfSummaryX def dateProcess(date): """ To convert an excel date (user input) in a timestamp format to a string format - which then is read into the data request part of the code """ d0 = date dconv = d0.to_pydatetime() if len(str(dconv.month))==1: z1 = '-0' else: z1 = '-' if len(str(dconv.day))==1: z2 = '-0' else: z2 = '-' dateProcessed = str(dconv.year)+str(z1)+str(dconv.month)+str(z2)+str(dconv.day) return dateProcessed def processTradingHourCSV(baseTicker, filepath, filename): """ To read Trading Hours info for a given ticker/RIC from the file where such info for all the RICs is stored Need to process is there as in the request code, the time stamp goes in in a particular format only """ bt = baseTicker fp = filepath fn = filename csv0 = paramsProcessed(fp,fn) instLoc = csv0.loc[csv0.RIC == bt].index[0] instOpenTime = csv0['OpenTimeForCode'][instLoc] instCloseTime = csv0['CloseTimeForCode'][instLoc] instOpenHour = instOpenTime//100 instOpenMinute = instOpenTime%100 instCloseHour = instCloseTime//100 instCloseMinute = instCloseTime%100 if instOpenMinute == 0: if instOpenHour < 10: instOpenString = (str('T0'+str(instOpenHour)+':00:00.000Z')) else: instOpenString = (str('T'+str(instOpenHour)+':00:00.000Z')) else: if instOpenHour < 10: instOpenString = (str('T0'+str(instOpenHour)+':'+str(instOpenMinute)+':00.000Z')) else: instOpenString = (str('T'+str(instOpenHour)+':'+str(instOpenMinute)+':00.000Z')) if instCloseMinute == 0: if instCloseHour < 10: instCloseString = (str('T0'+str(instCloseHour)+':00:00.000Z')) else: instCloseString = (str('T'+str(instCloseHour)+':00:00.000Z')) else: if instCloseHour < 10: instCloseString = (str('T0'+str(instCloseHour)+':'+str(instCloseMinute)+':00.000Z')) else: instCloseString = (str('T'+str(instCloseHour)+':'+str(instCloseMinute)+':00.000Z')) return instOpenString, instCloseString def requestCode(baseTicker, nContracts, FID, filepath, filename): """ This function requests data for a RIC based on user inputs like - RIC (eg Gold - GC, Copper - HG etc.) - number of chain contracts (eg 12/36/120 etc.) - FID for which data is needed (eg Price - 70, OpenInterest 64 etc.) - Name and path of the master file (which contains all the names of paths of other files) Output of the function is Uncompressed data (in a single string form), for the given set of parameters """ bt = baseTicker nc = nContracts fp = filepath fn = filename fi = FID dateforfilename = str(bt)+'__'+str(datetime.date.today())+'__FID'+fi dfMaster = paramsProcessed(fp,fn) #Fetching params fn0 = dfMaster[dfMaster['FileName']=='Params.xlsx']['FileName'][dfMaster[dfMaster['FileName']=='Params.xlsx']['FileName'].index[0]] dParams = paramsProcessed(fp,fn0) sd = dateProcess(dParams['StartDate'][0]) ed = dateProcess(dParams['EndDate'][0]) #Fetching the mapping table data fn1 = dfMaster[dfMaster['FileName']=='MappingTables.xlsx']['FileName'][dfMaster[dfMaster['FileName']=='MappingTables.xlsx']['FileName'].index[0]] dTable = paramsProcessed(fp,fn1) #fetching the trading hour data fn2 = dfMaster[dfMaster['FileName']=='TradingHours.csv']['FileName'][dfMaster[dfMaster['FileName']=='TradingHours.csv']['FileName'].index[0]] # csv1 = paramsProcessed(fp,fn2) openString = processTradingHourCSV(bt,fp,fn2)[0] closeString = processTradingHourCSV(bt,fp,fn2)[1] requestUrl = "https://hosted.datascopeapi.reuters.com/RestApi/v1/Authentication/RequestToken" requestHeaders={ "Prefer":"respond-async", "Content-Type":"application/json" } requestBody={ "Credentials": { "Username": xxx "Password": xxx } } r1 = requests.post(requestUrl, json=requestBody,headers=requestHeaders) if r1.status_code == 200 : jsonResponse = json.loads(r1.text.encode('ascii', 'ignore')) token = jsonResponse["value"] print ('Authentication token (valid 24 hours):') print (token) else: print ('Replace myUserName and myPassword with valid credentials, then repeat the request') #Step 2: send an on demand extraction request using the received token requestUrl='https://hosted.datascopeapi.reuters.com/RestApi/v1/Extractions/ExtractRaw' requestHeaders={ "Prefer":"respond-async", "Content-Type":"application/json", "Authorization": "token " + token } requestBody={ "ExtractionRequest": { "@odata.type": "#ThomsonReuters.Dss.Api.Extractions.ExtractionRequests.TickHistoryRawExtractionRequest", "IdentifierList": { "@odata.type": "#ThomsonReuters.Dss.Api.Extractions.ExtractionRequests.InstrumentIdentifierList", "InstrumentIdentifiers": generateChainRICs(sd,nc,bt,dTable), "ValidationOptions": { "AllowHistoricalInstruments": "true" } }, "Condition": { "MessageTimeStampIn": "LocalExchangeTime", #To be linked to Instrument's time zone - a mapping table needed #"ApplyCorrectionsAndCancellations": "false", "ReportDateRangeType": "Range", "QueryStartDate": sd+openString, #"T18:00:00.000Z", #To be user defined - linked to variables "QueryEndDate": ed+closeString, #"T17:00:00.000Z", #To be user defined - linked to variables "ExtractBy": "Ric", "SortBy": "SingleByRic", "DomainCode": "MarketPrice", "Fids" : fi, # "70", "DateRangeTimeZone" : "UTC", "DisplaySourceRIC": "true" } } } r2 = requests.post(requestUrl, json=requestBody,headers=requestHeaders) #Step 3: poll the status of the request using the received location URL. #Once the request has completed, retrieve the jobId and extraction notes. requestUrl = r2.headers["location"] requestHeaders={ "Prefer":"respond-async", "Content-Type":"application/json", "Authorization":"token " + token } r3 = requests.get(requestUrl,headers=requestHeaders) while (r3.status_code == 202): print ('As we received a 202, we wait 900 seconds, then poll again (until we receive a 200)') time.sleep(900) r3 = requests.get(requestUrl,headers=requestHeaders) if r3.status_code == 200 : r3Json = json.loads(r3.text.encode('ascii', 'ignore')) jobId = r3Json["JobId"] if r3.status_code != 200 : print ('An error occured. Try to run this cell again. If it fails, re-run the previous cell.\n') #Step 5: get the extraction results, using the received jobId. #We also save the compressed data to disk, as a GZIP. #We only display a few lines of the data. #IMPORTANT NOTE: #This code is much better than that of step 4; it should not fail even with large data sets. #If you need to manipulate the data, read and decompress the file, instead of decompressing #data from the server on the fly. #This is the recommended way to proceed, to avoid data loss issues. #For more information, see the related document: # Advisory: avoid incomplete output - decompress then download requestUrl = "https://hosted.datascopeapi.reuters.com/RestApi/v1/Extractions/RawExtractionResults" + "('" + jobId + "')" + "/$value" print "Job ID" print jobId #AWS requires an additional header: X-Direct-Download if useAws: requestHeaders={ "Prefer":"respond-async", "Content-Type":"text/plain", "Accept-Encoding":"gzip", "X-Direct-Download":"true", "Authorization": "token " + token, #'User-Agent':'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36', #"X-Client-Session-Id":"Direct AWS" } else: requestHeaders={ "Prefer":"respond-async", "Content-Type":"text/plain", "Accept-Encoding":"gzip", "Authorization": "token " + token, #'User-Agent':'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36', #"X-Client-Session-Id":"From DSS" } #import pdb;pdb.set_trace() r5 = requests.get(requestUrl,headers=requestHeaders,stream=True) #time.sleep(3) #Ensure we do not automatically decompress the data on the fly: r5.raw.decode_content = False if useAws: print ('Content response headers (AWS server): type: ' + r5.headers["Content-Type"] + '\n') #AWS does not set header Content-Encoding="gzip". else: print ('Content response headers (TRTH server): type: ' + r5.headers["Content-Type"] + ' - encoding: ' + r5.headers["Content-Encoding"] + '\n') #Next 2 lines display some of the compressed data, but if you uncomment them save to file fails fileName = filePath0 + dateforfilename+".csv.gz" chunk_size = 1024 rr = r5.raw print (rr) with open(fileName, 'wb') as fd: shutil.copyfileobj(rr, fd, chunk_size) fd.close #Now let us read and decompress the file we just created. #For the demo we limit the treatment to a few lines: maxLines = 100000000000000000 uncompressedData = "" count = 0 with gzip.open(fileName, 'rb') as fd: for line in fd: dataLine = line.decode("utf-8") uncompressedData = uncompressedData + dataLine count += 1 if count >= maxLines: break fd.close() return uncompressedData