Dear Support,
I am using EMA Java with reactor core Flux to receive Reuter stock prices asynchronously, and it works for a while (this varies for an hour to a couple of days) but eventually I keep getting NullPointerException on EVERY item emitted. Looking at the stack trace, it seems to be coming from internal linkedlist's get() while iterating FieldListImpl inside Flux.flatMap. Below is the source code. As you can see, each FieldListImpl emitted should be accessed by single thread at any given point in time so I can't imagine having race condition while working with FieldListImpl. Could you please help figure out what could be the issue?
public Flux<FieldList> createPublisher(ReqMsg reqMsg) { return Flux.create(sink -> consumer.registerClient(reqMsg, new CommonOmmListener() { @Override public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent consumerEvent) { if (refreshMsg.payload().dataType() == DataType.DataTypes.SERIES) { if ("RWFFld".equals(refreshMsg.name())) { dataDictionary.decodeFieldDictionary(refreshMsg.payload().series(), EmaRdm.DICTIONARY_NORMAL); } else if ("RWFEnum".equals(refreshMsg.name())) { dataDictionary.decodeEnumTypeDictionary(refreshMsg.payload().series(), EmaRdm.DICTIONARY_NORMAL); } } else if (refreshMsg.payload().dataType() == DataType.DataTypes.FIELD_LIST) { sink.next(refreshMsg.payload().fieldList()); } } @Override public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent consumerEvent) { if (DataType.DataTypes.FIELD_LIST == updateMsg.payload().dataType()) { sink.next(updateMsg.payload().fieldList()); } } }) , FluxSink.OverflowStrategy.LATEST ); } ... private void registerClient(String stockCode, ReqMsg reqMsg, List<ReactiveAdapter> adapters) {
appClientFlux.createPublisher(reqMsg)
.filter(Objects::nonNull)
.flatMap(FieldEntryDecoder::decode) // NullPointerException occurs here
...
.subscribe();
}
...
public static Mono<Map<String, String>> decode(FieldList fieldList) { try { Map<String, String> hashMap = Maps.newHashMap(); for (FieldEntry fieldEntry : fieldList) { // NullPointerException occurs here Object fieldValue = null; int fieldId = fieldEntry.fieldId(); if (Data.DataCode.BLANK == fieldEntry.code()) { fieldValue = "blank"; } else { switch (fieldEntry.loadType()) { case DataType.DataTypes.REAL: fieldValue = fieldEntry.real().asDouble(); break; case DataType.DataTypes.DATE: fieldValue = fieldEntry.date().toString(); break; case DataType.DataTypes.TIME: fieldValue = fieldEntry.time().toString(); break; case DataType.DataTypes.INT: fieldValue = fieldEntry.intValue(); break; case DataType.DataTypes.UINT: fieldValue = fieldEntry.uintValue(); break; case DataType.DataTypes.ASCII: fieldValue = fieldEntry.ascii(); break; case DataType.DataTypes.ENUM: if (fieldEntry.hasEnumDisplay()) { fieldValue = fieldEntry.enumDisplay(); } break; case DataType.DataTypes.RMTES: fieldValue = fieldEntry.rmtes().toString(); break; case DataType.DataTypes.ERROR: log.error("FID: {}, Error Code: {}", fieldEntry.error().errorCode(), fieldEntry.error().errorCodeAsString()); break; default: break; } } hashMap.put(String.valueOf(fieldId), ofNullable(fieldValue).orElse("").toString()); } hashMap.put("timestamp", String.valueOf(System.currentTimeMillis())); return Mono.just(hashMap); } catch (Exception exception) { log.error("Error occurred while decoding field-list.", exception); } return Mono.empty(); }
Artifact versions in gradle.build (with Spring Boot 2.0.6.RELEASE):
implementation 'io.projectreactor:reactor-core:3.1.10.RELEASE' compile('com.thomsonreuters.ema:ema:3.2.0.2') { exclude group: 'org.slf4j'}implementation 'com.thomsonreuters.upa:upa:3.2.0.2'implementation 'com.thomsonreuters.upa.valueadd:upaValueAdd:3.2.0.2'implementation 'com.thomsonreuters.upa.valueadd.cache:upaValueAddCache:3.2.0.2'implementation 'com.thomsonreuters.upa.ansi:ansipage:3.2.0.2'
Below are the full stack traces. First I get #1 then keeps getting #2 on what seems to be EVERY item emitted because I don't receive any item at all from this Flux.
#1:
2018-12-07 09:00:28.288 ERROR [-,,,] 142729 --- [pool-2-thread-1] c.l.s.c.e.reuters.FieldEntryDecoder : Error occurred while decoding field-list. java.lang.NullPointerException: null at java.util.LinkedList$ListItr.next(LinkedList.java:893) at com.thomsonreuters.ema.access.EmaIterator.next(EmaIterator.java:30) at com.##.consumer.external.reuters.FieldEntryDecoder.decode(FieldEntryDecoder.java:22) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:347) at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:97) at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) at reactor.core.publisher.FluxCreate$LatestAsyncSink.drain(FluxCreate.java:870) at reactor.core.publisher.FluxCreate$LatestAsyncSink.next(FluxCreate.java:801) at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:151) at com.##consumer.daemon.AppClientFlux$1.onUpdateMsg(AppClientFlux.java:49) at com.thomsonreuters.ema.access.ItemCallbackClientConsumer.notifyOnUpdateMsg(ItemCallbackClient.java:2522) at com.thomsonreuters.ema.access.ItemCallbackClient.processUpdateMsg(ItemCallbackClient.java:1803) at com.thomsonreuters.ema.access.ItemCallbackClient.defaultMsgCallback(ItemCallbackClient.java:1607) at com.thomsonreuters.upa.valueadd.reactor.Reactor.sendDefaultMsgCallback(Reactor.java:808) at com.thomsonreuters.upa.valueadd.reactor.Reactor.sendAndHandleDefaultMsgCallback(Reactor.java:823) at com.thomsonreuters.upa.valueadd.reactor.WlItemHandler.callbackUser(WlItemHandler.java:2814) at com.thomsonreuters.upa.valueadd.reactor.WlItemHandler.readUpdateMsg(WlItemHandler.java:2153) at com.thomsonreuters.upa.valueadd.reactor.WlItemHandler.readMsg(WlItemHandler.java:1787) at com.thomsonreuters.upa.valueadd.reactor.Watchlist.readMsg(Watchlist.java:287) at com.thomsonreuters.upa.valueadd.reactor.Reactor.performChannelRead(Reactor.java:1758) at com.thomsonreuters.upa.valueadd.reactor.Reactor.dispatchChannel(Reactor.java:1389) at com.thomsonreuters.upa.valueadd.reactor.ReactorChannel.dispatch(ReactorChannel.java:469) at com.thomsonreuters.ema.access.OmmBaseImpl.rsslReactorDispatchLoop(OmmBaseImpl.java:1142) at com.thomsonreuters.ema.access.OmmBaseImpl.run(OmmBaseImpl.java:1275) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
#2:
2018-12-07 09:00:28.289 ERROR [-,,,] 142729 --- [pool-2-thread-1] c.l.s.c.e.reuters.FieldEntryDecoder : Error occurred while decoding field-list. java.lang.NullPointerException: null at java.util.LinkedList.get(LinkedList.java:477) at com.thomsonreuters.ema.access.FieldListImpl.clearCollection(FieldListImpl.java:559) at com.thomsonreuters.ema.access.FieldListImpl.fillCollection(FieldListImpl.java:329) at com.thomsonreuters.ema.access.FieldListImpl.iterator(FieldListImpl.java:99) at com.##.consumer.external.reuters.FieldEntryDecoder.decode(FieldEntryDecoder.java:22) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:347) at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:97) at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) at reactor.core.publisher.FluxCreate$LatestAsyncSink.drain(FluxCreate.java:870) at reactor.core.publisher.FluxCreate$LatestAsyncSink.next(FluxCreate.java:801) at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:151) at com.##.consumer.daemon.AppClientFlux$1.onUpdateMsg(AppClientFlux.java:49) at com.thomsonreuters.ema.access.ItemCallbackClientConsumer.notifyOnUpdateMsg(ItemCallbackClient.java:2522) at com.thomsonreuters.ema.access.ItemCallbackClient.processUpdateMsg(ItemCallbackClient.java:1803) at com.thomsonreuters.ema.access.ItemCallbackClient.defaultMsgCallback(ItemCallbackClient.java:1607) at com.thomsonreuters.upa.valueadd.reactor.Reactor.sendDefaultMsgCallback(Reactor.java:808) at com.thomsonreuters.upa.valueadd.reactor.Reactor.sendAndHandleDefaultMsgCallback(Reactor.java:823) at com.thomsonreuters.upa.valueadd.reactor.WlItemHandler.callbackUser(WlItemHandler.java:2814) at com.thomsonreuters.upa.valueadd.reactor.WlItemHandler.readUpdateMsg(WlItemHandler.java:2153) at com.thomsonreuters.upa.valueadd.reactor.WlItemHandler.readMsg(WlItemHandler.java:1787) at com.thomsonreuters.upa.valueadd.reactor.Watchlist.readMsg(Watchlist.java:287) at com.thomsonreuters.upa.valueadd.reactor.Reactor.performChannelRead(Reactor.java:1758) at com.thomsonreuters.upa.valueadd.reactor.Reactor.dispatchChannel(Reactor.java:1389) at com.thomsonreuters.upa.valueadd.reactor.ReactorChannel.dispatch(ReactorChannel.java:469) at com.thomsonreuters.ema.access.OmmBaseImpl.rsslReactorDispatchLoop(OmmBaseImpl.java:1142) at com.thomsonreuters.ema.access.OmmBaseImpl.run(OmmBaseImpl.java:1275) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)