The main in the code below is simulating a servlet call. I was aiming to connect to the servers once, then create/destroy a client for each call - as I believe that the client only needs to exist for the time of publishing.
When running the client, the latest price is published. However, each response is returning a additional ACKs all corresponding to the last Handle ID published. For example.
- 1st call = 1 ACK and HandleID 1
- 2nd call = 2 ACKs and all Handle ID 2
- 3rd call = 3 ACKs and all Handle ID 3 etc.
However, the ACK ID always the first Handle ID.
I was assuming that the would be some kind of 'unregisterClient' to deal with this but there is none in EMA. What am I not unregistering?
The values are publishing correctly, I am just concerned that there appears to be a leak.
Code is;
public class ReutersPublisherClientStandAloneTest2 implements OmmConsumerClient { public long bid, ask, postId; CountDownLatch counterDownLatch = new CountDownLatch(1); public ReutersPublisherClientStandAloneTest2(long bid, long ask) { this.bid = bid; this.ask = ask; } public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event) { // System.out.println("onRefreshMsg. Item Handle: " + event.handle() + " Closure: " + event.closure() + " Message: " + refreshMsg.toString()); if ( refreshMsg.domainType() == EmaRdm.MMT_LOGIN && refreshMsg.state().streamState() == OmmState.StreamState.OPEN && refreshMsg.state().dataState() == OmmState.DataState.OK ) { // #1 PostMsg postMsg = EmaFactory.createPostMsg(); UpdateMsg updateMessage = EmaFactory.createUpdateMsg(); FieldList nestedFieldList = EmaFactory.createFieldList(); nestedFieldList.add(EmaFactory.createFieldEntry().real(22, bid, OmmReal.MagnitudeType.EXPONENT_NEG_4)); nestedFieldList.add(EmaFactory.createFieldEntry().real(25, ask, OmmReal.MagnitudeType.EXPONENT_NEG_4)); updateMessage.payload(nestedFieldList); ((OmmConsumer) event.closure()).submit( postMsg .postId(postId++) .serviceName(WRITE_SERVICE) .name(RIC) .solicitAck(true) .complete(true) .payload(updateMessage) , event.handle() ); } // #1 -end } public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent event) { // System.out.println("onUpdateMsg. Item Handle: " + event.handle() + " Closure: " + event.closure() + " Message: " + updateMsg.toString()); } public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent event) { // System.out.println("onStatusMsg. Item Handle: " + event.handle() + " Closure: " + event.closure() + " Message: " + statusMsg.toString()); } public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent event) { System.out.println("onAckMsg. Item Handle: " + event.handle() + " Closure: " + event.closure() + " Message: " + ackMsg.toString()); counterDownLatch.countDown(); } public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent event){} public void onAllMsg(Msg msg, OmmConsumerEvent event){} public static String READ_SERVICE = "IDN_RDF"; public static String WRITE_SERVICE = "MARKETLINK"; public static String RIC = "XXX=YYY"; public static void main(String[] args) throws Exception { OmmConsumer consumer = EmaFactory.createOmmConsumer( EmaFactory .createOmmConsumerConfig("C:/cfg") .username("user") ); long bid = 35000, ask = 35100; for (int i = 0; i < 3; i++) { System.out.println("Sending ============ " + bid + " " + ask); ReutersPublisherClientStandAloneTest2 appClient = new ReutersPublisherClientStandAloneTest2(bid += 100, ask += 100); // #2 ReqMsg reqMsg = EmaFactory.createReqMsg(); long handle1 = consumer.registerClient(reqMsg.domainType( EmaRdm.MMT_LOGIN), appClient, consumer); long handle2 = consumer.registerClient(reqMsg.clear().serviceName(READ_SERVICE).name(RIC).interestAfterRefresh(false), appClient, consumer); // #3 appClient.counterDownLatch.await(); consumer.unregister(handle1); // #5 consumer.unregister(handle2); // #6 Thread.sleep(5000); } if (consumer != null) consumer.uninitialize(); } }