For a deeper look into our Elektron API, look into:

Overview |  Quickstart |  Documentation |  Downloads |  Tutorials |  Articles

question

Upvotes
Accepted
16 1 1 2

Unable to post to ATS service using on-stream example code

I am using the example code that came with the documentation for ADS 3.2.3 . I want to poke a value into the ATS service for a test instrument. The refinitiv example code came with a posting method. However when I run it I am given the following message

"NakCode":"DeniedBySrc",
"Text":"[400]: Invalid Request",
"Type":"Ack"


The code is almost identical except the Service is our service name for ATS and the instrument is not TRI.N

Changed

'Key': {
'Name': 'TRI.N',

< Can you really contribute and poke a value to TRI.N ??? >

To

'Key': {
'Service': 'DTS',
'Name': '.AAAATEST'
},


treprdp-apiwebsocketsrrto
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Upvotes
Accepted
11.5k 16 7 10

Hello @graham.street

The "NakCode":"DeniedBySrc", "Text":"[400]: Invalid Request", error messages are sent from the TREP and ATS servers. The "[400]:Invalid Request" message is generated from the ATS server. This error description is "The format is incorrect so the contribution cannot be processed.".


I did a quick test with ADS 3.2.3's market_price_posting.py example and applied your send_market_price_post() function. The example application can request data from my ATS version 1.4 and can On-Stream post back to my ATS successfully (please see attach).post_result.txt

What is the version of your ATS, ADS and ADH servers?

Can you On-Stream post to the same ATS with other Elektron SDK or TREP APIs?

I did a quick check on ADH document and found the following ADH 3.x behavior regarding the *adh*routename.route*servicename*forwardOnStreamPostMsgKey configuration. Could you please check if your ADH has configured this parameter to True?





post-result.txt (5.3 KiB)
adh.png (56.5 KiB)
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Upvotes
11.5k 16 7 10

Hello @graham.street

Does the item '.AAAATEST' already available in your ATS server? Basically, the On-Stream post needs to subscribe that item from the same service first.

Could you please share your entire JSON messages? (Please remove your infrastructure IP and password).

You can find more detail regarding how to use the Elektron WebSocket API with ATS from the following articles:

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Upvotes
16 1 1 2

Yes the instrument .AAAATEST exists.

My colleague wrote an off-stream program in python that was able to poke in data to that service and instrument..

The code is pasted below and you will see that it is just the refinitiv example code with the service added and the instrument name changed, Incidentally I changed the service name to one of our ADH cached services and the instrument to one that existed in that ADH cache. The code worked fine for that.




#!/usr/bin/env python
""" Simple example of posting Market Price JSON data using Websockets """

import sys
import time
import getopt
import socket
import json
import websocket
import threading
import os
from threading import Thread, Event

# Global Default Variables
hostname = 'xxxxxxxxx'
port = '15000'
user = 'xxxxxxxxxxxxxxx'
app_id = '256'
position = socket.gethostbyname(socket.gethostname())

# Global Variables
next_post_time = 0
web_socket_app = None
web_socket_open = False
post_id = 1


def process_message(ws, message_json):
""" Parse at high level and output JSON of message """
message_type = message_json['Type']

if message_type == "Refresh":
if 'Domain' in message_json:
message_domain = message_json['Domain']
if message_domain == "Login":
process_login_response(ws, message_json)
elif message_type == "Ping":
pong_json = { 'Type':'Pong' }
ws.send(json.dumps(pong_json))
print("SENT:")
print(json.dumps(pong_json, sort_keys=True, indent=2, separators=(',', ':')))

# If our TRI stream is now open, we can start sending posts.
global next_post_time
if ('ID' in message_json and message_json['ID'] == 2 and next_post_time == 0 and
(not 'State' in message_json or message_json['State']['Stream'] == "Open" and message_json['State']['Data'] == "Ok")):
next_post_time = time.time() + 3


def process_login_response(ws, message_json):
""" Send item request """
send_market_price_request(ws)


def send_market_price_request(ws):
""" Create and send simple Market Price request """
mp_req_json = {
'ID': 2,
'Key': {
'Service': 'DTS',
'Name': '.AAAATEST'
},
}
ws.send(json.dumps(mp_req_json))
print("SENT:")
print(json.dumps(mp_req_json, sort_keys=True, indent=2, separators=(',', ':')))

def send_market_price_post(ws):
global post_id
""" Send a post message containing market-price content for TRI.N """
print("send_market_price_post")
time.sleep(1)
mp_post_json = {
'ID': 2,
'Type':'Post',
'Domain':'MarketPrice',
'Ack':True,
'PostID':post_id,
'PostUserInfo': {
'Address':position, # Use IP address as the Post User Address.
'UserID':os.getpid() # Use process ID as the Post User Id.
},
'Message': {
'ID': 0,
'Type':'Update',
'Domain':'MarketPrice',
'Fields':{'BID': 44.55,'ASK': 66.77}
}
}

time.sleep(1)
ws.send(json.dumps(mp_post_json))
print("Sending this :")
print(json.dumps(mp_post_json))
print("SENT POST:")
time.sleep(1)
print(json.dumps(mp_post_json, sort_keys=True, indent=2, separators=(',', ':')))
print("SENT POSTEND:")
post_id += 1

def send_login_request(ws):
""" Generate a login request from command line data (or defaults) and send """
login_json = {
'ID': 1,
'Domain': 'Login',
'Key': {
'Name': '',
'Elements': {
'ApplicationId': '',
'Position': ''
}
}
}

login_json['Key']['Name'] = user
login_json['Key']['Elements']['ApplicationId'] = app_id
login_json['Key']['Elements']['Position'] = position

ws.send(json.dumps(login_json))
print("SENT:")
print(json.dumps(login_json, sort_keys=True, indent=2, separators=(',', ':')))


def on_message(ws, message):
""" Called when message received, parse message into JSON for processing """
print("RECEIVED: ")
message_json = json.loads(message)
print("on_message")
print(json.dumps(message_json, sort_keys=True, indent=2, separators=(',', ':')))

for singleMsg in message_json:
process_message(ws, singleMsg)


def on_error(ws, error):
""" Called when websocket error has occurred """
print("on_error")
print(error)


def on_close(ws):
""" Called when websocket is closed """
global web_socket_open
print("WebSocket Closed")
web_socket_open = False


def on_open(ws):
""" Called when handshake is complete and websocket is open, send login """
print("on_open")
print("WebSocket successfully connected!")
global web_socket_open
web_socket_open = True
send_login_request(ws)


if __name__ == "__main__":

# Get command line parameters
try:
opts, args = getopt.getopt(sys.argv[1:], "", ["help", "hostname=", "port=", "app_id=", "user=", "position="])
except getopt.GetoptError:
print('Usage: market_price.py [--hostname hostname] [--port port] [--app_id app_id] [--user user] [--position position] [--help]')
sys.exit(2)
for opt, arg in opts:
if opt in ("--help"):
print('Usage: market_price.py [--hostname hostname] [--port port] [--app_id app_id] [--user user] [--position position] [--help]')
sys.exit(0)
elif opt in ("--hostname"):
hostname = arg
elif opt in ("--port"):
port = arg
elif opt in ("--app_id"):
app_id = arg
elif opt in ("--user"):
user = arg
elif opt in ("--position"):
position = arg

# Start websocket handshake
ws_address = "ws://{}:{}/WebSocket".format(hostname, port)
print("Connecting to WebSocket " + ws_address + " ...")
web_socket_app = websocket.WebSocketApp(ws_address, header=['User-Agent: Python'],
on_message=on_message,
on_error=on_error,
on_close=on_close,
subprotocols=['tr_json2'])
web_socket_app.on_open = on_open

# Event loop
wst = threading.Thread(target=web_socket_app.run_forever)
wst.start()
print("just started wst")
try:
while True:
time.sleep(1)
print("+++++++++++++++++++++++++++++")
if next_post_time != 0 and time.time() > next_post_time:
send_market_price_post(web_socket_app)
next_post_time = time.time() + 3
except KeyboardInterrupt:
web_socket_app.close()

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Upvote
16 1 1 2

I added the line *adh*routename.route*servicename*forwardOnStreamPostMsgKey to my cnf file, restarted the adhs and the code can now push data to the ATS service. I am using ADH 3.2.3 but this line is only documented from 3.3 onwards it seems. Well done Wasin for solving this problem.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Click below to post an Idea Post Idea