Skip to content

Commit

Permalink
RDKB-58390: RBUS- Configure timeout values for each rbus_handle (#244)
Browse files Browse the repository at this point in the history
* RBUS- Configure timeout values for each rbus_handle

* Added Unit_tests for TimeoutValues
  • Loading branch information
NetajiPanigrahi authored Jan 22, 2025
1 parent aed0560 commit 9df93ee
Show file tree
Hide file tree
Showing 23 changed files with 631 additions and 367 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ jobs:
export PREFIX=$PWD
export LD_LIBRARY_PATH=$PREFIX/lib
nohup ./bin/rtrouted -f -l DEBUG > /tmp/rtrouted_log.txt &
- name: Run RbusTestTimeoutValues Unit test
run: |
cd install/usr
export PREFIX=$PWD
export LD_LIBRARY_PATH=$PREFIX/lib
nohup ./bin/rbusTestTimeoutValuesProvider &
./bin/rbusTestTimeoutValuesConsumer
- name: Run Unit test
run: |
cd install/usr
Expand Down
75 changes: 75 additions & 0 deletions include/rbus.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ typedef enum _rbusError
} rbusError_t;



char const * rbusError_ToString(rbusError_t e);

/** @struct rbusSetOptions_t
Expand Down Expand Up @@ -1965,6 +1966,80 @@ rbusError_t rbus_registerDynamicTableSyncHandler(
rbusHandle_t handle,
char const* tableName,
rbusTableSyncHandler_t syncHandler);

typedef struct _rbusTimeoutValues
{
uint32_t setTimeout; /* default timeout in miliseconds for SET API*/
uint32_t getTimeout; /* default timeout in miliseconds for GET API*/
uint32_t setMultiTimeout; /* default timeout in miliseconds for SET Multi API*/
uint32_t getMultiTimeout; /* default timeout in miliseconds for Wildcard Query GET API*/
uint32_t subscribeTimeout; /* default timeout in miliseconds for Subscribe operation*/
}rbusTimeoutValues_t;

/** @fn rbusError_t rbusHandle_ConfigTimeoutValues(
* rbusHandle_t handle,
* rbusTimeoutValues_t timeoutValues);
*
* @brief Configure timeout values for each rbus_handle.
*
* Used by: Component wants to configure its own timeout values per rbus_handle.
*
* @param handle Bus Handle
* @param timeoutValues Timeout values in miliseconds for SET, GET operations.
* @return RBus error code as defined by rbusError_t.
*/

rbusError_t rbusHandle_ConfigTimeoutValues(rbusHandle_t handle, rbusTimeoutValues_t timeoutValues);

/** @fn int rbusHandle_ConfigSetTimeout(rbusHandle_t handle,int timeout)
* @brief function to update SET Timeout value.
*
* @param handle The Bus handle.
* @param timeout The Timeout value for rbus_set operation in milliseconds,
* set to default value if timeout is Zero.
* @return RBus error code as defined by rbusError_t.
*/
rbusError_t rbusHandle_ConfigSetTimeout(rbusHandle_t handle, uint32_t timeout);

/** @fn int rbusHandle_ConfigGetTimeout(rbusHandle_t handle,int timeout)
* @brief function to update GET Timeout value.
*
* @param handle The Bus handle.
* @param timeout The Timeout value for rbus_get operation in milliseconds,
* set to default value if timeout is Zero.
* @return RBus error code as defined by rbusError_t.
*/
rbusError_t rbusHandle_ConfigGetTimeout(rbusHandle_t handle, uint32_t timeout);

/** @fn int rbusHandle_ConfigGetMultiTimeout(rbusHandle_t handle,int timeout)
* @brief function to update GET Wildcard query Timeout value.
*
* @param handle The Bus handle.
* @param timeout The Timeout value for rbus_get operation in milliseconds,
* set to default value if timeout is Zero.
* @return RBus error code as defined by rbusError_t.
*/
rbusError_t rbusHandle_ConfigGetMultiTimeout(rbusHandle_t handle, uint32_t timeout);

/** @fn int rbusHandle_ConfigSetMultiTimeout(rbusHandle_t handle,int timeout)
* @brief function to update SetMulti Timeout value.
*
* @param handle The Bus handle.
* @param timeout The Timeout value for rbus_setMulti operation in milliseconds,
* set to default value if timeout is Zero.
* @return RBus error code as defined by rbusError_t.
*/
rbusError_t rbusHandle_ConfigSetMultiTimeout(rbusHandle_t handle, uint32_t timeout);

