question

Upvotes
Accepted
35 3 3 9

How to send directory messages with no services

Hi,

I'm developing an EMA C++ IProvider that publishes services which creation depends on the availability of external data sources.

Those sources and their names are not under the provider control.

To do so, I'm managing the requests on the directory domain at application level, as described in various samples of the EMA SDK.

I'm trying to cover also the corner case where the IProvider can find itself with no services available.

To understand how to build a refresh message on the directory domain with no services, I've used one of the IProvider samples of the SDK where the directory requests are managed by EMA itself, then I removed all the configured services from the IProvider configuration, to learn how EMA behaves in case of any incoming directory requests.

Surprisingly, I've discovered that in case of no configured services, EMA adds always a "DIRECT_FEED" service in the in memory configuration.

The service is created "on the fly" in the EMA method

thomsonreuters::ema::access::DirectoryServiceStore::loadConfigDirectory(...)

Could you explain this behavior? Should I do the same in my application?

Otherwise, what is the right way to signal no services available? Avoid the OmmProvider creation, avoid to reply at all to any directory requests, or reply in a specific way?

I tried to follow the approach documented to reply to an unknown service request, but it doesn't seem to work: the consumer requests to get all the available services go in timeout.

I'm referring to this excerpt of the RDM Usage Guide (pag. 54):

... If the requested service name or service ID is not available, EMA should send a service directory containing an empty map entry in the payload.

If the service becomes available later, the client receives an update message which contains the required service information.

More in details, I tried to build a message as below, please help me to understand if it is correct (I need to send it also to reply to unknown services in case of punctual requests)

event.getProvider().submit(RefreshMsg().domainType(MMT_DIRECTORY).filter(SERVICE_INFO_FILTER | SERVICE_STATE_FILTER).solicited(true).payload(Map().addKeyUInt(0, MapEntry::AddEnum, FilterList().complete()).complete()).complete(), event.getHandle());


Thanks

Best Regards,

Paolo

elektronrefinitiv-realtimeelektron-sdkrrtema-apielektron-message-api
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Upvotes
Accepted
35 3 3 9

Hi,

if adding also the SERVICE_INFO_FILTER and content, I get an exception on the consumer EMA while de-serializesthe message: the consumer api tries to notify a null UpdateMsg (DirectoryCallbackClient.cpp::1206).

Would it be possible to have a working example of provider sending Updates on the directory domain?

Thanks

Best Regards,

Paolo


Please find some details below about this new test:

Provider message body:

UpdateMsg msg; msg.domainType(MMT_DIRECTORY).filter(SERVICE_INFO_FILTER | SERVICE_STATE_FILTER). payload(Map(). addKeyUInt(2, MapEntry::AddEnum, FilterList(). add(SERVICE_INFO_ID, FilterEntry::UpdateEnum, ElementList(). addAscii(ENAME_NAME, "DIRECT_FEED"). addArray(ENAME_CAPABILITIES, OmmArray(). addUInt(MMT_MARKET_PRICE). addUInt(MMT_MARKET_BY_PRICE). complete()). addArray(ENAME_DICTIONARYS_USED, OmmArray(). addAscii("RWFFld"). addAscii("RWFEnum"). complete()). complete()). add(SERVICE_STATE_ID, FilterEntry::UpdateEnum, ElementList(). addUInt(ENAME_SVC_STATE, i % 2 ? SERVICE_UP : SERVICE_DOWN). addUInt(ENAME_ACCEPTING_REQS, i % 2 ? SERVICE_YES : SERVICE_NO). complete()). complete()). complete()); provider.submit(msg, 0);

Consumer message received in attachmentEmaTrace.txt


ematrace.txt (7.1 KiB)
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Hi Paolo,

Which version of EMA you are using? I have tried your code on IProvider_331 and Consumer_331. The Consumer application can received service status properly even with/without SERVICE_INFO_FILTER. I use ESDK 1.3.0 version.

Also, according to EMA RDM Usage Guide, the SERVICE_INFO_FILTER should be included when the connected consumer is ADH.

Upvotes
11.3k 25 8 13

Hi @Paolo Parlapiano,

I have tested the same scenario on ADS. ADS will send Directory response containing empty MapEntry in payload, when there is no available service on ADS. Below is the sample of message.

