From 023484ca0b7ea110b5713b0a4cb2d81cb1a675a2 Mon Sep 17 00:00:00 2001 From: Juan RP Date: Tue, 17 Sep 2013 16:30:13 +0200 Subject: [PATCH] Introduce xbps_{array,pkgdb}_foreach_cb_multi() and use it where appropiate. In some tasks the single threaded implementation outperms the multithreaded one. Use it where it really makes a difference. The _multi() routines do not spawn any thread if _SC_NPROCESSORS_ONLN == 1. Bump XBPS_API_VERSION. --- bin/xbps-pkgdb/check.c | 2 +- bin/xbps-query/list.c | 2 +- bin/xbps-query/ownedby.c | 9 --- bin/xbps-query/search.c | 9 --- bin/xbps-rindex/index-clean.c | 2 +- bin/xbps-rindex/remove-obsoletes.c | 2 +- include/xbps.h.in | 53 +++++++++++++++- lib/pkgdb.c | 18 ++++++ lib/plist.c | 99 +++++++++++++++++------------- 9 files changed, 131 insertions(+), 65 deletions(-) diff --git a/bin/xbps-pkgdb/check.c b/bin/xbps-pkgdb/check.c index 7e726da8..ff8c0115 100644 --- a/bin/xbps-pkgdb/check.c +++ b/bin/xbps-pkgdb/check.c @@ -69,7 +69,7 @@ check_pkg_integrity_all(struct xbps_handle *xhp) /* force an update to get total pkg count */ (void)xbps_pkgdb_update(xhp, false); - rv = xbps_pkgdb_foreach_cb(xhp, pkgdb_cb, NULL); + rv = xbps_pkgdb_foreach_cb_multi(xhp, pkgdb_cb, NULL); if ((rv = xbps_pkgdb_update(xhp, true)) != 0) { xbps_error_printf("failed to write pkgdb: %s\n", diff --git a/bin/xbps-query/list.c b/bin/xbps-query/list.c index b281365f..6ef69ea7 100644 --- a/bin/xbps-query/list.c +++ b/bin/xbps-query/list.c @@ -215,7 +215,7 @@ find_longest_pkgver(struct xbps_handle *xhp, xbps_object_t o) _find_longest_pkgver_cb, &ffl); xbps_object_release(array); } else { - (void)xbps_pkgdb_foreach_cb(xhp, + (void)xbps_pkgdb_foreach_cb_multi(xhp, _find_longest_pkgver_cb, &ffl); } diff --git a/bin/xbps-query/ownedby.c b/bin/xbps-query/ownedby.c index 60028259..7f0049af 100644 --- a/bin/xbps-query/ownedby.c +++ b/bin/xbps-query/ownedby.c @@ -31,13 +31,11 @@ #include #include #include -#include #include #include "defs.h" struct ffdata { - pthread_mutex_t mtx; int npatterns; char **patterns; const char *repouri; @@ -102,8 +100,6 @@ ownedby_pkgdb_cb(struct xbps_handle *xhp, xbps_dictionary_get_cstring_nocopy(obj, "pkgver", &pkgver); - pthread_mutex_lock(&ffd->mtx); - pkgmetad = xbps_pkgdb_get_pkg_metadata(xhp, pkgver); assert(pkgmetad); @@ -115,8 +111,6 @@ ownedby_pkgdb_cb(struct xbps_handle *xhp, xbps_object_release(pkgmetad); xbps_object_release(files_keys); - pthread_mutex_unlock(&ffd->mtx); - return 0; } @@ -128,7 +122,6 @@ ownedby(struct xbps_handle *xhp, int npatterns, char **patterns) ffd.npatterns = npatterns; ffd.patterns = patterns; - pthread_mutex_init(&ffd.mtx, NULL); for (int i = 0; i < npatterns; i++) { rfile = realpath(patterns[i], NULL); @@ -189,7 +182,6 @@ repo_ownedby(struct xbps_handle *xhp, int npatterns, char **patterns) char *rfile; int rv; - pthread_mutex_init(&ffd.mtx, NULL); ffd.npatterns = npatterns; ffd.patterns = patterns; @@ -199,7 +191,6 @@ repo_ownedby(struct xbps_handle *xhp, int npatterns, char **patterns) patterns[i] = rfile; } rv = xbps_rpool_foreach(xhp, repo_ownedby_cb, &ffd); - pthread_mutex_destroy(&ffd.mtx); return rv; } diff --git a/bin/xbps-query/search.c b/bin/xbps-query/search.c index 7755dbaf..e978bad8 100644 --- a/bin/xbps-query/search.c +++ b/bin/xbps-query/search.c @@ -40,7 +40,6 @@ #include #include #include -#include #include #include "defs.h" @@ -49,7 +48,6 @@ struct search_data { int npatterns; char **patterns; int maxcols; - pthread_mutex_t mtx; xbps_array_t results; }; @@ -119,10 +117,8 @@ search_array_cb(struct xbps_handle *xhp _unused, if ((xbps_pkgpattern_match(pkgver, sd->patterns[x])) || (strcasestr(pkgver, sd->patterns[x])) || (strcasestr(desc, sd->patterns[x])) || vpkgfound) { - pthread_mutex_lock(&sd->mtx); xbps_array_add_cstring_nocopy(sd->results, pkgver); xbps_array_add_cstring_nocopy(sd->results, desc); - pthread_mutex_unlock(&sd->mtx); } } return 0; @@ -135,9 +131,6 @@ search_pkgs_cb(struct xbps_repo *repo, void *arg, bool *done _unused) struct search_data *sd = arg; int rv; - if (repo->idx == NULL) - return 0; - allkeys = xbps_dictionary_all_keys(repo->idx); rv = xbps_array_foreach_cb(repo->xhp, allkeys, repo->idx, search_array_cb, sd); xbps_object_release(allkeys); @@ -151,7 +144,6 @@ repo_search(struct xbps_handle *xhp, int npatterns, char **patterns) struct search_data sd; int rv; - pthread_mutex_init(&sd.mtx, NULL); sd.npatterns = npatterns; sd.patterns = patterns; sd.maxcols = get_maxcols(); @@ -166,7 +158,6 @@ repo_search(struct xbps_handle *xhp, int npatterns, char **patterns) print_results(xhp, &sd); xbps_object_release(sd.results); } - pthread_mutex_destroy(&sd.mtx); return rv; } diff --git a/bin/xbps-rindex/index-clean.c b/bin/xbps-rindex/index-clean.c index b8a1bbca..d8ad3edb 100644 --- a/bin/xbps-rindex/index-clean.c +++ b/bin/xbps-rindex/index-clean.c @@ -119,7 +119,7 @@ index_clean(struct xbps_handle *xhp, const char *repodir) cbd.array = xbps_array_create(); allkeys = xbps_dictionary_all_keys(idx); - rv = xbps_array_foreach_cb(xhp, allkeys, idx, idx_cleaner_cb, &cbd); + rv = xbps_array_foreach_cb_multi(xhp, allkeys, idx, idx_cleaner_cb, &cbd); xbps_object_release(allkeys); for (unsigned int x = 0; x < xbps_array_count(cbd.array); x++) { diff --git a/bin/xbps-rindex/remove-obsoletes.c b/bin/xbps-rindex/remove-obsoletes.c index bc33ff5a..02884456 100644 --- a/bin/xbps-rindex/remove-obsoletes.c +++ b/bin/xbps-rindex/remove-obsoletes.c @@ -147,7 +147,7 @@ remove_obsoletes(struct xbps_handle *xhp, const char *repodir) } (void)closedir(dirp); - rv = xbps_array_foreach_cb(xhp, array, NULL, cleaner_cb, repo); + rv = xbps_array_foreach_cb_multi(xhp, array, NULL, cleaner_cb, repo); xbps_repo_close(repo); xbps_object_release(array); diff --git a/include/xbps.h.in b/include/xbps.h.in index 2c96bc84..f58fab48 100644 --- a/include/xbps.h.in +++ b/include/xbps.h.in @@ -46,7 +46,7 @@ * * This header documents the full API for the XBPS Library. */ -#define XBPS_API_VERSION "20130727-1" +#define XBPS_API_VERSION "20130917" #ifndef XBPS_VERSION #define XBPS_VERSION "UNSET" @@ -699,7 +699,7 @@ xbps_array_t xbps_find_pkg_obsoletes(struct xbps_handle *xhp, /** * Executes a function callback per a package dictionary registered - * in master package database (pkgdb) plist (downwards). + * in master package database (pkgdb) plist. * * @param[in] xhp The pointer to the xbps_handle struct. * @param[in] fn Function callback to run for any pkg dictionary. @@ -712,6 +712,24 @@ int xbps_pkgdb_foreach_cb(struct xbps_handle *xhp, int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *), void *arg); +/** + * Executes a function callback per a package dictionary registered + * in master package database (pkgdb) plist. + * + * This is a multithreaded implementation spawning a thread per core. Each + * thread processes a fraction of total objects in the pkgdb dictionary. + * + * @param[in] xhp The pointer to the xbps_handle struct. + * @param[in] fn Function callback to run for any pkg dictionary. + * @param[in] arg Argument to be passed to the function callback. + * + * @return 0 on success (all objects were processed), otherwise + * the value returned by the function callback. + */ +int xbps_pkgdb_foreach_cb_multi(struct xbps_handle *xhp, + int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *), + void *arg); + /** * Returns a package dictionary from master package database (pkgdb) plist, * matching pkgname or pkgver object in \a pkg. @@ -813,12 +831,43 @@ int xbps_pkg_exec_script(struct xbps_handle *xhp, /** @addtogroup plist */ /*@{*/ +/** + * Executes a function callback (\a fn) per object in the proplib array \a array. + * + * @param[in] xhp The pointer to the xbps_handle struct. + * @param[in] array The proplib array to traverse. + * @param[in] dict The dictionary associated with the array. + * @param[in] fn Function callback to run for any pkg dictionary. + * @param[in] arg Argument to be passed to the function callback. + * + * @return 0 on success (all objects were processed), otherwise + * the value returned by the function callback. + */ int xbps_array_foreach_cb(struct xbps_handle *xhp, xbps_array_t array, xbps_dictionary_t dict, int (*fn)(struct xbps_handle *, xbps_object_t obj, const char *, void *arg, bool *done), void *arg); +/** + * Executes a function callback (\a fn) per object in the proplib array \a array. + * This is a multithreaded implementation spawning a thread per core. Each + * thread processes a fraction of total objects in the array. + * + * @param[in] xhp The pointer to the xbps_handle struct. + * @param[in] array The proplib array to traverse. + * @param[in] dict The dictionary associated with the array. + * @param[in] fn Function callback to run for any pkg dictionary. + * @param[in] arg Argument to be passed to the function callback. + * + * @return 0 on success (all objects were processed), otherwise + * the value returned by the function callback. + */ +int xbps_array_foreach_cb_multi(struct xbps_handle *xhp, + xbps_array_t array, + xbps_dictionary_t dict, + int (*fn)(struct xbps_handle *, xbps_object_t obj, const char *, void *arg, bool *done), + void *arg); /** * Match a virtual package name or pattern by looking at package's * dictionary "provides" array object. diff --git a/lib/pkgdb.c b/lib/pkgdb.c index f618592e..25413b76 100644 --- a/lib/pkgdb.c +++ b/lib/pkgdb.c @@ -153,6 +153,24 @@ xbps_pkgdb_foreach_cb(struct xbps_handle *xhp, return rv; } +int +xbps_pkgdb_foreach_cb_multi(struct xbps_handle *xhp, + int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *), + void *arg) +{ + xbps_array_t allkeys; + int rv; + + if ((rv = xbps_pkgdb_init(xhp)) != 0) + return rv; + + allkeys = xbps_dictionary_all_keys(xhp->pkgdb); + assert(allkeys); + rv = xbps_array_foreach_cb_multi(xhp, allkeys, xhp->pkgdb, fn, arg); + xbps_object_release(allkeys); + return rv; +} + xbps_dictionary_t xbps_pkgdb_get_pkg(struct xbps_handle *xhp, const char *pkg) { diff --git a/lib/plist.c b/lib/plist.c index 071e0348..3735ab33 100644 --- a/lib/plist.c +++ b/lib/plist.c @@ -85,7 +85,7 @@ array_foreach_thread(void *arg) } int -xbps_array_foreach_cb(struct xbps_handle *xhp, +xbps_array_foreach_cb_multi(struct xbps_handle *xhp, xbps_array_t array, xbps_dictionary_t dict, int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *), @@ -106,50 +106,67 @@ xbps_array_foreach_cb(struct xbps_handle *xhp, return 0; maxthreads = (int)sysconf(_SC_NPROCESSORS_ONLN); - if (maxthreads > 1) { - thd = calloc(maxthreads, sizeof(*thd)); - assert(thd); - slicecount = arraycount / maxthreads; - pkgcount = 0; - pthread_mutex_init(&mtx, NULL); + if (maxthreads == 1) /* use single threaded routine */ + return xbps_array_foreach_cb(xhp, array, dict, fn, arg); - for (int i = 0; i < maxthreads; i++) { - thd[i].mtx = &mtx; - thd[i].array = array; - thd[i].dict = dict; - thd[i].xhp = xhp; - thd[i].fn = fn; - thd[i].fn_arg = arg; - thd[i].start = pkgcount; - if (i + 1 >= maxthreads) - thd[i].end = arraycount; - else - thd[i].end = pkgcount + slicecount; - pthread_create(&thd[i].thread, NULL, - array_foreach_thread, &thd[i]); - pkgcount += slicecount; - } - /* wait for all threads */ - for (int i = 0; i < maxthreads; i++) - pthread_join(thd[i].thread, NULL); + thd = calloc(maxthreads, sizeof(*thd)); + assert(thd); + slicecount = arraycount / maxthreads; + pkgcount = 0; + pthread_mutex_init(&mtx, NULL); - pthread_mutex_destroy(&mtx); - free(thd); - } else { - /* single threaded */ - struct thread_data mythd; - - mythd.mtx = NULL; - mythd.array = array; - mythd.dict = dict; - mythd.xhp = xhp; - mythd.start = 0; - mythd.end = arraycount; - mythd.fn = fn; - mythd.fn_arg = arg; - array_foreach_thread(&mythd); + for (int i = 0; i < maxthreads; i++) { + thd[i].mtx = &mtx; + thd[i].array = array; + thd[i].dict = dict; + thd[i].xhp = xhp; + thd[i].fn = fn; + thd[i].fn_arg = arg; + thd[i].start = pkgcount; + if (i + 1 >= maxthreads) + thd[i].end = arraycount; + else + thd[i].end = pkgcount + slicecount; + pthread_create(&thd[i].thread, NULL, + array_foreach_thread, &thd[i]); + pkgcount += slicecount; } + /* wait for all threads */ + for (int i = 0; i < maxthreads; i++) + rv = pthread_join(thd[i].thread, NULL); + pthread_mutex_destroy(&mtx); + free(thd); + + return rv; +} + +int +xbps_array_foreach_cb(struct xbps_handle *xhp, + xbps_array_t array, + xbps_dictionary_t dict, + int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *), + void *arg) +{ + xbps_dictionary_t pkgd; + xbps_object_t obj; + const char *key; + int rv = 0; + bool loop_done = false; + + for (unsigned int i = 0; i < xbps_array_count(array); i++) { + obj = xbps_array_get(array, i); + if (xbps_object_type(dict) == XBPS_TYPE_DICTIONARY) { + pkgd = xbps_dictionary_get_keysym(dict, obj); + key = xbps_dictionary_keysym_cstring_nocopy(obj); + } else { + pkgd = obj; + key = NULL; + } + rv = (*fn)(xhp, pkgd, key, arg, &loop_done); + if (rv != 0 || loop_done) + break; + } return rv; }