diff options
Diffstat (limited to 'server/.svn/text-base/fqm.c.svn-base')
-rw-r--r-- | server/.svn/text-base/fqm.c.svn-base | 448 |
1 files changed, 448 insertions, 0 deletions
diff --git a/server/.svn/text-base/fqm.c.svn-base b/server/.svn/text-base/fqm.c.svn-base new file mode 100644 index 0000000..73c9e54 --- /dev/null +++ b/server/.svn/text-base/fqm.c.svn-base @@ -0,0 +1,448 @@ +#include "conf.h" +#include "fqm.h" +#include "log.h" +#include "mysqldb.h" +#include "thread.h" + +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +RCSID("$Id: fqm.c,v 1.33 2001/10/29 11:17:13 shmit Exp $"); + +/* Global Variables */ +fqm_t *fqm; + +request_t * +req_new(int sock, short reqid, REQHANDLER((*req_handler)), + const char *reqstr, int reqlen) +{ + request_t *tmp; + + tmp = malloc(sizeof(request_t)); + if (tmp == NULL) { + log_err("Couldn't allocate request: %s.", strerror(errno)); + return NULL; + } + + tmp->req = malloc((reqlen + 1) * sizeof(char)); + if (tmp->req == NULL) { + log_err("Couldn't allocate request string: %s.", + strerror(errno)); + req_delete(tmp); + return NULL; + } + memcpy(tmp->req, reqstr, reqlen); + tmp->req[reqlen] = '\0'; + tmp->sock = sock; + tmp->reqid = reqid; + tmp->req_handler = req_handler; + + return tmp; +} + +void +req_delete(request_t *req) +{ + if (req) { + if (req->req) + free(req->req); + free(req); + } +} + +static request_t * +fqm_pop(reqthread_t *req) +{ + fqm_t *fqm; + request_t *p; + + if (req == NULL || req->fqm == NULL) { + mutex_unlock(&req->fqm->q_cond->lock); + return NULL; + } + fqm = req->fqm; + + /* Wait for something to arrive on the queue. */ + mutex_lock(&req->fqm->q_cond->lock); + req->status = IDLE; + + if (fqm->sreq == NULL) + cond_wait(fqm->q_cond); + + /* Something waiting on the queue. Scarf it off and unlock. */ + p = fqm->sreq; + if (p == NULL) { + mutex_unlock(&fqm->q_cond->lock); + return NULL; + } + fqm->sreq = fqm->sreq->next; + fqm->nitems--; + req->status = BUSY; + mutex_unlock(&fqm->q_cond->lock); + + p->next = NULL; + return p; +} + +static void * +fqm_popper(void *arg) +{ + reqthread_t *self; + request_t *req; + + /* Detach this thread. */ + (void)pthread_detach(pthread_self()); + + self = (reqthread_t *)arg; + for (;;) { + req = fqm_pop(self); + + /* + * Check to see if we were told to die. If we were, + * do so. + * Note: there's the possibility for the dropping of + * a request here. Que sera sera. + */ + if (self->dieflag) + break; + if (req == NULL) + continue; + if (req->req_handler(req, self) == -1) + log_info("Couldn't handle request"); + req_delete(req); + } + + log_info("Got a kill flag; dying off."); + mutex_lock(&self->fqm->t_cond->lock); + self->status = NOALLOC; + cond_signal(self->fqm->t_cond); + mutex_unlock(&self->fqm->t_cond->lock); + return NULL; +} + +static reqthread_t * +fqm_thr_new(fqm_t *fqm) +{ + reqthread_t *tmp; + + tmp = malloc(sizeof(reqthread_t)); + if (tmp == NULL) { + log_err("Couldn't allocate fallthrough thread: %s.", + strerror(errno)); + return NULL; + } + + tmp->arg = NULL; + tmp->fqm = fqm; + tmp->dieflag = 0; + tmp->status = NOALLOC; + + tmp->tid = malloc(sizeof(thread_t)); + if (tmp->tid == NULL) { + log_err("Couldn't allocate fallthrough thread: %s.", + strerror(errno)); + free(tmp); + return NULL; + } + + if (thread_new(tmp->tid, fqm_popper, tmp)) { + log_err("Couldn't start fallthrough thread: %s.", + strerror(errno)); + free(tmp->tid); + tmp->tid = NULL; + free(tmp); + return NULL; + } + tmp->status = IDLE; + + return tmp; +} + +static int +fqm_thr_wait(fqm_t *fqm) +{ + int deadthreads; + int i; + + deadthreads = 0; + + for (i = 0; i < fqm->maxitems; i++) { + if (fqm->tids[i] && fqm->tids[i]->status == NOALLOC) { + mysqldb_connect_close(fqm->tids[i]->arg); + if (fqm->tids[i]->tid != NULL) + free(fqm->tids[i]->tid); + free(fqm->tids[i]); + fqm->tids[i] = NULL; + deadthreads++; + } + } + + return deadthreads; +} + +fqm_t * +fqm_new(int maxitems) +{ + fqm_t *tmp; + int i; + + tmp = malloc(sizeof(fqm_t)); + if (tmp == NULL) { + log_err("Couldn't create fallthrough queue manager: %s.", + strerror(errno)); + return NULL; + } + + tmp->q_cond = NULL; + tmp->t_cond = NULL; + tmp->tids = NULL; + tmp->sreq = NULL; tmp->ereq = NULL; + tmp->maxitems = 0; + tmp->nitems = 0; + + tmp->q_cond = malloc(sizeof(cond_t)); + if (tmp->q_cond == NULL) { + log_err("Couldn't allocate queue condition variable: %s.", + strerror(errno)); + fqm_delete(tmp); + return NULL; + } + + if (cond_new(tmp->q_cond)) { + log_err("Couldn't initialise queue condition variable: %s.", + strerror(errno)); + fqm_delete(tmp); + return NULL; + } + + tmp->t_cond = malloc(sizeof(cond_t)); + if (tmp->t_cond == NULL) { + log_err("Couldn't allocate queue destroy condition: %s.", + strerror(errno)); + fqm_delete(tmp); + return NULL; + } + + if (cond_new(tmp->t_cond)) { + log_err("Couldn't initialise queue destroy condition: %s.", + strerror(errno)); + fqm_delete(tmp); + return NULL; + } + + tmp->tids = malloc(maxitems * sizeof(reqthread_t *)); + if (tmp->tids == NULL) { + log_err("Couldn't allocate fallthrough queue threads: %s.", + strerror(errno)); + fqm_delete(tmp); + return NULL; + } + + /* Init these so fqm_delete won't break. */ + for (i = 0; i < maxitems; i++) + tmp->tids[i] = NULL; + + /* We cannot adjust max items while threads are busy, so lock. */ + mutex_lock(&tmp->q_cond->lock); + for (i = 0; i < maxitems; i++) { + tmp->tids[i] = fqm_thr_new(tmp); + if (tmp->tids[i] == NULL) { + mutex_unlock(&tmp->q_cond->lock); + fqm_delete(tmp); + return NULL; + } + tmp->maxitems++; + } + mutex_unlock(&tmp->q_cond->lock); + + /* XXX: this may need a lock. */ + fqm = tmp; + return tmp; +} + +/* + * Make sure all threads in this fqm are idle. This is an expensive + * operation, but that's okay, since this code should only be run when + * a client has closed the connection, so we can take as long as we want. + * + * The queue is locked when coming in to this routine and leaving it. + */ +void +verify_idle_threads(fqm_t *fqm) +{ + int i, busy_thread; + + do { + busy_thread = 0; + for (i = 0; i < fqm->maxitems; i++) { + if (fqm->tids[i]->status == BUSY) + busy_thread = 1; + } + + /* + * This is the shitty part. + * We unlock the queue and sleep for a bit to give other + * threads a chance to finish what they're doing. + */ + if (busy_thread) { + mutex_unlock(&fqm->q_cond->lock); + sleep(1); + mutex_lock(&fqm->q_cond->lock); + } + } while (busy_thread); +} + +void +fqm_delete(fqm_t *fqm) +{ + int i; + + if (fqm->tids) { + /* Tell all threads to quit. */ + for (i = 0; i < fqm->maxitems; i++) { + if (fqm->tids[i] != NULL) + fqm->tids[i]->dieflag = 1; + } + + /* + * Lock queue waiting condition to ensure that no + * threads miss this signal. The threads MUST be idle + * in order to guarantee this signal is recieved. + */ + mutex_lock(&fqm->q_cond->lock); + verify_idle_threads(fqm); + pthread_cond_broadcast(&fqm->q_cond->id); + mutex_unlock(&fqm->q_cond->lock); + + /* Now wait for them to die off. */ + i = 0; + while (i < fqm->maxitems) { + mutex_lock(&fqm->t_cond->lock); + i += fqm_thr_wait(fqm); + if (i < fqm->maxitems) + cond_wait(fqm->t_cond); + mutex_unlock(&fqm->t_cond->lock); + } + free(fqm->tids); + fqm->tids = NULL; + } + + /* + * At this point all fqm threads should be dead. + */ + if (fqm->t_cond) { + (void)cond_destroy(fqm->t_cond); + free(fqm->t_cond); + fqm->t_cond = NULL; + } + + if (fqm->q_cond) { + (void)cond_destroy(fqm->q_cond); + free(fqm->q_cond); + fqm->q_cond = NULL; + } + + free(fqm); +} + +int +fqm_changemaxitems(fqm_t *fqm, int maxitems) +{ + reqthread_t **new_tidpool; + int i, j; + + /* + * This code broken. Silently fail here. + */ + return 0; + + if (fqm->maxitems == maxitems) + return 0; + + mutex_lock(&fqm->q_cond->lock); + /* + * Make sure all threads are busy. We can't go changing + * data under them. + */ + verify_idle_threads(fqm); + new_tidpool = malloc(maxitems * sizeof(reqthread_t *)); + if (new_tidpool == NULL) { + mutex_unlock(&fqm->q_cond->lock); + log_err("Couldn't allocate new FQM thread pool: %s.", + strerror(errno)); + return -1; + } + + /* Compress old TID pool. */ + for (j = 0, i = 0; i < fqm->maxitems && j < maxitems; i++) { + if (fqm->tids[i]->status != NOALLOC) { + new_tidpool[j] = fqm->tids[i]; + j++; + } + } + + if (fqm->maxitems < maxitems) { + /* Add more threads. */ + mutex_lock(&fqm->q_cond->lock); + for (i = fqm->maxitems; i < maxitems; i++) { + new_tidpool[i] = fqm_thr_new(fqm); + if (new_tidpool[i] == NULL) { + free(new_tidpool); + mutex_unlock(&fqm->q_cond->lock); + return -1; + } + } + mutex_unlock(&fqm->q_cond->lock); + } else if (fqm->maxitems > maxitems) { + /* Kill some threads. */ + int deadthreads; + + /* Tell them to die, then wake 'em up. */ + for (i = maxitems; i < fqm->maxitems; i++) + fqm->tids[i]->dieflag = 1; + pthread_cond_broadcast(&fqm->q_cond->id); + + deadthreads = 0; + while (deadthreads < fqm->maxitems - maxitems) { + mutex_lock(&fqm->t_cond->lock); + deadthreads += fqm_thr_wait(fqm); + if (deadthreads < fqm->maxitems - maxitems) + cond_wait(fqm->t_cond); + mutex_unlock(&fqm->t_cond->lock); + } + } + + free(fqm->tids); + fqm->tids = new_tidpool; + fqm->maxitems = maxitems; + mutex_unlock(&fqm->q_cond->lock); + return 0; +} + +int +fqm_push(fqm_t *fqm, request_t *req) +{ + if (fqm->nitems == fqm->maxitems) { + log_err("Too many items on the request queue."); + return -1; + } + + /* Lock the queue and add the item. */ + mutex_lock(&fqm->q_cond->lock); + + req->next = NULL; + if (fqm->sreq == NULL) + fqm->sreq = req; + else + fqm->ereq->next = req; + fqm->ereq = req; + fqm->nitems++; + + /* Unlock the queue and signal that there's an item on it. */ + cond_signal(fqm->q_cond); + mutex_unlock(&fqm->q_cond->lock); + return 0; +} |