diff --git a/src/urpc/include/umq/transport_layer/umq_tp_api.h b/src/urpc/include/umq/transport_layer/umq_tp_api.h index a2aabc0142a3a72d1fda3221b1c8385224540bfb..99cd69e918e439ff0938d428d6a09241c60785e8 100644 --- a/src/urpc/include/umq/transport_layer/umq_tp_api.h +++ b/src/urpc/include/umq/transport_layer/umq_tp_api.h @@ -79,6 +79,15 @@ typedef struct umq_ops { */ int (*umq_tp_unbind)(uint64_t umqh_tp); + /** + * User should ensure thread safety if io_lock_free is true + * Set umq state + * @param[in] umqh_tp: umq handle + * @param[in] state: umq state want to set(Only Support Set ERR STATE) + * Return 0 on success, error code on failure + */ + int (*umq_tp_state_set)(uint64_t umqh_tp, umq_state_t state); + /** * User should ensure thread safety if io_lock_free is true * Query umq state diff --git a/src/urpc/include/umq/umq_api.h b/src/urpc/include/umq/umq_api.h index b8cec114fdc0ad4cd766f5d59a6602c09505dc29..a5d395f97591e46407d63391d1d4c9ddc4ea1eef 100644 --- a/src/urpc/include/umq/umq_api.h +++ b/src/urpc/include/umq/umq_api.h @@ -79,6 +79,15 @@ int umq_bind(uint64_t umqh, uint8_t *bind_info, uint32_t bind_info_size); */ int umq_unbind(uint64_t umqh); +/** + * User should ensure thread safety if io_lock_free is true + * Set umq state + * @param[in] umqh: umq handle + * @param[in] state: umq state want to set (Only Support Set ERR STATE) + * Return 0 on success, error code on failure + */ +int umq_state_set(uint64_t umqh, umq_state_t state); + /** * User should ensure thread safety if io_lock_free is true * Query umq state diff --git a/src/urpc/umq/umq_api.c b/src/urpc/umq/umq_api.c index f817d168f978e833b71bd48a50eb53b5d8c68c6e..415a68ee01cffeea2671eca5f073aefdd35a4250 100644 --- a/src/urpc/umq/umq_api.c +++ b/src/urpc/umq/umq_api.c @@ -889,6 +889,17 @@ int umq_buf_split(umq_buf_t *head, umq_buf_t *node) return UMQ_SUCCESS; } +int umq_state_set(uint64_t umqh, umq_state_t state) +{ + umq_t *umq = (umq_t *)(uintptr_t)umqh; + if (umq == NULL || umq->umqh_tp == UMQ_INVALID_HANDLE || umq->tp_ops->umq_tp_state_set == NULL) { + UMQ_VLOG_ERR("parameter invalid\n"); + return -UMQ_ERR_EINVAL; + } + + return umq->tp_ops->umq_tp_state_set(umq->umqh_tp, state); +} + umq_state_t umq_state_get(uint64_t umqh) { umq_t *umq = (umq_t *)(uintptr_t)umqh; diff --git a/src/urpc/umq/umq_ub/core/umq_ub_impl.c b/src/urpc/umq/umq_ub/core/umq_ub_impl.c index 99377accdbdffb0ae2109588e674f51969107aae..cb5823488f7d6dbd01bc86b10cb15247111c226f 100644 --- a/src/urpc/umq/umq_ub/core/umq_ub_impl.c +++ b/src/urpc/umq/umq_ub/core/umq_ub_impl.c @@ -1020,6 +1020,33 @@ util_id_allocator_t *umq_ub_get_msg_id_generator(uint64_t umqh_tp) return umq_ub_id_allocator_get(); } +int umq_ub_state_set_impl(uint64_t umqh_tp, umq_state_t state) +{ + ub_queue_t *queue = (ub_queue_t *)(uintptr_t)umqh_tp; + if (queue->create_flag & UMQ_CREATE_FLAG_SUB_UMQ) { + UMQ_VLOG_ERR("set state only support main umq\n"); + return -UMQ_ERR_EINVAL; + } + + if (state != QUEUE_STATE_ERR) { + UMQ_VLOG_ERR("set state only support error\n"); + return -UMQ_ERR_EINVAL; + } + + if (queue->state == QUEUE_STATE_ERR) { + UMQ_VLOG_INFO("queue state already in error state\n"); + return UMQ_SUCCESS; + } + + int ret = umq_modify_ubq_to_err(queue); + if (ret) { + UMQ_VLOG_ERR("modify queue state failed\n"); + return -ret; + } + + return UMQ_SUCCESS; +} + umq_state_t umq_ub_state_get_impl(uint64_t umqh_tp) { ub_queue_t *queue = (ub_queue_t *)(uintptr_t)umqh_tp; diff --git a/src/urpc/umq/umq_ub/core/umq_ub_impl.h b/src/urpc/umq/umq_ub/core/umq_ub_impl.h index b56d3d237483271d37b3e7737348d2cec3c4e5dd..810878b4353debcf1497470f8f800a45c064d057 100644 --- a/src/urpc/umq/umq_ub/core/umq_ub_impl.h +++ b/src/urpc/umq/umq_ub/core/umq_ub_impl.h @@ -29,6 +29,7 @@ int32_t umq_ub_destroy_impl(uint64_t umqh); int umq_ub_bind_info_get_impl(uint64_t umqh, uint8_t *bind_info, uint32_t bind_info_size); int umq_ub_bind_impl(uint64_t umqh, uint8_t *bind_info, uint32_t bind_info_size); int umq_ub_unbind_impl(uint64_t umqh); +int umq_ub_state_set_impl(uint64_t umqh_tp, umq_state_t state); umq_state_t umq_ub_state_get_impl(uint64_t umqh_tp); int32_t umq_ub_register_memory_impl(void *buf, uint64_t size); diff --git a/src/urpc/umq/umq_ub/umq_ub_api.c b/src/urpc/umq/umq_ub/umq_ub_api.c index 227d6860bbb3a74b9fbe6934935c0a61560df9ad..5e46972a02ff501147961dc11825fd8825c95f6b 100644 --- a/src/urpc/umq/umq_ub/umq_ub_api.c +++ b/src/urpc/umq/umq_ub/umq_ub_api.c @@ -69,6 +69,11 @@ static int umq_tp_ub_unbind(uint64_t umqh_tp) return umq_ub_unbind_impl(umqh_tp); } +static int umq_tp_ub_state_set(uint64_t umqh_tp, umq_state_t state) +{ + return umq_ub_state_set_impl(umqh_tp, state); +} + static umq_state_t umq_tp_ub_state_get(uint64_t umqh_tp) { return umq_ub_state_get_impl(umqh_tp); @@ -184,6 +189,7 @@ static umq_ops_t g_umq_ub_ops = { .umq_tp_bind_info_get = umq_tp_ub_bind_info_get, .umq_tp_bind = umq_tp_ub_bind, .umq_tp_unbind = umq_tp_ub_unbind, + .umq_tp_state_set = umq_tp_ub_state_set, .umq_tp_state_get = umq_tp_ub_state_get, .umq_tp_log_config_set = umq_tp_ub_log_config_set, .umq_tp_log_config_reset = umq_tp_ub_log_config_reset, diff --git a/src/urpc/umq/umq_ub/umq_ub_plus_api.c b/src/urpc/umq/umq_ub/umq_ub_plus_api.c index 45fb058b150838bfc955848acf4f626acb8f1085..70c9fc110d196191e3c690f9b4dfa5edf6abf25e 100644 --- a/src/urpc/umq/umq_ub/umq_ub_plus_api.c +++ b/src/urpc/umq/umq_ub/umq_ub_plus_api.c @@ -79,6 +79,11 @@ static int umq_tp_ub_plus_unbind(uint64_t umqh_tp) return umq_ub_unbind_impl(umqh_tp); } +static int umq_tp_ub_plus_state_set(uint64_t umqh_tp, umq_state_t state) +{ + return umq_ub_state_set_impl(umqh_tp, state); +} + static umq_state_t umq_tp_ub_plus_state_get(uint64_t umqh_tp) { return umq_ub_state_get_impl(umqh_tp); @@ -198,6 +203,7 @@ static umq_ops_t g_umq_ub_plus_ops = { .umq_tp_bind_info_get = umq_tp_ub_plus_bind_info_get, .umq_tp_bind = umq_tp_ub_plus_bind, .umq_tp_unbind = umq_tp_ub_plus_unbind, + .umq_tp_state_set = umq_tp_ub_plus_state_set, .umq_tp_state_get = umq_tp_ub_plus_state_get, .umq_tp_log_config_set = umq_tp_ub_plus_log_config_set, .umq_tp_log_config_reset = umq_tp_ub_plus_log_config_reset,