diff --git a/src/libpcp_web/src/webgroup.c b/src/libpcp_web/src/webgroup.c index 6829cf16ef..9ba4987f45 100644 --- a/src/libpcp_web/src/webgroup.c +++ b/src/libpcp_web/src/webgroup.c @@ -66,6 +66,9 @@ typedef struct webgroups { struct context *pending_timer_init; unsigned int active; unsigned int gc_timer_started; + + pmWebGroupModule *module; /* owner; used when freeing after uv_close */ + unsigned int close_pending; /* remaining embedded handle closes */ } webgroups; static struct webgroups * @@ -76,6 +79,7 @@ webgroups_lookup(pmWebGroupModule *module) if (module->privdata == NULL) { module->privdata = calloc(1, sizeof(struct webgroups)); groups = (struct webgroups *)module->privdata; + groups->module = module; uv_mutex_init(&groups->mutex); } return groups; @@ -102,6 +106,34 @@ webgroup_release_context(uv_handle_t *handle) pmwebapi_free_context(context); } +/* + * Drop a context from the deferred timer-init queue (see webgroup_new_context + * and webgroup_async_cb). If the context is freed while still queued, the + * async callback would otherwise follow dangling next_pending pointers. + */ +static void +webgroup_pending_timer_init_remove(struct context *context, struct webgroups *groups) +{ + struct context **pp, *p; + + if (groups == NULL) + groups = (struct webgroups *)context->privdata; + if (groups == NULL) + return; + + uv_mutex_lock(&groups->mutex); + pp = &groups->pending_timer_init; + while ((p = *pp) != NULL) { + if (p == context) { + *pp = p->next_pending; + context->next_pending = NULL; + break; + } + pp = &p->next_pending; + } + uv_mutex_unlock(&groups->mutex); +} + static void webgroup_drop_context(struct context *context, struct webgroups *groups) { @@ -110,6 +142,7 @@ webgroup_drop_context(struct context *context, struct webgroups *groups) context, context->refcount); if (webgroup_deref_context(context) == 0) { + webgroup_pending_timer_init_remove(context, groups); if (context->garbage == 0) { context->garbage = 1; if (context->timer_init) @@ -546,9 +579,15 @@ pmWebGroupContext(pmWebGroupSettings *sp, sds id, dict *params, void *arg) pmWebSource context; sds msg = NULL; int sts = 0; + unsigned int context_key = 0; + int have_context_key = 0; + char *endptr = NULL; if ((cp = webgroup_lookup_context(sp, &id, params, &sts, &msg, arg))) { id = cp->origin; + context_key = (unsigned int)strtoul(id, &endptr, 10); + if (*endptr == '\0') + have_context_key = 1; pmwebapi_context_hash(cp); context.source = pmwebapi_hash_sds(NULL, cp->name.hash); context.hostspec = cp->host; @@ -564,7 +603,16 @@ pmWebGroupContext(pmWebGroupSettings *sp, sds id, dict *params, void *arg) } sp->callbacks.on_done(id, sts, msg, arg); - webgroup_deref_context(cp); + + if (have_context_key) { + struct webgroups *groups = webgroups_lookup(&sp->module); + dictEntry *entry; + + entry = dictFind(groups->contexts, &context_key); + if (entry != NULL) + webgroup_deref_context((struct context *)dictGetVal(entry)); + } + sdsfree(msg); return sts; } @@ -2633,11 +2681,30 @@ pmWebGroupSetMetricRegistry(pmWebGroupModule *module, mmv_registry_t *registry) return -ENOMEM; } +static void +webgroups_handle_closed(uv_handle_t *handle) +{ + struct webgroups *groups = (struct webgroups *)handle->data; + pmWebGroupModule *module; + + if (groups == NULL) + return; + if (--groups->close_pending != 0) + return; + + module = groups->module; + uv_mutex_destroy(&groups->mutex); + free(groups); + if (module != NULL) + module->privdata = NULL; +} + void pmWebGroupClose(pmWebGroupModule *module) { struct webgroups *groups = (struct webgroups *)module->privdata; dictEntry *entry; + unsigned int close_pending; if (groups) { /* walk the contexts, stop timers and free resources */ @@ -2648,12 +2715,30 @@ pmWebGroupClose(pmWebGroupModule *module) webgroup_drop_context((context_t *)dictGetVal(entry), NULL); } dictRelease(groups->contexts); - webgroup_timers_stop(groups); - if (groups->events) - uv_close((uv_handle_t *)&groups->async, NULL); - memset(groups, 0, sizeof(struct webgroups)); - free(groups); - module->privdata = NULL; + groups->contexts = NULL; + + close_pending = 0; + if (groups->active && groups->gc_timer_started) { + uv_timer_stop(&groups->timer); + groups->timer.data = (void *)groups; + uv_close((uv_handle_t *)&groups->timer, webgroups_handle_closed); + close_pending++; + } + groups->active = 0; + groups->gc_timer_started = 0; + + if (groups->events) { + groups->async.data = (void *)groups; + uv_close((uv_handle_t *)&groups->async, webgroups_handle_closed); + close_pending++; + } + + if (close_pending == 0) { + uv_mutex_destroy(&groups->mutex); + free(groups); + module->privdata = NULL; + } else + groups->close_pending = close_pending; } sdsfree(PARAM_HOSTNAME); diff --git a/src/pmfind/source.c b/src/pmfind/source.c index f9230ee441..fdbe02746b 100644 --- a/src/pmfind/source.c +++ b/src/pmfind/source.c @@ -57,6 +57,7 @@ sources_release(void *arg, const struct dictEntry *entry) if (pmDebugOptions.discovery) fprintf(stderr, "releasing context %s\n", ctx); + dictDelete(sp->contexts, ctx); source_release(sp, cp, ctx); } @@ -166,8 +167,9 @@ on_source_done(sds context, int status, sds message, void *arg) if (remove) { if (pmDebugOptions.discovery) fprintf(stderr, "remove context %s\n", context); - source_release(sp, cp, context); + dictDelete(sp->contexts, context); + source_release(sp, cp, context); } if (release) { @@ -231,6 +233,8 @@ sources_discovery_start(uv_timer_t *arg) dictRelease(dp); pmWebTimerClose(); + + uv_stop(((uv_timer_t *)arg)->loop); } /* @@ -281,12 +285,17 @@ source_discovery(int count, char **urls) uv_timer_init(loop, &timing); uv_timer_start(&timing, sources_discovery_start, 0, 0); uv_run(loop, UV_RUN_DEFAULT); + + pmWebGroupClose(&settings.module); + + uv_close((uv_handle_t *)&timing, NULL); + (void)uv_run(loop, UV_RUN_DEFAULT); + uv_loop_close(loop); /* * Finished, release all resources acquired so far */ - pmWebGroupClose(&settings.module); uv_mutex_destroy(&find.mutex); dictRelease(find.uniq); dictRelease(find.params);