question

Upvotes
Accepted
5 0 0 0

EMA Java: NullPointerException on FieldListImpl when using Flux

Dear Support,

I am using EMA Java with reactor core Flux to receive Reuter stock prices asynchronously, and it works for a while (this varies for an hour to a couple of days) but eventually I keep getting NullPointerException on EVERY item emitted. Looking at the stack trace, it seems to be coming from internal linkedlist's get() while iterating FieldListImpl inside Flux.flatMap. Below is the source code. As you can see, each FieldListImpl emitted should be accessed by single thread at any given point in time so I can't imagine having race condition while working with FieldListImpl. Could you please help figure out what could be the issue?

public Flux<FieldList> createPublisher(ReqMsg reqMsg) {
    return Flux.create(sink ->
                    consumer.registerClient(reqMsg, new CommonOmmListener() {
                        @Override
                        public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent consumerEvent) {
                            if (refreshMsg.payload().dataType() == DataType.DataTypes.SERIES) {
                                if ("RWFFld".equals(refreshMsg.name())) {
                                    dataDictionary.decodeFieldDictionary(refreshMsg.payload().series(), EmaRdm.DICTIONARY_NORMAL);
                                } else if ("RWFEnum".equals(refreshMsg.name())) {
                                    dataDictionary.decodeEnumTypeDictionary(refreshMsg.payload().series(), EmaRdm.DICTIONARY_NORMAL);
                                }
                            } else if (refreshMsg.payload().dataType() == DataType.DataTypes.FIELD_LIST) {
                                sink.next(refreshMsg.payload().fieldList());
                            }
                        }

                        @Override
                        public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent consumerEvent) {
                            if (DataType.DataTypes.FIELD_LIST == updateMsg.payload().dataType()) {
                                sink.next(updateMsg.payload().fieldList());
                            }
                        }
                    })
            , FluxSink.OverflowStrategy.LATEST
    );
}
...
private void registerClient(String stockCode, ReqMsg reqMsg, List<ReactiveAdapter> adapters) {
  
appClientFlux.createPublisher(reqMsg)
.filter(Objects::nonNull)
.flatMap(FieldEntryDecoder::decode) // NullPointerException occurs here
...
.subscribe();
}
...
public static Mono<Map<String, String>> decode(FieldList fieldList) {
    try {
        Map<String, String> hashMap = Maps.newHashMap();
        for (FieldEntry fieldEntry : fieldList) { // NullPointerException occurs here
            Object fieldValue = null;
            int fieldId = fieldEntry.fieldId();
            if (Data.DataCode.BLANK == fieldEntry.code()) {
                fieldValue = "blank";
            } else {
                switch (fieldEntry.loadType()) {
                    case DataType.DataTypes.REAL:
                        fieldValue = fieldEntry.real().asDouble();
                        break;
                    case DataType.DataTypes.DATE:
                        fieldValue = fieldEntry.date().toString();
                        break;
                    case DataType.DataTypes.TIME:
                        fieldValue = fieldEntry.time().toString();
                        break;
                    case DataType.DataTypes.INT:
                        fieldValue = fieldEntry.intValue();
                        break;
                    case DataType.DataTypes.UINT:
                        fieldValue = fieldEntry.uintValue();
                        break;
                    case DataType.DataTypes.ASCII:
                        fieldValue = fieldEntry.ascii();
                        break;
                    case DataType.DataTypes.ENUM:
                        if (fieldEntry.hasEnumDisplay()) {
                            fieldValue = fieldEntry.enumDisplay();
                        }
                        break;
                    case DataType.DataTypes.RMTES:
                        fieldValue = fieldEntry.rmtes().toString();
                        break;
                    case DataType.DataTypes.ERROR:
                        log.error("FID: {}, Error Code: {}", fieldEntry.error().errorCode(), fieldEntry.error().errorCodeAsString());
                        break;
                    default:
                        break;
                }
            }
            hashMap.put(String.valueOf(fieldId), ofNullable(fieldValue).orElse("").toString());
        }
        hashMap.put("timestamp", String.valueOf(System.currentTimeMillis()));
        return Mono.just(hashMap);
    } catch (Exception exception) {
        log.error("Error occurred while decoding field-list.", exception);
    }
    return Mono.empty();
}

