summaryrefslogtreecommitdiff
path: root/src/lib/cdap.c
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2016-08-18 14:22:06 +0200
committerSander Vrijders <[email protected]>2016-08-19 13:24:39 +0200
commit2c8e29ca7a997c5aa9d34e3fa956b120a0bbf20c (patch)
tree6807a23a6def167a2b9ab26937fe25bbcc2a8064 /src/lib/cdap.c
parent0192488015770b4855165db8502214dad1941dd2 (diff)
downloadouroboros-2c8e29ca7a997c5aa9d34e3fa956b120a0bbf20c.tar.gz
ouroboros-2c8e29ca7a997c5aa9d34e3fa956b120a0bbf20c.zip
ipcpd: normal: Handle enrollment replies
This adds a condition variable with a timeout to the CDAP request so that we can respond correctly to the answer from the remote. It also adds a timeout to the condition variable waiting on completion of enrollment. Furthermore, for every CDAP callback a new thread is now spawned, to avoid deadlocking in case a callback is stuck.
Diffstat (limited to 'src/lib/cdap.c')
-rw-r--r--src/lib/cdap.c153
1 files changed, 92 insertions, 61 deletions
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index 4c70b2e4..5dc050a4 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -43,6 +43,12 @@ struct cdap {
struct cdap_ops * ops;
};
+struct cdap_info {
+ pthread_t thread;
+ struct cdap * instance;
+ cdap_t * msg;
+};
+
static int next_invoke_id(struct cdap * instance)
{
int ret;
@@ -66,12 +72,84 @@ static int release_invoke_id(struct cdap * instance,
return ret;
}
+static void * handle_cdap_msg(void * o)
+{
+ struct cdap_info * info = (struct cdap_info *) o;
+ struct cdap * instance = info->instance;
+ cdap_t * msg = info->msg;
+
+ switch (msg->opcode) {
+ case OPCODE__READ:
+ if (msg->name != NULL)
+ instance->ops->cdap_read(instance,
+ msg->invoke_id,
+ msg->name);
+ break;
+ case OPCODE__WRITE:
+ if (msg->name != NULL &&
+ msg->has_value)
+ instance->ops->cdap_write(instance,
+ msg->invoke_id,
+ msg->name,
+ msg->value.data,
+ msg->value.len,
+ msg->flags);
+ break;
+ case OPCODE__CREATE:
+ if (msg->name != NULL &&
+ msg->has_value)
+ instance->ops->cdap_create(instance,
+ msg->invoke_id,
+ msg->name,
+ msg->value.data,
+ msg->value.len);
+ break;
+ case OPCODE__DELETE:
+ if (msg->name != NULL &&
+ msg->has_value)
+ instance->ops->cdap_create(instance,
+ msg->invoke_id,
+ msg->name,
+ msg->value.data,
+ msg->value.len);
+ break;
+ case OPCODE__START:
+ if (msg->name != NULL)
+ instance->ops->cdap_start(instance,
+ msg->invoke_id,
+ msg->name);
+ break;
+ case OPCODE__STOP:
+ if (msg->name != NULL)
+ instance->ops->cdap_stop(instance,
+ msg->invoke_id,
+ msg->name);
+ break;
+ case OPCODE__REPLY:
+ instance->ops->cdap_reply(instance,
+ msg->invoke_id,
+ msg->result,
+ msg->value.data,
+ msg->value.len);
+ release_invoke_id(instance, msg->invoke_id);
+ break;
+ default:
+ break;
+ }
+
+ free(info);
+ cdap__free_unpacked(msg, NULL);
+
+ return (void *) 0;
+}
+
static void * sdu_reader(void * o)
{
struct cdap * instance = (struct cdap *) o;
cdap_t * msg;
uint8_t buf[BUF_SIZE];
ssize_t len;
+ struct cdap_info * cdap_info;
while (true) {
len = flow_read(instance->fd, buf, BUF_SIZE);
@@ -82,69 +160,22 @@ static void * sdu_reader(void * o)
if (msg == NULL)
continue;
- switch (msg->opcode) {
- case OPCODE__READ:
- if (msg->name != NULL)
- instance->ops->cdap_read(instance,
- msg->invoke_id,
- msg->name);
- break;
- case OPCODE__WRITE:
- if (msg->name != NULL &&
- msg->has_value) {
- instance->ops->cdap_write(instance,
- msg->invoke_id,
- msg->name,
- msg->value.data,
- msg->value.len,
- msg->flags);
- }
- break;
- case OPCODE__CREATE:
- if (msg->name != NULL &&
- msg->has_value) {
- instance->ops->cdap_create(instance,
- msg->invoke_id,
- msg->name,
- msg->value.data,
- msg->value.len);
- }
- break;
- case OPCODE__DELETE:
- if (msg->name != NULL &&
- msg->has_value) {
- instance->ops->cdap_create(instance,
- msg->invoke_id,
- msg->name,
- msg->value.data,
- msg->value.len);
- }
- break;
- case OPCODE__START:
- if (msg->name != NULL)
- instance->ops->cdap_start(instance,
- msg->invoke_id,
- msg->name);
- break;
- case OPCODE__STOP:
- if (msg->name != NULL)
- instance->ops->cdap_stop(instance,
- msg->invoke_id,
- msg->name);
- break;
- case OPCODE__REPLY:
- instance->ops->cdap_reply(instance,
- msg->invoke_id,
- msg->result,
- msg->value.data,
- msg->value.len);
- release_invoke_id(instance, msg->invoke_id);
- break;
- default:
- break;
+ cdap_info = malloc(sizeof(*cdap_info));
+ if (cdap_info == NULL) {
+ cdap__free_unpacked(msg, NULL);
+ continue;
}
- cdap__free_unpacked(msg, NULL);
+ cdap_info->instance = instance;
+ cdap_info->msg = msg;
+
+ pthread_create(&cdap_info->thread,
+ NULL,
+ handle_cdap_msg,
+ (void *) cdap_info);
+
+ pthread_detach(cdap_info->thread);
+
}
return (void *) 0;