After cloning of onUpdateMsg my memory utilization is very high it is going up to 16gb. Below is my code of cloning update message. Is there anything wrong ?
Thank you!
After cloning of onUpdateMsg my memory utilization is very high it is going up to 16gb. Below is my code of cloning update message. Is there anything wrong ?
Thank you!
Hello @MayassK
You use EmaFactory.createUpdateMsg(other) to clone all messages and decode them inside the callback method i.e. onUpdateMsg(..). This can causes memory growth problem which you are facing. The goal of cloning message is to be able to decode the message's payload outside of message callback methods.
To protect memory growth problem, you should minimize the time spent processing incoming data in the callback methods i.e. onRefreshMsg(..) and onUpdateMsg(..). Avoid making database and/or other blocking calls in the methods. The application can use an additional thread to process the received messages separately. Therefore, EMA thread spends less time in the callback methods for each data messages and it can handle the huge data messages in a timely manner. You can use the cloning functionality for message types to copy the entire encoded buffer to another objects. This would help to cache and process the EMA message types outside of the callback methods e.g. onRefreshMsg(..), onUpdateMsg(..),onStatusMsg(..). com.thomsonreuters.ema.access.EmaFactory class provides the following methods to create a clone of message:
Hope this help.
Thank you Pimchaya for your response!
I followed the below example provided on GIT for cloning
For database calls i am using additional class threads which are processing separately.
Can you please provide me a relevant cloning example?
Should I start a separate thread like below?
public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent event) {
try {
UpdateMsg cloneUpdateMsg = EmaFactory.createUpdateMsg(updateMsg);
new decode(cloneUpdateMsg).start();
} catch (Exception e) {
loggerInfo.error("on update MSG error" + e.getMessage());
}
}
Hello @MayassK
Yes. You should start a separate thread after cloning the message. However, a new thread should not be created every time an update message is received. Your CPU only maybe 4 or 8 cores, so it can't run more than that many threads at the same time anyway. It's not useful to have many threads e.g. 15 because most of them will be waiting until a CPU core is free to run them. Each thread has a call stack which spends memory. That's mean more threads use more memory.
You should use a thread pool instead. Java has a whole framework for thread pools in the java.util.concurrent package e.g. ExecutorService which can be created and limited the number of threads using Executors class.
Hi @MayassK
To manage memory when you clone messages, you need to ensure the specific messages you are cloning are unreachable after you are done with them to ensure they are eligible for Java garbage collection. If you are following these guidelines and memory is still peaking, you may need to manually force garbage collection (System.gc() / Runtime.getRuntime().gc()) periodically to reduce the memory stress.
Thank you Pimchaya and Nick,
Now i am using thread pool to call the decode method.
But i am getting blank value for cloneUpdateMsg object in decode method.
Is this the correct way i am following in below code
public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent event) {
try {
UpdateMsg cloneUpdateMsg = EmaFactory.createUpdateMsg(updateMsg);
Runnable decodeUpdate =new ProcessDecode(updateMsg);
this.pool.execute(decodeUpdate);
} catch (Exception e) {
loggerInfo.error("on update MSG error" + e.getMessage());
}
}
class ProcessDecode implements Runnable{
UpdateMsg cloneUpdateMsg;
public ProcessDecode(UpdateMsg updateMsg) {
this.cloneUpdateMsg=updateMsg;
}
public void run() {
if (DataType.DataTypes.FIELD_LIST == cloneUpdateMsg.payload().dataType() && cloneUpdateMsg.hasName() && cloneUpdateMsg.hasServiceName()) {
decode(cloneUpdateMsg.payload().fieldList(), cloneUpdateMsg);
}
}
}
Hello @MayassK
Runnable decodeUpdate =new ProcessDecode(updateMsg);
The source code above is to set update message not the clone of update message for a thread. The source code should be:
Runnable decodeUpdate =new ProcessDecode(cloneUpdateMsg);
This above source code is to set the clone of update message for the thread.