Artifact versions in gradle.build (with Spring Boot 2.0.6.RELEASE):

implementation 'io.projectreactor:reactor-core:3.1.10.RELEASE'
compile('com.thomsonreuters.ema:ema:3.2.0.2') {    exclude group: 'org.slf4j'}implementation 'com.thomsonreuters.upa:upa:3.2.0.2'implementation 'com.thomsonreuters.upa.valueadd:upaValueAdd:3.2.0.2'implementation 'com.thomsonreuters.upa.valueadd.cache:upaValueAddCache:3.2.0.2'implementation 'com.thomsonreuters.upa.ansi:ansipage:3.2.0.2'

Below are the full stack traces. First I get #1 then keeps getting #2 on what seems to be EVERY item emitted because I don't receive any item at all from this Flux.

#1:

2018-12-07 09:00:28.288 ERROR [-,,,] 142729 --- [pool-2-thread-1] c.l.s.c.e.reuters.FieldEntryDecoder : Error occurred while decoding field-list. java.lang.NullPointerException: null at java.util.LinkedList$ListItr.next(LinkedList.java:893) at com.thomsonreuters.ema.access.EmaIterator.next(EmaIterator.java:30) at com.##.consumer.external.reuters.FieldEntryDecoder.decode(FieldEntryDecoder.java:22) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:347) at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:97) at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) at reactor.core.publisher.FluxCreate$LatestAsyncSink.drain(FluxCreate.java:870) at reactor.core.publisher.FluxCreate$LatestAsyncSink.next(FluxCreate.java:801) at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:151) at com.##consumer.daemon.AppClientFlux$1.onUpdateMsg(AppClientFlux.java:49) at com.thomsonreuters.ema.access.ItemCallbackClientConsumer.notifyOnUpdateMsg(ItemCallbackClient.java:2522) at com.thomsonreuters.ema.access.ItemCallbackClient.processUpdateMsg(ItemCallbackClient.java:1803) at com.thomsonreuters.ema.access.ItemCallbackClient.defaultMsgCallback(ItemCallbackClient.java:1607) at com.thomsonreuters.upa.valueadd.reactor.Reactor.sendDefaultMsgCallback(Reactor.java:808) at com.thomsonreuters.upa.valueadd.reactor.Reactor.sendAndHandleDefaultMsgCallback(Reactor.java:823) at com.thomsonreuters.upa.valueadd.reactor.WlItemHandler.callbackUser(WlItemHandler.java:2814) at com.thomsonreuters.upa.valueadd.reactor.WlItemHandler.readUpdateMsg(WlItemHandler.java:2153) at com.thomsonreuters.upa.valueadd.reactor.WlItemHandler.readMsg(WlItemHandler.java:1787) at com.thomsonreuters.upa.valueadd.reactor.Watchlist.readMsg(Watchlist.java:287) at com.thomsonreuters.upa.valueadd.reactor.Reactor.performChannelRead(Reactor.java:1758) at com.thomsonreuters.upa.valueadd.reactor.Reactor.dispatchChannel(Reactor.java:1389) at com.thomsonreuters.upa.valueadd.reactor.ReactorChannel.dispatch(ReactorChannel.java:469) at com.thomsonreuters.ema.access.OmmBaseImpl.rsslReactorDispatchLoop(OmmBaseImpl.java:1142) at com.thomsonreuters.ema.access.OmmBaseImpl.run(OmmBaseImpl.java:1275) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

#2:

