aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/mysqlerl.c61
-rw-r--r--src/mysqlerl_connection.erl9
2 files changed, 41 insertions, 29 deletions
diff --git a/src/mysqlerl.c b/src/mysqlerl.c
index b4e6441..b5ad31c 100644
--- a/src/mysqlerl.c
+++ b/src/mysqlerl.c
@@ -19,16 +19,13 @@
const char *LOGPATH = "/tmp/mysqlerl.log";
static FILE *logfile = NULL;
+const char *QUERY_MSG = "sql_query";
+
typedef u_int32_t msglen_t;
-typedef enum {
- QUERY_MSG = 0, COMMIT_MSG = 1, ROLLBACK_MSG = 2, EXTENDED_MSG = 255
-} msgtype_t;
struct msg {
- msgtype_t type;
- char *msg;
- size_t msglen;
- char *buf;
+ ETERM *cmd;
+ unsigned char *buf;
};
typedef struct msg msg_t;
@@ -62,7 +59,7 @@ logmsg(const char *format, ...)
}
int
-restartable_read(char *buf, size_t buflen)
+restartable_read(unsigned char *buf, size_t buflen)
{
ssize_t rc, readb;
@@ -88,7 +85,7 @@ restartable_read(char *buf, size_t buflen)
}
int
-restartable_write(const char *buf, size_t buflen)
+restartable_write(const unsigned char *buf, size_t buflen)
{
ssize_t rc, wroteb;
@@ -124,16 +121,16 @@ read_msg()
}
logmsg("DEBUG: reading message length.");
- if (restartable_read((char *)&len, sizeof(len)) == -1) {
+ if (restartable_read((unsigned char *)&len, sizeof(len)) == -1) {
logmsg("ERROR: couldn't read %d byte message prefix: %s.",
sizeof(len), strerror(errno));
free(msg);
exit(2);
}
- len = ntohl(len);
- msg->buf = malloc(len + 1);
+ len = ntohl(len);
+ msg->buf = malloc(len);
if (msg->buf == NULL) {
logmsg("ERROR: Couldn't malloc %d bytes: %s.", len,
strerror(errno));
@@ -141,7 +138,6 @@ read_msg()
free(msg);
exit(2);
}
- msg->buf[len] = '\0';
logmsg("DEBUG: reading message body (len: %d).", len);
if (restartable_read(msg->buf, len) == -1) {
@@ -153,9 +149,7 @@ read_msg()
exit(2);
}
- msg->type = msg->buf[0];
- msg->msg = msg->buf + 1;
- msg->msglen = len - 1;
+ msg->cmd = erl_decode(msg->buf);
return msg;
}
@@ -166,11 +160,11 @@ write_cmd(const char *cmd, msglen_t len)
msglen_t nlen;
nlen = htonl(len + 3);
- if (restartable_write((char *)&nlen, sizeof(nlen)) == -1)
+ if (restartable_write((unsigned char *)&nlen, sizeof(nlen)) == -1)
return -1;
- if (restartable_write(" - ", 3) == -1)
+ if (restartable_write((unsigned char *)" - ", 3) == -1)
return -1;
- if (restartable_write(cmd, len) == -1)
+ if (restartable_write((unsigned char *)cmd, len) == -1)
return -1;
return 0;
@@ -179,16 +173,26 @@ write_cmd(const char *cmd, msglen_t len)
void
dispatch_db_cmd(MYSQL *dbh, msg_t *msg)
{
- switch (msg->type) {
- case QUERY_MSG:
- logmsg("DEBUG: got query msg: %s.", msg->msg);
- write_cmd(msg->msg, msg->msglen);
- break;
-
- default:
- logmsg("WARNING: message type %d unknown.", msg->type);
+ ETERM *tag;
+
+ tag = erl_element(1, msg->cmd);
+ if (strncmp((char *)ERL_ATOM_PTR(tag), QUERY_MSG, sizeof(QUERY_MSG)) == 0) {
+ ETERM *query;
+ char *q;
+ query = erl_element(2, msg->cmd);
+ q = erl_iolist_to_string(query);
+ erl_free_term(query);
+
+ logmsg("DEBUG: got query msg: %s.", q);
+ write_cmd(q, strlen(q));
+ erl_free(q);
+ } else {
+ logmsg("WARNING: message type %s unknown.", (char *)ERL_ATOM_PTR(tag));
+ erl_free_term(tag);
exit(3);
}
+
+ erl_free_term(tag);
}
void
@@ -217,6 +221,8 @@ main(int argc, char *argv[])
user = argv[4];
passwd = argv[5];
+ erl_init(NULL, 0);
+
mysql_init(&dbh);
if (mysql_real_connect(&dbh, host, user, passwd,
db_name, atoi(port), NULL, 0) == NULL) {
@@ -229,6 +235,7 @@ main(int argc, char *argv[])
dispatch_db_cmd(&dbh, msg);
/* XXX: Move this to function */
+ erl_free_compound(msg->cmd);
free(msg->buf);
free(msg);
}
diff --git a/src/mysqlerl_connection.erl b/src/mysqlerl_connection.erl
index 52a6300..fb0479c 100644
--- a/src/mysqlerl_connection.erl
+++ b/src/mysqlerl_connection.erl
@@ -9,6 +9,8 @@
handle_call/3, handle_cast/2, handle_info/2]).
-define(QUERY_MSG, 0).
+-define(COMMIT_MSG, 1).
+-define(ROLLBACK_MSG, 2).
-define(EXTENDED_MSG, 255).
-record(state, {ref}).
@@ -30,7 +32,7 @@ init([Host, Port, Database, User, Password, Options]) ->
Cmd = lists:flatten(io_lib:format("~s ~s ~w ~s ~s ~s ~s",
[helper(), Host, Port, Database,
User, Password, Options])),
- Ref = open_port({spawn, Cmd}, [{packet, 4}]),
+ Ref = open_port({spawn, Cmd}, [{packet, 4}, binary]),
{ok, #state{ref = Ref}}.
terminate(#port_closed{reason = Reason}, #state{ref = Ref}) ->
@@ -69,7 +71,10 @@ helper() ->
handle_query(Ref, Query) ->
io:format("DEBUG: got query: ~p~n", [Query]),
- port_command(Ref, [?QUERY_MSG | Query]),
+ make_request(Ref, {sql_query, Query}).
+
+make_request(Ref, Req) ->
+ port_command(Ref, term_to_binary(Req)),
receive
{Ref, {data, Res}} -> {ok, Res};
Other ->