question

Upvotes
Accepted
26 2 1 4

Too Many ACKs on Off-Line Posting Consumer

The main in the code below is simulating a servlet call. I was aiming to connect to the servers once, then create/destroy a client for each call - as I believe that the client only needs to exist for the time of publishing.

When running the client, the latest price is published. However, each response is returning a additional ACKs all corresponding to the last Handle ID published. For example.

  • 1st call = 1 ACK and HandleID 1
  • 2nd call = 2 ACKs and all Handle ID 2
  • 3rd call = 3 ACKs and all Handle ID 3 etc.

However, the ACK ID always the first Handle ID.

I was assuming that the would be some kind of 'unregisterClient' to deal with this but there is none in EMA. What am I not unregistering?

The values are publishing correctly, I am just concerned that there appears to be a leak.

Code is;

public class ReutersPublisherClientStandAloneTest2 implements OmmConsumerClient {

    public long bid, ask, postId;
    CountDownLatch counterDownLatch = new CountDownLatch(1);
    
    public ReutersPublisherClientStandAloneTest2(long bid, long ask) {
        this.bid = bid;
        this.ask = ask;
    }


    public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)    {
        
//        System.out.println("onRefreshMsg. Item Handle: " + event.handle() + " Closure: " + event.closure() + " Message: " + refreshMsg.toString());
        
        if ( refreshMsg.domainType() == EmaRdm.MMT_LOGIN && 
                refreshMsg.state().streamState() == OmmState.StreamState.OPEN &&
                refreshMsg.state().dataState() == OmmState.DataState.OK )
        {
// #1
                PostMsg postMsg = EmaFactory.createPostMsg();
                UpdateMsg updateMessage = EmaFactory.createUpdateMsg();
                FieldList nestedFieldList = EmaFactory.createFieldList();
                
                nestedFieldList.add(EmaFactory.createFieldEntry().real(22, bid, OmmReal.MagnitudeType.EXPONENT_NEG_4));
                nestedFieldList.add(EmaFactory.createFieldEntry().real(25, ask, OmmReal.MagnitudeType.EXPONENT_NEG_4));
                
                updateMessage.payload(nestedFieldList);
                
                ((OmmConsumer) event.closure()).submit( 
                    postMsg
                        .postId(postId++)
                        .serviceName(WRITE_SERVICE)
                        .name(RIC)
                        .solicitAck(true)
                        .complete(true)
                        .payload(updateMessage)
                    , event.handle() 
                );
        }
// #1 -end
    }
        
    public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent event) {
//        System.out.println("onUpdateMsg. Item Handle: " + event.handle() + " Closure: " + event.closure() + " Message: " + updateMsg.toString());
    }

    public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent event) {
//        System.out.println("onStatusMsg. Item Handle: " + event.handle() + " Closure: " + event.closure() + " Message: " + statusMsg.toString());
    }
        
    public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent event)    {
        System.out.println("onAckMsg. Item Handle: " + event.handle() + " Closure: " + event.closure() + " Message: " + ackMsg.toString());
        counterDownLatch.countDown();
    }
        
    public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent event){}
    public void onAllMsg(Msg msg, OmmConsumerEvent event){}


    public static String READ_SERVICE = "IDN_RDF";
    public static String WRITE_SERVICE = "MARKETLINK";
    public static String RIC = "XXX=YYY";
        
    public static void main(String[] args) throws Exception    {
        OmmConsumer    consumer = EmaFactory.createOmmConsumer(
            EmaFactory
                .createOmmConsumerConfig("C:/cfg")
                .username("user")
        );

        long bid = 35000, ask = 35100;
        for (int i = 0; i < 3; i++) {
            System.out.println("Sending ============ " + bid + " " + ask);
            ReutersPublisherClientStandAloneTest2 appClient = new ReutersPublisherClientStandAloneTest2(bid += 100, ask += 100); // #2
            ReqMsg reqMsg = EmaFactory.createReqMsg();
            long handle1 = consumer.registerClient(reqMsg.domainType( EmaRdm.MMT_LOGIN), appClient, consumer);
            long handle2 = consumer.registerClient(reqMsg.clear().serviceName(READ_SERVICE).name(RIC).interestAfterRefresh(false), appClient, consumer); // #3
            appClient.counterDownLatch.await();
            consumer.unregister(handle1); // #5
            consumer.unregister(handle2); // #6
            Thread.sleep(5000);
        }

        if (consumer != null) consumer.uninitialize();
    }
    
}
elektronrefinitiv-realtimeelektron-sdkrrtema-apielektron-message-api
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

1 Answer

Upvotes
Accepted
20.3k 73 10 20

Hi @anthony.leon

I cannot comment on the additional ACKs without testing - and may require you to create a Support ticket for offline investigation.

However, one thing I can point out is that your current implementation is not ideal.

Each time you registerClient for MMT_LOGIN - you are sending a Login request to the server and then you are logging out again when you unregisterClient.

If your servlet requirement allows it, I would just do the registerClient one time and when the initial Login Refresh is received, I would grab the handle - hold on to it and just use it as and when I want to carry out an off-stream post. Only unregistering the handle when you are finished.

You don't have to post from within the onRefreshMsg call - as long as you are still logged in, you can use the Login Handle to post. So, you should handle the onStatusMsg for the Login - just in case you get logged out for whatever reason by the server (e.g. disconnect).


icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Having the login run just once solved it.. I also didn't release that one could place the submit() outside the OmmConsumerClient. I did that too. I just blindly copied the example assuming that the submit() had to be within the OmmConsumerClient (maybe for threading reasons).

Hi @anthony.leon

No worries - you are not the only one to think this. We have to keep our examples relatively simple and hence they may not always reflect all the possibilities.

We are hoping to do a hints + tips newsletter in the future and hopefully, we can include things like this in the newsletter....

Click below to post an Idea Post Idea