diff options
Diffstat (limited to 'server/.svn/text-base/fqm.c.svn-base')
-rw-r--r-- | server/.svn/text-base/fqm.c.svn-base | 448 |
1 files changed, 0 insertions, 448 deletions
diff --git a/server/.svn/text-base/fqm.c.svn-base b/server/.svn/text-base/fqm.c.svn-base deleted file mode 100644 index 73c9e54..0000000 --- a/server/.svn/text-base/fqm.c.svn-base +++ /dev/null @@ -1,448 +0,0 @@ -#include "conf.h" -#include "fqm.h" -#include "log.h" -#include "mysqldb.h" -#include "thread.h" - -#include <errno.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <unistd.h> - -RCSID("$Id: fqm.c,v 1.33 2001/10/29 11:17:13 shmit Exp $"); - -/* Global Variables */ -fqm_t *fqm; - -request_t * -req_new(int sock, short reqid, REQHANDLER((*req_handler)), - const char *reqstr, int reqlen) -{ - request_t *tmp; - - tmp = malloc(sizeof(request_t)); - if (tmp == NULL) { - log_err("Couldn't allocate request: %s.", strerror(errno)); - return NULL; - } - - tmp->req = malloc((reqlen + 1) * sizeof(char)); - if (tmp->req == NULL) { - log_err("Couldn't allocate request string: %s.", - strerror(errno)); - req_delete(tmp); - return NULL; - } - memcpy(tmp->req, reqstr, reqlen); - tmp->req[reqlen] = '\0'; - tmp->sock = sock; - tmp->reqid = reqid; - tmp->req_handler = req_handler; - - return tmp; -} - -void -req_delete(request_t *req) -{ - if (req) { - if (req->req) - free(req->req); - free(req); - } -} - -static request_t * -fqm_pop(reqthread_t *req) -{ - fqm_t *fqm; - request_t *p; - - if (req == NULL || req->fqm == NULL) { - mutex_unlock(&req->fqm->q_cond->lock); - return NULL; - } - fqm = req->fqm; - - /* Wait for something to arrive on the queue. */ - mutex_lock(&req->fqm->q_cond->lock); - req->status = IDLE; - - if (fqm->sreq == NULL) - cond_wait(fqm->q_cond); - - /* Something waiting on the queue. Scarf it off and unlock. */ - p = fqm->sreq; - if (p == NULL) { - mutex_unlock(&fqm->q_cond->lock); - return NULL; - } - fqm->sreq = fqm->sreq->next; - fqm->nitems--; - req->status = BUSY; - mutex_unlock(&fqm->q_cond->lock); - - p->next = NULL; - return p; -} - -static void * -fqm_popper(void *arg) -{ - reqthread_t *self; - request_t *req; - - /* Detach this thread. */ - (void)pthread_detach(pthread_self()); - - self = (reqthread_t *)arg; - for (;;) { - req = fqm_pop(self); - - /* - * Check to see if we were told to die. If we were, - * do so. - * Note: there's the possibility for the dropping of - * a request here. Que sera sera. - */ - if (self->dieflag) - break; - if (req == NULL) - continue; - if (req->req_handler(req, self) == -1) - log_info("Couldn't handle request"); - req_delete(req); - } - - log_info("Got a kill flag; dying off."); - mutex_lock(&self->fqm->t_cond->lock); - self->status = NOALLOC; - cond_signal(self->fqm->t_cond); - mutex_unlock(&self->fqm->t_cond->lock); - return NULL; -} - -static reqthread_t * -fqm_thr_new(fqm_t *fqm) -{ - reqthread_t *tmp; - - tmp = malloc(sizeof(reqthread_t)); - if (tmp == NULL) { - log_err("Couldn't allocate fallthrough thread: %s.", - strerror(errno)); - return NULL; - } - - tmp->arg = NULL; - tmp->fqm = fqm; - tmp->dieflag = 0; - tmp->status = NOALLOC; - - tmp->tid = malloc(sizeof(thread_t)); - if (tmp->tid == NULL) { - log_err("Couldn't allocate fallthrough thread: %s.", - strerror(errno)); - free(tmp); - return NULL; - } - - if (thread_new(tmp->tid, fqm_popper, tmp)) { - log_err("Couldn't start fallthrough thread: %s.", - strerror(errno)); - free(tmp->tid); - tmp->tid = NULL; - free(tmp); - return NULL; - } - tmp->status = IDLE; - - return tmp; -} - -static int -fqm_thr_wait(fqm_t *fqm) -{ - int deadthreads; - int i; - - deadthreads = 0; - - for (i = 0; i < fqm->maxitems; i++) { - if (fqm->tids[i] && fqm->tids[i]->status == NOALLOC) { - mysqldb_connect_close(fqm->tids[i]->arg); - if (fqm->tids[i]->tid != NULL) - free(fqm->tids[i]->tid); - free(fqm->tids[i]); - fqm->tids[i] = NULL; - deadthreads++; - } - } - - return deadthreads; -} - -fqm_t * -fqm_new(int maxitems) -{ - fqm_t *tmp; - int i; - - tmp = malloc(sizeof(fqm_t)); - if (tmp == NULL) { - log_err("Couldn't create fallthrough queue manager: %s.", - strerror(errno)); - return NULL; - } - - tmp->q_cond = NULL; - tmp->t_cond = NULL; - tmp->tids = NULL; - tmp->sreq = NULL; tmp->ereq = NULL; - tmp->maxitems = 0; - tmp->nitems = 0; - - tmp->q_cond = malloc(sizeof(cond_t)); - if (tmp->q_cond == NULL) { - log_err("Couldn't allocate queue condition variable: %s.", - strerror(errno)); - fqm_delete(tmp); - return NULL; - } - - if (cond_new(tmp->q_cond)) { - log_err("Couldn't initialise queue condition variable: %s.", - strerror(errno)); - fqm_delete(tmp); - return NULL; - } - - tmp->t_cond = malloc(sizeof(cond_t)); - if (tmp->t_cond == NULL) { - log_err("Couldn't allocate queue destroy condition: %s.", - strerror(errno)); - fqm_delete(tmp); - return NULL; - } - - if (cond_new(tmp->t_cond)) { - log_err("Couldn't initialise queue destroy condition: %s.", - strerror(errno)); - fqm_delete(tmp); - return NULL; - } - - tmp->tids = malloc(maxitems * sizeof(reqthread_t *)); - if (tmp->tids == NULL) { - log_err("Couldn't allocate fallthrough queue threads: %s.", - strerror(errno)); - fqm_delete(tmp); - return NULL; - } - - /* Init these so fqm_delete won't break. */ - for (i = 0; i < maxitems; i++) - tmp->tids[i] = NULL; - - /* We cannot adjust max items while threads are busy, so lock. */ - mutex_lock(&tmp->q_cond->lock); - for (i = 0; i < maxitems; i++) { - tmp->tids[i] = fqm_thr_new(tmp); - if (tmp->tids[i] == NULL) { - mutex_unlock(&tmp->q_cond->lock); - fqm_delete(tmp); - return NULL; - } - tmp->maxitems++; - } - mutex_unlock(&tmp->q_cond->lock); - - /* XXX: this may need a lock. */ - fqm = tmp; - return tmp; -} - -/* - * Make sure all threads in this fqm are idle. This is an expensive - * operation, but that's okay, since this code should only be run when - * a client has closed the connection, so we can take as long as we want. - * - * The queue is locked when coming in to this routine and leaving it. - */ -void -verify_idle_threads(fqm_t *fqm) -{ - int i, busy_thread; - - do { - busy_thread = 0; - for (i = 0; i < fqm->maxitems; i++) { - if (fqm->tids[i]->status == BUSY) - busy_thread = 1; - } - - /* - * This is the shitty part. - * We unlock the queue and sleep for a bit to give other - * threads a chance to finish what they're doing. - */ - if (busy_thread) { - mutex_unlock(&fqm->q_cond->lock); - sleep(1); - mutex_lock(&fqm->q_cond->lock); - } - } while (busy_thread); -} - -void -fqm_delete(fqm_t *fqm) -{ - int i; - - if (fqm->tids) { - /* Tell all threads to quit. */ - for (i = 0; i < fqm->maxitems; i++) { - if (fqm->tids[i] != NULL) - fqm->tids[i]->dieflag = 1; - } - - /* - * Lock queue waiting condition to ensure that no - * threads miss this signal. The threads MUST be idle - * in order to guarantee this signal is recieved. - */ - mutex_lock(&fqm->q_cond->lock); - verify_idle_threads(fqm); - pthread_cond_broadcast(&fqm->q_cond->id); - mutex_unlock(&fqm->q_cond->lock); - - /* Now wait for them to die off. */ - i = 0; - while (i < fqm->maxitems) { - mutex_lock(&fqm->t_cond->lock); - i += fqm_thr_wait(fqm); - if (i < fqm->maxitems) - cond_wait(fqm->t_cond); - mutex_unlock(&fqm->t_cond->lock); - } - free(fqm->tids); - fqm->tids = NULL; - } - - /* - * At this point all fqm threads should be dead. - */ - if (fqm->t_cond) { - (void)cond_destroy(fqm->t_cond); - free(fqm->t_cond); - fqm->t_cond = NULL; - } - - if (fqm->q_cond) { - (void)cond_destroy(fqm->q_cond); - free(fqm->q_cond); - fqm->q_cond = NULL; - } - - free(fqm); -} - -int -fqm_changemaxitems(fqm_t *fqm, int maxitems) -{ - reqthread_t **new_tidpool; - int i, j; - - /* - * This code broken. Silently fail here. - */ - return 0; - - if (fqm->maxitems == maxitems) - return 0; - - mutex_lock(&fqm->q_cond->lock); - /* - * Make sure all threads are busy. We can't go changing - * data under them. - */ - verify_idle_threads(fqm); - new_tidpool = malloc(maxitems * sizeof(reqthread_t *)); - if (new_tidpool == NULL) { - mutex_unlock(&fqm->q_cond->lock); - log_err("Couldn't allocate new FQM thread pool: %s.", - strerror(errno)); - return -1; - } - - /* Compress old TID pool. */ - for (j = 0, i = 0; i < fqm->maxitems && j < maxitems; i++) { - if (fqm->tids[i]->status != NOALLOC) { - new_tidpool[j] = fqm->tids[i]; - j++; - } - } - - if (fqm->maxitems < maxitems) { - /* Add more threads. */ - mutex_lock(&fqm->q_cond->lock); - for (i = fqm->maxitems; i < maxitems; i++) { - new_tidpool[i] = fqm_thr_new(fqm); - if (new_tidpool[i] == NULL) { - free(new_tidpool); - mutex_unlock(&fqm->q_cond->lock); - return -1; - } - } - mutex_unlock(&fqm->q_cond->lock); - } else if (fqm->maxitems > maxitems) { - /* Kill some threads. */ - int deadthreads; - - /* Tell them to die, then wake 'em up. */ - for (i = maxitems; i < fqm->maxitems; i++) - fqm->tids[i]->dieflag = 1; - pthread_cond_broadcast(&fqm->q_cond->id); - - deadthreads = 0; - while (deadthreads < fqm->maxitems - maxitems) { - mutex_lock(&fqm->t_cond->lock); - deadthreads += fqm_thr_wait(fqm); - if (deadthreads < fqm->maxitems - maxitems) - cond_wait(fqm->t_cond); - mutex_unlock(&fqm->t_cond->lock); - } - } - - free(fqm->tids); - fqm->tids = new_tidpool; - fqm->maxitems = maxitems; - mutex_unlock(&fqm->q_cond->lock); - return 0; -} - -int -fqm_push(fqm_t *fqm, request_t *req) -{ - if (fqm->nitems == fqm->maxitems) { - log_err("Too many items on the request queue."); - return -1; - } - - /* Lock the queue and add the item. */ - mutex_lock(&fqm->q_cond->lock); - - req->next = NULL; - if (fqm->sreq == NULL) - fqm->sreq = req; - else - fqm->ereq->next = req; - fqm->ereq = req; - fqm->nitems++; - - /* Unlock the queue and signal that there's an item on it. */ - cond_signal(fqm->q_cond); - mutex_unlock(&fqm->q_cond->lock); - return 0; -} |