From 50f3c688a7cd4819680dbd19b9933c6cb5e62ca7 Mon Sep 17 00:00:00 2001 From: Brian Cully Date: Mon, 6 Feb 2012 19:44:47 -0500 Subject: Move most files into lib, add top-level makefile --- .gitignore | 9 +- Makefile | 10 + lib/src/Makefile | 44 +++ lib/src/TODO.org | 6 + lib/src/io.c | 59 +++ lib/src/io.h | 13 + lib/src/log.c | 60 +++ lib/src/log.h | 12 + lib/src/msg.c | 74 ++++ lib/src/msg.h | 17 + lib/src/mysqlerl.app | 12 + lib/src/mysqlerl.c | 704 ++++++++++++++++++++++++++++++++++++ lib/src/mysqlerl.erl | 251 +++++++++++++ lib/src/mysqlerl.hrl | 9 + lib/src/mysqlerl_app.erl | 14 + lib/src/mysqlerl_connection.erl | 64 ++++ lib/src/mysqlerl_connection_sup.erl | 31 ++ lib/src/mysqlerl_port.erl | 83 +++++ lib/src/mysqlerl_port.hrl | 1 + lib/src/mysqlerl_port_sup.erl | 18 + lib/src/mysqlerl_sup.erl | 16 + src/Makefile | 44 --- src/TODO.org | 6 - src/io.c | 59 --- src/io.h | 13 - src/log.c | 60 --- src/log.h | 12 - src/msg.c | 74 ---- src/msg.h | 17 - src/mysqlerl.app | 12 - src/mysqlerl.c | 704 ------------------------------------ src/mysqlerl.erl | 251 ------------- src/mysqlerl.hrl | 9 - src/mysqlerl_app.erl | 14 - src/mysqlerl_connection.erl | 64 ---- src/mysqlerl_connection_sup.erl | 31 -- src/mysqlerl_port.erl | 83 ----- src/mysqlerl_port.hrl | 1 - src/mysqlerl_port_sup.erl | 18 - src/mysqlerl_sup.erl | 16 - 40 files changed, 1504 insertions(+), 1491 deletions(-) create mode 100644 Makefile create mode 100644 lib/src/Makefile create mode 100644 lib/src/TODO.org create mode 100644 lib/src/io.c create mode 100644 lib/src/io.h create mode 100644 lib/src/log.c create mode 100644 lib/src/log.h create mode 100644 lib/src/msg.c create mode 100644 lib/src/msg.h create mode 100644 lib/src/mysqlerl.app create mode 100644 lib/src/mysqlerl.c create mode 100644 lib/src/mysqlerl.erl create mode 100644 lib/src/mysqlerl.hrl create mode 100644 lib/src/mysqlerl_app.erl create mode 100644 lib/src/mysqlerl_connection.erl create mode 100644 lib/src/mysqlerl_connection_sup.erl create mode 100644 lib/src/mysqlerl_port.erl create mode 100644 lib/src/mysqlerl_port.hrl create mode 100644 lib/src/mysqlerl_port_sup.erl create mode 100644 lib/src/mysqlerl_sup.erl delete mode 100644 src/Makefile delete mode 100644 src/TODO.org delete mode 100644 src/io.c delete mode 100644 src/io.h delete mode 100644 src/log.c delete mode 100644 src/log.h delete mode 100644 src/msg.c delete mode 100644 src/msg.h delete mode 100644 src/mysqlerl.app delete mode 100644 src/mysqlerl.c delete mode 100644 src/mysqlerl.erl delete mode 100644 src/mysqlerl.hrl delete mode 100644 src/mysqlerl_app.erl delete mode 100644 src/mysqlerl_connection.erl delete mode 100644 src/mysqlerl_connection_sup.erl delete mode 100644 src/mysqlerl_port.erl delete mode 100644 src/mysqlerl_port.hrl delete mode 100644 src/mysqlerl_port_sup.erl delete mode 100644 src/mysqlerl_sup.erl diff --git a/.gitignore b/.gitignore index c5a0c1c..7cd6bb1 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,9 @@ *~ *.o *.beam -mysqlerl -ebin/* -priv/* \ No newline at end of file +lib/ebin/* +lib/priv/* +lib/test/*.html +lib/test/*.css +lib/test/ct_run.* +lib/test/variables-ct* diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f431ef2 --- /dev/null +++ b/Makefile @@ -0,0 +1,10 @@ +.PHONY: all install test + +all: + @@cd lib/src && make all + +install: + @@cd lib/src && make install + +test: + @@cd lib/test && make test diff --git a/lib/src/Makefile b/lib/src/Makefile new file mode 100644 index 0000000..3a1510b --- /dev/null +++ b/lib/src/Makefile @@ -0,0 +1,44 @@ +ERLPREFIX = /usr/local +ERLINTERFACE = erl_interface-3.7.6 +ERLINCS = -I$(ERLPREFIX)/lib/erlang/lib/$(ERLINTERFACE)/include +ERLLIBS = -L$(ERLPREFIX)/lib/erlang/lib/$(ERLINTERFACE)/lib \ + -lerl_interface -lei + +MYSQLPREFIX = /usr/local/mysql +MYSQLINCS = -I$(MYSQLPREFIX)/include +MYSQLLIBS = -L$(MYSQLPREFIX)/lib -lmysqlclient + +CFLAGS = -g -Wall -Werror $(MYSQLINCS) $(ERLINCS) +LDFLAGS = $(MYSQLLIBS) $(ERLLIBS) +EFLAGS = -W +debug_info + +PRIVDIR = ../priv +BEAMDIR = ../ebin + +BINS = $(PRIVDIR)/mysqlerl $(BEAMDIR)/mysqlerl.app +MYSQLERLOBJS = io.o log.o msg.o mysqlerl.o +BEAMS = mysqlerl.beam mysqlerl_app.beam mysqlerl_connection.beam \ + mysqlerl_connection_sup.beam mysqlerl_port.beam \ + mysqlerl_port_sup.beam mysqlerl_sup.beam + +all: $(PRIVDIR) $(BEAMDIR) $(BINS) + +clean: + rm -rf *.o *.beam + rm -rf $(BINS) $(MYSQLERLOBJS) $(BEAMS) $(BEAMDIR)/mysqlerl.app + +%.beam: %.erl + erlc $(EFLAGS) $< + +$(PRIVDIR)/mysqlerl: $(PRIVDIR) $(MYSQLERLOBJS) + $(CC) -o $@ $(MYSQLERLOBJS) $(LDFLAGS) + +$(BEAMDIR)/mysqlerl.app: $(BEAMDIR) $(BEAMS) + cp $(BEAMS) $(BEAMDIR) + cp mysqlerl.app $(BEAMDIR) + +$(PRIVDIR): + mkdir -p $(PRIVDIR) + +$(BEAMDIR): + mkdir -p $(BEAMDIR) diff --git a/lib/src/TODO.org b/lib/src/TODO.org new file mode 100644 index 0000000..636f47d --- /dev/null +++ b/lib/src/TODO.org @@ -0,0 +1,6 @@ +* Support ODBC options where applicable. +* Test functions die (or don't) correctly when not connected to DB. +* Connection pooling with multiple databases. +I'm guessing that it doesn't work right now. Looks like +mysqlerl_connection_sup:connect starts a simple-one-for-one port, and +commands use :random_child to do dispatch. That's bad. diff --git a/lib/src/io.c b/lib/src/io.c new file mode 100644 index 0000000..dcc055f --- /dev/null +++ b/lib/src/io.c @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2008, Brian Cully + */ + +#include "io.h" +#include "log.h" + +#include +#include +#include +#include + +int +restartable_read(unsigned char *buf, size_t buflen) +{ + ssize_t rc, readb; + + rc = 0; + READLOOP: + while (rc < buflen) { + readb = read(STDIN_FILENO, buf + rc, buflen - rc); + if (readb == -1) { + if (errno == EAGAIN || errno == EINTR) + goto READLOOP; + + return -1; + } else if (readb == 0) { + logmsg("ERROR: EOF trying to read additional %d bytes from " + "standard input", buflen - rc); + return -1; + } + + rc += readb; + } + + return rc; +} + +int +restartable_write(const unsigned char *buf, size_t buflen) +{ + ssize_t rc, wroteb; + + rc = 0; + WRITELOOP: + while (rc < buflen) { + wroteb = write(STDOUT_FILENO, buf + rc, buflen - rc); + if (wroteb == -1) { + if (errno == EAGAIN || errno == EINTR) + goto WRITELOOP; + + return -1; + } + + rc += wroteb; + } + + return rc; +} diff --git a/lib/src/io.h b/lib/src/io.h new file mode 100644 index 0000000..bf4e086 --- /dev/null +++ b/lib/src/io.h @@ -0,0 +1,13 @@ +/* + * Copyright (C) 2008, Brian Cully + */ + +#ifndef _IO_H +#define _IO_H + +#include + +int restartable_read(unsigned char *buf, size_t buflen); +int restartable_write(const unsigned char *buf, size_t buflen); + +#endif diff --git a/lib/src/log.c b/lib/src/log.c new file mode 100644 index 0000000..80aa755 --- /dev/null +++ b/lib/src/log.c @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2008, Brian Cully + */ + +#include "log.h" + +#include +#include +#include +#include + +const char *LOGPATH = "/tmp/mysqlerl.log"; +static FILE *logfile = NULL; + +void +openlog() +{ + logfile = fopen(LOGPATH, "a"); +} + +void +closelog() +{ + fclose(logfile); +} + +void +logmsg(const char *format, ...) +{ + FILE *out = logfile; + char timebuf[32] = "\0"; + struct tm now_tm; + time_t now_time; + va_list args; + + va_start(args, format); + + if (logfile == NULL) + out = stderr; + + if (time(&now_time) == (time_t)-1) { + (void)fprintf(out, "LOGERROR - Failed to fetch time: "); + } else { + (void)localtime_r(&now_time, &now_tm); + if (strftime(timebuf, sizeof(timebuf), "%Y%m%d %H:%M:%S ", &now_tm) == 0) { + (void)fprintf(out, "LOGERROR - Failed to parse time (now: %d): ", + (int)now_time); + } else { + (void)fprintf(out, "%s", timebuf); + } + } + (void)fprintf(out, "[%d]: ", getpid()); + (void)vfprintf(out, format, args); + (void)fprintf(out, "\n"); + + fflush(out); + + va_end(args); +} + diff --git a/lib/src/log.h b/lib/src/log.h new file mode 100644 index 0000000..ae9a9bd --- /dev/null +++ b/lib/src/log.h @@ -0,0 +1,12 @@ +/* + * Copyright (C) 2008, Brian Cully + */ + +#ifndef _LOG_H +#define _LOG_H + +void openlog(); +void closelog(); +void logmsg(const char *format, ...); + +#endif diff --git a/lib/src/msg.c b/lib/src/msg.c new file mode 100644 index 0000000..00a5a6a --- /dev/null +++ b/lib/src/msg.c @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2008, Brian Cully + */ + +#include "msg.h" + +#include "io.h" +#include "log.h" + +#include + +ETERM * +read_msg() +{ + ETERM *msg; + unsigned char *buf; + msglen_t len; + + logmsg("DEBUG: reading message length."); + if (restartable_read((unsigned char *)&len, sizeof(len)) == -1) { + logmsg("ERROR: couldn't read %d byte message prefix: %s.", + sizeof(len), strerror(errno)); + + exit(2); + } + + len = ntohl(len); + buf = (unsigned char *)malloc(len); + if (buf == NULL) { + logmsg("ERROR: Couldn't malloc %d bytes: %s.", len, + strerror(errno)); + + exit(2); + } + + logmsg("DEBUG: reading message body (len: %d).", len); + if (restartable_read(buf, len) == -1) { + logmsg("ERROR: couldn't read %d byte message: %s.", + len, strerror(errno)); + + free(buf); + exit(2); + } + + msg = erl_decode(buf); + free(buf); + + return msg; +} + +int +write_msg(ETERM *msg) +{ + unsigned char *buf; + msglen_t nlen, buflen; + + buflen = erl_term_len(msg); + buf = (unsigned char *)malloc(buflen); + erl_encode(msg, buf); + erl_free_term(msg); + + nlen = htonl(buflen); + if (restartable_write((unsigned char *)&nlen, sizeof(nlen)) == -1) { + free(buf); + return -1; + } + if (restartable_write(buf, buflen) == -1) { + free(buf); + return -1; + } + free(buf); + + return 0; +} diff --git a/lib/src/msg.h b/lib/src/msg.h new file mode 100644 index 0000000..6db2aa8 --- /dev/null +++ b/lib/src/msg.h @@ -0,0 +1,17 @@ +/* + * Copyright (C) 2008, Brian Cully + */ + +#ifndef _MSG_H +#define _MSG_H + +#include +#include +#include + +typedef u_int32_t msglen_t; + +ETERM *read_msg(); +int write_msg(ETERM *msg); + +#endif diff --git a/lib/src/mysqlerl.app b/lib/src/mysqlerl.app new file mode 100644 index 0000000..5687463 --- /dev/null +++ b/lib/src/mysqlerl.app @@ -0,0 +1,12 @@ +%% -*- Erlang -*- +%% Copyright (C) 2008, Brian Cully + +{application, mysqlerl, + [{description, "mysqlerl"}, + {vsn, "0"}, + {modules, [mysqlerl, mysqlerl_app, mysqlerl_connection_sup, + mysqlerl_connection, mysql_port_sup, mysql_port]}, + {registered, [mysqlerl, mysqlerl_app, mysqlerl_connection_sup]}, + {applications, [kernel, stdlib]}, + {env, []}, + {mod, {mysqlerl_app, []}}]}. diff --git a/lib/src/mysqlerl.c b/lib/src/mysqlerl.c new file mode 100644 index 0000000..020d9eb --- /dev/null +++ b/lib/src/mysqlerl.c @@ -0,0 +1,704 @@ +/* + * MySQL port driver. + * + * Copyright (C) 2008, Brian Cully + */ + +#include "io.h" +#include "log.h" +#include "msg.h" + +#include +#include +#include + +const int TRUTHY = 1; +const int FALSY = 0; + +const char *CONNECT_MSG = "sql_connect"; +const char *QUERY_MSG = "sql_query"; +const char *PARAM_QUERY_MSG = "sql_param_query"; +const char *SELECT_COUNT_MSG = "sql_select_count"; +const char *SELECT_MSG = "sql_select"; +const char *FIRST_MSG = "sql_first"; +const char *LAST_MSG = "sql_last"; +const char *NEXT_MSG = "sql_next"; +const char *PREV_MSG = "sql_prev"; + +const char *NULL_SQL = "null"; +const char *NUMERIC_SQL = "sql_numeric"; +const char *DECIMAL_SQL = "sql_decimal"; +const char *FLOAT_SQL = "sql_float"; +const char *CHAR_SQL = "sql_char"; +const char *VARCHAR_SQL = "sql_varchar"; +const char *TIMESTAMP_SQL = "sql_timestamp"; +const char *INTEGER_SQL = "sql_integer"; + +MYSQL dbh; +MYSQL_RES *results = NULL; +my_ulonglong resultoffset = 0, numrows = 0; + +void +set_mysql_results() +{ + if (results) + mysql_free_result(results); + results = mysql_store_result(&dbh); + + resultoffset = 0; + numrows = results ? mysql_num_rows(results) : 0; +} + +ETERM * +make_cols(MYSQL_FIELD *fields, unsigned int num_fields) +{ + ETERM **cols, *rc; + unsigned int i; + + cols = (ETERM **)malloc(num_fields * sizeof(ETERM *)); + if (cols == NULL) { + logmsg("ERROR: Couldn't allocate %d bytes for columns: %s", + strerror(errno)); + exit(3); + } + + for (i = 0; i < num_fields; i++) + cols[i] = erl_mk_string(fields[i].name); + + rc = erl_mk_list(cols, num_fields); + + for (i = 0; i < num_fields; i++) + erl_free_term(cols[i]); + free(cols); + + return rc; +} + +ETERM * +make_row(MYSQL_ROW row, unsigned long *lengths, unsigned int num_fields) +{ + ETERM **rowtup, *rc; + unsigned int i; + + rowtup = (ETERM **)malloc(num_fields * sizeof(ETERM *)); + if (rowtup == NULL) { + logmsg("ERROR: Couldn't allocate %d bytes for row: %s", + strerror(errno)); + exit(3); + } + + for (i = 0; i < num_fields; i++) { + if (row[i]) + rowtup[i] = erl_mk_estring(row[i], lengths[i]); + else + rowtup[i] = erl_mk_atom("null"); + } + + rc = erl_mk_tuple(rowtup, num_fields); + if (rc == NULL) { + logmsg("ERROR: couldn't allocate %d-tuple", num_fields); + exit(3); + } + + for (i = 0; i < num_fields; i++) + erl_free_term(rowtup[i]); + free(rowtup); + + return rc; +} + +ETERM * +make_rows(unsigned int num_rows, unsigned int num_fields) +{ + ETERM **rows, *rc; + unsigned int i; + + rows = (ETERM **)malloc(num_rows * sizeof(ETERM *)); + if (rows == NULL) { + logmsg("ERROR: Couldn't allocate %d bytes for rows: %s", + strerror(errno)); + exit(3); + } + + for (i = 0; i < num_rows; i++) { + ETERM *rt; + unsigned long *lengths; + MYSQL_ROW row; + + row = mysql_fetch_row(results); + resultoffset++; + lengths = mysql_fetch_lengths(results); + + rt = make_row(row, lengths, num_fields); + rows[i] = erl_format("~w", rt); + erl_free_term(rt); + } + + rc = erl_mk_list(rows, num_rows); + + for (i = 0; i < num_rows; i++) + erl_free_term(rows[i]); + free(rows); + + return rc; +} + +ETERM * +handle_mysql_result() +{ + MYSQL_FIELD *fields; + ETERM *ecols, *erows, *resp; + unsigned int num_fields; + + num_fields = mysql_num_fields(results); + fields = mysql_fetch_fields(results); + + ecols = make_cols(fields, num_fields); + erows = make_rows(numrows, num_fields); + + resp = erl_format("{selected, ~w, ~w}", ecols, erows); + + erl_free_term(ecols); + erl_free_term(erows); + + return resp; +} + +void +handle_connect(ETERM *msg) +{ + ETERM *resp, *tmp; + char *host, *db_name, *user, *passwd; + int port; + + tmp = erl_element(2, msg); + host = erl_iolist_to_string(tmp); + erl_free_term(tmp); + + tmp = erl_element(3, msg); + port = ERL_INT_VALUE(tmp); + erl_free_term(tmp); + + tmp = erl_element(4, msg); + db_name = erl_iolist_to_string(tmp); + erl_free_term(tmp); + + tmp = erl_element(5, msg); + user = erl_iolist_to_string(tmp); + erl_free_term(tmp); + + tmp = erl_element(6, msg); + passwd = erl_iolist_to_string(tmp); + erl_free_term(tmp); + + /* TODO: handle options, passed in next. */ + + logmsg("INFO: Connecting to %s on %s:%d as %s", db_name, host, port, user); + if (mysql_real_connect(&dbh, host, user, passwd, + db_name, port, NULL, 0) == NULL) { + logmsg("ERROR: Failed to connect to database %s as %s: %s.", + db_name, user, mysql_error(&dbh)); + exit(2); + } + + resp = erl_format("ok"); + write_msg(resp); + erl_free_term(resp); +} + +void +handle_query(ETERM *cmd) +{ + ETERM *query, *resp; + char *q; + + query = erl_element(2, cmd); + q = erl_iolist_to_string(query); + erl_free_term(query); + + logmsg("DEBUG: got query msg: %s.", q); + if (mysql_query(&dbh, q)) { + resp = erl_format("{error, {mysql_error, ~i, ~s}}", + mysql_errno(&dbh), mysql_error(&dbh)); + } else { + set_mysql_results(); + if (results) { + resp = handle_mysql_result(); + set_mysql_results(); + } else { + if (mysql_field_count(&dbh) == 0) + resp = erl_format("{updated, ~i}", mysql_affected_rows(&dbh)); + else + resp = erl_format("{error, {mysql_error, ~i, ~s}}", + mysql_errno(&dbh), mysql_error(&dbh)); + } + } + erl_free(q); + + write_msg(resp); + erl_free_term(resp); +} + +/* + * http://dev.mysql.com/doc/refman/5.1/en/mysql-stmt-execute.html + * + * 6 > odbc:param_query(Ref, + * "INSERT INTO EMPLOYEE (NR, FIRSTNAME, " + * "LASTNAME, GENDER) VALUES(?, ?, ?, ?)", + * [{sql_integer,[2,3,4,5,6,7,8]}, + * {{sql_varchar, 20}, + * ["John", "Monica", "Ross", "Rachel", + * "Piper", "Prue", "Louise"]}, + * {{sql_varchar, 20}, + * ["Doe","Geller","Geller", "Green", + * "Halliwell", "Halliwell", "Lane"]}, + * {{sql_char, 1}, ["M","F","M","F","T","F","F"]}]). + * {updated, 7} + */ +void +handle_param_query(ETERM *msg) +{ + ETERM *query, *params, *p, *tmp, *resp; + MYSQL_STMT *sth; + MYSQL_BIND *bind; + char *q; + int param_count, i; + + query = erl_element(2, msg); + q = erl_iolist_to_string(query); + erl_free_term(query); + + params = erl_element(3, msg); + erl_free_term(params); + + logmsg("DEBUG: got param query msg: %s.", q); + + sth = mysql_stmt_init(&dbh); + if (mysql_stmt_prepare(sth, q, strlen(q))) { + resp = erl_format("{error, {mysql_error, ~i, ~s}}", + mysql_errno(&dbh), mysql_error(&dbh)); + } else { + param_count = mysql_stmt_param_count(sth); + if (param_count != erl_length(params)) { + resp = erl_format("{error, {mysql_error, -1, [expected_params, %d, got_params, %d]}}", param_count, erl_length(params)); + } else { + bind = malloc(param_count * sizeof(MYSQL_BIND)); + if (bind == NULL) { + logmsg("ERROR: Couldn't allocate %d bytes for bind params.", + param_count * sizeof(MYSQL_BIND)); + exit(3); + } + memset(bind, 0, param_count * sizeof(MYSQL_BIND)); + + for (i = 0, tmp = params; + (p = erl_hd(tmp)) != NULL && i < 1000; + i++, tmp = erl_tl(tmp)) { + ETERM *type, *value; + + type = erl_element(1, p); + value = erl_element(2, p); + + if (ERL_IS_TUPLE(type)) { + ETERM *t_type, *t_size; + char *t; + + t_size = erl_element(2, type); + bind[i].buffer_length = ERL_INT_VALUE(t_size); + erl_free_term(t_size); + + t_type = erl_element(1, type); + t = (char *)ERL_ATOM_PTR(t_type); + bind[i].length = malloc(sizeof(unsigned long)); + if (strncmp(t, NUMERIC_SQL, strlen(NUMERIC_SQL)) == 0) { + int val; + + bind[i].buffer_type = MYSQL_TYPE_LONG; + *bind[i].length = bind[i].buffer_length * sizeof(int); + bind[i].buffer = malloc(*bind[i].length); + memset(bind[i].buffer, 0, *bind[i].length); + + val = ERL_INT_VALUE(value); + memcpy(bind[i].buffer, &val, *bind[i].length); + } else if (strncmp(t, DECIMAL_SQL, strlen(DECIMAL_SQL)) == 0) { + char *val; + + bind[i].buffer_type = MYSQL_TYPE_STRING; + *bind[i].length = bind[i].buffer_length * sizeof(char *); + bind[i].buffer = malloc(*bind[i].length); + memset(bind[i].buffer, 0, *bind[i].length); + + val = erl_iolist_to_string(value); + if (val) { + memcpy(bind[i].buffer, val, *bind[i].length); + free(val); + } + } else if (strncmp(t, FLOAT_SQL, strlen(FLOAT_SQL)) == 0) { + float val; + + bind[i].buffer_type = MYSQL_TYPE_FLOAT; + *bind[i].length = bind[i].buffer_length * sizeof(float); + bind[i].buffer = malloc(*bind[i].length); + memset(bind[i].buffer, 0, *bind[i].length); + + val = ERL_FLOAT_VALUE(value); + memcpy(bind[i].buffer, &val, *bind[i].length); + } else if (strncmp(t, CHAR_SQL, strlen(CHAR_SQL)) == 0) { + char *val; + + bind[i].buffer_type = MYSQL_TYPE_STRING; + *bind[i].length = bind[i].buffer_length * sizeof(char *); + bind[i].buffer = malloc(*bind[i].length); + memset(bind[i].buffer, 0, *bind[i].length); + + val = erl_iolist_to_string(value); + if (val) { + memcpy(bind[i].buffer, val, *bind[i].length); + free(val); + } + } else if (strncmp(t, VARCHAR_SQL, strlen(VARCHAR_SQL)) == 0) { + char *val; + + bind[i].buffer_type = MYSQL_TYPE_BLOB; + *bind[i].length = bind[i].buffer_length * sizeof(char *); + bind[i].buffer = malloc(*bind[i].length); + memset(bind[i].buffer, 0, *bind[i].length); + + val = erl_iolist_to_string(value); + if (val) { + memcpy(bind[i].buffer, val, *bind[i].length); + free(val); + } + } else { + logmsg("ERROR: Unknown sized type: {%s, %d}", t, + bind[i].buffer_length); + exit(3); + } + erl_free_term(t_type); + } else { + char *t; + + t = (char *)ERL_ATOM_PTR(type); + if (strncmp(t, TIMESTAMP_SQL, strlen(TIMESTAMP_SQL)) == 0) { + bind[i].buffer_type = MYSQL_TYPE_TIMESTAMP; + *bind[i].length = sizeof(MYSQL_TIME); + bind[i].buffer = malloc(*bind[i].length); + memset(bind[i].buffer, 0, *bind[i].length); + + memcpy(bind[i].buffer, value, *bind[i].length); + } else if (strncmp(t, INTEGER_SQL, strlen(INTEGER_SQL)) == 0) { + int val; + + bind[i].buffer_type = MYSQL_TYPE_LONG; + *bind[i].length = sizeof(int); + bind[i].buffer = malloc(*bind[i].length); + memset(bind[i].buffer, 0, *bind[i].length); + + val = ERL_INT_VALUE(value); + memcpy(bind[i].buffer, &val, *bind[i].length); + } else { + logmsg("ERROR: Unknown type: %s", t); + exit(3); + } + } + + bind[i].is_null = malloc(sizeof(int)); + if (ERL_IS_ATOM(value) + && strncmp((char *)ERL_ATOM_PTR(value), + NULL_SQL, strlen(NULL_SQL)) == 0) + memcpy(bind[i].is_null, &TRUTHY, sizeof(int)); + else + memcpy(bind[i].is_null, &FALSY, sizeof(int)); + + erl_free_term(value); + erl_free_term(type); + } + erl_free_term(params); + + if (mysql_stmt_bind_param(sth, bind)) { + resp = erl_format("{error, {mysql_error, ~i, ~s}}", + mysql_errno(&dbh), mysql_error(&dbh)); + } else { + if (mysql_stmt_execute(sth)) { + resp = erl_format("{error, {mysql_error, ~i, ~s}}", + mysql_errno(&dbh), mysql_error(&dbh)); + } else { + set_mysql_results(); + if (results) { + resp = handle_mysql_result(); + set_mysql_results(); + } else { + if (mysql_field_count(&dbh) == 0) + resp = erl_format("{updated, ~i}", mysql_affected_rows(&dbh)); + else + resp = erl_format("{error, {mysql_error, ~i, ~s}}", + mysql_errno(&dbh), mysql_error(&dbh)); + } + } + } + + for (i = 0; i < param_count; i++) { + free(bind[i].buffer); + free(bind[i].is_null); + } + free(bind); + } + } + erl_free(q); + + mysql_stmt_close(sth); + + write_msg(resp); + erl_free_term(resp); +} + +void +handle_select_count(ETERM *msg) +{ + ETERM *query, *resp; + char *q; + + query = erl_element(2, msg); + q = erl_iolist_to_string(query); + erl_free_term(query); + + logmsg("DEBUG: got select count msg: %s.", q); + if (mysql_query(&dbh, q)) { + resp = erl_format("{error, {mysql_error, ~i, ~s}}", + mysql_errno(&dbh), mysql_error(&dbh)); + } else { + set_mysql_results(); + if (results) { + resp = erl_format("{ok, ~i}", numrows); + } else { + if (mysql_field_count(&dbh) == 0) + resp = erl_format("{ok, ~i}", mysql_affected_rows(&dbh)); + else + resp = erl_format("{error, {mysql_error, ~i, ~s}}", + mysql_errno(&dbh), mysql_error(&dbh)); + } + } + erl_free(q); + + write_msg(resp); + erl_free_term(resp); +} + +void +handle_select(ETERM *msg) +{ + MYSQL_FIELD *fields; + ETERM *epos, *enum_items, *ecols, *erows, *resp; + my_ulonglong pos, num_items; + unsigned int num_fields; + + epos = erl_element(2, msg); + enum_items = erl_element(3, msg); + pos = ERL_INT_UVALUE(epos); + num_items = ERL_INT_UVALUE(enum_items); + erl_free_term(enum_items); + erl_free_term(epos); + + logmsg("DEBUG: got select pos: %d, n: %d.", erl_size(msg), pos, num_items); + if (results == NULL) { + logmsg("ERROR: select message w/o cursor."); + exit(2); + } + + num_fields = mysql_num_fields(results); + fields = mysql_fetch_fields(results); + if (resultoffset > 0) + resultoffset = pos - 1; + if (num_items > numrows - resultoffset) + num_items = numrows - resultoffset; + mysql_data_seek(results, resultoffset); + + ecols = make_cols(fields, num_fields); + erows = make_rows(num_items, num_fields); + resp = erl_format("{selected, ~w, ~w}", ecols, erows); + erl_free_term(erows); + + erl_free_term(ecols); + write_msg(resp); + erl_free_term(resp); +} + +void +handle_first(ETERM *msg) +{ + MYSQL_FIELD *fields; + ETERM *ecols, *erows, *resp; + unsigned int num_fields; + + logmsg("DEBUG: got first msg."); + if (results == NULL) { + logmsg("ERROR: got first message w/o cursor."); + exit(2); + } + + num_fields = mysql_num_fields(results); + fields = mysql_fetch_fields(results); + resultoffset = 0; + mysql_data_seek(results, resultoffset); + + ecols = make_cols(fields, num_fields); + erows = make_rows(1, num_fields); + resp = erl_format("{selected, ~w, ~w}", ecols, erows); + erl_free_term(erows); + + erl_free_term(ecols); + write_msg(resp); + erl_free_term(resp); +} + +void +handle_last(ETERM *msg) +{ + MYSQL_FIELD *fields; + ETERM *ecols, *erows, *resp; + unsigned int num_fields; + + logmsg("DEBUG: got last msg."); + if (results == NULL) { + logmsg("ERROR: got last message w/o cursor."); + exit(2); + } + + num_fields = mysql_num_fields(results); + fields = mysql_fetch_fields(results); + resultoffset = numrows - 1; + mysql_data_seek(results, resultoffset); + + ecols = make_cols(fields, num_fields); + erows = make_rows(1, num_fields); + resp = erl_format("{selected, ~w, ~w}", ecols, erows); + erl_free_term(erows); + + erl_free_term(ecols); + write_msg(resp); + erl_free_term(resp); +} + +void +handle_next(ETERM *msg) +{ + MYSQL_FIELD *fields; + ETERM *ecols, *erows, *resp; + unsigned int num_fields; + + logmsg("DEBUG: got next msg."); + if (results == NULL) { + logmsg("ERROR: got next message w/o cursor."); + exit(2); + } + + num_fields = mysql_num_fields(results); + fields = mysql_fetch_fields(results); + + ecols = make_cols(fields, num_fields); + logmsg("resultoffset: %d, num_rows: %d", resultoffset, numrows); + if (resultoffset == numrows) { + resp = erl_format("{selected, ~w, []}", ecols); + } else { + erows = make_rows(1, num_fields); + resp = erl_format("{selected, ~w, ~w}", ecols, erows); + erl_free_term(erows); + } + + erl_free_term(ecols); + write_msg(resp); + erl_free_term(resp); +} + +void +handle_prev(ETERM *msg) +{ + MYSQL_FIELD *fields; + ETERM *ecols, *erows, *resp; + unsigned int num_fields; + + logmsg("DEBUG: got prev msg."); + if (results == NULL) { + logmsg("ERROR: got prev message w/o cursor."); + exit(2); + } + + num_fields = mysql_num_fields(results); + fields = mysql_fetch_fields(results); + + ecols = make_cols(fields, num_fields); + logmsg("resultoffset: %d, num_rows: %d", resultoffset, numrows); + if (resultoffset == 0) { + resp = erl_format("{selected, ~w, []}", ecols); + } else { + resultoffset = resultoffset - 1; + mysql_data_seek(results, resultoffset); + erows = make_rows(1, num_fields); + + /* Rewind to position at the point we returned. */ + resultoffset = resultoffset - 1; + mysql_data_seek(results, resultoffset); + resp = erl_format("{selected, ~w, ~w}", ecols, erows); + erl_free_term(erows); + } + + erl_free_term(ecols); + write_msg(resp); + erl_free_term(resp); +} + +void +dispatch_db_cmd(ETERM *msg) +{ + ETERM *tag; + char *tag_name; + + tag = erl_element(1, msg); + tag_name = (char *)ERL_ATOM_PTR(tag); + if (strncmp(tag_name, CONNECT_MSG, strlen(CONNECT_MSG)) == 0) + handle_connect(msg); + else if (strncmp(tag_name, QUERY_MSG, strlen(QUERY_MSG)) == 0) + handle_query(msg); + else if (strncmp(tag_name, PARAM_QUERY_MSG, strlen(PARAM_QUERY_MSG)) == 0) + handle_param_query(msg); + else if (strncmp(tag_name, SELECT_COUNT_MSG, strlen(SELECT_COUNT_MSG)) == 0) + handle_select_count(msg); + else if (strncmp(tag_name, SELECT_MSG, strlen(SELECT_MSG)) == 0) + handle_select(msg); + else if (strncmp(tag_name, FIRST_MSG, strlen(FIRST_MSG)) == 0) + handle_first(msg); + else if (strncmp(tag_name, LAST_MSG, strlen(LAST_MSG)) == 0) + handle_last(msg); + else if (strncmp(tag_name, NEXT_MSG, strlen(NEXT_MSG)) == 0) + handle_next(msg); + else if (strncmp(tag_name, PREV_MSG, strlen(PREV_MSG)) == 0) + handle_prev(msg); + else { + logmsg("WARNING: message type %s unknown.", (char *)ERL_ATOM_PTR(tag)); + erl_free_term(tag); + exit(3); + } + + erl_free_term(tag); +} + +int +main(int argc, char *argv[]) +{ + ETERM *msg; + + openlog(); + logmsg("INFO: starting up."); + erl_init(NULL, 0); + + mysql_init(&dbh); + while ((msg = read_msg()) != NULL) { + dispatch_db_cmd(msg); + erl_free_term(msg); + } + mysql_close(&dbh); + + logmsg("INFO: shutting down."); + closelog(); + + return 0; +} diff --git a/lib/src/mysqlerl.erl b/lib/src/mysqlerl.erl new file mode 100644 index 0000000..946e9c5 --- /dev/null +++ b/lib/src/mysqlerl.erl @@ -0,0 +1,251 @@ +%% Modeled from ODBC +%% http://www.erlang.org/doc/apps/odbc/ + +-module(mysqlerl). +-author('bjc@kublai.com'). + +-include("mysqlerl.hrl"). + +-export([convert_type/1]). + +-export([test_start/0, test_msg/0, test_query/0, test_param_query/0]). + +-export([start/0, start/1, stop/0, commit/2, commit/3, + connect/6, disconnect/1, describe_table/2, + describe_table/3, first/1, first/2, + last/1, last/2, next/1, next/2, prev/1, + prev/2, select_count/2, select_count/3, + select/3, select/4, param_query/3, param_query/4, + sql_query/2, sql_query/3]). + +-define(CONFIG, "/Users/bjc/tmp/test-server.cfg"). + +test_start() -> + {ok, [{Host, Port, DB, User, Pass, Options}]} = file:consult(?CONFIG), + mysqlerl:connect(Host, Port, DB, User, Pass, Options). + +test_msg() -> + commit(mysqlerl_connection_sup:random_child(), + rollback, 2000). + +test_query() -> + sql_query(mysqlerl_connection_sup:random_child(), + "SELECT COUNT(*) FROM user", 2000). + +test_param_query() -> + %% This should really be an update or something, since that's how + %% it'll be used. + param_query(mysqlerl_connection_sup:random_child(), + "SELECT * FROM user WHERE username=?", + [{{sql_varchar, 20}, "bjc"}]). + +start() -> + start(temporary). + +%% Arguments: +%% Type = permanent | transient | temporary +%% +%% Returns: +%% ok | {error, Reason} +start(Type) -> + application:start(sasl), + application:start(mysqlerl, Type). + +stop() -> + application:stop(mysqlerl). + +commit(Ref, CommitMode) -> + commit(Ref, CommitMode, infinity). + +%% Arguments: +%% Ref = connection_reference() +%% Timeout = time_out() +%% CommitMode = commit | rollback +%% Reason = not_an_explicit_commit_connection | +%% process_not_owner_of_odbc_connection | +%% common_reason() +%% ok | {error, Reason} +commit(Ref, commit, Timeout) -> + case sql_query(Ref, "COMMIT", Timeout) of + {num_rows, _} -> ok; + Other -> Other + end; +commit(Ref, rollback, Timeout) -> + case sql_query(Ref, "ROLLBACK", Timeout) of + {num_rows, _} -> ok; + Other -> Other + end. + +%% Arguments: +%% Host = string() +%% Port = integer() +%% Database = string() +%% User = string() +%% Password = string() +%% Options = list() +%% +%% Returns: +%% {ok, Ref} | {error, Reason} +%% Ref = connection_reference() +connect(Host, Port, Database, User, Password, Options) -> + mysqlerl_connection_sup:connect(Host, Port, Database, + User, Password, Options). + +%% Arguments: +%% Ref = connection_reference() +%% +%% Returns: +%% ok | {error, Reason} +disconnect(Ref) -> + mysqlerl_connection:stop(Ref). + +describe_table(Ref, Table) -> + describe_table(Ref, Table, infinity). + +%% Arguments: +%% Ref = connection_reference() +%% Table = string() +%% Timeout = time_out() +%% +%% Returns: +%% {ok, Description} | {error, Reason} +%% Description = [{col_name(), odbc_data_type()}] +describe_table(Ref, Table, Timeout) -> + Q = ["DESCRIBE ", Table], + {selected, _, Rows} = sql_query(Ref, Q, Timeout), + Description = [{Name, convert_type(T)} || {Name, T, _, _, _, _} <- Rows], + {ok, Description}. + +first(Ref) -> + first(Ref, infinity). + +%% Arguments: +%% Ref = connection_reference() +%% Timeout = time_out() +%% Returns: +%% {selected, ColNames, Rows} | {error, Reason} +%% Rows = rows() +first(Ref, Timeout) -> + conn_fwd(Ref, #sql_first{}, Timeout). + +last(Ref) -> + last(Ref, infinity). + +%% Arguments: +%% Ref = connection_reference() +%% Timeout = time_out() +%% Returns: +%% {selected, ColNames, Rows} | {error, Reason} +%% Rows = rows() +last(Ref, Timeout) -> + conn_fwd(Ref, #sql_last{}, Timeout). + +next(Ref) -> + next(Ref, infinity). + +%% Arguments: +%% Ref = connection_reference() +%% Timeout = time_out() +%% Returns: +%% {selected, ColNames, Rows} | {error, Reason} +%% Rows = rows() +next(Ref, Timeout) -> + conn_fwd(Ref, #sql_next{}, Timeout). + +prev(Ref) -> + prev(Ref, infinity). + +%% Arguments: +%% Ref = connection_reference() +%% Timeout = time_out() +%% Returns: +%% {selected, ColNames, Rows} | {error, Reason} +%% Rows = rows() +prev(Ref, Timeout) -> + conn_fwd(Ref, #sql_prev{}, Timeout). + +select_count(Ref, SQLQuery) -> + select_count(Ref, SQLQuery, infinity). + +%% Arguments: +%% Ref = connection_reference() +%% SQLQuery = string() +%% Timeout = time_out() +%% Returns: +%% {ok, NrRows} | {error, Reason} +%% NrRows = n_rows() +select_count(Ref, SQLQuery, Timeout) -> + conn_fwd(Ref, #sql_select_count{q = SQLQuery}, Timeout). + +select(Ref, Pos, N) -> + select(Ref, Pos, N, infinity). + +%% Arguments: +%% Ref = connection_reference() +%% Pos = integer() +%% Timeout = time_out() +%% Returns: +%% {selected, ColNames, Rows} | {error, Reason} +%% Rows = rows() +select(Ref, Pos, N, Timeout) -> + conn_fwd(Ref, #sql_select{pos = Pos, n = N}, Timeout). + +param_query(Ref, SQLQuery, Params) -> + param_query(Ref, SQLQuery, Params, infinity). + +%% Arguments: +%% Ref = connection_reference() +%% SQLQuery = string() +%% Params = [{odbc_data_type(), [value()]}] +%% Timeout = time_out() +%% Returns: +%% {selected, ColNames, Rows} | {error, Reason} +%% Rows = rows() +param_query(Ref, SQLQuery, Params, Timeout) -> + conn_fwd(Ref, #sql_param_query{q = SQLQuery, params = Params}, Timeout). + +sql_query(Ref, SQLQuery) -> + sql_query(Ref, SQLQuery, infinity). + +%% Arguments: +%% Ref = connection_reference() +%% SQLQuery = string() +%% Timeout = time_out() +%% Returns: +%% {selected, ColNames, Rows} | {error, Reason} +%% Rows = rows() +sql_query(Ref, SQLQuery, Timeout) -> + conn_fwd(Ref, #sql_query{q = SQLQuery}, Timeout). + +conn_fwd(Ref, Msg, Timeout) -> + gen_server:call(Ref, {Msg, Timeout}, infinity). + +%% Convert type needs some love! Cover all bases here instead of +%% fudging. +convert_type("timestamp") -> + sql_timestamp; +convert_type("int") -> + sql_integer; +convert_type("int(" ++ Rest) -> + Size = find_data_size(Rest), + {sql_numeric, list_to_integer(Size)}; +convert_type("decimal(" ++ Rest) -> + Size = find_data_size(Rest), + {sql_decimal, list_to_integer(Size)}; +convert_type("float(" ++ Rest) -> + Size = find_data_size(Rest), + {sql_float, list_to_float(Size)}; +convert_type("char(" ++ Rest) -> + Size = find_data_size(Rest), + {sql_char, list_to_integer(Size)}; +convert_type("varchar(" ++ Rest) -> + Size = find_data_size(Rest), + {sql_varchar, list_to_integer(Size)}. + +find_data_size(Str) -> + find_data_size(Str, []). + +find_data_size([$) | _Rest], Accum) -> + lists:reverse(Accum); +find_data_size([H | T], Accum) -> + find_data_size(T, [H | Accum]). diff --git a/lib/src/mysqlerl.hrl b/lib/src/mysqlerl.hrl new file mode 100644 index 0000000..99e674f --- /dev/null +++ b/lib/src/mysqlerl.hrl @@ -0,0 +1,9 @@ +-record(sql_connect, {host, port, database, user, password, options}). +-record(sql_query, {q}). +-record(sql_param_query, {q, params}). +-record(sql_select_count, {q}). +-record(sql_select, {pos, n}). +-record(sql_first, {}). +-record(sql_last, {}). +-record(sql_next, {}). +-record(sql_prev, {}). diff --git a/lib/src/mysqlerl_app.erl b/lib/src/mysqlerl_app.erl new file mode 100644 index 0000000..cdb8ade --- /dev/null +++ b/lib/src/mysqlerl_app.erl @@ -0,0 +1,14 @@ +-module(mysqlerl_app). +-author('bjc@kublai.com'). + +-behavior(application). + +%% Behavior callbacks. +-export([start/2, stop/1]). + +start(normal, []) -> + register(?MODULE, self()), + mysqlerl_sup:start_link(). + +stop([]) -> + ok. diff --git a/lib/src/mysqlerl_connection.erl b/lib/src/mysqlerl_connection.erl new file mode 100644 index 0000000..1a3d900 --- /dev/null +++ b/lib/src/mysqlerl_connection.erl @@ -0,0 +1,64 @@ +-module(mysqlerl_connection). +-author('bjc@kublai.com'). + +-include("mysqlerl.hrl"). +-include("mysqlerl_port.hrl"). + +-behavior(gen_server). + +-export([start_link/7, stop/1]). + +-export([init/1, terminate/2, code_change/3, + handle_call/3, handle_cast/2, handle_info/2]). + +-record(state, {sup, owner}). + +start_link(Owner, Host, Port, Database, User, Password, Options) -> + gen_server:start_link(?MODULE, [Owner, Host, Port, Database, + User, Password, Options], []). + +stop(Pid) -> + gen_server:call(Pid, stop). + +init([Owner, Host, Port, Database, User, Password, Options]) -> + process_flag(trap_exit, true), + link(Owner), + {ok, Sup} = mysqlerl_port_sup:start_link(helper(), Host, Port, Database, + User, Password, Options), + {ok, #state{sup = Sup, owner = Owner}}. + +terminate(Reason, _State) -> + io:format("DEBUG: connection got terminate: ~p~n", [Reason]), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_call(Request, From, #state{owner = Owner} = State) + when Owner /= element(1, From) -> + error_logger:warning_msg("Request from ~p (owner: ~p): ~p", + [element(1, From), Owner, Request]), + {reply, {error, process_not_owner_of_odbc_connection}, State}; +handle_call(stop, _From, State) -> + {stop, normal, State}; +handle_call(Request, _From, State) -> + {reply, gen_server:call(port_ref(State#state.sup), + #req{request = Request}, infinity), State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({'EXIT', Pid, _Reason}, #state{owner = Pid} = State) -> + io:format("DEBUG: owner ~p shut down.~n", [Pid]), + {stop, normal, State}. + +helper() -> + case code:priv_dir(mysqlerl) of + PrivDir when is_list(PrivDir) -> ok; + {error, bad_name} -> PrivDir = filename:join(["..", "priv"]) + end, + filename:join([PrivDir, "mysqlerl"]). + +port_ref(Sup) -> + [{mysqlerl_port, Ref, worker, _}] = supervisor:which_children(Sup), + Ref. diff --git a/lib/src/mysqlerl_connection_sup.erl b/lib/src/mysqlerl_connection_sup.erl new file mode 100644 index 0000000..6e1632a --- /dev/null +++ b/lib/src/mysqlerl_connection_sup.erl @@ -0,0 +1,31 @@ +-module(mysqlerl_connection_sup). +-author('bjc@kublai.com'). + +-behavior(supervisor). + +-export([random_child/0]). +-export([start_link/0, connect/6]). + +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +connect(Host, Port, Database, User, Password, Options) -> + supervisor:start_child(?MODULE, [self(), Host, Port, Database, + User, Password, Options]). + +random_child() -> + case get_pids() of + [] -> {error, no_connections}; + Pids -> lists:nth(erlang:phash(now(), length(Pids)), Pids) + end. + +init([]) -> + Connection = {undefined, {mysqlerl_connection, start_link, []}, + transient, 5, worker, [mysqlerl_connection]}, + {ok, {{simple_one_for_one, 10, 5}, + [Connection]}}. + +get_pids() -> + [Pid || {_Id, Pid, _Type, _Modules} <- supervisor:which_children(?MODULE)]. diff --git a/lib/src/mysqlerl_port.erl b/lib/src/mysqlerl_port.erl new file mode 100644 index 0000000..74fe177 --- /dev/null +++ b/lib/src/mysqlerl_port.erl @@ -0,0 +1,83 @@ +-module(mysqlerl_port). +-author('bjc@kublai.com'). + +-include("mysqlerl.hrl"). +-include("mysqlerl_port.hrl"). + +-behavior(gen_server). + +-export([start_link/7]). +-export([init/1, terminate/2, code_change/3, + handle_call/3, handle_cast/2, handle_info/2]). + +-define(CONNECT_TIMEOUT, 30000). + +-record(state, {ref}). +-record(port_closed, {reason}). + +start_link(Cmd, Host, Port, Database, User, Password, Options) -> + gen_server:start_link(?MODULE, + [Cmd, Host, Port, Database, User, Password, Options], + []). + +init([Cmd, Host, Port, Database, User, Password, Options]) -> + process_flag(trap_exit, true), + Ref = open_port({spawn, Cmd}, [{packet, 4}, binary]), + {data, ok} = send_port_cmd(Ref, #sql_connect{host = Host, + port = Port, + database = Database, + user = User, + password = Password, + options = Options}, + ?CONNECT_TIMEOUT), + {ok, #state{ref = Ref}}. + +terminate(#port_closed{reason = Reason}, #state{ref = Ref}) -> + io:format("DEBUG: mysqlerl connection ~p shutting down (~p).~n", + [Ref, Reason]), + ok; +terminate(Reason, State) -> + catch port_close(State#state.ref), + io:format("DEBUG: mysqlerl_port got terminate: ~p~n", [Reason]), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_call(#req{request = {Request, Timeout}}, From, + #state{ref = Ref} = State) -> + case send_port_cmd(Ref, Request, Timeout) of + {data, Res} -> + {reply, Res, State}; + {'EXIT', Ref, Reason} -> + gen_server:reply(From, {error, connection_closed}), + {stop, #port_closed{reason = Reason}, State}; + timeout -> + gen_server:reply(From, timeout), + {stop, timeout, State}; + Other -> + error_logger:warning_msg("Got unknown query response: ~p~n", + [Other]), + gen_server:reply(From, {error, connection_closed}), + {stop, {unknownreply, Other}, State} + end. + + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({'EXIT', Ref, Reason}, #state{ref = Ref} = State) -> + io:format("DEBUG: Port ~p closed on ~p.~n", [Ref, State]), + {stop, #port_closed{reason = Reason}, State}. + + +send_port_cmd(Ref, Request, Timeout) -> + io:format("DEBUG: Sending request: ~p~n", [Request]), + port_command(Ref, term_to_binary(Request)), + receive + {Ref, {data, Res}} -> + {data, binary_to_term(Res)}; + Other -> Other + after Timeout -> + timeout + end. diff --git a/lib/src/mysqlerl_port.hrl b/lib/src/mysqlerl_port.hrl new file mode 100644 index 0000000..a7a3300 --- /dev/null +++ b/lib/src/mysqlerl_port.hrl @@ -0,0 +1 @@ +-record(req, {request}). diff --git a/lib/src/mysqlerl_port_sup.erl b/lib/src/mysqlerl_port_sup.erl new file mode 100644 index 0000000..3053cdc --- /dev/null +++ b/lib/src/mysqlerl_port_sup.erl @@ -0,0 +1,18 @@ +-module(mysqlerl_port_sup). +-author('bjc@kublai.com'). + +-behavior(supervisor). + +-export([start_link/7]). +-export([init/1]). + +start_link(Cmd, Host, Port, Database, User, Password, Options) -> + supervisor:start_link(?MODULE, [Cmd, Host, Port, Database, + User, Password, Options]). + +init([Cmd, Host, Port, Database, User, Password, Options]) -> + Ref = {mysqlerl_port, {mysqlerl_port, start_link, + [Cmd, Host, Port, Database, + User, Password, Options]}, + transient, 5, worker, [mysqlerl_port]}, + {ok, {{one_for_one, 10, 5}, [Ref]}}. diff --git a/lib/src/mysqlerl_sup.erl b/lib/src/mysqlerl_sup.erl new file mode 100644 index 0000000..1ebfbf2 --- /dev/null +++ b/lib/src/mysqlerl_sup.erl @@ -0,0 +1,16 @@ +-module(mysqlerl_sup). +-author('bjc@kublai.com'). + +-behavior(supervisor). + +-export([start_link/0, init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + ConnectionSup = {mysqlerl_connection_sup, + {mysqlerl_connection_sup, start_link, []}, + permanent, infinity, supervisor, + [mysqlerl_connection_sup]}, + {ok, {{one_for_one, 10, 5}, [ConnectionSup]}}. diff --git a/src/Makefile b/src/Makefile deleted file mode 100644 index 3a1510b..0000000 --- a/src/Makefile +++ /dev/null @@ -1,44 +0,0 @@ -ERLPREFIX = /usr/local -ERLINTERFACE = erl_interface-3.7.6 -ERLINCS = -I$(ERLPREFIX)/lib/erlang/lib/$(ERLINTERFACE)/include -ERLLIBS = -L$(ERLPREFIX)/lib/erlang/lib/$(ERLINTERFACE)/lib \ - -lerl_interface -lei - -MYSQLPREFIX = /usr/local/mysql -MYSQLINCS = -I$(MYSQLPREFIX)/include -MYSQLLIBS = -L$(MYSQLPREFIX)/lib -lmysqlclient - -CFLAGS = -g -Wall -Werror $(MYSQLINCS) $(ERLINCS) -LDFLAGS = $(MYSQLLIBS) $(ERLLIBS) -EFLAGS = -W +debug_info - -PRIVDIR = ../priv -BEAMDIR = ../ebin - -BINS = $(PRIVDIR)/mysqlerl $(BEAMDIR)/mysqlerl.app -MYSQLERLOBJS = io.o log.o msg.o mysqlerl.o -BEAMS = mysqlerl.beam mysqlerl_app.beam mysqlerl_connection.beam \ - mysqlerl_connection_sup.beam mysqlerl_port.beam \ - mysqlerl_port_sup.beam mysqlerl_sup.beam - -all: $(PRIVDIR) $(BEAMDIR) $(BINS) - -clean: - rm -rf *.o *.beam - rm -rf $(BINS) $(MYSQLERLOBJS) $(BEAMS) $(BEAMDIR)/mysqlerl.app - -%.beam: %.erl - erlc $(EFLAGS) $< - -$(PRIVDIR)/mysqlerl: $(PRIVDIR) $(MYSQLERLOBJS) - $(CC) -o $@ $(MYSQLERLOBJS) $(LDFLAGS) - -$(BEAMDIR)/mysqlerl.app: $(BEAMDIR) $(BEAMS) - cp $(BEAMS) $(BEAMDIR) - cp mysqlerl.app $(BEAMDIR) - -$(PRIVDIR): - mkdir -p $(PRIVDIR) - -$(BEAMDIR): - mkdir -p $(BEAMDIR) diff --git a/src/TODO.org b/src/TODO.org deleted file mode 100644 index 636f47d..0000000 --- a/src/TODO.org +++ /dev/null @@ -1,6 +0,0 @@ -* Support ODBC options where applicable. -* Test functions die (or don't) correctly when not connected to DB. -* Connection pooling with multiple databases. -I'm guessing that it doesn't work right now. Looks like -mysqlerl_connection_sup:connect starts a simple-one-for-one port, and -commands use :random_child to do dispatch. That's bad. diff --git a/src/io.c b/src/io.c deleted file mode 100644 index dcc055f..0000000 --- a/src/io.c +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (C) 2008, Brian Cully - */ - -#include "io.h" -#include "log.h" - -#include -#include -#include -#include - -int -restartable_read(unsigned char *buf, size_t buflen) -{ - ssize_t rc, readb; - - rc = 0; - READLOOP: - while (rc < buflen) { - readb = read(STDIN_FILENO, buf + rc, buflen - rc); - if (readb == -1) { - if (errno == EAGAIN || errno == EINTR) - goto READLOOP; - - return -1; - } else if (readb == 0) { - logmsg("ERROR: EOF trying to read additional %d bytes from " - "standard input", buflen - rc); - return -1; - } - - rc += readb; - } - - return rc; -} - -int -restartable_write(const unsigned char *buf, size_t buflen) -{ - ssize_t rc, wroteb; - - rc = 0; - WRITELOOP: - while (rc < buflen) { - wroteb = write(STDOUT_FILENO, buf + rc, buflen - rc); - if (wroteb == -1) { - if (errno == EAGAIN || errno == EINTR) - goto WRITELOOP; - - return -1; - } - - rc += wroteb; - } - - return rc; -} diff --git a/src/io.h b/src/io.h deleted file mode 100644 index bf4e086..0000000 --- a/src/io.h +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright (C) 2008, Brian Cully - */ - -#ifndef _IO_H -#define _IO_H - -#include - -int restartable_read(unsigned char *buf, size_t buflen); -int restartable_write(const unsigned char *buf, size_t buflen); - -#endif diff --git a/src/log.c b/src/log.c deleted file mode 100644 index 80aa755..0000000 --- a/src/log.c +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (C) 2008, Brian Cully - */ - -#include "log.h" - -#include -#include -#include -#include - -const char *LOGPATH = "/tmp/mysqlerl.log"; -static FILE *logfile = NULL; - -void -openlog() -{ - logfile = fopen(LOGPATH, "a"); -} - -void -closelog() -{ - fclose(logfile); -} - -void -logmsg(const char *format, ...) -{ - FILE *out = logfile; - char timebuf[32] = "\0"; - struct tm now_tm; - time_t now_time; - va_list args; - - va_start(args, format); - - if (logfile == NULL) - out = stderr; - - if (time(&now_time) == (time_t)-1) { - (void)fprintf(out, "LOGERROR - Failed to fetch time: "); - } else { - (void)localtime_r(&now_time, &now_tm); - if (strftime(timebuf, sizeof(timebuf), "%Y%m%d %H:%M:%S ", &now_tm) == 0) { - (void)fprintf(out, "LOGERROR - Failed to parse time (now: %d): ", - (int)now_time); - } else { - (void)fprintf(out, "%s", timebuf); - } - } - (void)fprintf(out, "[%d]: ", getpid()); - (void)vfprintf(out, format, args); - (void)fprintf(out, "\n"); - - fflush(out); - - va_end(args); -} - diff --git a/src/log.h b/src/log.h deleted file mode 100644 index ae9a9bd..0000000 --- a/src/log.h +++ /dev/null @@ -1,12 +0,0 @@ -/* - * Copyright (C) 2008, Brian Cully - */ - -#ifndef _LOG_H -#define _LOG_H - -void openlog(); -void closelog(); -void logmsg(const char *format, ...); - -#endif diff --git a/src/msg.c b/src/msg.c deleted file mode 100644 index 00a5a6a..0000000 --- a/src/msg.c +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (C) 2008, Brian Cully - */ - -#include "msg.h" - -#include "io.h" -#include "log.h" - -#include - -ETERM * -read_msg() -{ - ETERM *msg; - unsigned char *buf; - msglen_t len; - - logmsg("DEBUG: reading message length."); - if (restartable_read((unsigned char *)&len, sizeof(len)) == -1) { - logmsg("ERROR: couldn't read %d byte message prefix: %s.", - sizeof(len), strerror(errno)); - - exit(2); - } - - len = ntohl(len); - buf = (unsigned char *)malloc(len); - if (buf == NULL) { - logmsg("ERROR: Couldn't malloc %d bytes: %s.", len, - strerror(errno)); - - exit(2); - } - - logmsg("DEBUG: reading message body (len: %d).", len); - if (restartable_read(buf, len) == -1) { - logmsg("ERROR: couldn't read %d byte message: %s.", - len, strerror(errno)); - - free(buf); - exit(2); - } - - msg = erl_decode(buf); - free(buf); - - return msg; -} - -int -write_msg(ETERM *msg) -{ - unsigned char *buf; - msglen_t nlen, buflen; - - buflen = erl_term_len(msg); - buf = (unsigned char *)malloc(buflen); - erl_encode(msg, buf); - erl_free_term(msg); - - nlen = htonl(buflen); - if (restartable_write((unsigned char *)&nlen, sizeof(nlen)) == -1) { - free(buf); - return -1; - } - if (restartable_write(buf, buflen) == -1) { - free(buf); - return -1; - } - free(buf); - - return 0; -} diff --git a/src/msg.h b/src/msg.h deleted file mode 100644 index 6db2aa8..0000000 --- a/src/msg.h +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright (C) 2008, Brian Cully - */ - -#ifndef _MSG_H -#define _MSG_H - -#include -#include -#include - -typedef u_int32_t msglen_t; - -ETERM *read_msg(); -int write_msg(ETERM *msg); - -#endif diff --git a/src/mysqlerl.app b/src/mysqlerl.app deleted file mode 100644 index 5687463..0000000 --- a/src/mysqlerl.app +++ /dev/null @@ -1,12 +0,0 @@ -%% -*- Erlang -*- -%% Copyright (C) 2008, Brian Cully - -{application, mysqlerl, - [{description, "mysqlerl"}, - {vsn, "0"}, - {modules, [mysqlerl, mysqlerl_app, mysqlerl_connection_sup, - mysqlerl_connection, mysql_port_sup, mysql_port]}, - {registered, [mysqlerl, mysqlerl_app, mysqlerl_connection_sup]}, - {applications, [kernel, stdlib]}, - {env, []}, - {mod, {mysqlerl_app, []}}]}. diff --git a/src/mysqlerl.c b/src/mysqlerl.c deleted file mode 100644 index 020d9eb..0000000 --- a/src/mysqlerl.c +++ /dev/null @@ -1,704 +0,0 @@ -/* - * MySQL port driver. - * - * Copyright (C) 2008, Brian Cully - */ - -#include "io.h" -#include "log.h" -#include "msg.h" - -#include -#include -#include - -const int TRUTHY = 1; -const int FALSY = 0; - -const char *CONNECT_MSG = "sql_connect"; -const char *QUERY_MSG = "sql_query"; -const char *PARAM_QUERY_MSG = "sql_param_query"; -const char *SELECT_COUNT_MSG = "sql_select_count"; -const char *SELECT_MSG = "sql_select"; -const char *FIRST_MSG = "sql_first"; -const char *LAST_MSG = "sql_last"; -const char *NEXT_MSG = "sql_next"; -const char *PREV_MSG = "sql_prev"; - -const char *NULL_SQL = "null"; -const char *NUMERIC_SQL = "sql_numeric"; -const char *DECIMAL_SQL = "sql_decimal"; -const char *FLOAT_SQL = "sql_float"; -const char *CHAR_SQL = "sql_char"; -const char *VARCHAR_SQL = "sql_varchar"; -const char *TIMESTAMP_SQL = "sql_timestamp"; -const char *INTEGER_SQL = "sql_integer"; - -MYSQL dbh; -MYSQL_RES *results = NULL; -my_ulonglong resultoffset = 0, numrows = 0; - -void -set_mysql_results() -{ - if (results) - mysql_free_result(results); - results = mysql_store_result(&dbh); - - resultoffset = 0; - numrows = results ? mysql_num_rows(results) : 0; -} - -ETERM * -make_cols(MYSQL_FIELD *fields, unsigned int num_fields) -{ - ETERM **cols, *rc; - unsigned int i; - - cols = (ETERM **)malloc(num_fields * sizeof(ETERM *)); - if (cols == NULL) { - logmsg("ERROR: Couldn't allocate %d bytes for columns: %s", - strerror(errno)); - exit(3); - } - - for (i = 0; i < num_fields; i++) - cols[i] = erl_mk_string(fields[i].name); - - rc = erl_mk_list(cols, num_fields); - - for (i = 0; i < num_fields; i++) - erl_free_term(cols[i]); - free(cols); - - return rc; -} - -ETERM * -make_row(MYSQL_ROW row, unsigned long *lengths, unsigned int num_fields) -{ - ETERM **rowtup, *rc; - unsigned int i; - - rowtup = (ETERM **)malloc(num_fields * sizeof(ETERM *)); - if (rowtup == NULL) { - logmsg("ERROR: Couldn't allocate %d bytes for row: %s", - strerror(errno)); - exit(3); - } - - for (i = 0; i < num_fields; i++) { - if (row[i]) - rowtup[i] = erl_mk_estring(row[i], lengths[i]); - else - rowtup[i] = erl_mk_atom("null"); - } - - rc = erl_mk_tuple(rowtup, num_fields); - if (rc == NULL) { - logmsg("ERROR: couldn't allocate %d-tuple", num_fields); - exit(3); - } - - for (i = 0; i < num_fields; i++) - erl_free_term(rowtup[i]); - free(rowtup); - - return rc; -} - -ETERM * -make_rows(unsigned int num_rows, unsigned int num_fields) -{ - ETERM **rows, *rc; - unsigned int i; - - rows = (ETERM **)malloc(num_rows * sizeof(ETERM *)); - if (rows == NULL) { - logmsg("ERROR: Couldn't allocate %d bytes for rows: %s", - strerror(errno)); - exit(3); - } - - for (i = 0; i < num_rows; i++) { - ETERM *rt; - unsigned long *lengths; - MYSQL_ROW row; - - row = mysql_fetch_row(results); - resultoffset++; - lengths = mysql_fetch_lengths(results); - - rt = make_row(row, lengths, num_fields); - rows[i] = erl_format("~w", rt); - erl_free_term(rt); - } - - rc = erl_mk_list(rows, num_rows); - - for (i = 0; i < num_rows; i++) - erl_free_term(rows[i]); - free(rows); - - return rc; -} - -ETERM * -handle_mysql_result() -{ - MYSQL_FIELD *fields; - ETERM *ecols, *erows, *resp; - unsigned int num_fields; - - num_fields = mysql_num_fields(results); - fields = mysql_fetch_fields(results); - - ecols = make_cols(fields, num_fields); - erows = make_rows(numrows, num_fields); - - resp = erl_format("{selected, ~w, ~w}", ecols, erows); - - erl_free_term(ecols); - erl_free_term(erows); - - return resp; -} - -void -handle_connect(ETERM *msg) -{ - ETERM *resp, *tmp; - char *host, *db_name, *user, *passwd; - int port; - - tmp = erl_element(2, msg); - host = erl_iolist_to_string(tmp); - erl_free_term(tmp); - - tmp = erl_element(3, msg); - port = ERL_INT_VALUE(tmp); - erl_free_term(tmp); - - tmp = erl_element(4, msg); - db_name = erl_iolist_to_string(tmp); - erl_free_term(tmp); - - tmp = erl_element(5, msg); - user = erl_iolist_to_string(tmp); - erl_free_term(tmp); - - tmp = erl_element(6, msg); - passwd = erl_iolist_to_string(tmp); - erl_free_term(tmp); - - /* TODO: handle options, passed in next. */ - - logmsg("INFO: Connecting to %s on %s:%d as %s", db_name, host, port, user); - if (mysql_real_connect(&dbh, host, user, passwd, - db_name, port, NULL, 0) == NULL) { - logmsg("ERROR: Failed to connect to database %s as %s: %s.", - db_name, user, mysql_error(&dbh)); - exit(2); - } - - resp = erl_format("ok"); - write_msg(resp); - erl_free_term(resp); -} - -void -handle_query(ETERM *cmd) -{ - ETERM *query, *resp; - char *q; - - query = erl_element(2, cmd); - q = erl_iolist_to_string(query); - erl_free_term(query); - - logmsg("DEBUG: got query msg: %s.", q); - if (mysql_query(&dbh, q)) { - resp = erl_format("{error, {mysql_error, ~i, ~s}}", - mysql_errno(&dbh), mysql_error(&dbh)); - } else { - set_mysql_results(); - if (results) { - resp = handle_mysql_result(); - set_mysql_results(); - } else { - if (mysql_field_count(&dbh) == 0) - resp = erl_format("{updated, ~i}", mysql_affected_rows(&dbh)); - else - resp = erl_format("{error, {mysql_error, ~i, ~s}}", - mysql_errno(&dbh), mysql_error(&dbh)); - } - } - erl_free(q); - - write_msg(resp); - erl_free_term(resp); -} - -/* - * http://dev.mysql.com/doc/refman/5.1/en/mysql-stmt-execute.html - * - * 6 > odbc:param_query(Ref, - * "INSERT INTO EMPLOYEE (NR, FIRSTNAME, " - * "LASTNAME, GENDER) VALUES(?, ?, ?, ?)", - * [{sql_integer,[2,3,4,5,6,7,8]}, - * {{sql_varchar, 20}, - * ["John", "Monica", "Ross", "Rachel", - * "Piper", "Prue", "Louise"]}, - * {{sql_varchar, 20}, - * ["Doe","Geller","Geller", "Green", - * "Halliwell", "Halliwell", "Lane"]}, - * {{sql_char, 1}, ["M","F","M","F","T","F","F"]}]). - * {updated, 7} - */ -void -handle_param_query(ETERM *msg) -{ - ETERM *query, *params, *p, *tmp, *resp; - MYSQL_STMT *sth; - MYSQL_BIND *bind; - char *q; - int param_count, i; - - query = erl_element(2, msg); - q = erl_iolist_to_string(query); - erl_free_term(query); - - params = erl_element(3, msg); - erl_free_term(params); - - logmsg("DEBUG: got param query msg: %s.", q); - - sth = mysql_stmt_init(&dbh); - if (mysql_stmt_prepare(sth, q, strlen(q))) { - resp = erl_format("{error, {mysql_error, ~i, ~s}}", - mysql_errno(&dbh), mysql_error(&dbh)); - } else { - param_count = mysql_stmt_param_count(sth); - if (param_count != erl_length(params)) { - resp = erl_format("{error, {mysql_error, -1, [expected_params, %d, got_params, %d]}}", param_count, erl_length(params)); - } else { - bind = malloc(param_count * sizeof(MYSQL_BIND)); - if (bind == NULL) { - logmsg("ERROR: Couldn't allocate %d bytes for bind params.", - param_count * sizeof(MYSQL_BIND)); - exit(3); - } - memset(bind, 0, param_count * sizeof(MYSQL_BIND)); - - for (i = 0, tmp = params; - (p = erl_hd(tmp)) != NULL && i < 1000; - i++, tmp = erl_tl(tmp)) { - ETERM *type, *value; - - type = erl_element(1, p); - value = erl_element(2, p); - - if (ERL_IS_TUPLE(type)) { - ETERM *t_type, *t_size; - char *t; - - t_size = erl_element(2, type); - bind[i].buffer_length = ERL_INT_VALUE(t_size); - erl_free_term(t_size); - - t_type = erl_element(1, type); - t = (char *)ERL_ATOM_PTR(t_type); - bind[i].length = malloc(sizeof(unsigned long)); - if (strncmp(t, NUMERIC_SQL, strlen(NUMERIC_SQL)) == 0) { - int val; - - bind[i].buffer_type = MYSQL_TYPE_LONG; - *bind[i].length = bind[i].buffer_length * sizeof(int); - bind[i].buffer = malloc(*bind[i].length); - memset(bind[i].buffer, 0, *bind[i].length); - - val = ERL_INT_VALUE(value); - memcpy(bind[i].buffer, &val, *bind[i].length); - } else if (strncmp(t, DECIMAL_SQL, strlen(DECIMAL_SQL)) == 0) { - char *val; - - bind[i].buffer_type = MYSQL_TYPE_STRING; - *bind[i].length = bind[i].buffer_length * sizeof(char *); - bind[i].buffer = malloc(*bind[i].length); - memset(bind[i].buffer, 0, *bind[i].length); - - val = erl_iolist_to_string(value); - if (val) { - memcpy(bind[i].buffer, val, *bind[i].length); - free(val); - } - } else if (strncmp(t, FLOAT_SQL, strlen(FLOAT_SQL)) == 0) { - float val; - - bind[i].buffer_type = MYSQL_TYPE_FLOAT; - *bind[i].length = bind[i].buffer_length * sizeof(float); - bind[i].buffer = malloc(*bind[i].length); - memset(bind[i].buffer, 0, *bind[i].length); - - val = ERL_FLOAT_VALUE(value); - memcpy(bind[i].buffer, &val, *bind[i].length); - } else if (strncmp(t, CHAR_SQL, strlen(CHAR_SQL)) == 0) { - char *val; - - bind[i].buffer_type = MYSQL_TYPE_STRING; - *bind[i].length = bind[i].buffer_length * sizeof(char *); - bind[i].buffer = malloc(*bind[i].length); - memset(bind[i].buffer, 0, *bind[i].length); - - val = erl_iolist_to_string(value); - if (val) { - memcpy(bind[i].buffer, val, *bind[i].length); - free(val); - } - } else if (strncmp(t, VARCHAR_SQL, strlen(VARCHAR_SQL)) == 0) { - char *val; - - bind[i].buffer_type = MYSQL_TYPE_BLOB; - *bind[i].length = bind[i].buffer_length * sizeof(char *); - bind[i].buffer = malloc(*bind[i].length); - memset(bind[i].buffer, 0, *bind[i].length); - - val = erl_iolist_to_string(value); - if (val) { - memcpy(bind[i].buffer, val, *bind[i].length); - free(val); - } - } else { - logmsg("ERROR: Unknown sized type: {%s, %d}", t, - bind[i].buffer_length); - exit(3); - } - erl_free_term(t_type); - } else { - char *t; - - t = (char *)ERL_ATOM_PTR(type); - if (strncmp(t, TIMESTAMP_SQL, strlen(TIMESTAMP_SQL)) == 0) { - bind[i].buffer_type = MYSQL_TYPE_TIMESTAMP; - *bind[i].length = sizeof(MYSQL_TIME); - bind[i].buffer = malloc(*bind[i].length); - memset(bind[i].buffer, 0, *bind[i].length); - - memcpy(bind[i].buffer, value, *bind[i].length); - } else if (strncmp(t, INTEGER_SQL, strlen(INTEGER_SQL)) == 0) { - int val; - - bind[i].buffer_type = MYSQL_TYPE_LONG; - *bind[i].length = sizeof(int); - bind[i].buffer = malloc(*bind[i].length); - memset(bind[i].buffer, 0, *bind[i].length); - - val = ERL_INT_VALUE(value); - memcpy(bind[i].buffer, &val, *bind[i].length); - } else { - logmsg("ERROR: Unknown type: %s", t); - exit(3); - } - } - - bind[i].is_null = malloc(sizeof(int)); - if (ERL_IS_ATOM(value) - && strncmp((char *)ERL_ATOM_PTR(value), - NULL_SQL, strlen(NULL_SQL)) == 0) - memcpy(bind[i].is_null, &TRUTHY, sizeof(int)); - else - memcpy(bind[i].is_null, &FALSY, sizeof(int)); - - erl_free_term(value); - erl_free_term(type); - } - erl_free_term(params); - - if (mysql_stmt_bind_param(sth, bind)) { - resp = erl_format("{error, {mysql_error, ~i, ~s}}", - mysql_errno(&dbh), mysql_error(&dbh)); - } else { - if (mysql_stmt_execute(sth)) { - resp = erl_format("{error, {mysql_error, ~i, ~s}}", - mysql_errno(&dbh), mysql_error(&dbh)); - } else { - set_mysql_results(); - if (results) { - resp = handle_mysql_result(); - set_mysql_results(); - } else { - if (mysql_field_count(&dbh) == 0) - resp = erl_format("{updated, ~i}", mysql_affected_rows(&dbh)); - else - resp = erl_format("{error, {mysql_error, ~i, ~s}}", - mysql_errno(&dbh), mysql_error(&dbh)); - } - } - } - - for (i = 0; i < param_count; i++) { - free(bind[i].buffer); - free(bind[i].is_null); - } - free(bind); - } - } - erl_free(q); - - mysql_stmt_close(sth); - - write_msg(resp); - erl_free_term(resp); -} - -void -handle_select_count(ETERM *msg) -{ - ETERM *query, *resp; - char *q; - - query = erl_element(2, msg); - q = erl_iolist_to_string(query); - erl_free_term(query); - - logmsg("DEBUG: got select count msg: %s.", q); - if (mysql_query(&dbh, q)) { - resp = erl_format("{error, {mysql_error, ~i, ~s}}", - mysql_errno(&dbh), mysql_error(&dbh)); - } else { - set_mysql_results(); - if (results) { - resp = erl_format("{ok, ~i}", numrows); - } else { - if (mysql_field_count(&dbh) == 0) - resp = erl_format("{ok, ~i}", mysql_affected_rows(&dbh)); - else - resp = erl_format("{error, {mysql_error, ~i, ~s}}", - mysql_errno(&dbh), mysql_error(&dbh)); - } - } - erl_free(q); - - write_msg(resp); - erl_free_term(resp); -} - -void -handle_select(ETERM *msg) -{ - MYSQL_FIELD *fields; - ETERM *epos, *enum_items, *ecols, *erows, *resp; - my_ulonglong pos, num_items; - unsigned int num_fields; - - epos = erl_element(2, msg); - enum_items = erl_element(3, msg); - pos = ERL_INT_UVALUE(epos); - num_items = ERL_INT_UVALUE(enum_items); - erl_free_term(enum_items); - erl_free_term(epos); - - logmsg("DEBUG: got select pos: %d, n: %d.", erl_size(msg), pos, num_items); - if (results == NULL) { - logmsg("ERROR: select message w/o cursor."); - exit(2); - } - - num_fields = mysql_num_fields(results); - fields = mysql_fetch_fields(results); - if (resultoffset > 0) - resultoffset = pos - 1; - if (num_items > numrows - resultoffset) - num_items = numrows - resultoffset; - mysql_data_seek(results, resultoffset); - - ecols = make_cols(fields, num_fields); - erows = make_rows(num_items, num_fields); - resp = erl_format("{selected, ~w, ~w}", ecols, erows); - erl_free_term(erows); - - erl_free_term(ecols); - write_msg(resp); - erl_free_term(resp); -} - -void -handle_first(ETERM *msg) -{ - MYSQL_FIELD *fields; - ETERM *ecols, *erows, *resp; - unsigned int num_fields; - - logmsg("DEBUG: got first msg."); - if (results == NULL) { - logmsg("ERROR: got first message w/o cursor."); - exit(2); - } - - num_fields = mysql_num_fields(results); - fields = mysql_fetch_fields(results); - resultoffset = 0; - mysql_data_seek(results, resultoffset); - - ecols = make_cols(fields, num_fields); - erows = make_rows(1, num_fields); - resp = erl_format("{selected, ~w, ~w}", ecols, erows); - erl_free_term(erows); - - erl_free_term(ecols); - write_msg(resp); - erl_free_term(resp); -} - -void -handle_last(ETERM *msg) -{ - MYSQL_FIELD *fields; - ETERM *ecols, *erows, *resp; - unsigned int num_fields; - - logmsg("DEBUG: got last msg."); - if (results == NULL) { - logmsg("ERROR: got last message w/o cursor."); - exit(2); - } - - num_fields = mysql_num_fields(results); - fields = mysql_fetch_fields(results); - resultoffset = numrows - 1; - mysql_data_seek(results, resultoffset); - - ecols = make_cols(fields, num_fields); - erows = make_rows(1, num_fields); - resp = erl_format("{selected, ~w, ~w}", ecols, erows); - erl_free_term(erows); - - erl_free_term(ecols); - write_msg(resp); - erl_free_term(resp); -} - -void -handle_next(ETERM *msg) -{ - MYSQL_FIELD *fields; - ETERM *ecols, *erows, *resp; - unsigned int num_fields; - - logmsg("DEBUG: got next msg."); - if (results == NULL) { - logmsg("ERROR: got next message w/o cursor."); - exit(2); - } - - num_fields = mysql_num_fields(results); - fields = mysql_fetch_fields(results); - - ecols = make_cols(fields, num_fields); - logmsg("resultoffset: %d, num_rows: %d", resultoffset, numrows); - if (resultoffset == numrows) { - resp = erl_format("{selected, ~w, []}", ecols); - } else { - erows = make_rows(1, num_fields); - resp = erl_format("{selected, ~w, ~w}", ecols, erows); - erl_free_term(erows); - } - - erl_free_term(ecols); - write_msg(resp); - erl_free_term(resp); -} - -void -handle_prev(ETERM *msg) -{ - MYSQL_FIELD *fields; - ETERM *ecols, *erows, *resp; - unsigned int num_fields; - - logmsg("DEBUG: got prev msg."); - if (results == NULL) { - logmsg("ERROR: got prev message w/o cursor."); - exit(2); - } - - num_fields = mysql_num_fields(results); - fields = mysql_fetch_fields(results); - - ecols = make_cols(fields, num_fields); - logmsg("resultoffset: %d, num_rows: %d", resultoffset, numrows); - if (resultoffset == 0) { - resp = erl_format("{selected, ~w, []}", ecols); - } else { - resultoffset = resultoffset - 1; - mysql_data_seek(results, resultoffset); - erows = make_rows(1, num_fields); - - /* Rewind to position at the point we returned. */ - resultoffset = resultoffset - 1; - mysql_data_seek(results, resultoffset); - resp = erl_format("{selected, ~w, ~w}", ecols, erows); - erl_free_term(erows); - } - - erl_free_term(ecols); - write_msg(resp); - erl_free_term(resp); -} - -void -dispatch_db_cmd(ETERM *msg) -{ - ETERM *tag; - char *tag_name; - - tag = erl_element(1, msg); - tag_name = (char *)ERL_ATOM_PTR(tag); - if (strncmp(tag_name, CONNECT_MSG, strlen(CONNECT_MSG)) == 0) - handle_connect(msg); - else if (strncmp(tag_name, QUERY_MSG, strlen(QUERY_MSG)) == 0) - handle_query(msg); - else if (strncmp(tag_name, PARAM_QUERY_MSG, strlen(PARAM_QUERY_MSG)) == 0) - handle_param_query(msg); - else if (strncmp(tag_name, SELECT_COUNT_MSG, strlen(SELECT_COUNT_MSG)) == 0) - handle_select_count(msg); - else if (strncmp(tag_name, SELECT_MSG, strlen(SELECT_MSG)) == 0) - handle_select(msg); - else if (strncmp(tag_name, FIRST_MSG, strlen(FIRST_MSG)) == 0) - handle_first(msg); - else if (strncmp(tag_name, LAST_MSG, strlen(LAST_MSG)) == 0) - handle_last(msg); - else if (strncmp(tag_name, NEXT_MSG, strlen(NEXT_MSG)) == 0) - handle_next(msg); - else if (strncmp(tag_name, PREV_MSG, strlen(PREV_MSG)) == 0) - handle_prev(msg); - else { - logmsg("WARNING: message type %s unknown.", (char *)ERL_ATOM_PTR(tag)); - erl_free_term(tag); - exit(3); - } - - erl_free_term(tag); -} - -int -main(int argc, char *argv[]) -{ - ETERM *msg; - - openlog(); - logmsg("INFO: starting up."); - erl_init(NULL, 0); - - mysql_init(&dbh); - while ((msg = read_msg()) != NULL) { - dispatch_db_cmd(msg); - erl_free_term(msg); - } - mysql_close(&dbh); - - logmsg("INFO: shutting down."); - closelog(); - - return 0; -} diff --git a/src/mysqlerl.erl b/src/mysqlerl.erl deleted file mode 100644 index 946e9c5..0000000 --- a/src/mysqlerl.erl +++ /dev/null @@ -1,251 +0,0 @@ -%% Modeled from ODBC -%% http://www.erlang.org/doc/apps/odbc/ - --module(mysqlerl). --author('bjc@kublai.com'). - --include("mysqlerl.hrl"). - --export([convert_type/1]). - --export([test_start/0, test_msg/0, test_query/0, test_param_query/0]). - --export([start/0, start/1, stop/0, commit/2, commit/3, - connect/6, disconnect/1, describe_table/2, - describe_table/3, first/1, first/2, - last/1, last/2, next/1, next/2, prev/1, - prev/2, select_count/2, select_count/3, - select/3, select/4, param_query/3, param_query/4, - sql_query/2, sql_query/3]). - --define(CONFIG, "/Users/bjc/tmp/test-server.cfg"). - -test_start() -> - {ok, [{Host, Port, DB, User, Pass, Options}]} = file:consult(?CONFIG), - mysqlerl:connect(Host, Port, DB, User, Pass, Options). - -test_msg() -> - commit(mysqlerl_connection_sup:random_child(), - rollback, 2000). - -test_query() -> - sql_query(mysqlerl_connection_sup:random_child(), - "SELECT COUNT(*) FROM user", 2000). - -test_param_query() -> - %% This should really be an update or something, since that's how - %% it'll be used. - param_query(mysqlerl_connection_sup:random_child(), - "SELECT * FROM user WHERE username=?", - [{{sql_varchar, 20}, "bjc"}]). - -start() -> - start(temporary). - -%% Arguments: -%% Type = permanent | transient | temporary -%% -%% Returns: -%% ok | {error, Reason} -start(Type) -> - application:start(sasl), - application:start(mysqlerl, Type). - -stop() -> - application:stop(mysqlerl). - -commit(Ref, CommitMode) -> - commit(Ref, CommitMode, infinity). - -%% Arguments: -%% Ref = connection_reference() -%% Timeout = time_out() -%% CommitMode = commit | rollback -%% Reason = not_an_explicit_commit_connection | -%% process_not_owner_of_odbc_connection | -%% common_reason() -%% ok | {error, Reason} -commit(Ref, commit, Timeout) -> - case sql_query(Ref, "COMMIT", Timeout) of - {num_rows, _} -> ok; - Other -> Other - end; -commit(Ref, rollback, Timeout) -> - case sql_query(Ref, "ROLLBACK", Timeout) of - {num_rows, _} -> ok; - Other -> Other - end. - -%% Arguments: -%% Host = string() -%% Port = integer() -%% Database = string() -%% User = string() -%% Password = string() -%% Options = list() -%% -%% Returns: -%% {ok, Ref} | {error, Reason} -%% Ref = connection_reference() -connect(Host, Port, Database, User, Password, Options) -> - mysqlerl_connection_sup:connect(Host, Port, Database, - User, Password, Options). - -%% Arguments: -%% Ref = connection_reference() -%% -%% Returns: -%% ok | {error, Reason} -disconnect(Ref) -> - mysqlerl_connection:stop(Ref). - -describe_table(Ref, Table) -> - describe_table(Ref, Table, infinity). - -%% Arguments: -%% Ref = connection_reference() -%% Table = string() -%% Timeout = time_out() -%% -%% Returns: -%% {ok, Description} | {error, Reason} -%% Description = [{col_name(), odbc_data_type()}] -describe_table(Ref, Table, Timeout) -> - Q = ["DESCRIBE ", Table], - {selected, _, Rows} = sql_query(Ref, Q, Timeout), - Description = [{Name, convert_type(T)} || {Name, T, _, _, _, _} <- Rows], - {ok, Description}. - -first(Ref) -> - first(Ref, infinity). - -%% Arguments: -%% Ref = connection_reference() -%% Timeout = time_out() -%% Returns: -%% {selected, ColNames, Rows} | {error, Reason} -%% Rows = rows() -first(Ref, Timeout) -> - conn_fwd(Ref, #sql_first{}, Timeout). - -last(Ref) -> - last(Ref, infinity). - -%% Arguments: -%% Ref = connection_reference() -%% Timeout = time_out() -%% Returns: -%% {selected, ColNames, Rows} | {error, Reason} -%% Rows = rows() -last(Ref, Timeout) -> - conn_fwd(Ref, #sql_last{}, Timeout). - -next(Ref) -> - next(Ref, infinity). - -%% Arguments: -%% Ref = connection_reference() -%% Timeout = time_out() -%% Returns: -%% {selected, ColNames, Rows} | {error, Reason} -%% Rows = rows() -next(Ref, Timeout) -> - conn_fwd(Ref, #sql_next{}, Timeout). - -prev(Ref) -> - prev(Ref, infinity). - -%% Arguments: -%% Ref = connection_reference() -%% Timeout = time_out() -%% Returns: -%% {selected, ColNames, Rows} | {error, Reason} -%% Rows = rows() -prev(Ref, Timeout) -> - conn_fwd(Ref, #sql_prev{}, Timeout). - -select_count(Ref, SQLQuery) -> - select_count(Ref, SQLQuery, infinity). - -%% Arguments: -%% Ref = connection_reference() -%% SQLQuery = string() -%% Timeout = time_out() -%% Returns: -%% {ok, NrRows} | {error, Reason} -%% NrRows = n_rows() -select_count(Ref, SQLQuery, Timeout) -> - conn_fwd(Ref, #sql_select_count{q = SQLQuery}, Timeout). - -select(Ref, Pos, N) -> - select(Ref, Pos, N, infinity). - -%% Arguments: -%% Ref = connection_reference() -%% Pos = integer() -%% Timeout = time_out() -%% Returns: -%% {selected, ColNames, Rows} | {error, Reason} -%% Rows = rows() -select(Ref, Pos, N, Timeout) -> - conn_fwd(Ref, #sql_select{pos = Pos, n = N}, Timeout). - -param_query(Ref, SQLQuery, Params) -> - param_query(Ref, SQLQuery, Params, infinity). - -%% Arguments: -%% Ref = connection_reference() -%% SQLQuery = string() -%% Params = [{odbc_data_type(), [value()]}] -%% Timeout = time_out() -%% Returns: -%% {selected, ColNames, Rows} | {error, Reason} -%% Rows = rows() -param_query(Ref, SQLQuery, Params, Timeout) -> - conn_fwd(Ref, #sql_param_query{q = SQLQuery, params = Params}, Timeout). - -sql_query(Ref, SQLQuery) -> - sql_query(Ref, SQLQuery, infinity). - -%% Arguments: -%% Ref = connection_reference() -%% SQLQuery = string() -%% Timeout = time_out() -%% Returns: -%% {selected, ColNames, Rows} | {error, Reason} -%% Rows = rows() -sql_query(Ref, SQLQuery, Timeout) -> - conn_fwd(Ref, #sql_query{q = SQLQuery}, Timeout). - -conn_fwd(Ref, Msg, Timeout) -> - gen_server:call(Ref, {Msg, Timeout}, infinity). - -%% Convert type needs some love! Cover all bases here instead of -%% fudging. -convert_type("timestamp") -> - sql_timestamp; -convert_type("int") -> - sql_integer; -convert_type("int(" ++ Rest) -> - Size = find_data_size(Rest), - {sql_numeric, list_to_integer(Size)}; -convert_type("decimal(" ++ Rest) -> - Size = find_data_size(Rest), - {sql_decimal, list_to_integer(Size)}; -convert_type("float(" ++ Rest) -> - Size = find_data_size(Rest), - {sql_float, list_to_float(Size)}; -convert_type("char(" ++ Rest) -> - Size = find_data_size(Rest), - {sql_char, list_to_integer(Size)}; -convert_type("varchar(" ++ Rest) -> - Size = find_data_size(Rest), - {sql_varchar, list_to_integer(Size)}. - -find_data_size(Str) -> - find_data_size(Str, []). - -find_data_size([$) | _Rest], Accum) -> - lists:reverse(Accum); -find_data_size([H | T], Accum) -> - find_data_size(T, [H | Accum]). diff --git a/src/mysqlerl.hrl b/src/mysqlerl.hrl deleted file mode 100644 index 99e674f..0000000 --- a/src/mysqlerl.hrl +++ /dev/null @@ -1,9 +0,0 @@ --record(sql_connect, {host, port, database, user, password, options}). --record(sql_query, {q}). --record(sql_param_query, {q, params}). --record(sql_select_count, {q}). --record(sql_select, {pos, n}). --record(sql_first, {}). --record(sql_last, {}). --record(sql_next, {}). --record(sql_prev, {}). diff --git a/src/mysqlerl_app.erl b/src/mysqlerl_app.erl deleted file mode 100644 index cdb8ade..0000000 --- a/src/mysqlerl_app.erl +++ /dev/null @@ -1,14 +0,0 @@ --module(mysqlerl_app). --author('bjc@kublai.com'). - --behavior(application). - -%% Behavior callbacks. --export([start/2, stop/1]). - -start(normal, []) -> - register(?MODULE, self()), - mysqlerl_sup:start_link(). - -stop([]) -> - ok. diff --git a/src/mysqlerl_connection.erl b/src/mysqlerl_connection.erl deleted file mode 100644 index 1a3d900..0000000 --- a/src/mysqlerl_connection.erl +++ /dev/null @@ -1,64 +0,0 @@ --module(mysqlerl_connection). --author('bjc@kublai.com'). - --include("mysqlerl.hrl"). --include("mysqlerl_port.hrl"). - --behavior(gen_server). - --export([start_link/7, stop/1]). - --export([init/1, terminate/2, code_change/3, - handle_call/3, handle_cast/2, handle_info/2]). - --record(state, {sup, owner}). - -start_link(Owner, Host, Port, Database, User, Password, Options) -> - gen_server:start_link(?MODULE, [Owner, Host, Port, Database, - User, Password, Options], []). - -stop(Pid) -> - gen_server:call(Pid, stop). - -init([Owner, Host, Port, Database, User, Password, Options]) -> - process_flag(trap_exit, true), - link(Owner), - {ok, Sup} = mysqlerl_port_sup:start_link(helper(), Host, Port, Database, - User, Password, Options), - {ok, #state{sup = Sup, owner = Owner}}. - -terminate(Reason, _State) -> - io:format("DEBUG: connection got terminate: ~p~n", [Reason]), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -handle_call(Request, From, #state{owner = Owner} = State) - when Owner /= element(1, From) -> - error_logger:warning_msg("Request from ~p (owner: ~p): ~p", - [element(1, From), Owner, Request]), - {reply, {error, process_not_owner_of_odbc_connection}, State}; -handle_call(stop, _From, State) -> - {stop, normal, State}; -handle_call(Request, _From, State) -> - {reply, gen_server:call(port_ref(State#state.sup), - #req{request = Request}, infinity), State}. - -handle_cast(_Request, State) -> - {noreply, State}. - -handle_info({'EXIT', Pid, _Reason}, #state{owner = Pid} = State) -> - io:format("DEBUG: owner ~p shut down.~n", [Pid]), - {stop, normal, State}. - -helper() -> - case code:priv_dir(mysqlerl) of - PrivDir when is_list(PrivDir) -> ok; - {error, bad_name} -> PrivDir = filename:join(["..", "priv"]) - end, - filename:join([PrivDir, "mysqlerl"]). - -port_ref(Sup) -> - [{mysqlerl_port, Ref, worker, _}] = supervisor:which_children(Sup), - Ref. diff --git a/src/mysqlerl_connection_sup.erl b/src/mysqlerl_connection_sup.erl deleted file mode 100644 index 6e1632a..0000000 --- a/src/mysqlerl_connection_sup.erl +++ /dev/null @@ -1,31 +0,0 @@ --module(mysqlerl_connection_sup). --author('bjc@kublai.com'). - --behavior(supervisor). - --export([random_child/0]). --export([start_link/0, connect/6]). - --export([init/1]). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -connect(Host, Port, Database, User, Password, Options) -> - supervisor:start_child(?MODULE, [self(), Host, Port, Database, - User, Password, Options]). - -random_child() -> - case get_pids() of - [] -> {error, no_connections}; - Pids -> lists:nth(erlang:phash(now(), length(Pids)), Pids) - end. - -init([]) -> - Connection = {undefined, {mysqlerl_connection, start_link, []}, - transient, 5, worker, [mysqlerl_connection]}, - {ok, {{simple_one_for_one, 10, 5}, - [Connection]}}. - -get_pids() -> - [Pid || {_Id, Pid, _Type, _Modules} <- supervisor:which_children(?MODULE)]. diff --git a/src/mysqlerl_port.erl b/src/mysqlerl_port.erl deleted file mode 100644 index 74fe177..0000000 --- a/src/mysqlerl_port.erl +++ /dev/null @@ -1,83 +0,0 @@ --module(mysqlerl_port). --author('bjc@kublai.com'). - --include("mysqlerl.hrl"). --include("mysqlerl_port.hrl"). - --behavior(gen_server). - --export([start_link/7]). --export([init/1, terminate/2, code_change/3, - handle_call/3, handle_cast/2, handle_info/2]). - --define(CONNECT_TIMEOUT, 30000). - --record(state, {ref}). --record(port_closed, {reason}). - -start_link(Cmd, Host, Port, Database, User, Password, Options) -> - gen_server:start_link(?MODULE, - [Cmd, Host, Port, Database, User, Password, Options], - []). - -init([Cmd, Host, Port, Database, User, Password, Options]) -> - process_flag(trap_exit, true), - Ref = open_port({spawn, Cmd}, [{packet, 4}, binary]), - {data, ok} = send_port_cmd(Ref, #sql_connect{host = Host, - port = Port, - database = Database, - user = User, - password = Password, - options = Options}, - ?CONNECT_TIMEOUT), - {ok, #state{ref = Ref}}. - -terminate(#port_closed{reason = Reason}, #state{ref = Ref}) -> - io:format("DEBUG: mysqlerl connection ~p shutting down (~p).~n", - [Ref, Reason]), - ok; -terminate(Reason, State) -> - catch port_close(State#state.ref), - io:format("DEBUG: mysqlerl_port got terminate: ~p~n", [Reason]), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -handle_call(#req{request = {Request, Timeout}}, From, - #state{ref = Ref} = State) -> - case send_port_cmd(Ref, Request, Timeout) of - {data, Res} -> - {reply, Res, State}; - {'EXIT', Ref, Reason} -> - gen_server:reply(From, {error, connection_closed}), - {stop, #port_closed{reason = Reason}, State}; - timeout -> - gen_server:reply(From, timeout), - {stop, timeout, State}; - Other -> - error_logger:warning_msg("Got unknown query response: ~p~n", - [Other]), - gen_server:reply(From, {error, connection_closed}), - {stop, {unknownreply, Other}, State} - end. - - -handle_cast(_Request, State) -> - {noreply, State}. - -handle_info({'EXIT', Ref, Reason}, #state{ref = Ref} = State) -> - io:format("DEBUG: Port ~p closed on ~p.~n", [Ref, State]), - {stop, #port_closed{reason = Reason}, State}. - - -send_port_cmd(Ref, Request, Timeout) -> - io:format("DEBUG: Sending request: ~p~n", [Request]), - port_command(Ref, term_to_binary(Request)), - receive - {Ref, {data, Res}} -> - {data, binary_to_term(Res)}; - Other -> Other - after Timeout -> - timeout - end. diff --git a/src/mysqlerl_port.hrl b/src/mysqlerl_port.hrl deleted file mode 100644 index a7a3300..0000000 --- a/src/mysqlerl_port.hrl +++ /dev/null @@ -1 +0,0 @@ --record(req, {request}). diff --git a/src/mysqlerl_port_sup.erl b/src/mysqlerl_port_sup.erl deleted file mode 100644 index 3053cdc..0000000 --- a/src/mysqlerl_port_sup.erl +++ /dev/null @@ -1,18 +0,0 @@ --module(mysqlerl_port_sup). --author('bjc@kublai.com'). - --behavior(supervisor). - --export([start_link/7]). --export([init/1]). - -start_link(Cmd, Host, Port, Database, User, Password, Options) -> - supervisor:start_link(?MODULE, [Cmd, Host, Port, Database, - User, Password, Options]). - -init([Cmd, Host, Port, Database, User, Password, Options]) -> - Ref = {mysqlerl_port, {mysqlerl_port, start_link, - [Cmd, Host, Port, Database, - User, Password, Options]}, - transient, 5, worker, [mysqlerl_port]}, - {ok, {{one_for_one, 10, 5}, [Ref]}}. diff --git a/src/mysqlerl_sup.erl b/src/mysqlerl_sup.erl deleted file mode 100644 index 1ebfbf2..0000000 --- a/src/mysqlerl_sup.erl +++ /dev/null @@ -1,16 +0,0 @@ --module(mysqlerl_sup). --author('bjc@kublai.com'). - --behavior(supervisor). - --export([start_link/0, init/1]). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -init([]) -> - ConnectionSup = {mysqlerl_connection_sup, - {mysqlerl_connection_sup, start_link, []}, - permanent, infinity, supervisor, - [mysqlerl_connection_sup]}, - {ok, {{one_for_one, 10, 5}, [ConnectionSup]}}. -- cgit v1.2.3