aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorBrian Cully <bjc@kublai.com>2012-02-06 20:15:48 -0500
committerBrian Cully <github.20.shmit@spamgourmet.com>2012-02-06 20:15:48 -0500
commit10b824963ff7b0b012e6b85c8b3f07904c473fd1 (patch)
tree344e00f788ea24c273f841636bdf92f354b40e2c /src
parent591169f86718117151bf9eb5960ba58e2a27a5bb (diff)
downloadmysqlerl-10b824963ff7b0b012e6b85c8b3f07904c473fd1.tar.gz
mysqlerl-10b824963ff7b0b012e6b85c8b3f07904c473fd1.zip
Remove stupid lib directory
Diffstat (limited to 'src')
-rw-r--r--src/Makefile45
-rw-r--r--src/TODO.org6
-rw-r--r--src/io.c59
-rw-r--r--src/io.h13
-rw-r--r--src/log.c60
-rw-r--r--src/log.h12
-rw-r--r--src/msg.c74
-rw-r--r--src/msg.h17
-rw-r--r--src/mysqlerl.app12
-rw-r--r--src/mysqlerl.c704
-rw-r--r--src/mysqlerl.erl251
-rw-r--r--src/mysqlerl.hrl9
-rw-r--r--src/mysqlerl_app.erl14
-rw-r--r--src/mysqlerl_connection.erl64
-rw-r--r--src/mysqlerl_connection_sup.erl31
-rw-r--r--src/mysqlerl_port.erl83
-rw-r--r--src/mysqlerl_port.hrl1
-rw-r--r--src/mysqlerl_port_sup.erl18
-rw-r--r--src/mysqlerl_sup.erl16
19 files changed, 1489 insertions, 0 deletions
diff --git a/src/Makefile b/src/Makefile
new file mode 100644
index 0000000..4782487
--- /dev/null
+++ b/src/Makefile
@@ -0,0 +1,45 @@
+ERLPREFIX = /usr/local
+ERLINTERFACE = erl_interface-3.7.6
+ERLINCS = -I$(ERLPREFIX)/lib/erlang/lib/$(ERLINTERFACE)/include
+ERLLIBS = -L$(ERLPREFIX)/lib/erlang/lib/$(ERLINTERFACE)/lib \
+ -lerl_interface -lei
+
+MYSQLPREFIX = /usr/local/mysql
+MYSQLINCS = -I$(MYSQLPREFIX)/include
+#MYSQLLIBS = -L$(MYSQLPREFIX)/lib -lmysqlclient
+MYSQLLIBS = $(MYSQLPREFIX)/lib/libmysqlclient.a
+
+CFLAGS = -g -Wall -Werror $(MYSQLINCS) $(ERLINCS)
+LDFLAGS = $(MYSQLLIBS) $(ERLLIBS)
+EFLAGS = -W +debug_info
+
+PRIVDIR = ../priv
+BEAMDIR = ../ebin
+
+BINS = $(PRIVDIR)/mysqlerl $(BEAMDIR)/mysqlerl.app
+MYSQLERLOBJS = io.o log.o msg.o mysqlerl.o
+BEAMS = mysqlerl.beam mysqlerl_app.beam mysqlerl_connection.beam \
+ mysqlerl_connection_sup.beam mysqlerl_port.beam \
+ mysqlerl_port_sup.beam mysqlerl_sup.beam
+
+all: $(PRIVDIR) $(BEAMDIR) $(BINS)
+
+clean:
+ rm -rf *.o *.beam
+ rm -rf $(BINS) $(MYSQLERLOBJS) $(BEAMS) $(BEAMDIR)/mysqlerl.app
+
+%.beam: %.erl
+ erlc $(EFLAGS) $<
+
+$(PRIVDIR)/mysqlerl: $(PRIVDIR) $(MYSQLERLOBJS)
+ $(CC) -o $@ $(MYSQLERLOBJS) $(LDFLAGS)
+
+$(BEAMDIR)/mysqlerl.app: $(BEAMDIR) $(BEAMS)
+ cp $(BEAMS) $(BEAMDIR)
+ cp mysqlerl.app $(BEAMDIR)
+
+$(PRIVDIR):
+ mkdir -p $(PRIVDIR)
+
+$(BEAMDIR):
+ mkdir -p $(BEAMDIR)
diff --git a/src/TODO.org b/src/TODO.org
new file mode 100644
index 0000000..636f47d
--- /dev/null
+++ b/src/TODO.org
@@ -0,0 +1,6 @@
+* Support ODBC options where applicable.
+* Test functions die (or don't) correctly when not connected to DB.
+* Connection pooling with multiple databases.
+I'm guessing that it doesn't work right now. Looks like
+mysqlerl_connection_sup:connect starts a simple-one-for-one port, and
+commands use :random_child to do dispatch. That's bad.
diff --git a/src/io.c b/src/io.c
new file mode 100644
index 0000000..dcc055f
--- /dev/null
+++ b/src/io.c
@@ -0,0 +1,59 @@
+/*
+ * Copyright (C) 2008, Brian Cully <bjc@kublai.com>
+ */
+
+#include "io.h"
+#include "log.h"
+
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+
+int
+restartable_read(unsigned char *buf, size_t buflen)
+{
+ ssize_t rc, readb;
+
+ rc = 0;
+ READLOOP:
+ while (rc < buflen) {
+ readb = read(STDIN_FILENO, buf + rc, buflen - rc);
+ if (readb == -1) {
+ if (errno == EAGAIN || errno == EINTR)
+ goto READLOOP;
+
+ return -1;
+ } else if (readb == 0) {
+ logmsg("ERROR: EOF trying to read additional %d bytes from "
+ "standard input", buflen - rc);
+ return -1;
+ }
+
+ rc += readb;
+ }
+
+ return rc;
+}
+
+int
+restartable_write(const unsigned char *buf, size_t buflen)
+{
+ ssize_t rc, wroteb;
+
+ rc = 0;
+ WRITELOOP:
+ while (rc < buflen) {
+ wroteb = write(STDOUT_FILENO, buf + rc, buflen - rc);
+ if (wroteb == -1) {
+ if (errno == EAGAIN || errno == EINTR)
+ goto WRITELOOP;
+
+ return -1;
+ }
+
+ rc += wroteb;
+ }
+
+ return rc;
+}
diff --git a/src/io.h b/src/io.h
new file mode 100644
index 0000000..bf4e086
--- /dev/null
+++ b/src/io.h
@@ -0,0 +1,13 @@
+/*
+ * Copyright (C) 2008, Brian Cully <bjc@kublai.com>
+ */
+
+#ifndef _IO_H
+#define _IO_H
+
+#include <sys/types.h>
+
+int restartable_read(unsigned char *buf, size_t buflen);
+int restartable_write(const unsigned char *buf, size_t buflen);
+
+#endif
diff --git a/src/log.c b/src/log.c
new file mode 100644
index 0000000..80aa755
--- /dev/null
+++ b/src/log.c
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 2008, Brian Cully <bjc@kublai.com>
+ */
+
+#include "log.h"
+
+#include <stdio.h>
+#include <stdarg.h>
+#include <time.h>
+#include <unistd.h>
+
+const char *LOGPATH = "/tmp/mysqlerl.log";
+static FILE *logfile = NULL;
+
+void
+openlog()
+{
+ logfile = fopen(LOGPATH, "a");
+}
+
+void
+closelog()
+{
+ fclose(logfile);
+}
+
+void
+logmsg(const char *format, ...)
+{
+ FILE *out = logfile;
+ char timebuf[32] = "\0";
+ struct tm now_tm;
+ time_t now_time;
+ va_list args;
+
+ va_start(args, format);
+
+ if (logfile == NULL)
+ out = stderr;
+
+ if (time(&now_time) == (time_t)-1) {
+ (void)fprintf(out, "LOGERROR - Failed to fetch time: ");
+ } else {
+ (void)localtime_r(&now_time, &now_tm);
+ if (strftime(timebuf, sizeof(timebuf), "%Y%m%d %H:%M:%S ", &now_tm) == 0) {
+ (void)fprintf(out, "LOGERROR - Failed to parse time (now: %d): ",
+ (int)now_time);
+ } else {
+ (void)fprintf(out, "%s", timebuf);
+ }
+ }
+ (void)fprintf(out, "[%d]: ", getpid());
+ (void)vfprintf(out, format, args);
+ (void)fprintf(out, "\n");
+
+ fflush(out);
+
+ va_end(args);
+}
+
diff --git a/src/log.h b/src/log.h
new file mode 100644
index 0000000..ae9a9bd
--- /dev/null
+++ b/src/log.h
@@ -0,0 +1,12 @@
+/*
+ * Copyright (C) 2008, Brian Cully <bjc@kublai.com>
+ */
+
+#ifndef _LOG_H
+#define _LOG_H
+
+void openlog();
+void closelog();
+void logmsg(const char *format, ...);
+
+#endif
diff --git a/src/msg.c b/src/msg.c
new file mode 100644
index 0000000..00a5a6a
--- /dev/null
+++ b/src/msg.c
@@ -0,0 +1,74 @@
+/*
+ * Copyright (C) 2008, Brian Cully <bjc@kublai.com>
+ */
+
+#include "msg.h"
+
+#include "io.h"
+#include "log.h"
+
+#include <string.h>
+
+ETERM *
+read_msg()
+{
+ ETERM *msg;
+ unsigned char *buf;
+ msglen_t len;
+
+ logmsg("DEBUG: reading message length.");
+ if (restartable_read((unsigned char *)&len, sizeof(len)) == -1) {
+ logmsg("ERROR: couldn't read %d byte message prefix: %s.",
+ sizeof(len), strerror(errno));
+
+ exit(2);
+ }
+
+ len = ntohl(len);
+ buf = (unsigned char *)malloc(len);
+ if (buf == NULL) {
+ logmsg("ERROR: Couldn't malloc %d bytes: %s.", len,
+ strerror(errno));
+
+ exit(2);
+ }
+
+ logmsg("DEBUG: reading message body (len: %d).", len);
+ if (restartable_read(buf, len) == -1) {
+ logmsg("ERROR: couldn't read %d byte message: %s.",
+ len, strerror(errno));
+
+ free(buf);
+ exit(2);
+ }
+
+ msg = erl_decode(buf);
+ free(buf);
+
+ return msg;
+}
+
+int
+write_msg(ETERM *msg)
+{
+ unsigned char *buf;
+ msglen_t nlen, buflen;
+
+ buflen = erl_term_len(msg);
+ buf = (unsigned char *)malloc(buflen);
+ erl_encode(msg, buf);
+ erl_free_term(msg);
+
+ nlen = htonl(buflen);
+ if (restartable_write((unsigned char *)&nlen, sizeof(nlen)) == -1) {
+ free(buf);
+ return -1;
+ }
+ if (restartable_write(buf, buflen) == -1) {
+ free(buf);
+ return -1;
+ }
+ free(buf);
+
+ return 0;
+}
diff --git a/src/msg.h b/src/msg.h
new file mode 100644
index 0000000..6db2aa8
--- /dev/null
+++ b/src/msg.h
@@ -0,0 +1,17 @@
+/*
+ * Copyright (C) 2008, Brian Cully <bjc@kublai.com>
+ */
+
+#ifndef _MSG_H
+#define _MSG_H
+
+#include <erl_interface.h>
+#include <ei.h>
+#include <stdlib.h>
+
+typedef u_int32_t msglen_t;
+
+ETERM *read_msg();
+int write_msg(ETERM *msg);
+
+#endif
diff --git a/src/mysqlerl.app b/src/mysqlerl.app
new file mode 100644
index 0000000..5687463
--- /dev/null
+++ b/src/mysqlerl.app
@@ -0,0 +1,12 @@
+%% -*- Erlang -*-
+%% Copyright (C) 2008, Brian Cully
+
+{application, mysqlerl,
+ [{description, "mysqlerl"},
+ {vsn, "0"},
+ {modules, [mysqlerl, mysqlerl_app, mysqlerl_connection_sup,
+ mysqlerl_connection, mysql_port_sup, mysql_port]},
+ {registered, [mysqlerl, mysqlerl_app, mysqlerl_connection_sup]},
+ {applications, [kernel, stdlib]},
+ {env, []},
+ {mod, {mysqlerl_app, []}}]}.
diff --git a/src/mysqlerl.c b/src/mysqlerl.c
new file mode 100644
index 0000000..020d9eb
--- /dev/null
+++ b/src/mysqlerl.c
@@ -0,0 +1,704 @@
+/*
+ * MySQL port driver.
+ *
+ * Copyright (C) 2008, Brian Cully <bjc@kublai.com>
+ */
+
+#include "io.h"
+#include "log.h"
+#include "msg.h"
+
+#include <errno.h>
+#include <mysql.h>
+#include <string.h>
+
+const int TRUTHY = 1;
+const int FALSY = 0;
+
+const char *CONNECT_MSG = "sql_connect";
+const char *QUERY_MSG = "sql_query";
+const char *PARAM_QUERY_MSG = "sql_param_query";
+const char *SELECT_COUNT_MSG = "sql_select_count";
+const char *SELECT_MSG = "sql_select";
+const char *FIRST_MSG = "sql_first";
+const char *LAST_MSG = "sql_last";
+const char *NEXT_MSG = "sql_next";
+const char *PREV_MSG = "sql_prev";
+
+const char *NULL_SQL = "null";
+const char *NUMERIC_SQL = "sql_numeric";
+const char *DECIMAL_SQL = "sql_decimal";
+const char *FLOAT_SQL = "sql_float";
+const char *CHAR_SQL = "sql_char";
+const char *VARCHAR_SQL = "sql_varchar";
+const char *TIMESTAMP_SQL = "sql_timestamp";
+const char *INTEGER_SQL = "sql_integer";
+
+MYSQL dbh;
+MYSQL_RES *results = NULL;
+my_ulonglong resultoffset = 0, numrows = 0;
+
+void
+set_mysql_results()
+{
+ if (results)
+ mysql_free_result(results);
+ results = mysql_store_result(&dbh);
+
+ resultoffset = 0;
+ numrows = results ? mysql_num_rows(results) : 0;
+}
+
+ETERM *
+make_cols(MYSQL_FIELD *fields, unsigned int num_fields)
+{
+ ETERM **cols, *rc;
+ unsigned int i;
+
+ cols = (ETERM **)malloc(num_fields * sizeof(ETERM *));
+ if (cols == NULL) {
+ logmsg("ERROR: Couldn't allocate %d bytes for columns: %s",
+ strerror(errno));
+ exit(3);
+ }
+
+ for (i = 0; i < num_fields; i++)
+ cols[i] = erl_mk_string(fields[i].name);
+
+ rc = erl_mk_list(cols, num_fields);
+
+ for (i = 0; i < num_fields; i++)
+ erl_free_term(cols[i]);
+ free(cols);
+
+ return rc;
+}
+
+ETERM *
+make_row(MYSQL_ROW row, unsigned long *lengths, unsigned int num_fields)
+{
+ ETERM **rowtup, *rc;
+ unsigned int i;
+
+ rowtup = (ETERM **)malloc(num_fields * sizeof(ETERM *));
+ if (rowtup == NULL) {
+ logmsg("ERROR: Couldn't allocate %d bytes for row: %s",
+ strerror(errno));
+ exit(3);
+ }
+
+ for (i = 0; i < num_fields; i++) {
+ if (row[i])
+ rowtup[i] = erl_mk_estring(row[i], lengths[i]);
+ else
+ rowtup[i] = erl_mk_atom("null");
+ }
+
+ rc = erl_mk_tuple(rowtup, num_fields);
+ if (rc == NULL) {
+ logmsg("ERROR: couldn't allocate %d-tuple", num_fields);
+ exit(3);
+ }
+
+ for (i = 0; i < num_fields; i++)
+ erl_free_term(rowtup[i]);
+ free(rowtup);
+
+ return rc;
+}
+
+ETERM *
+make_rows(unsigned int num_rows, unsigned int num_fields)
+{
+ ETERM **rows, *rc;
+ unsigned int i;
+
+ rows = (ETERM **)malloc(num_rows * sizeof(ETERM *));
+ if (rows == NULL) {
+ logmsg("ERROR: Couldn't allocate %d bytes for rows: %s",
+ strerror(errno));
+ exit(3);
+ }
+
+ for (i = 0; i < num_rows; i++) {
+ ETERM *rt;
+ unsigned long *lengths;
+ MYSQL_ROW row;
+
+ row = mysql_fetch_row(results);
+ resultoffset++;
+ lengths = mysql_fetch_lengths(results);
+
+ rt = make_row(row, lengths, num_fields);
+ rows[i] = erl_format("~w", rt);
+ erl_free_term(rt);
+ }
+
+ rc = erl_mk_list(rows, num_rows);
+
+ for (i = 0; i < num_rows; i++)
+ erl_free_term(rows[i]);
+ free(rows);
+
+ return rc;
+}
+
+ETERM *
+handle_mysql_result()
+{
+ MYSQL_FIELD *fields;
+ ETERM *ecols, *erows, *resp;
+ unsigned int num_fields;
+
+ num_fields = mysql_num_fields(results);
+ fields = mysql_fetch_fields(results);
+
+ ecols = make_cols(fields, num_fields);
+ erows = make_rows(numrows, num_fields);
+
+ resp = erl_format("{selected, ~w, ~w}", ecols, erows);
+
+ erl_free_term(ecols);
+ erl_free_term(erows);
+
+ return resp;
+}
+
+void
+handle_connect(ETERM *msg)
+{
+ ETERM *resp, *tmp;
+ char *host, *db_name, *user, *passwd;
+ int port;
+
+ tmp = erl_element(2, msg);
+ host = erl_iolist_to_string(tmp);
+ erl_free_term(tmp);
+
+ tmp = erl_element(3, msg);
+ port = ERL_INT_VALUE(tmp);
+ erl_free_term(tmp);
+
+ tmp = erl_element(4, msg);
+ db_name = erl_iolist_to_string(tmp);
+ erl_free_term(tmp);
+
+ tmp = erl_element(5, msg);
+ user = erl_iolist_to_string(tmp);
+ erl_free_term(tmp);
+
+ tmp = erl_element(6, msg);
+ passwd = erl_iolist_to_string(tmp);
+ erl_free_term(tmp);
+
+ /* TODO: handle options, passed in next. */
+
+ logmsg("INFO: Connecting to %s on %s:%d as %s", db_name, host, port, user);
+ if (mysql_real_connect(&dbh, host, user, passwd,
+ db_name, port, NULL, 0) == NULL) {
+ logmsg("ERROR: Failed to connect to database %s as %s: %s.",
+ db_name, user, mysql_error(&dbh));
+ exit(2);
+ }
+
+ resp = erl_format("ok");
+ write_msg(resp);
+ erl_free_term(resp);
+}
+
+void
+handle_query(ETERM *cmd)
+{
+ ETERM *query, *resp;
+ char *q;
+
+ query = erl_element(2, cmd);
+ q = erl_iolist_to_string(query);
+ erl_free_term(query);
+
+ logmsg("DEBUG: got query msg: %s.", q);
+ if (mysql_query(&dbh, q)) {
+ resp = erl_format("{error, {mysql_error, ~i, ~s}}",
+ mysql_errno(&dbh), mysql_error(&dbh));
+ } else {
+ set_mysql_results();
+ if (results) {
+ resp = handle_mysql_result();
+ set_mysql_results();
+ } else {
+ if (mysql_field_count(&dbh) == 0)
+ resp = erl_format("{updated, ~i}", mysql_affected_rows(&dbh));
+ else
+ resp = erl_format("{error, {mysql_error, ~i, ~s}}",
+ mysql_errno(&dbh), mysql_error(&dbh));
+ }
+ }
+ erl_free(q);
+
+ write_msg(resp);
+ erl_free_term(resp);
+}
+
+/*
+ * http://dev.mysql.com/doc/refman/5.1/en/mysql-stmt-execute.html
+ *
+ * 6 > odbc:param_query(Ref,
+ * "INSERT INTO EMPLOYEE (NR, FIRSTNAME, "
+ * "LASTNAME, GENDER) VALUES(?, ?, ?, ?)",
+ * [{sql_integer,[2,3,4,5,6,7,8]},
+ * {{sql_varchar, 20},
+ * ["John", "Monica", "Ross", "Rachel",
+ * "Piper", "Prue", "Louise"]},
+ * {{sql_varchar, 20},
+ * ["Doe","Geller","Geller", "Green",
+ * "Halliwell", "Halliwell", "Lane"]},
+ * {{sql_char, 1}, ["M","F","M","F","T","F","F"]}]).
+ * {updated, 7}
+ */
+void
+handle_param_query(ETERM *msg)
+{
+ ETERM *query, *params, *p, *tmp, *resp;
+ MYSQL_STMT *sth;
+ MYSQL_BIND *bind;
+ char *q;
+ int param_count, i;
+
+ query = erl_element(2, msg);
+ q = erl_iolist_to_string(query);
+ erl_free_term(query);
+
+ params = erl_element(3, msg);
+ erl_free_term(params);
+
+ logmsg("DEBUG: got param query msg: %s.", q);
+
+ sth = mysql_stmt_init(&dbh);
+ if (mysql_stmt_prepare(sth, q, strlen(q))) {
+ resp = erl_format("{error, {mysql_error, ~i, ~s}}",
+ mysql_errno(&dbh), mysql_error(&dbh));
+ } else {
+ param_count = mysql_stmt_param_count(sth);
+ if (param_count != erl_length(params)) {
+ resp = erl_format("{error, {mysql_error, -1, [expected_params, %d, got_params, %d]}}", param_count, erl_length(params));
+ } else {
+ bind = malloc(param_count * sizeof(MYSQL_BIND));
+ if (bind == NULL) {
+ logmsg("ERROR: Couldn't allocate %d bytes for bind params.",
+ param_count * sizeof(MYSQL_BIND));
+ exit(3);
+ }
+ memset(bind, 0, param_count * sizeof(MYSQL_BIND));
+
+ for (i = 0, tmp = params;
+ (p = erl_hd(tmp)) != NULL && i < 1000;
+ i++, tmp = erl_tl(tmp)) {
+ ETERM *type, *value;
+
+ type = erl_element(1, p);
+ value = erl_element(2, p);
+
+ if (ERL_IS_TUPLE(type)) {
+ ETERM *t_type, *t_size;
+ char *t;
+
+ t_size = erl_element(2, type);
+ bind[i].buffer_length = ERL_INT_VALUE(t_size);
+ erl_free_term(t_size);
+
+ t_type = erl_element(1, type);
+ t = (char *)ERL_ATOM_PTR(t_type);
+ bind[i].length = malloc(sizeof(unsigned long));
+ if (strncmp(t, NUMERIC_SQL, strlen(NUMERIC_SQL)) == 0) {
+ int val;
+
+ bind[i].buffer_type = MYSQL_TYPE_LONG;
+ *bind[i].length = bind[i].buffer_length * sizeof(int);
+ bind[i].buffer = malloc(*bind[i].length);
+ memset(bind[i].buffer, 0, *bind[i].length);
+
+ val = ERL_INT_VALUE(value);
+ memcpy(bind[i].buffer, &val, *bind[i].length);
+ } else if (strncmp(t, DECIMAL_SQL, strlen(DECIMAL_SQL)) == 0) {
+ char *val;
+
+ bind[i].buffer_type = MYSQL_TYPE_STRING;
+ *bind[i].length = bind[i].buffer_length * sizeof(char *);
+ bind[i].buffer = malloc(*bind[i].length);
+ memset(bind[i].buffer, 0, *bind[i].length);
+
+ val = erl_iolist_to_string(value);
+ if (val) {
+ memcpy(bind[i].buffer, val, *bind[i].length);
+ free(val);
+ }
+ } else if (strncmp(t, FLOAT_SQL, strlen(FLOAT_SQL)) == 0) {
+ float val;
+
+ bind[i].buffer_type = MYSQL_TYPE_FLOAT;
+ *bind[i].length = bind[i].buffer_length * sizeof(float);
+ bind[i].buffer = malloc(*bind[i].length);
+ memset(bind[i].buffer, 0, *bind[i].length);
+
+ val = ERL_FLOAT_VALUE(value);
+ memcpy(bind[i].buffer, &val, *bind[i].length);
+ } else if (strncmp(t, CHAR_SQL, strlen(CHAR_SQL)) == 0) {
+ char *val;
+
+ bind[i].buffer_type = MYSQL_TYPE_STRING;
+ *bind[i].length = bind[i].buffer_length * sizeof(char *);
+ bind[i].buffer = malloc(*bind[i].length);
+ memset(bind[i].buffer, 0, *bind[i].length);
+
+ val = erl_iolist_to_string(value);
+ if (val) {
+ memcpy(bind[i].buffer, val, *bind[i].length);
+ free(val);
+ }
+ } else if (strncmp(t, VARCHAR_SQL, strlen(VARCHAR_SQL)) == 0) {
+ char *val;
+
+ bind[i].buffer_type = MYSQL_TYPE_BLOB;
+ *bind[i].length = bind[i].buffer_length * sizeof(char *);
+ bind[i].buffer = malloc(*bind[i].length);
+ memset(bind[i].buffer, 0, *bind[i].length);
+
+ val = erl_iolist_to_string(value);
+ if (val) {
+ memcpy(bind[i].buffer, val, *bind[i].length);
+ free(val);
+ }
+ } else {
+ logmsg("ERROR: Unknown sized type: {%s, %d}", t,
+ bind[i].buffer_length);
+ exit(3);
+ }
+ erl_free_term(t_type);
+ } else {
+ char *t;
+
+ t = (char *)ERL_ATOM_PTR(type);
+ if (strncmp(t, TIMESTAMP_SQL, strlen(TIMESTAMP_SQL)) == 0) {
+ bind[i].buffer_type = MYSQL_TYPE_TIMESTAMP;
+ *bind[i].length = sizeof(MYSQL_TIME);
+ bind[i].buffer = malloc(*bind[i].length);
+ memset(bind[i].buffer, 0, *bind[i].length);
+
+ memcpy(bind[i].buffer, value, *bind[i].length);
+ } else if (strncmp(t, INTEGER_SQL, strlen(INTEGER_SQL)) == 0) {
+ int val;
+
+ bind[i].buffer_type = MYSQL_TYPE_LONG;
+ *bind[i].length = sizeof(int);
+ bind[i].buffer = malloc(*bind[i].length);
+ memset(bind[i].buffer, 0, *bind[i].length);
+
+ val = ERL_INT_VALUE(value);
+ memcpy(bind[i].buffer, &val, *bind[i].length);
+ } else {
+ logmsg("ERROR: Unknown type: %s", t);
+ exit(3);
+ }
+ }
+
+ bind[i].is_null = malloc(sizeof(int));
+ if (ERL_IS_ATOM(value)
+ && strncmp((char *)ERL_ATOM_PTR(value),
+ NULL_SQL, strlen(NULL_SQL)) == 0)
+ memcpy(bind[i].is_null, &TRUTHY, sizeof(int));
+ else
+ memcpy(bind[i].is_null, &FALSY, sizeof(int));
+
+ erl_free_term(value);
+ erl_free_term(type);
+ }
+ erl_free_term(params);
+
+ if (mysql_stmt_bind_param(sth, bind)) {
+ resp = erl_format("{error, {mysql_error, ~i, ~s}}",
+ mysql_errno(&dbh), mysql_error(&dbh));
+ } else {
+ if (mysql_stmt_execute(sth)) {
+ resp = erl_format("{error, {mysql_error, ~i, ~s}}",
+ mysql_errno(&dbh), mysql_error(&dbh));
+ } else {
+ set_mysql_results();
+ if (results) {
+ resp = handle_mysql_result();
+ set_mysql_results();
+ } else {
+ if (mysql_field_count(&dbh) == 0)
+ resp = erl_format("{updated, ~i}", mysql_affected_rows(&dbh));
+ else
+ resp = erl_format("{error, {mysql_error, ~i, ~s}}",
+ mysql_errno(&dbh), mysql_error(&dbh));
+ }
+ }
+ }
+
+ for (i = 0; i < param_count; i++) {
+ free(bind[i].buffer);
+ free(bind[i].is_null);
+ }
+ free(bind);
+ }
+ }
+ erl_free(q);
+
+ mysql_stmt_close(sth);
+
+ write_msg(resp);
+ erl_free_term(resp);
+}
+
+void
+handle_select_count(ETERM *msg)
+{
+ ETERM *query, *resp;
+ char *q;
+
+ query = erl_element(2, msg);
+ q = erl_iolist_to_string(query);
+ erl_free_term(query);
+
+ logmsg("DEBUG: got select count msg: %s.", q);
+ if (mysql_query(&dbh, q)) {
+ resp = erl_format("{error, {mysql_error, ~i, ~s}}",
+ mysql_errno(&dbh), mysql_error(&dbh));
+ } else {
+ set_mysql_results();
+ if (results) {
+ resp = erl_format("{ok, ~i}", numrows);
+ } else {
+ if (mysql_field_count(&dbh) == 0)
+ resp = erl_format("{ok, ~i}", mysql_affected_rows(&dbh));
+ else
+ resp = erl_format("{error, {mysql_error, ~i, ~s}}",
+ mysql_errno(&dbh), mysql_error(&dbh));
+ }
+ }
+ erl_free(q);
+
+ write_msg(resp);
+ erl_free_term(resp);
+}
+
+void
+handle_select(ETERM *msg)
+{
+ MYSQL_FIELD *fields;
+ ETERM *epos, *enum_items, *ecols, *erows, *resp;
+ my_ulonglong pos, num_items;
+ unsigned int num_fields;
+
+ epos = erl_element(2, msg);
+ enum_items = erl_element(3, msg);
+ pos = ERL_INT_UVALUE(epos);
+ num_items = ERL_INT_UVALUE(enum_items);
+ erl_free_term(enum_items);
+ erl_free_term(epos);
+
+ logmsg("DEBUG: got select pos: %d, n: %d.", erl_size(msg), pos, num_items);
+ if (results == NULL) {
+ logmsg("ERROR: select message w/o cursor.");
+ exit(2);
+ }
+
+ num_fields = mysql_num_fields(results);
+ fields = mysql_fetch_fields(results);
+ if (resultoffset > 0)
+ resultoffset = pos - 1;
+ if (num_items > numrows - resultoffset)
+ num_items = numrows - resultoffset;
+ mysql_data_seek(results, resultoffset);
+
+ ecols = make_cols(fields, num_fields);
+ erows = make_rows(num_items, num_fields);
+ resp = erl_format("{selected, ~w, ~w}", ecols, erows);
+ erl_free_term(erows);
+
+ erl_free_term(ecols);
+ write_msg(resp);
+ erl_free_term(resp);
+}
+
+void
+handle_first(ETERM *msg)
+{
+ MYSQL_FIELD *fields;
+ ETERM *ecols, *erows, *resp;
+ unsigned int num_fields;
+
+ logmsg("DEBUG: got first msg.");
+ if (results == NULL) {
+ logmsg("ERROR: got first message w/o cursor.");
+ exit(2);
+ }
+
+ num_fields = mysql_num_fields(results);
+ fields = mysql_fetch_fields(results);
+ resultoffset = 0;
+ mysql_data_seek(results, resultoffset);
+
+ ecols = make_cols(fields, num_fields);
+ erows = make_rows(1, num_fields);
+ resp = erl_format("{selected, ~w, ~w}", ecols, erows);
+ erl_free_term(erows);
+
+ erl_free_term(ecols);
+ write_msg(resp);
+ erl_free_term(resp);
+}
+
+void
+handle_last(ETERM *msg)
+{
+ MYSQL_FIELD *fields;
+ ETERM *ecols, *erows, *resp;
+ unsigned int num_fields;
+
+ logmsg("DEBUG: got last msg.");
+ if (results == NULL) {
+ logmsg("ERROR: got last message w/o cursor.");
+ exit(2);
+ }
+
+ num_fields = mysql_num_fields(results);
+ fields = mysql_fetch_fields(results);
+ resultoffset = numrows - 1;
+ mysql_data_seek(results, resultoffset);
+
+ ecols = make_cols(fields, num_fields);
+ erows = make_rows(1, num_fields);
+ resp = erl_format("{selected, ~w, ~w}", ecols, erows);
+ erl_free_term(erows);
+
+ erl_free_term(ecols);
+ write_msg(resp);
+ erl_free_term(resp);
+}
+
+void
+handle_next(ETERM *msg)
+{
+ MYSQL_FIELD *fields;
+ ETERM *ecols, *erows, *resp;
+ unsigned int num_fields;
+
+ logmsg("DEBUG: got next msg.");
+ if (results == NULL) {
+ logmsg("ERROR: got next message w/o cursor.");
+ exit(2);
+ }
+
+ num_fields = mysql_num_fields(results);
+ fields = mysql_fetch_fields(results);
+
+ ecols = make_cols(fields, num_fields);
+ logmsg("resultoffset: %d, num_rows: %d", resultoffset, numrows);
+ if (resultoffset == numrows) {
+ resp = erl_format("{selected, ~w, []}", ecols);
+ } else {
+ erows = make_rows(1, num_fields);
+ resp = erl_format("{selected, ~w, ~w}", ecols, erows);
+ erl_free_term(erows);
+ }
+
+ erl_free_term(ecols);
+ write_msg(resp);
+ erl_free_term(resp);
+}
+
+void
+handle_prev(ETERM *msg)
+{
+ MYSQL_FIELD *fields;
+ ETERM *ecols, *erows, *resp;
+ unsigned int num_fields;
+
+ logmsg("DEBUG: got prev msg.");
+ if (results == NULL) {
+ logmsg("ERROR: got prev message w/o cursor.");
+ exit(2);
+ }
+
+ num_fields = mysql_num_fields(results);
+ fields = mysql_fetch_fields(results);
+
+ ecols = make_cols(fields, num_fields);
+ logmsg("resultoffset: %d, num_rows: %d", resultoffset, numrows);
+ if (resultoffset == 0) {
+ resp = erl_format("{selected, ~w, []}", ecols);
+ } else {
+ resultoffset = resultoffset - 1;
+ mysql_data_seek(results, resultoffset);
+ erows = make_rows(1, num_fields);
+
+ /* Rewind to position at the point we returned. */
+ resultoffset = resultoffset - 1;
+ mysql_data_seek(results, resultoffset);
+ resp = erl_format("{selected, ~w, ~w}", ecols, erows);
+ erl_free_term(erows);
+ }
+
+ erl_free_term(ecols);
+ write_msg(resp);
+ erl_free_term(resp);
+}
+
+void
+dispatch_db_cmd(ETERM *msg)
+{
+ ETERM *tag;
+ char *tag_name;
+
+ tag = erl_element(1, msg);
+ tag_name = (char *)ERL_ATOM_PTR(tag);
+ if (strncmp(tag_name, CONNECT_MSG, strlen(CONNECT_MSG)) == 0)
+ handle_connect(msg);
+ else if (strncmp(tag_name, QUERY_MSG, strlen(QUERY_MSG)) == 0)
+ handle_query(msg);
+ else if (strncmp(tag_name, PARAM_QUERY_MSG, strlen(PARAM_QUERY_MSG)) == 0)
+ handle_param_query(msg);
+ else if (strncmp(tag_name, SELECT_COUNT_MSG, strlen(SELECT_COUNT_MSG)) == 0)
+ handle_select_count(msg);
+ else if (strncmp(tag_name, SELECT_MSG, strlen(SELECT_MSG)) == 0)
+ handle_select(msg);
+ else if (strncmp(tag_name, FIRST_MSG, strlen(FIRST_MSG)) == 0)
+ handle_first(msg);
+ else if (strncmp(tag_name, LAST_MSG, strlen(LAST_MSG)) == 0)
+ handle_last(msg);
+ else if (strncmp(tag_name, NEXT_MSG, strlen(NEXT_MSG)) == 0)
+ handle_next(msg);
+ else if (strncmp(tag_name, PREV_MSG, strlen(PREV_MSG)) == 0)
+ handle_prev(msg);
+ else {
+ logmsg("WARNING: message type %s unknown.", (char *)ERL_ATOM_PTR(tag));
+ erl_free_term(tag);
+ exit(3);
+ }
+
+ erl_free_term(tag);
+}
+
+int
+main(int argc, char *argv[])
+{
+ ETERM *msg;
+
+ openlog();
+ logmsg("INFO: starting up.");
+ erl_init(NULL, 0);
+
+ mysql_init(&dbh);
+ while ((msg = read_msg()) != NULL) {
+ dispatch_db_cmd(msg);
+ erl_free_term(msg);
+ }
+ mysql_close(&dbh);
+
+ logmsg("INFO: shutting down.");
+ closelog();
+
+ return 0;
+}
diff --git a/src/mysqlerl.erl b/src/mysqlerl.erl
new file mode 100644
index 0000000..946e9c5
--- /dev/null
+++ b/src/mysqlerl.erl
@@ -0,0 +1,251 @@
+%% Modeled from ODBC
+%% http://www.erlang.org/doc/apps/odbc/
+
+-module(mysqlerl).
+-author('bjc@kublai.com').
+
+-include("mysqlerl.hrl").
+
+-export([convert_type/1]).
+
+-export([test_start/0, test_msg/0, test_query/0, test_param_query/0]).
+
+-export([start/0, start/1, stop/0, commit/2, commit/3,
+ connect/6, disconnect/1, describe_table/2,
+ describe_table/3, first/1, first/2,
+ last/1, last/2, next/1, next/2, prev/1,
+ prev/2, select_count/2, select_count/3,
+ select/3, select/4, param_query/3, param_query/4,
+ sql_query/2, sql_query/3]).
+
+-define(CONFIG, "/Users/bjc/tmp/test-server.cfg").
+
+test_start() ->
+ {ok, [{Host, Port, DB, User, Pass, Options}]} = file:consult(?CONFIG),
+ mysqlerl:connect(Host, Port, DB, User, Pass, Options).
+
+test_msg() ->
+ commit(mysqlerl_connection_sup:random_child(),
+ rollback, 2000).
+
+test_query() ->
+ sql_query(mysqlerl_connection_sup:random_child(),
+ "SELECT COUNT(*) FROM user", 2000).
+
+test_param_query() ->
+ %% This should really be an update or something, since that's how
+ %% it'll be used.
+ param_query(mysqlerl_connection_sup:random_child(),
+ "SELECT * FROM user WHERE username=?",
+ [{{sql_varchar, 20}, "bjc"}]).
+
+start() ->
+ start(temporary).
+
+%% Arguments:
+%% Type = permanent | transient | temporary
+%%
+%% Returns:
+%% ok | {error, Reason}
+start(Type) ->
+ application:start(sasl),
+ application:start(mysqlerl, Type).
+
+stop() ->
+ application:stop(mysqlerl).
+
+commit(Ref, CommitMode) ->
+ commit(Ref, CommitMode, infinity).
+
+%% Arguments:
+%% Ref = connection_reference()
+%% Timeout = time_out()
+%% CommitMode = commit | rollback
+%% Reason = not_an_explicit_commit_connection |
+%% process_not_owner_of_odbc_connection |
+%% common_reason()
+%% ok | {error, Reason}
+commit(Ref, commit, Timeout) ->
+ case sql_query(Ref, "COMMIT", Timeout) of
+ {num_rows, _} -> ok;
+ Other -> Other
+ end;
+commit(Ref, rollback, Timeout) ->
+ case sql_query(Ref, "ROLLBACK", Timeout) of
+ {num_rows, _} -> ok;
+ Other -> Other
+ end.
+
+%% Arguments:
+%% Host = string()
+%% Port = integer()
+%% Database = string()
+%% User = string()
+%% Password = string()
+%% Options = list()
+%%
+%% Returns:
+%% {ok, Ref} | {error, Reason}
+%% Ref = connection_reference()
+connect(Host, Port, Database, User, Password, Options) ->
+ mysqlerl_connection_sup:connect(Host, Port, Database,
+ User, Password, Options).
+
+%% Arguments:
+%% Ref = connection_reference()
+%%
+%% Returns:
+%% ok | {error, Reason}
+disconnect(Ref) ->
+ mysqlerl_connection:stop(Ref).
+
+describe_table(Ref, Table) ->
+ describe_table(Ref, Table, infinity).
+
+%% Arguments:
+%% Ref = connection_reference()
+%% Table = string()
+%% Timeout = time_out()
+%%
+%% Returns:
+%% {ok, Description} | {error, Reason}
+%% Description = [{col_name(), odbc_data_type()}]
+describe_table(Ref, Table, Timeout) ->
+ Q = ["DESCRIBE ", Table],
+ {selected, _, Rows} = sql_query(Ref, Q, Timeout),
+ Description = [{Name, convert_type(T)} || {Name, T, _, _, _, _} <- Rows],
+ {ok, Description}.
+
+first(Ref) ->
+ first(Ref, infinity).
+
+%% Arguments:
+%% Ref = connection_reference()
+%% Timeout = time_out()
+%% Returns:
+%% {selected, ColNames, Rows} | {error, Reason}
+%% Rows = rows()
+first(Ref, Timeout) ->
+ conn_fwd(Ref, #sql_first{}, Timeout).
+
+last(Ref) ->
+ last(Ref, infinity).
+
+%% Arguments:
+%% Ref = connection_reference()
+%% Timeout = time_out()
+%% Returns:
+%% {selected, ColNames, Rows} | {error, Reason}
+%% Rows = rows()
+last(Ref, Timeout) ->
+ conn_fwd(Ref, #sql_last{}, Timeout).
+
+next(Ref) ->
+ next(Ref, infinity).
+
+%% Arguments:
+%% Ref = connection_reference()
+%% Timeout = time_out()
+%% Returns:
+%% {selected, ColNames, Rows} | {error, Reason}
+%% Rows = rows()
+next(Ref, Timeout) ->
+ conn_fwd(Ref, #sql_next{}, Timeout).
+
+prev(Ref) ->
+ prev(Ref, infinity).
+
+%% Arguments:
+%% Ref = connection_reference()
+%% Timeout = time_out()
+%% Returns:
+%% {selected, ColNames, Rows} | {error, Reason}
+%% Rows = rows()
+prev(Ref, Timeout) ->
+ conn_fwd(Ref, #sql_prev{}, Timeout).
+
+select_count(Ref, SQLQuery) ->
+ select_count(Ref, SQLQuery, infinity).
+
+%% Arguments:
+%% Ref = connection_reference()
+%% SQLQuery = string()
+%% Timeout = time_out()
+%% Returns:
+%% {ok, NrRows} | {error, Reason}
+%% NrRows = n_rows()
+select_count(Ref, SQLQuery, Timeout) ->
+ conn_fwd(Ref, #sql_select_count{q = SQLQuery}, Timeout).
+
+select(Ref, Pos, N) ->
+ select(Ref, Pos, N, infinity).
+
+%% Arguments:
+%% Ref = connection_reference()
+%% Pos = integer()
+%% Timeout = time_out()
+%% Returns:
+%% {selected, ColNames, Rows} | {error, Reason}
+%% Rows = rows()
+select(Ref, Pos, N, Timeout) ->
+ conn_fwd(Ref, #sql_select{pos = Pos, n = N}, Timeout).
+
+param_query(Ref, SQLQuery, Params) ->
+ param_query(Ref, SQLQuery, Params, infinity).
+
+%% Arguments:
+%% Ref = connection_reference()
+%% SQLQuery = string()
+%% Params = [{odbc_data_type(), [value()]}]
+%% Timeout = time_out()
+%% Returns:
+%% {selected, ColNames, Rows} | {error, Reason}
+%% Rows = rows()
+param_query(Ref, SQLQuery, Params, Timeout) ->
+ conn_fwd(Ref, #sql_param_query{q = SQLQuery, params = Params}, Timeout).
+
+sql_query(Ref, SQLQuery) ->
+ sql_query(Ref, SQLQuery, infinity).
+
+%% Arguments:
+%% Ref = connection_reference()
+%% SQLQuery = string()
+%% Timeout = time_out()
+%% Returns:
+%% {selected, ColNames, Rows} | {error, Reason}
+%% Rows = rows()
+sql_query(Ref, SQLQuery, Timeout) ->
+ conn_fwd(Ref, #sql_query{q = SQLQuery}, Timeout).
+
+conn_fwd(Ref, Msg, Timeout) ->
+ gen_server:call(Ref, {Msg, Timeout}, infinity).
+
+%% Convert type needs some love! Cover all bases here instead of
+%% fudging.
+convert_type("timestamp") ->
+ sql_timestamp;
+convert_type("int") ->
+ sql_integer;
+convert_type("int(" ++ Rest) ->
+ Size = find_data_size(Rest),
+ {sql_numeric, list_to_integer(Size)};
+convert_type("decimal(" ++ Rest) ->
+ Size = find_data_size(Rest),
+ {sql_decimal, list_to_integer(Size)};
+convert_type("float(" ++ Rest) ->
+ Size = find_data_size(Rest),
+ {sql_float, list_to_float(Size)};
+convert_type("char(" ++ Rest) ->
+ Size = find_data_size(Rest),
+ {sql_char, list_to_integer(Size)};
+convert_type("varchar(" ++ Rest) ->
+ Size = find_data_size(Rest),
+ {sql_varchar, list_to_integer(Size)}.
+
+find_data_size(Str) ->
+ find_data_size(Str, []).
+
+find_data_size([$) | _Rest], Accum) ->
+ lists:reverse(Accum);
+find_data_size([H | T], Accum) ->
+ find_data_size(T, [H | Accum]).
diff --git a/src/mysqlerl.hrl b/src/mysqlerl.hrl
new file mode 100644
index 0000000..99e674f
--- /dev/null
+++ b/src/mysqlerl.hrl
@@ -0,0 +1,9 @@
+-record(sql_connect, {host, port, database, user, password, options}).
+-record(sql_query, {q}).
+-record(sql_param_query, {q, params}).
+-record(sql_select_count, {q}).
+-record(sql_select, {pos, n}).
+-record(sql_first, {}).
+-record(sql_last, {}).
+-record(sql_next, {}).
+-record(sql_prev, {}).
diff --git a/src/mysqlerl_app.erl b/src/mysqlerl_app.erl
new file mode 100644
index 0000000..cdb8ade
--- /dev/null
+++ b/src/mysqlerl_app.erl
@@ -0,0 +1,14 @@
+-module(mysqlerl_app).
+-author('bjc@kublai.com').
+
+-behavior(application).
+
+%% Behavior callbacks.
+-export([start/2, stop/1]).
+
+start(normal, []) ->
+ register(?MODULE, self()),
+ mysqlerl_sup:start_link().
+
+stop([]) ->
+ ok.
diff --git a/src/mysqlerl_connection.erl b/src/mysqlerl_connection.erl
new file mode 100644
index 0000000..1a3d900
--- /dev/null
+++ b/src/mysqlerl_connection.erl
@@ -0,0 +1,64 @@
+-module(mysqlerl_connection).
+-author('bjc@kublai.com').
+
+-include("mysqlerl.hrl").
+-include("mysqlerl_port.hrl").
+
+-behavior(gen_server).
+
+-export([start_link/7, stop/1]).
+
+-export([init/1, terminate/2, code_change/3,
+ handle_call/3, handle_cast/2, handle_info/2]).
+
+-record(state, {sup, owner}).
+
+start_link(Owner, Host, Port, Database, User, Password, Options) ->
+ gen_server:start_link(?MODULE, [Owner, Host, Port, Database,
+ User, Password, Options], []).
+
+stop(Pid) ->
+ gen_server:call(Pid, stop).
+
+init([Owner, Host, Port, Database, User, Password, Options]) ->
+ process_flag(trap_exit, true),
+ link(Owner),
+ {ok, Sup} = mysqlerl_port_sup:start_link(helper(), Host, Port, Database,
+ User, Password, Options),
+ {ok, #state{sup = Sup, owner = Owner}}.
+
+terminate(Reason, _State) ->
+ io:format("DEBUG: connection got terminate: ~p~n", [Reason]),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_call(Request, From, #state{owner = Owner} = State)
+ when Owner /= element(1, From) ->
+ error_logger:warning_msg("Request from ~p (owner: ~p): ~p",
+ [element(1, From), Owner, Request]),
+ {reply, {error, process_not_owner_of_odbc_connection}, State};
+handle_call(stop, _From, State) ->
+ {stop, normal, State};
+handle_call(Request, _From, State) ->
+ {reply, gen_server:call(port_ref(State#state.sup),
+ #req{request = Request}, infinity), State}.
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info({'EXIT', Pid, _Reason}, #state{owner = Pid} = State) ->
+ io:format("DEBUG: owner ~p shut down.~n", [Pid]),
+ {stop, normal, State}.
+
+helper() ->
+ case code:priv_dir(mysqlerl) of
+ PrivDir when is_list(PrivDir) -> ok;
+ {error, bad_name} -> PrivDir = filename:join(["..", "priv"])
+ end,
+ filename:join([PrivDir, "mysqlerl"]).
+
+port_ref(Sup) ->
+ [{mysqlerl_port, Ref, worker, _}] = supervisor:which_children(Sup),
+ Ref.
diff --git a/src/mysqlerl_connection_sup.erl b/src/mysqlerl_connection_sup.erl
new file mode 100644
index 0000000..6e1632a
--- /dev/null
+++ b/src/mysqlerl_connection_sup.erl
@@ -0,0 +1,31 @@
+-module(mysqlerl_connection_sup).
+-author('bjc@kublai.com').
+
+-behavior(supervisor).
+
+-export([random_child/0]).
+-export([start_link/0, connect/6]).
+
+-export([init/1]).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+connect(Host, Port, Database, User, Password, Options) ->
+ supervisor:start_child(?MODULE, [self(), Host, Port, Database,
+ User, Password, Options]).
+
+random_child() ->
+ case get_pids() of
+ [] -> {error, no_connections};
+ Pids -> lists:nth(erlang:phash(now(), length(Pids)), Pids)
+ end.
+
+init([]) ->
+ Connection = {undefined, {mysqlerl_connection, start_link, []},
+ transient, 5, worker, [mysqlerl_connection]},
+ {ok, {{simple_one_for_one, 10, 5},
+ [Connection]}}.
+
+get_pids() ->
+ [Pid || {_Id, Pid, _Type, _Modules} <- supervisor:which_children(?MODULE)].
diff --git a/src/mysqlerl_port.erl b/src/mysqlerl_port.erl
new file mode 100644
index 0000000..74fe177
--- /dev/null
+++ b/src/mysqlerl_port.erl
@@ -0,0 +1,83 @@
+-module(mysqlerl_port).
+-author('bjc@kublai.com').
+
+-include("mysqlerl.hrl").
+-include("mysqlerl_port.hrl").
+
+-behavior(gen_server).
+
+-export([start_link/7]).
+-export([init/1, terminate/2, code_change/3,
+ handle_call/3, handle_cast/2, handle_info/2]).
+
+-define(CONNECT_TIMEOUT, 30000).
+
+-record(state, {ref}).
+-record(port_closed, {reason}).
+
+start_link(Cmd, Host, Port, Database, User, Password, Options) ->
+ gen_server:start_link(?MODULE,
+ [Cmd, Host, Port, Database, User, Password, Options],
+ []).
+
+init([Cmd, Host, Port, Database, User, Password, Options]) ->
+ process_flag(trap_exit, true),
+ Ref = open_port({spawn, Cmd}, [{packet, 4}, binary]),
+ {data, ok} = send_port_cmd(Ref, #sql_connect{host = Host,
+ port = Port,
+ database = Database,
+ user = User,
+ password = Password,
+ options = Options},
+ ?CONNECT_TIMEOUT),
+ {ok, #state{ref = Ref}}.
+
+terminate(#port_closed{reason = Reason}, #state{ref = Ref}) ->
+ io:format("DEBUG: mysqlerl connection ~p shutting down (~p).~n",
+ [Ref, Reason]),
+ ok;
+terminate(Reason, State) ->
+ catch port_close(State#state.ref),
+ io:format("DEBUG: mysqlerl_port got terminate: ~p~n", [Reason]),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_call(#req{request = {Request, Timeout}}, From,
+ #state{ref = Ref} = State) ->
+ case send_port_cmd(Ref, Request, Timeout) of
+ {data, Res} ->
+ {reply, Res, State};
+ {'EXIT', Ref, Reason} ->
+ gen_server:reply(From, {error, connection_closed}),
+ {stop, #port_closed{reason = Reason}, State};
+ timeout ->
+ gen_server:reply(From, timeout),
+ {stop, timeout, State};
+ Other ->
+ error_logger:warning_msg("Got unknown query response: ~p~n",
+ [Other]),
+ gen_server:reply(From, {error, connection_closed}),
+ {stop, {unknownreply, Other}, State}
+ end.
+
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info({'EXIT', Ref, Reason}, #state{ref = Ref} = State) ->
+ io:format("DEBUG: Port ~p closed on ~p.~n", [Ref, State]),
+ {stop, #port_closed{reason = Reason}, State}.
+
+
+send_port_cmd(Ref, Request, Timeout) ->
+ io:format("DEBUG: Sending request: ~p~n", [Request]),
+ port_command(Ref, term_to_binary(Request)),
+ receive
+ {Ref, {data, Res}} ->
+ {data, binary_to_term(Res)};
+ Other -> Other
+ after Timeout ->
+ timeout
+ end.
diff --git a/src/mysqlerl_port.hrl b/src/mysqlerl_port.hrl
new file mode 100644
index 0000000..a7a3300
--- /dev/null
+++ b/src/mysqlerl_port.hrl
@@ -0,0 +1 @@
+-record(req, {request}).
diff --git a/src/mysqlerl_port_sup.erl b/src/mysqlerl_port_sup.erl
new file mode 100644
index 0000000..3053cdc
--- /dev/null
+++ b/src/mysqlerl_port_sup.erl
@@ -0,0 +1,18 @@
+-module(mysqlerl_port_sup).
+-author('bjc@kublai.com').
+
+-behavior(supervisor).
+
+-export([start_link/7]).
+-export([init/1]).
+
+start_link(Cmd, Host, Port, Database, User, Password, Options) ->
+ supervisor:start_link(?MODULE, [Cmd, Host, Port, Database,
+ User, Password, Options]).
+
+init([Cmd, Host, Port, Database, User, Password, Options]) ->
+ Ref = {mysqlerl_port, {mysqlerl_port, start_link,
+ [Cmd, Host, Port, Database,
+ User, Password, Options]},
+ transient, 5, worker, [mysqlerl_port]},
+ {ok, {{one_for_one, 10, 5}, [Ref]}}.
diff --git a/src/mysqlerl_sup.erl b/src/mysqlerl_sup.erl
new file mode 100644
index 0000000..1ebfbf2
--- /dev/null
+++ b/src/mysqlerl_sup.erl
@@ -0,0 +1,16 @@
+-module(mysqlerl_sup).
+-author('bjc@kublai.com').
+
+-behavior(supervisor).
+
+-export([start_link/0, init/1]).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+ ConnectionSup = {mysqlerl_connection_sup,
+ {mysqlerl_connection_sup, start_link, []},
+ permanent, infinity, supervisor,
+ [mysqlerl_connection_sup]},
+ {ok, {{one_for_one, 10, 5}, [ConnectionSup]}}.