From 0c7701f2bc33932ec84a8693bfd39c34514513dc Mon Sep 17 00:00:00 2001 From: Juan RP Date: Sat, 27 Jul 2013 09:47:16 +0200 Subject: [PATCH] Introduce xbps_array_foreach_cb() and use it in random code. This routine will spawn a thread per core to process N items stored in the specified array, the last thread gets the remainder of items left. Results have shown that xbps benefits if there is a considerable amount of items and number of threads being spawned. Use it in xbps_pkgdb_foreach_cb(), xbps-pkgdb(8), xbps-query(8) and xbps-rindex(8). On UP systems there's no overhead because pthread(3) is not used at all. WIP! investigate if it can be used in libxbps (xbps_rpool_foreach()), and finish conversion of xbps-rindex(8) -c. --- bin/xbps-pkgdb/check.c | 79 ++++------------- bin/xbps-query/defs.h | 4 +- bin/xbps-query/list.c | 16 ++-- bin/xbps-query/ownedby.c | 58 +++++++----- bin/xbps-query/search.c | 67 ++++++++------ bin/xbps-rindex/remove-obsoletes.c | 117 +++++++++--------------- include/xbps.h.in | 49 ++-------- lib/pkgdb.c | 22 ++--- lib/plist.c | 138 ++++++++++++++++++++--------- 9 files changed, 255 insertions(+), 295 deletions(-) diff --git a/bin/xbps-pkgdb/check.c b/bin/xbps-pkgdb/check.c index 1262ba3f..d82e0e53 100644 --- a/bin/xbps-pkgdb/check.c +++ b/bin/xbps-pkgdb/check.c @@ -30,89 +30,46 @@ #include #include #include -#include #include #include #include "defs.h" -struct thread_data { - pthread_t thread; - struct xbps_handle *xhp; - unsigned int start; - unsigned int end; - int thread_num; -}; - -static void * -pkgdb_thread_worker(void *arg) +static int +pkgdb_cb(struct xbps_handle *xhp, xbps_object_t obj, const char *key, void *arg, bool *done) { - xbps_dictionary_t pkgd; - xbps_array_t array; - xbps_object_t obj; - struct thread_data *thd = arg; const char *pkgver; char *pkgname; - unsigned int i; int rv; - array = xbps_dictionary_all_keys(thd->xhp->pkgdb); - assert(array); + (void)key; + (void)arg; + (void)done; - /* process pkgs from start until end */ - for (i = thd->start; i < thd->end; i++) { - obj = xbps_array_get(array, i); - pkgd = xbps_dictionary_get_keysym(thd->xhp->pkgdb, obj); - xbps_dictionary_get_cstring_nocopy(pkgd, "pkgver", &pkgver); - if (thd->xhp->flags & XBPS_FLAG_VERBOSE) - printf("Checking %s ...\n", pkgver); + xbps_dictionary_get_cstring_nocopy(obj, "pkgver", &pkgver); + if (xhp->flags & XBPS_FLAG_VERBOSE) + printf("Checking %s ...\n", pkgver); - pkgname = xbps_pkg_name(pkgver); - assert(pkgname); - rv = check_pkg_integrity(thd->xhp, pkgd, pkgname); - free(pkgname); - if (rv != 0) - fprintf(stderr, "pkgdb[%d] failed for %s: %s\n", - thd->thread_num, pkgver, strerror(rv)); - } - xbps_object_release(array); + pkgname = xbps_pkg_name(pkgver); + assert(pkgname); + rv = check_pkg_integrity(xhp, obj, pkgname); + free(pkgname); + if (rv != 0) + fprintf(stderr, "pkgdb failed for %s: %s\n", + pkgver, strerror(rv)); - return NULL; + return rv; } int check_pkg_integrity_all(struct xbps_handle *xhp) { - struct thread_data *thd; - unsigned int slicecount, pkgcount; - int rv, maxthreads, i; + int rv; /* force an update to get total pkg count */ (void)xbps_pkgdb_update(xhp, false); - maxthreads = (int)sysconf(_SC_NPROCESSORS_ONLN); - thd = calloc(maxthreads, sizeof(*thd)); - assert(thd); - - slicecount = xbps_dictionary_count(xhp->pkgdb) / maxthreads; - pkgcount = 0; - - for (i = 0; i < maxthreads; i++) { - thd[i].thread_num = i; - thd[i].xhp = xhp; - thd[i].start = pkgcount; - if (i + 1 >= maxthreads) - thd[i].end = xbps_dictionary_count(xhp->pkgdb); - else - thd[i].end = pkgcount + slicecount; - pthread_create(&thd[i].thread, NULL, - pkgdb_thread_worker, &thd[i]); - pkgcount += slicecount; - } - - /* wait for all threads */ - for (i = 0; i < maxthreads; i++) - pthread_join(thd[i].thread, NULL); + rv = xbps_pkgdb_foreach_cb(xhp, pkgdb_cb, NULL); if ((rv = xbps_pkgdb_update(xhp, true)) != 0) { xbps_error_printf("failed to write pkgdb: %s\n", diff --git a/bin/xbps-query/defs.h b/bin/xbps-query/defs.h index 4fb9be31..d59edaa9 100644 --- a/bin/xbps-query/defs.h +++ b/bin/xbps-query/defs.h @@ -59,8 +59,8 @@ int repo_ownedby(struct xbps_handle *, int, char **); /* From list.c */ unsigned int find_longest_pkgver(struct xbps_handle *, xbps_object_t); -int list_pkgs_in_dict(struct xbps_handle *, xbps_object_t, void *, bool *); -int list_manual_pkgs(struct xbps_handle *, xbps_object_t, void *, bool *); +int list_pkgs_in_dict(struct xbps_handle *, xbps_object_t, const char *, void *, bool *); +int list_manual_pkgs(struct xbps_handle *, xbps_object_t, const char *, void *, bool *); int list_orphans(struct xbps_handle *); int list_pkgs_pkgdb(struct xbps_handle *); diff --git a/bin/xbps-query/list.c b/bin/xbps-query/list.c index 6ee72c23..f692a48d 100644 --- a/bin/xbps-query/list.c +++ b/bin/xbps-query/list.c @@ -41,6 +41,7 @@ struct list_pkgver_cb { int list_pkgs_in_dict(struct xbps_handle *xhp, xbps_object_t obj, + const char *key, void *arg, bool *loop_done) { @@ -51,6 +52,7 @@ list_pkgs_in_dict(struct xbps_handle *xhp, pkg_state_t state; (void)xhp; + (void)key; (void)loop_done; xbps_dictionary_get_cstring_nocopy(obj, "pkgver", &pkgver); @@ -93,6 +95,7 @@ list_pkgs_in_dict(struct xbps_handle *xhp, int list_manual_pkgs(struct xbps_handle *xhp, xbps_object_t obj, + const char *key, void *arg, bool *loop_done) { @@ -100,6 +103,7 @@ list_manual_pkgs(struct xbps_handle *xhp, bool automatic = false; (void)xhp; + (void)key; (void)arg; (void)loop_done; @@ -176,23 +180,19 @@ struct fflongest { static int _find_longest_pkgver_cb(struct xbps_handle *xhp, xbps_object_t obj, + const char *key, void *arg, bool *loop_done) { struct fflongest *ffl = arg; - xbps_dictionary_t pkgd; const char *pkgver; unsigned int len; (void)xhp; + (void)key; (void)loop_done; - if (xbps_object_type(obj) == XBPS_TYPE_DICT_KEYSYM) - pkgd = xbps_dictionary_get_keysym(ffl->d, obj); - else - pkgd = obj; - - xbps_dictionary_get_cstring_nocopy(pkgd, "pkgver", &pkgver); + xbps_dictionary_get_cstring_nocopy(obj, "pkgver", &pkgver); len = strlen(pkgver); if (ffl->len == 0 || len > ffl->len) ffl->len = len; @@ -212,7 +212,7 @@ find_longest_pkgver(struct xbps_handle *xhp, xbps_object_t o) xbps_array_t array; array = xbps_dictionary_all_keys(o); - (void)xbps_callback_array_iter(xhp, array, + (void)xbps_array_foreach_cb(xhp, array, o, _find_longest_pkgver_cb, &ffl); xbps_object_release(array); } else { diff --git a/bin/xbps-query/ownedby.c b/bin/xbps-query/ownedby.c index e85b0d82..98f8a9c5 100644 --- a/bin/xbps-query/ownedby.c +++ b/bin/xbps-query/ownedby.c @@ -31,14 +31,18 @@ #include #include #include +#include #include #include "defs.h" struct ffdata { + pthread_mutex_t mtx; int npatterns; char **patterns; const char *repouri; + xbps_array_t allkeys; + xbps_dictionary_t filesd; }; static void @@ -83,7 +87,7 @@ match_files_by_pattern(xbps_dictionary_t pkg_filesd, } static int -ownedby_pkgdb_cb(struct xbps_handle *xhp, xbps_object_t obj, void *arg, bool *done) +ownedby_pkgdb_cb(struct xbps_handle *xhp, xbps_object_t obj, const char *obj_key, void *arg, bool *done) { xbps_dictionary_t pkgmetad; xbps_array_t files_keys; @@ -91,9 +95,13 @@ ownedby_pkgdb_cb(struct xbps_handle *xhp, xbps_object_t obj, void *arg, bool *do unsigned int i; const char *pkgver; + (void)obj_key; (void)done; xbps_dictionary_get_cstring_nocopy(obj, "pkgver", &pkgver); + + pthread_mutex_lock(&ffd->mtx); + pkgmetad = xbps_pkgdb_get_pkg_metadata(xhp, pkgver); assert(pkgmetad); @@ -105,6 +113,8 @@ ownedby_pkgdb_cb(struct xbps_handle *xhp, xbps_object_t obj, void *arg, bool *do xbps_object_release(pkgmetad); xbps_object_release(files_keys); + pthread_mutex_unlock(&ffd->mtx); + return 0; } @@ -117,6 +127,7 @@ ownedby(struct xbps_handle *xhp, int npatterns, char **patterns) ffd.npatterns = npatterns; ffd.patterns = patterns; + pthread_mutex_init(&ffd.mtx, NULL); for (i = 0; i < npatterns; i++) { rfile = realpath(patterns[i], NULL); @@ -126,52 +137,51 @@ ownedby(struct xbps_handle *xhp, int npatterns, char **patterns) return xbps_pkgdb_foreach_cb(xhp, ownedby_pkgdb_cb, &ffd); } -static void -repo_match_files_by_pattern(xbps_array_t files, - const char *pkgver, - struct ffdata *ffd) +static int +repo_match_cb(struct xbps_handle *xhp, xbps_object_t obj, const char *key, void *arg, bool *done) { + struct ffdata *ffd = arg; const char *filestr; unsigned int i; int x; - for (i = 0; i < xbps_array_count(files); i++) { - xbps_array_get_cstring_nocopy(files, i, &filestr); + (void)xhp; + (void)done; + + for (i = 0; i < xbps_array_count(obj); i++) { + xbps_array_get_cstring_nocopy(obj, i, &filestr); for (x = 0; x < ffd->npatterns; x++) { if ((fnmatch(ffd->patterns[x], filestr, FNM_PERIOD)) == 0) { printf("%s: %s (%s)\n", - pkgver, filestr, ffd->repouri); + key, filestr, ffd->repouri); } } } + + return 0; } static int repo_ownedby_cb(struct xbps_repo *repo, void *arg, bool *done) { - xbps_array_t allkeys, pkgar; + xbps_array_t allkeys; xbps_dictionary_t filesd; - xbps_dictionary_keysym_t ksym; struct ffdata *ffd = arg; - const char *pkgver; - unsigned int i; + int rv; (void)done; filesd = xbps_repo_get_plist(repo, XBPS_PKGINDEX_FILES); + if (filesd == NULL) + return 0; + ffd->repouri = repo->uri; allkeys = xbps_dictionary_all_keys(filesd); - - for (i = 0; i < xbps_array_count(allkeys); i++) { - ksym = xbps_array_get(allkeys, i); - pkgar = xbps_dictionary_get_keysym(filesd, ksym); - pkgver = xbps_dictionary_keysym_cstring_nocopy(ksym); - repo_match_files_by_pattern(pkgar, pkgver, ffd); - } + rv = xbps_array_foreach_cb(repo->xhp, allkeys, filesd, repo_match_cb, ffd); xbps_object_release(filesd); xbps_object_release(allkeys); - return 0; + return rv; } int @@ -179,8 +189,9 @@ repo_ownedby(struct xbps_handle *xhp, int npatterns, char **patterns) { struct ffdata ffd; char *rfile; - int i; + int i, rv; + pthread_mutex_init(&ffd.mtx, NULL); ffd.npatterns = npatterns; ffd.patterns = patterns; @@ -189,5 +200,8 @@ repo_ownedby(struct xbps_handle *xhp, int npatterns, char **patterns) if (rfile) patterns[i] = rfile; } - return xbps_rpool_foreach(xhp, repo_ownedby_cb, &ffd); + rv = xbps_rpool_foreach(xhp, repo_ownedby_cb, &ffd); + pthread_mutex_destroy(&ffd.mtx); + + return rv; } diff --git a/bin/xbps-query/search.c b/bin/xbps-query/search.c index c014215f..d211c147 100644 --- a/bin/xbps-query/search.c +++ b/bin/xbps-query/search.c @@ -40,6 +40,7 @@ #include #include #include +#include #include #include "defs.h" @@ -48,6 +49,7 @@ struct search_data { int npatterns; char **patterns; int maxcols; + pthread_mutex_t mtx; xbps_array_t results; }; @@ -95,44 +97,52 @@ print_results(struct xbps_handle *xhp, struct search_data *sd) } } +static int +search_array_cb(struct xbps_handle *xhp, xbps_object_t obj, const char *key, void *arg, bool *done) +{ + struct search_data *sd = arg; + const char *pkgver, *desc; + int x; + + (void)xhp; + (void)key; + (void)done; + + xbps_dictionary_get_cstring_nocopy(obj, "pkgver", &pkgver); + xbps_dictionary_get_cstring_nocopy(obj, "short_desc", &desc); + + for (x = 0; x < sd->npatterns; x++) { + bool vpkgfound = false; + + if (xbps_match_virtual_pkg_in_dict(obj, sd->patterns[x], false)) + vpkgfound = true; + + if ((xbps_pkgpattern_match(pkgver, sd->patterns[x])) || + (strcasestr(pkgver, sd->patterns[x])) || + (strcasestr(desc, sd->patterns[x])) || vpkgfound) { + pthread_mutex_lock(&sd->mtx); + xbps_array_add_cstring_nocopy(sd->results, pkgver); + xbps_array_add_cstring_nocopy(sd->results, desc); + pthread_mutex_unlock(&sd->mtx); + } + } + return 0; +} + static int search_pkgs_cb(struct xbps_repo *repo, void *arg, bool *done) { xbps_array_t allkeys; - xbps_dictionary_t pkgd; - xbps_dictionary_keysym_t ksym; struct search_data *sd = arg; - const char *pkgver, *desc; - unsigned int i; - int x; + int rv; (void)done; allkeys = xbps_dictionary_all_keys(repo->idx); - for (i = 0; i < xbps_array_count(allkeys); i++) { - ksym = xbps_array_get(allkeys, i); - pkgd = xbps_dictionary_get_keysym(repo->idx, ksym); - - xbps_dictionary_get_cstring_nocopy(pkgd, "pkgver", &pkgver); - xbps_dictionary_get_cstring_nocopy(pkgd, "short_desc", &desc); - - for (x = 0; x < sd->npatterns; x++) { - bool vpkgfound = false; - - if (xbps_match_virtual_pkg_in_dict(pkgd, sd->patterns[x], false)) - vpkgfound = true; - - if ((xbps_pkgpattern_match(pkgver, sd->patterns[x])) || - (strcasestr(pkgver, sd->patterns[x])) || - (strcasestr(desc, sd->patterns[x])) || vpkgfound) { - xbps_array_add_cstring_nocopy(sd->results, pkgver); - xbps_array_add_cstring_nocopy(sd->results, desc); - } - } - } + rv = xbps_array_foreach_cb(repo->xhp, allkeys, repo->idx, search_array_cb, sd); xbps_object_release(allkeys); - return 0; + return rv; } int @@ -141,6 +151,7 @@ repo_search(struct xbps_handle *xhp, int npatterns, char **patterns) struct search_data sd; int rv; + pthread_mutex_init(&sd.mtx, NULL); sd.npatterns = npatterns; sd.patterns = patterns; sd.maxcols = get_maxcols(); @@ -152,6 +163,8 @@ repo_search(struct xbps_handle *xhp, int npatterns, char **patterns) strerror(rv)); print_results(xhp, &sd); + pthread_mutex_destroy(&sd.mtx); + xbps_object_release(sd.results); return rv; } diff --git a/bin/xbps-rindex/remove-obsoletes.c b/bin/xbps-rindex/remove-obsoletes.c index 2663fb0d..66050ad7 100644 --- a/bin/xbps-rindex/remove-obsoletes.c +++ b/bin/xbps-rindex/remove-obsoletes.c @@ -32,20 +32,10 @@ #include #include #include -#include #include #include "defs.h" -struct thread_data { - pthread_t thread; - xbps_array_t array; - struct xbps_repo *repo; - unsigned int start; - unsigned int end; - int thread_num; -}; - static int remove_pkg(const char *repodir, const char *arch, const char *file) { @@ -81,51 +71,49 @@ remove_pkg(const char *repodir, const char *arch, const char *file) return 0; } -static void * -cleaner_thread(void *arg) +static int +cleaner_cb(struct xbps_handle *xhp, xbps_object_t obj, const char *key, void *arg, bool *done) { xbps_dictionary_t pkgd; - struct thread_data *thd = arg; + struct xbps_repo *repo = arg; const char *binpkg, *pkgver, *arch; - unsigned int i; int rv; - /* process pkgs from start until end */ - for (i = thd->start; i < thd->end; i++) { - xbps_array_get_cstring_nocopy(thd->array, i, &binpkg); - pkgd = xbps_get_pkg_plist_from_binpkg(binpkg, "./props.plist"); - if (pkgd == NULL) { - rv = remove_pkg(thd->repo->uri, arch, binpkg); - if (rv != 0) { - xbps_object_release(pkgd); - continue; - } - printf("Removed broken package `%s'.\n", binpkg); - } - xbps_dictionary_get_cstring_nocopy(pkgd, "pkgver", &pkgver); - xbps_dictionary_get_cstring_nocopy(pkgd, "architecture", &arch); - /* ignore pkgs from other archs */ - if (!xbps_pkg_arch_match(thd->repo->xhp, arch, NULL)) { - xbps_object_release(pkgd); - continue; - } - xbps_dbg_printf(thd->repo->xhp, "thread[%d] checking %s (%s)\n", - thd->thread_num, pkgver, binpkg); - /* - * If binpkg is not registered in index, remove binpkg. - */ - if (!xbps_repo_get_pkg(thd->repo, pkgver)) { - rv = remove_pkg(thd->repo->uri, arch, binpkg); - if (rv != 0) { - xbps_object_release(pkgd); - continue; - } - printf("Removed obsolete package `%s'.\n", binpkg); - } - xbps_object_release(pkgd); - } + (void)key; + (void)done; - return NULL; + binpkg = xbps_string_cstring_nocopy(obj); + pkgd = xbps_get_pkg_plist_from_binpkg(binpkg, "./props.plist"); + if (pkgd == NULL) { + rv = remove_pkg(repo->uri, arch, binpkg); + if (rv != 0) { + xbps_object_release(pkgd); + return 0; + } + printf("Removed broken package `%s'.\n", binpkg); + } + xbps_dictionary_get_cstring_nocopy(pkgd, "pkgver", &pkgver); + xbps_dictionary_get_cstring_nocopy(pkgd, "architecture", &arch); + /* ignore pkgs from other archs */ + if (!xbps_pkg_arch_match(xhp, arch, NULL)) { + xbps_object_release(pkgd); + return 0; + } + printf("checking %s (%s)\n", pkgver, binpkg); + /* + * If binpkg is not registered in index, remove binpkg. + */ + if (!xbps_repo_get_pkg(repo, pkgver)) { + rv = remove_pkg(repo->uri, arch, binpkg); + if (rv != 0) { + xbps_object_release(pkgd); + return 0; + } + printf("Removed obsolete package `%s'.\n", binpkg); + } + xbps_object_release(pkgd); + + return 0; } int @@ -133,12 +121,10 @@ remove_obsoletes(struct xbps_handle *xhp, const char *repodir) { xbps_array_t array = NULL; struct xbps_repo *repo; - struct thread_data *thd; DIR *dirp; struct dirent *dp; char *ext; - int i, maxthreads, rv = 0; - unsigned int slicecount, pkgcount; + int rv = 0; repo = xbps_repo_open(xhp, repodir); if (repo == NULL) { @@ -173,34 +159,13 @@ remove_obsoletes(struct xbps_handle *xhp, const char *repodir) if (array == NULL) array = xbps_array_create(); - xbps_array_add_cstring(array, dp->d_name); + xbps_array_add_cstring_nocopy(array, dp->d_name); } (void)closedir(dirp); - maxthreads = (int)sysconf(_SC_NPROCESSORS_ONLN); - thd = calloc(maxthreads, sizeof(*thd)); - - slicecount = xbps_array_count(array) / maxthreads; - pkgcount = 0; - - for (i = 0; i < maxthreads; i++) { - thd[i].thread_num = i; - thd[i].array = array; - thd[i].repo = repo; - thd[i].start = pkgcount; - if (i + 1 >= maxthreads) - thd[i].end = xbps_array_count(array); - else - thd[i].end = pkgcount + slicecount; - pthread_create(&thd[i].thread, NULL, cleaner_thread, &thd[i]); - pkgcount += slicecount; - } - - /* wait for all threads */ - for (i = 0; i < maxthreads; i++) - pthread_join(thd[i].thread, NULL); - + rv = xbps_array_foreach_cb(xhp, array, NULL, cleaner_cb, repo); xbps_repo_close(repo); + xbps_object_release(array); return rv; } diff --git a/include/xbps.h.in b/include/xbps.h.in index 80e3dce2..2c96bc84 100644 --- a/include/xbps.h.in +++ b/include/xbps.h.in @@ -46,7 +46,7 @@ * * This header documents the full API for the XBPS Library. */ -#define XBPS_API_VERSION "20130727" +#define XBPS_API_VERSION "20130727-1" #ifndef XBPS_VERSION #define XBPS_VERSION "UNSET" @@ -709,7 +709,7 @@ xbps_array_t xbps_find_pkg_obsoletes(struct xbps_handle *xhp, * the value returned by the function callback. */ int xbps_pkgdb_foreach_cb(struct xbps_handle *xhp, - int (*fn)(struct xbps_handle *, xbps_object_t, void *, bool *), + int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *), void *arg); /** @@ -813,46 +813,11 @@ int xbps_pkg_exec_script(struct xbps_handle *xhp, /** @addtogroup plist */ /*@{*/ -/** - * Executes a function callback specified in \a fn with \a arg passed - * as its argument into they array \a array. - * - * @param[in] xhp The pointer to the xbps_handle struct. - * @param[in] array Proplib array to iterate. - * @param[in] fn Function callback to run on every object in the array. - * While running the function callback, the hird parameter (a pointer to - * a boolean) can be set to true to stop immediately the loop. - * @param[in] arg Argument to be passed to the function callback. - * - * @return 0 on success, otherwise the value returned by the function - * callback. - */ -int xbps_callback_array_iter(struct xbps_handle *xhp, - xbps_array_t array, - int (*fn)(struct xbps_handle *, xbps_object_t, void *, bool *), - void *arg); - -/** - * Executes a function callback into the array associated with key \a key, - * contained in a proplib dictionary. - * - * @param[in] xhp The pointer to the xbps_handle struct. - * @param[in] dict Proplib dictionary where the array resides. - * @param[in] key Key associated with array. - * @param[in] fn Function callback to run on every - * object in the array. While running the function callback, the third - * parameter (a pointer to a boolean) can be set to true to stop - * immediately the loop. - * @param[in] arg Argument to be passed to the function callback. - * - * @return 0 on success (all objects were processed), otherwise - * the value returned by the function callback. - */ -int xbps_callback_array_iter_in_dict(struct xbps_handle *xhp, - xbps_dictionary_t dict, - const char *key, - int (*fn)(struct xbps_handle *, xbps_object_t, void *, bool *), - void *arg); +int xbps_array_foreach_cb(struct xbps_handle *xhp, + xbps_array_t array, + xbps_dictionary_t dict, + int (*fn)(struct xbps_handle *, xbps_object_t obj, const char *, void *arg, bool *done), + void *arg); /** * Match a virtual package name or pattern by looking at package's diff --git a/lib/pkgdb.c b/lib/pkgdb.c index 5650ab4e..fa7217f1 100644 --- a/lib/pkgdb.c +++ b/lib/pkgdb.c @@ -137,27 +137,19 @@ xbps_pkgdb_release(struct xbps_handle *xhp) int xbps_pkgdb_foreach_cb(struct xbps_handle *xhp, - int (*fn)(struct xbps_handle *, xbps_object_t, void *, bool *), - void *arg) + int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *), + void *arg) { - xbps_object_t obj; - xbps_object_iterator_t iter; - xbps_dictionary_t pkgd; + xbps_array_t allkeys; int rv; - bool done = false; if ((rv = xbps_pkgdb_init(xhp)) != 0) return rv; - iter = xbps_dictionary_iterator(xhp->pkgdb); - assert(iter); - while ((obj = xbps_object_iterator_next(iter))) { - pkgd = xbps_dictionary_get_keysym(xhp->pkgdb, obj); - rv = (*fn)(xhp, pkgd, arg, &done); - if (rv != 0 || done) - break; - } - xbps_object_iterator_release(iter); + allkeys = xbps_dictionary_all_keys(xhp->pkgdb); + assert(allkeys); + rv = xbps_array_foreach_cb(xhp, allkeys, xhp->pkgdb, fn, arg); + xbps_object_release(allkeys); return rv; } diff --git a/lib/plist.c b/lib/plist.c index bf9a600f..14ce9dd2 100644 --- a/lib/plist.c +++ b/lib/plist.c @@ -28,9 +28,22 @@ #include #include #include +#include #include "xbps_api_impl.h" +struct thread_data { + pthread_t thread; + pthread_mutex_t *mtx; + xbps_array_t array; + xbps_dictionary_t dict; + struct xbps_handle *xhp; + unsigned int start; + unsigned int end; + int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *); + void *fn_arg; +}; + /** * @file lib/plist.c * @brief PropertyList generic routines @@ -73,60 +86,101 @@ xbps_add_obj_to_array(xbps_array_t array, xbps_object_t obj) return true; } +static void * +array_foreach_thread(void *arg) +{ + xbps_object_t obj, pkgd; + struct thread_data *thd = arg; + const char *key; + unsigned int i; + int rv; + bool loop_done = false; + + /* process pkgs from start until end */ + for (i = thd->start; i < thd->end; i++) { + if (thd->mtx) + pthread_mutex_lock(thd->mtx); + + obj = xbps_array_get(thd->array, i); + if (xbps_object_type(thd->dict) == XBPS_TYPE_DICTIONARY) { + pkgd = xbps_dictionary_get_keysym(thd->dict, obj); + key = xbps_dictionary_keysym_cstring_nocopy(obj); + } else { + pkgd = obj; + key = NULL; + } + if (thd->mtx) + pthread_mutex_unlock(thd->mtx); + + rv = (*thd->fn)(thd->xhp, pkgd, key, thd->fn_arg, &loop_done); + if (rv != 0 || loop_done) + break; + } + return NULL; +} + int -xbps_callback_array_iter(struct xbps_handle *xhp, +xbps_array_foreach_cb(struct xbps_handle *xhp, xbps_array_t array, - int (*fn)(struct xbps_handle *, xbps_object_t, void *, bool *), + xbps_dictionary_t dict, + int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *), void *arg) { - xbps_object_t obj; - xbps_object_iterator_t iter; - int rv = 0; - bool loop_done = false; + struct thread_data *thd; + pthread_mutex_t mtx; + unsigned int arraycount, slicecount, pkgcount; + int rv = 0, maxthreads, i; assert(xbps_object_type(array) == XBPS_TYPE_ARRAY); assert(fn != NULL); - iter = xbps_array_iterator(array); - if (iter == NULL) - return ENOMEM; + arraycount = xbps_array_count(array); + if (arraycount == 0) + return 0; - while ((obj = xbps_object_iterator_next(iter)) != NULL) { - rv = (*fn)(xhp, obj, arg, &loop_done); - if (rv != 0 || loop_done) - break; - } - xbps_object_iterator_release(iter); + maxthreads = (int)sysconf(_SC_NPROCESSORS_ONLN); + if (maxthreads > 1) { + thd = calloc(maxthreads, sizeof(*thd)); + assert(thd); + slicecount = arraycount / maxthreads; + pkgcount = 0; + pthread_mutex_init(&mtx, NULL); - return rv; -} + for (i = 0; i < maxthreads; i++) { + thd[i].mtx = &mtx; + thd[i].array = array; + thd[i].dict = dict; + thd[i].xhp = xhp; + thd[i].fn = fn; + thd[i].fn_arg = arg; + thd[i].start = pkgcount; + if (i + 1 >= maxthreads) + thd[i].end = arraycount; + else + thd[i].end = pkgcount + slicecount; + pthread_create(&thd[i].thread, NULL, + array_foreach_thread, &thd[i]); + pkgcount += slicecount; + } + /* wait for all threads */ + for (i = 0; i < maxthreads; i++) + pthread_join(thd[i].thread, NULL); -int -xbps_callback_array_iter_in_dict(struct xbps_handle *xhp, - xbps_dictionary_t dict, - const char *key, - int (*fn)(struct xbps_handle *, xbps_object_t, void *, bool *), - void *arg) -{ - xbps_object_t obj; - xbps_array_t array; - unsigned int i; - int rv = 0; - bool cbloop_done = false; + pthread_mutex_destroy(&mtx); + free(thd); + } else { + /* single threaded */ + struct thread_data mythd; - assert(xbps_object_type(dict) == XBPS_TYPE_DICTIONARY); - assert(xhp != NULL); - assert(key != NULL); - assert(fn != NULL); - - array = xbps_dictionary_get(dict, key); - for (i = 0; i < xbps_array_count(array); i++) { - obj = xbps_array_get(array, i); - if (obj == NULL) - continue; - rv = (*fn)(xhp, obj, arg, &cbloop_done); - if (rv != 0 || cbloop_done) - break; + mythd.mtx = NULL; + mythd.array = array; + mythd.dict = dict; + mythd.xhp = xhp; + mythd.start = 0; + mythd.end = arraycount; + mythd.fn = fn; + mythd.fn_arg = arg; + array_foreach_thread(&mythd); } return rv;