/** @fn int rbusHandle_ConfigSubscribeTimeout(rbusHandle_t handle,int timeout)
* @brief function to update Subscribe Timeout value.
*
* @param handle The Bus handle.
* @param timeout The Timeout value for Subscribe operation in milliseconds,
* set to default value if timeout is Zero.
* @return RBus error code as defined by rbusError_t.
*/
rbusError_t rbusHandle_ConfigSubscribeTimeout(rbusHandle_t handle, uint32_t timeout);
/** @} */

#ifdef __cplusplus
Expand Down
6 changes: 3 additions & 3 deletions src/core/rbuscore.c
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,7 @@ rbusCoreError_t rbus_pushObj(const char * object_name, rbusMessage message, int
return ret;
}

static rtError rbus_sendRequest(rtConnection con, rbusMessage req, char const* topic, rbusMessage* res, int32_t timeout)
static rtError rbus_sendRequest(rtConnection con, rbusMessage req, char const* topic, rbusMessage* res, uint32_t timeout)
{
rtError err = RT_OK;
uint8_t* data = NULL;
Expand All @@ -1103,12 +1103,12 @@ static rtError rbus_sendRequest(rtConnection con, rbusMessage req, char const* t
return err;
}

rbusCoreError_t rbus_invokeRemoteMethod(const char * object_name, const char *method, rbusMessage out, int timeout_millisecs, rbusMessage *in)
rbusCoreError_t rbus_invokeRemoteMethod(const char * object_name, const char *method, rbusMessage out, uint32_t timeout_millisecs, rbusMessage *in)
{
return rbus_invokeRemoteMethod2(g_connection, object_name, method, out, timeout_millisecs, in);
}

rbusCoreError_t rbus_invokeRemoteMethod2(rtConnection myConn, const char * object_name, const char *method, rbusMessage out, int timeout_millisecs, rbusMessage *in)
rbusCoreError_t rbus_invokeRemoteMethod2(rtConnection myConn, const char * object_name, const char *method, rbusMessage out, uint32_t timeout_millisecs, rbusMessage *in)
{
rtError err = RT_OK;
rbusCoreError_t ret = RBUSCORE_SUCCESS;
Expand Down
4 changes: 2 additions & 2 deletions src/core/rbuscore.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ rbusCoreError_t rbus_unregisterMethodTable(const char * object_name, rbus_method
* of the operation when it's complete. Marshalling of input arguments and output response is the responsibility of the caller. This function blocks until it receives a response
* from the remote recipient, or times out after 'timeout_milliseconds'. rbus will release 'out' internally. If call is successful, it's caller's responsibility
* to release 'in'. */
rbusCoreError_t rbus_invokeRemoteMethod(const char * object_name, const char *method, rbusMessage out, int timeout_millisecs, rbusMessage *in);
rbusCoreError_t rbus_invokeRemoteMethod2(rtConnection conn, const char * object_name, const char *method, rbusMessage out, int timeout_millisecs, rbusMessage *in);
rbusCoreError_t rbus_invokeRemoteMethod(const char * object_name, const char *method, rbusMessage out, uint32_t timeout_millisecs, rbusMessage *in);
rbusCoreError_t rbus_invokeRemoteMethod2(rtConnection conn, const char * object_name, const char *method, rbusMessage out, uint32_t timeout_millisecs, rbusMessage *in);

/* Invoke a remote procedure call 'method' on a destination/object object_name. 'out' has the input arguments necessary for the RPC. This function does not block for response
* from the remote end. It returns immediately after the outbound message is dispatched. 'callback' is invoked when it receives the response to the RPC call, or if it times out
Expand Down
3 changes: 1 addition & 2 deletions src/rbus/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ add_library(
rbus_subscriptions.c
rbus_tokenchain.c
rbus_asyncsubscribe.c
rbus_intervalsubscription.c
rbus_config.c)
rbus_intervalsubscription.c)

target_link_libraries(rbus rbuscore rtMessage -fPIC -pthread)

Expand Down
53 changes: 33 additions & 20 deletions src/rbus/rbus.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include "rbus_subscriptions.h"
#include "rbus_asyncsubscribe.h"
#include "rbus_intervalsubscription.h"
#include "rbus_config.h"
#include "rbus_log.h"
#include "rbus_handle.h"
#include "rbus_message.h"
Expand Down Expand Up @@ -279,6 +278,17 @@ static rbusEventSubscriptionInternal_t* rbusEventSubscription_find(rtVector even
return NULL;
}

rbusError_t rbusHandle_ConfigTimeoutValues(rbusHandle_t handle, rbusTimeoutValues_t timeoutValues)
{
VERIFY_NULL(handle);
rbusHandle_ConfigSetTimeout(handle, timeoutValues.setTimeout);
rbusHandle_ConfigGetTimeout(handle, timeoutValues.getTimeout);
rbusHandle_ConfigSetMultiTimeout(handle, timeoutValues.setMultiTimeout);
rbusHandle_ConfigGetMultiTimeout(handle, timeoutValues.getMultiTimeout);
rbusHandle_ConfigSubscribeTimeout(handle, timeoutValues.subscribeTimeout);
return RBUS_ERROR_SUCCESS;
}

rbusError_t rbusOpenDirect_SubAdd(rbusHandle_t handle, rtVector eventSubs, char const* eventName)
{
size_t i;
Expand Down Expand Up @@ -2790,13 +2800,11 @@ static void _rbus_open_pre_initialize(bool retain)

if(retain && !sRetained)
{
rbusConfig_CreateOnce();
rbus_registerMasterEventHandler(_master_event_callback_handler, NULL);
sRetained = true;
}
else if(!retain && sRetained)
{
rbusConfig_Destroy();
rbusElement_mutex_destroy();
sRetained = false;
}
Expand Down Expand Up @@ -2880,7 +2888,11 @@ rbusError_t rbus_open(rbusHandle_t* handle, char const* componentName)
RBUSLOG_ERROR("(%s): rbus_registerObj error %d", componentName, err);
goto exit_error2;
}

if (rbusHandle_TimeoutValuesInit(tmpHandle) != 0)
{
RBUSLOG_ERROR("(%s): rbusHandle_TimeoutValuesInit failed", componentName);
goto exit_error2;
}
tmpHandle->componentName = strdup(componentName);
tmpHandle->componentId = ++sLastComponentId;
tmpHandle->m_connection = rbus_getConnection();
Expand Down Expand Up @@ -3179,7 +3191,7 @@ rbusError_t rbus_regDataElements(

if(handleInfo->subscriptions == NULL)
{
rbusSubscriptions_create(&handleInfo->subscriptions, handle, handleInfo->componentName, handleInfo->elementRoot, rbusConfig_Get()->tmpDir);
rbusSubscriptions_create(&handleInfo->subscriptions, handle, handleInfo->componentName, handleInfo->elementRoot, RBUS_TMP_DIRECTORY);
}

if((err = rbus_addElement(handleInfo->componentName, name)) != RBUSCORE_SUCCESS)
Expand Down Expand Up @@ -3371,7 +3383,7 @@ rbusError_t rbus_get(rbusHandle_t handle, char const* name, rbusValue_t* value)
if (NULL == myConn)
myConn = handleInfo->m_connection;

err = rbus_invokeRemoteMethod2(myConn, name, METHOD_GETPARAMETERVALUES, request, rbusConfig_ReadGetTimeout(), &response);
err = rbus_invokeRemoteMethod2(myConn, name, METHOD_GETPARAMETERVALUES, request, rbusHandle_FetchGetTimeout(handle), &response);

if(err != RBUSCORE_SUCCESS)
{
Expand Down Expand Up @@ -3532,7 +3544,7 @@ rbusError_t rbus_getExt(rbusHandle_t handle, int paramCount, char const** pParam
rbusMessage_SetString(request, pParamNames[0]);
/* Invoke the method */
err = rbus_invokeRemoteMethod(destinations[i], METHOD_GETPARAMETERVALUES,
request, rbusConfig_ReadWildcardGetTimeout(), &response);
request, rbusHandle_FetchGetMultiTimeout(handle), &response);

if(err != RBUSCORE_SUCCESS)
{
Expand Down Expand Up @@ -3695,7 +3707,7 @@ rbusError_t rbus_getExt(rbusHandle_t handle, int paramCount, char const** pParam
RBUSLOG_DEBUG("sending batch request with %d params to component %s", batchCount, componentName);
free(componentName);

if((err = rbus_invokeRemoteMethod(firstParamName, METHOD_GETPARAMETERVALUES, request, rbusConfig_ReadGetTimeout(), &response)) != RBUSCORE_SUCCESS)
if((err = rbus_invokeRemoteMethod(firstParamName, METHOD_GETPARAMETERVALUES, request, rbusHandle_FetchGetTimeout(handle), &response)) != RBUSCORE_SUCCESS)
{
RBUSLOG_ERROR("get by %s failed; Received error %d from RBUS Daemon for the object %s", handle->componentName, err, firstParamName);
errorcode = rbusCoreError_to_rbusError(err);
Expand Down Expand Up @@ -3849,7 +3861,7 @@ rbusError_t rbus_set(rbusHandle_t handle, char const* name,rbusValue_t value, rb
if (NULL == myConn)
myConn = handleInfo->m_connection;

if((err = rbus_invokeRemoteMethod2(myConn, name, METHOD_SETPARAMETERVALUES, setRequest, rbusConfig_ReadSetTimeout(), &setResponse)) != RBUSCORE_SUCCESS)
if((err = rbus_invokeRemoteMethod2(myConn, name, METHOD_SETPARAMETERVALUES, setRequest, rbusHandle_FetchSetTimeout(handle), &setResponse)) != RBUSCORE_SUCCESS)
{
RBUSLOG_ERROR("set by %s failed; Received error %d from RBUS Daemon for the object %s", handle->componentName, err, name);
errorcode = rbusCoreError_to_rbusError(err);
Expand Down Expand Up @@ -3904,6 +3916,7 @@ rbusError_t rbus_setCommit(rbusHandle_t handle, char const* name, rbusSetOptions
else
rbusMessage_SetInt32(setRequest, 0);


/* Set the Component name that invokes the set */
rbusMessage_SetString(setRequest, handleInfo->componentName);
/* Set the Size of params */
Expand All @@ -3916,7 +3929,7 @@ rbusError_t rbus_setCommit(rbusHandle_t handle, char const* name, rbusSetOptions
rtConnection myConn = rbuscore_FindClientPrivateConnection(name);
if (NULL == myConn)
myConn = handleInfo->m_connection;
if((err = rbus_invokeRemoteMethod2(myConn, name, METHOD_COMMIT, setRequest, rbusConfig_ReadSetTimeout(), &setResponse)) != RBUSCORE_SUCCESS)
if((err = rbus_invokeRemoteMethod2(myConn, name, METHOD_COMMIT, setRequest, rbusHandle_FetchSetTimeout(handle), &setResponse)) != RBUSCORE_SUCCESS)
{
RBUSLOG_ERROR("set commit by %s failed; Received error %d from RBUS Daemon for the object %s", handle->componentName, err, name);
errorcode = rbusCoreError_to_rbusError(err);
Expand Down Expand Up @@ -4087,7 +4100,7 @@ rbusError_t rbus_setMulti(rbusHandle_t handle, int numProps, rbusProperty_t prop
/* Set the Commit value; FIXME: Should we use string? */
rbusMessage_SetString(setRequest, (!opts || opts->commit) ? "TRUE" : "FALSE");

if((err = rbus_invokeRemoteMethod(firstParamName, METHOD_SETPARAMETERVALUES, setRequest, rbusConfig_ReadSetTimeout(), &setResponse)) != RBUSCORE_SUCCESS)
if((err = rbus_invokeRemoteMethod(firstParamName, METHOD_SETPARAMETERVALUES, setRequest, rbusHandle_FetchSetTimeout(handle), &setResponse)) != RBUSCORE_SUCCESS)
{
RBUSLOG_ERROR("set by %s failed; Received error %d from RBUS Daemon for the object %s", handle->componentName, err, firstParamName);
errorcode = rbusCoreError_to_rbusError(err);
Expand Down Expand Up @@ -4254,7 +4267,7 @@ rbusError_t rbusTable_addRow(
because the broker simlpy looks at the top level nodes that are owned by a component route. maybe this breaks if the broker changes*/
METHOD_ADDTBLROW,
request,
rbusConfig_ReadSetTimeout(),
rbusHandle_FetchSetTimeout(handle),
&response)) != RBUSCORE_SUCCESS)
{
RBUSLOG_ERROR("Add row by %s failed; Received error %d from RBUS Daemon for the object %s", handle->componentName, err, tableName);
Expand Down Expand Up @@ -4320,7 +4333,7 @@ rbusError_t rbusTable_removeRow(
rowName,
METHOD_DELETETBLROW,
request,
rbusConfig_ReadSetTimeout(),
rbusHandle_FetchSetTimeout(handle),
&response)) != RBUSCORE_SUCCESS)
{
RBUSLOG_ERROR("Delete row by %s failed; Received error %d from RBUS Daemon for the object %s", handle->componentName, err, rowName);
Expand Down Expand Up @@ -4448,7 +4461,7 @@ rbusError_t rbusTable_getRowNames(
if (NULL == myConn)
myConn = handleInfo->m_connection;

if((err = rbus_invokeRemoteMethod2(myConn, tableName, METHOD_GETPARAMETERNAMES, request, rbusConfig_ReadGetTimeout(), &response)) == RBUSCORE_SUCCESS)
if((err = rbus_invokeRemoteMethod2(myConn, tableName, METHOD_GETPARAMETERNAMES, request, rbusHandle_FetchGetTimeout(handle), &response)) == RBUSCORE_SUCCESS)
{
rbusLegacyReturn_t legacyRetCode = RBUS_LEGACY_ERR_FAILURE;
int ret = -1;
Expand Down Expand Up @@ -4588,7 +4601,7 @@ rbusError_t rbusElementInfo_get(
rbusMessage_SetInt32(request, depth);/*depth*/
rbusMessage_SetInt32(request, 0);/*not row names*/

if((err = rbus_invokeRemoteMethod(destinations[d], METHOD_GETPARAMETERNAMES, request, rbusConfig_ReadGetTimeout(), &response)) != RBUSCORE_SUCCESS)
if((err = rbus_invokeRemoteMethod(destinations[d], METHOD_GETPARAMETERNAMES, request, rbusHandle_FetchGetTimeout(handle), &response)) != RBUSCORE_SUCCESS)
{
RBUSLOG_ERROR("invokeRemoteMethod %s destination=%s object=%s failed: err=%d", METHOD_GETPARAMETERNAMES, destinations[d], elemName, err);
errorcode = rbusCoreError_to_rbusError(err);
Expand Down Expand Up @@ -4819,7 +4832,7 @@ static rbusError_t rbusEvent_SubscribeWithRetries(

if(timeout == -1)
{
destNotFoundTimeout = rbusConfig_Get()->subscribeTimeout;
destNotFoundTimeout = rbusHandle_FetchSubscribeTimeout(handle);
}
else
{
Expand Down Expand Up @@ -4875,8 +4888,8 @@ static rbusError_t rbusEvent_SubscribeWithRetries(
destNotFoundSleep *= 2;

//cap it so the wait time still allows frequent retries
if(destNotFoundSleep > rbusConfig_Get()->subscribeMaxWait)
destNotFoundSleep = rbusConfig_Get()->subscribeMaxWait;
if(destNotFoundSleep > RBUS_SUBSCRIBE_MAXWAIT)
destNotFoundSleep = RBUS_SUBSCRIBE_MAXWAIT;
}
else
{
Expand Down Expand Up @@ -5880,7 +5893,7 @@ rbusError_t rbusMethod_Invoke(
if (handleInfo->m_handleType != RBUS_HWDL_TYPE_REGULAR)
return RBUS_ERROR_INVALID_HANDLE;

return rbusMethod_InvokeInternal(handle, methodName, inParams, outParams, rbusConfig_ReadSetTimeout());
return rbusMethod_InvokeInternal(handle, methodName, inParams, outParams, rbusHandle_FetchSetTimeout(handle));
}

typedef struct _rbusMethodInvokeAsyncData_t
Expand Down Expand Up @@ -5943,7 +5956,7 @@ rbusError_t rbusMethod_InvokeAsync(
data->methodName = strdup(methodName);
data->inParams = inParams;
data->callback = callback;
data->timeout = timeout > 0 ? (timeout * 1000) : rbusConfig_ReadSetTimeout(); /* convert seconds to milliseconds */
data->timeout = timeout > 0 ? (timeout * 1000) : (int)rbusHandle_FetchSetTimeout(handle); /* convert seconds to milliseconds */

if((err = pthread_create(&pid, NULL, rbusMethod_InvokeAsyncThreadFunc, data)) != 0)
{
Expand Down
Loading

0 comments on commit 9df93ee

Please sign in to comment.