<refreshMsg domainType="RSSL_DMT_SOURCE" streamId="2" containerType="RSSL_DT_MAP" flags="0x168 (RSSL_RFMF_HAS_MSG_KEY|RSSL_RFMF_SOLICITED|RSSL_RFMF_REFRESH_COMPLETE|RSSL_RFMF_CLEAR_CACHE)" groupId="0" dataState="RSSL_DATA_OK" streamState="RSSL_STREAM_OPEN" code="RSSL_SC_NONE" text=""  dataSize="5">
    <key  flags="0x8 (RSSL_MKF_HAS_FILTER)"  filter="63"/>
    <dataBody>
        <map flags="0x0" countHint="0" keyPrimitiveType="RSSL_DT_UINT" containerType="RSSL_DT_FILTER_LIST" >
        </map>
    </dataBody>
</refreshMsg>

However, when I tried to create the same Directory message, EMA returns invalid usage exception.

    event.getProvider().submit(RefreshMsg().domainType(MMT_DIRECTORY).filter(SERVICE_INFO_FILTER | SERVICE_STATE_FILTER).
        payload(Map().complete()).solicited(true), event.getHandle());
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Hi @Paolo Parlapiano,

I have been explained that although EMA allows encoding of empty Maps in general, encoding of SourceDirectory Message has a few restrictions, as a result, the code snippet I am using will not work.

The suggested directory message with no service is similar to your message you built in your question. Below is the sample code.

    event.getProvider().submit(RefreshMsg().domainType(MMT_DIRECTORY).payload(Map().addKeyUInt(1, MapEntry::AddEnum,FilterList().complete()).complete()).complete(), event.getHandle());

Message:

<refreshMsg domainType="RSSL_DMT_SOURCE" streamId="2" containerType="RSSL_DT_MAP" flags="0x40 SSL_RFMF_REFRESH_COMPLETE)" groupId="0" dataState="RSSL_DATA_OK" streamState="RSSL_STREAM_OPEN" code="RSSL_SC_NONE" text="" dataSize="12">
    <dataBody>
        <map flags="0x0" countHint="0" keyPrimitiveType="RSSL_DT_UINT" containerType="RSSL_DT_FILTER_LIST" >
            <mapEntry flags="0x0" action="RSSL_MPEA_ADD_ENTRY" key="1" >
                <filterList containerType="RSSL_DT_NO_DATA" countHint="0" flags="0x0">
                </filterList>
            </mapEntry>
        </map>
    </dataBody>
Upvotes
20.3k 73 10 20

Hi @Paolo Parlapiano

It is possible you may have discovered a bug in the API and I am sure my colleague Veerapath will deal with that scenario accordingly.

However, as side step - I just wanted to understand your required usage scenario. You mention that the availability of service(s) depends on the availability of external sources.

Is this non-availability of external sources typically transient in nature or longer-term/unlikely to change in the (near) future?

I ask because if it is of a transient nature, then perhaps publishing an empty Directory message may not be the best approach - depending on your implementation requirements...

As you may be aware, when populating a Directory message entry for a particular service, you can set the ServiceState and AcceptingRequests elements in the State filter entry work together to indicate the ability of a particular service to provide data:

• ServiceState indicates whether the source of the data is accepting requests.

• AcceptingRequests indicates whether the immediate upstream provider (the provider to which the consumer is directly connected) can accept new requests. If False, new requests are rejected while existing streams remain unaffected (reissue requests can still be made for any item streams that are currently open to the provider).

So, if it is a transient situation, you could set ServiceState:Down and AcceptingRequests:No - in the short term, until such a time as when the external source does become available - at which point you would change these values accordingly (e.g. Up & Yes)

If the non-availability of external sources is longer-term in nature, then it would be more appropriate to publish an empty directory - once the exception issue is resolved.


1568715842157.png (44.7 KiB)
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Upvotes
35 3 3 9

Hi @Umer Nalla,

thanks for the provided details.
To answer to your question: apart from "long term" misconfigurations of the provider, this non-availability of external sources is a transient condition.
Basically the provider can be also configured at runtime to add services, therefore theoretically I can have the case of startup with no services at all, I just wanted to cover it.
Apart from this, I've also investigated the ServiceState and AcceptingRequests flags that you are referring to, as they could be useful to me anyway.
However, when using them I've experienced another issue. Thinking about a possible misuse of the API, I didn't investigate further, but know I'd like to put here are my findings.

To test the programmatic usage of the ServiceState and AcceptingRequests flags, I've setup a scenario as follows:

  • IProvider, handling directory requests for a single service
  • Consumer, loading the dictionary from files, and handling directory messages in the application code (dictionary loaded from files so that I could play with the AcceptingRequests flag without causing the OmmConsumer to be unable to be created)

Aim: test my provider and look closer to the directory messages received on consumer side. Result: refresh messages on the Directory domain are correctly received by the consumer, however further updates are not notified.
Note: enabling the log tracing on the consumer (XmlTraceToFile), I can see the updates on the directory domain.