2018-12-07 09:00:28.289 ERROR [-,,,] 142729 --- [pool-2-thread-1] c.l.s.c.e.reuters.FieldEntryDecoder : Error occurred while decoding field-list. java.lang.NullPointerException: null at java.util.LinkedList.get(LinkedList.java:477) at com.thomsonreuters.ema.access.FieldListImpl.clearCollection(FieldListImpl.java:559) at com.thomsonreuters.ema.access.FieldListImpl.fillCollection(FieldListImpl.java:329) at com.thomsonreuters.ema.access.FieldListImpl.iterator(FieldListImpl.java:99) at com.##.consumer.external.reuters.FieldEntryDecoder.decode(FieldEntryDecoder.java:22) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:347) at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:97) at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) at reactor.core.publisher.FluxCreate$LatestAsyncSink.drain(FluxCreate.java:870) at reactor.core.publisher.FluxCreate$LatestAsyncSink.next(FluxCreate.java:801) at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:151) at com.##.consumer.daemon.AppClientFlux$1.onUpdateMsg(AppClientFlux.java:49) at com.thomsonreuters.ema.access.ItemCallbackClientConsumer.notifyOnUpdateMsg(ItemCallbackClient.java:2522) at com.thomsonreuters.ema.access.ItemCallbackClient.processUpdateMsg(ItemCallbackClient.java:1803) at com.thomsonreuters.ema.access.ItemCallbackClient.defaultMsgCallback(ItemCallbackClient.java:1607) at com.thomsonreuters.upa.valueadd.reactor.Reactor.sendDefaultMsgCallback(Reactor.java:808) at com.thomsonreuters.upa.valueadd.reactor.Reactor.sendAndHandleDefaultMsgCallback(Reactor.java:823) at com.thomsonreuters.upa.valueadd.reactor.WlItemHandler.callbackUser(WlItemHandler.java:2814) at com.thomsonreuters.upa.valueadd.reactor.WlItemHandler.readUpdateMsg(WlItemHandler.java:2153) at com.thomsonreuters.upa.valueadd.reactor.WlItemHandler.readMsg(WlItemHandler.java:1787) at com.thomsonreuters.upa.valueadd.reactor.Watchlist.readMsg(Watchlist.java:287) at com.thomsonreuters.upa.valueadd.reactor.Reactor.performChannelRead(Reactor.java:1758) at com.thomsonreuters.upa.valueadd.reactor.Reactor.dispatchChannel(Reactor.java:1389) at com.thomsonreuters.upa.valueadd.reactor.ReactorChannel.dispatch(ReactorChannel.java:469) at com.thomsonreuters.ema.access.OmmBaseImpl.rsslReactorDispatchLoop(OmmBaseImpl.java:1142) at com.thomsonreuters.ema.access.OmmBaseImpl.run(OmmBaseImpl.java:1275) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

elektronrefinitiv-realtimeelektron-sdkrrtema-apielektron-message-api
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Upvotes
Accepted
9.5k 10 5 7

Hello @791ee348-0190-4aab-b009-7b3d6875ca84

Have you tested with the lastest Elektron-SDK version, 1.2.2 yet?

If the problem still occurs with the latest version, to get the problem investigated, the best approach is to contact Elektron-SDK development team who can help you to fix the problem. Since the problem is complex, investigating the problem, analyzing EMA/ETA Java source code in depth by Elektron-SDK experts (the development team) are required. You can submit your problem to the development team directly via Elektron-SDK Github then click "New issue" button. This way makes you can contact and follow up with the development team directly.

Moreover, the forum is suitable for asking general, how to or non-complex problem.

Hope this help.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Upvotes
5 0 0 0

Yes, I have tried EMA version 3.2.2.0 but still same issue. Let me post it on github. Thanks!

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Upvotes
26 2 1 4

I have a similar issue on 3.2.2.0 using similar code to the above.

In my case the `map` just got lost. `System.out.println("XXX" + map + "XXX")` produced an empty line (not even the "XXX" was printed !!).

I removed the Flux code and just used a `BlockingQueue` and had a separate `Thread` to `take()` but still had the same issue (lost map).

In the `FieldList` Iterator, I tried `new()` on every value copied from the `payload` ... same issue (lost map).

I put a `CountDownLatch` before and after the iterator so that the `add` to the queue would wait ... same issue.

Instead of parsing to payload I am queuing the `toString()` of the original message.

When I `toString()` with a `BlockingQueue` or to `Flux` the message is not lost.

So I assume that there is something unsafe about the iterating `FieldList` and not a problem with `Flux`. For now, I am parsing the `toString()` to capture.

Edit: I forgot to add that;

This occurs on ItemRefresh on as little as 2 RIC subscriptions. When I subscribe to 1 RIC, the issue does not occur. On requesting >= 1, random ItemRefreshes are lost and others are parsed.

The issue also doesn't seem to occur on ItemUpdates. So I guess that is it related to the far larger number of fields which need to be iterated in the ItemRefresh.

I also suspected that it may have been connected to the dictionary download not being complete before the ItemRefresh was being parsed. So I changed my configuration to read the dictionary from disk, but the issue still occurs.

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Click below to post an Idea Post Idea