Introduce xbps_array_foreach_cb() and use it in random code.

This routine will spawn a thread per core to process N items stored
in the specified array, the last thread gets the remainder of items left.

Results have shown that xbps benefits if there is a considerable amount
of items and number of threads being spawned.

Use it in xbps_pkgdb_foreach_cb(), xbps-pkgdb(8), xbps-query(8)
and xbps-rindex(8).

On UP systems there's no overhead because pthread(3) is not used at all.

WIP! investigate if it can be used in libxbps (xbps_rpool_foreach()),
and finish conversion of xbps-rindex(8) -c.
This commit is contained in:
Juan RP
2013-07-27 09:47:16 +02:00
parent 432067de48
commit 0c7701f2bc
9 changed files with 255 additions and 295 deletions

View File

@@ -32,20 +32,10 @@
#include <dirent.h>
#include <libgen.h>
#include <assert.h>
#include <pthread.h>
#include <xbps.h>
#include "defs.h"
struct thread_data {
pthread_t thread;
xbps_array_t array;
struct xbps_repo *repo;
unsigned int start;
unsigned int end;
int thread_num;
};
static int
remove_pkg(const char *repodir, const char *arch, const char *file)
{
@@ -81,51 +71,49 @@ remove_pkg(const char *repodir, const char *arch, const char *file)
return 0;
}
static void *
cleaner_thread(void *arg)
static int
cleaner_cb(struct xbps_handle *xhp, xbps_object_t obj, const char *key, void *arg, bool *done)
{
xbps_dictionary_t pkgd;
struct thread_data *thd = arg;
struct xbps_repo *repo = arg;
const char *binpkg, *pkgver, *arch;
unsigned int i;
int rv;
/* process pkgs from start until end */
for (i = thd->start; i < thd->end; i++) {
xbps_array_get_cstring_nocopy(thd->array, i, &binpkg);
pkgd = xbps_get_pkg_plist_from_binpkg(binpkg, "./props.plist");
if (pkgd == NULL) {
rv = remove_pkg(thd->repo->uri, arch, binpkg);
if (rv != 0) {
xbps_object_release(pkgd);
continue;
}
printf("Removed broken package `%s'.\n", binpkg);
}
xbps_dictionary_get_cstring_nocopy(pkgd, "pkgver", &pkgver);
xbps_dictionary_get_cstring_nocopy(pkgd, "architecture", &arch);
/* ignore pkgs from other archs */
if (!xbps_pkg_arch_match(thd->repo->xhp, arch, NULL)) {
xbps_object_release(pkgd);
continue;
}
xbps_dbg_printf(thd->repo->xhp, "thread[%d] checking %s (%s)\n",
thd->thread_num, pkgver, binpkg);
/*
* If binpkg is not registered in index, remove binpkg.
*/
if (!xbps_repo_get_pkg(thd->repo, pkgver)) {
rv = remove_pkg(thd->repo->uri, arch, binpkg);
if (rv != 0) {
xbps_object_release(pkgd);
continue;
}
printf("Removed obsolete package `%s'.\n", binpkg);
}
xbps_object_release(pkgd);
}
(void)key;
(void)done;
return NULL;
binpkg = xbps_string_cstring_nocopy(obj);
pkgd = xbps_get_pkg_plist_from_binpkg(binpkg, "./props.plist");
if (pkgd == NULL) {
rv = remove_pkg(repo->uri, arch, binpkg);
if (rv != 0) {
xbps_object_release(pkgd);
return 0;
}
printf("Removed broken package `%s'.\n", binpkg);
}
xbps_dictionary_get_cstring_nocopy(pkgd, "pkgver", &pkgver);
xbps_dictionary_get_cstring_nocopy(pkgd, "architecture", &arch);
/* ignore pkgs from other archs */
if (!xbps_pkg_arch_match(xhp, arch, NULL)) {
xbps_object_release(pkgd);
return 0;
}
printf("checking %s (%s)\n", pkgver, binpkg);
/*
* If binpkg is not registered in index, remove binpkg.
*/
if (!xbps_repo_get_pkg(repo, pkgver)) {
rv = remove_pkg(repo->uri, arch, binpkg);
if (rv != 0) {
xbps_object_release(pkgd);
return 0;
}
printf("Removed obsolete package `%s'.\n", binpkg);
}
xbps_object_release(pkgd);
return 0;
}
int
@@ -133,12 +121,10 @@ remove_obsoletes(struct xbps_handle *xhp, const char *repodir)
{
xbps_array_t array = NULL;
struct xbps_repo *repo;
struct thread_data *thd;
DIR *dirp;
struct dirent *dp;
char *ext;
int i, maxthreads, rv = 0;
unsigned int slicecount, pkgcount;
int rv = 0;
repo = xbps_repo_open(xhp, repodir);
if (repo == NULL) {
@@ -173,34 +159,13 @@ remove_obsoletes(struct xbps_handle *xhp, const char *repodir)
if (array == NULL)
array = xbps_array_create();
xbps_array_add_cstring(array, dp->d_name);
xbps_array_add_cstring_nocopy(array, dp->d_name);
}
(void)closedir(dirp);
maxthreads = (int)sysconf(_SC_NPROCESSORS_ONLN);
thd = calloc(maxthreads, sizeof(*thd));
slicecount = xbps_array_count(array) / maxthreads;
pkgcount = 0;
for (i = 0; i < maxthreads; i++) {
thd[i].thread_num = i;
thd[i].array = array;
thd[i].repo = repo;
thd[i].start = pkgcount;
if (i + 1 >= maxthreads)
thd[i].end = xbps_array_count(array);
else
thd[i].end = pkgcount + slicecount;
pthread_create(&thd[i].thread, NULL, cleaner_thread, &thd[i]);
pkgcount += slicecount;
}
/* wait for all threads */
for (i = 0; i < maxthreads; i++)
pthread_join(thd[i].thread, NULL);
rv = xbps_array_foreach_cb(xhp, array, NULL, cleaner_cb, repo);
xbps_repo_close(repo);
xbps_object_release(array);
return rv;
}