I'm pretty sure that I'm doing something wrong here, perhaps the UpdateMsgs are built incorrectly?
Please find an excerpt of the provider (baseline is IProv331) and an excerpt of the consumer.

Thank you for your help
Best Regards,

Paolo

Provider relevant code

void AppClient::processDirectoryRequest(const ReqMsg&, const OmmProviderEvent& event)
{
	event.getProvider().submit(RefreshMsg().domainType(MMT_DIRECTORY).filter(SERVICE_INFO_FILTER | SERVICE_STATE_FILTER).
		payload(Map().
			addKeyUInt(2, MapEntry::AddEnum, FilterList().
				add(SERVICE_INFO_ID, FilterEntry::SetEnum, ElementList().
					addAscii(ENAME_NAME, "DIRECT_FEED").
					addArray(ENAME_CAPABILITIES, OmmArray().
						addUInt(MMT_MARKET_PRICE).
						addUInt(MMT_MARKET_BY_PRICE).
						complete()).
					addArray(ENAME_DICTIONARYS_USED, OmmArray().
						addAscii("RWFFld").
						addAscii("RWFEnum").
						complete()).
					complete()).
				add(SERVICE_STATE_ID, FilterEntry::SetEnum, ElementList().
					addUInt(ENAME_SVC_STATE, SERVICE_UP).
					addUInt(ENAME_ACCEPTING_REQS, SERVICE_NO).
					complete()).
				complete()).
			complete()).complete().solicited(true), event.getHandle());

	directoryRequestReceived = true;
}
int main()
{
	try
	{
		AppClient appClient;
		OmmProvider provider( OmmIProviderConfig().adminControlDirectory( OmmIProviderConfig::UserControlEnum ), appClient );
		while (directoryRequestReceived == false) sleep(1000);
		for (Int32 i = 0; i < 10; i++)
		{
			provider.submit(UpdateMsg().domainType(MMT_DIRECTORY).filter(SERVICE_INFO_FILTER | SERVICE_STATE_FILTER).
				payload(Map().
					addKeyUInt(2, MapEntry::UpdateEnum, FilterList().
						add(SERVICE_STATE_ID, FilterEntry::UpdateEnum, ElementList().
							addUInt(ENAME_SVC_STATE, i % 2 ? SERVICE_UP : SERVICE_DOWN).
							addUInt(ENAME_ACCEPTING_REQS, i % 2 ? SERVICE_YES : SERVICE_NO).
							complete()).
						complete()).
					complete()), 0);
			sleep(1000);
		}
	}
	catch ( const OmmException& excp )
	{
		cout << excp << endl;
	}
	return 0;
}

Consumer relevant code

void ServiceNotifier::onRefreshMsg(const RefreshMsg& refreshMsg, const OmmConsumerEvent& ommConsEvent)
{
	handleServiceStatus(refreshMsg.getPayload());
}
void ServiceNotifier::onUpdateMsg(const UpdateMsg& updMsg, const OmmConsumerEvent& ommConsEvent)
{
	handleServiceStatus(updMsg.getPayload());
}
void ServiceNotifier::handleServiceStatus(const Payload& serviceInfo)
{
	if (DataType::MapEnum == serviceInfo.getDataType())
	{
		const Map& map = serviceInfo.getMap();
		bool serviceReady = false;
		bool serviceAcceptingRequests = true;
		while (map.forth())
		{
			const MapEntry& me = map.getEntry();
			if (me.getLoadType() == DataType::FilterListEnum)
			{
				const FilterList& fl = me.getFilterList();
				while (fl.forth())
				{
					const FilterEntry& fe = fl.getEntry();
					switch (fe.getFilterId())
					{
					case SERVICE_STATE_FILTER:
					{
						if (fe.getLoadType() == DataType::ElementListEnum)
						{
							const ElementList& eList = fe.getElementList();
							while (eList.forth())
							{
								const ElementEntry& ee = eList.getEntry();
								if (ee.getName() == ENAME_SVC_STATE)
								{
									serviceReady = ee.getUInt() == 1 ? true : false;
								}
								else if (ee.getName() == ENAME_ACCEPTING_REQS)
								{
									serviceAcceptingRequests = ee.getUInt() == 1 ? true : false;
								}
							}
						}
						break;
					}
					default:
						break;
					}
				}
			}
		}
		if (serviceReady)
		{
			notify();
		}
	}
}
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Hi @Paolo Parlapiano

As far as I know, to change the state of service, the directory update message needs to contain two filters which are SERVICE_INFO_FILTER and SERVICE_STATE_FILTER. Please try the test again with SERVICE_INFO_FILTER included.

Click below to post an Idea Post Idea