diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mysqlerl.c | 61 | ||||
-rw-r--r-- | src/mysqlerl_connection.erl | 9 |
2 files changed, 41 insertions, 29 deletions
diff --git a/src/mysqlerl.c b/src/mysqlerl.c index b4e6441..b5ad31c 100644 --- a/src/mysqlerl.c +++ b/src/mysqlerl.c @@ -19,16 +19,13 @@ const char *LOGPATH = "/tmp/mysqlerl.log"; static FILE *logfile = NULL; +const char *QUERY_MSG = "sql_query"; + typedef u_int32_t msglen_t; -typedef enum { - QUERY_MSG = 0, COMMIT_MSG = 1, ROLLBACK_MSG = 2, EXTENDED_MSG = 255 -} msgtype_t; struct msg { - msgtype_t type; - char *msg; - size_t msglen; - char *buf; + ETERM *cmd; + unsigned char *buf; }; typedef struct msg msg_t; @@ -62,7 +59,7 @@ logmsg(const char *format, ...) } int -restartable_read(char *buf, size_t buflen) +restartable_read(unsigned char *buf, size_t buflen) { ssize_t rc, readb; @@ -88,7 +85,7 @@ restartable_read(char *buf, size_t buflen) } int -restartable_write(const char *buf, size_t buflen) +restartable_write(const unsigned char *buf, size_t buflen) { ssize_t rc, wroteb; @@ -124,16 +121,16 @@ read_msg() } logmsg("DEBUG: reading message length."); - if (restartable_read((char *)&len, sizeof(len)) == -1) { + if (restartable_read((unsigned char *)&len, sizeof(len)) == -1) { logmsg("ERROR: couldn't read %d byte message prefix: %s.", sizeof(len), strerror(errno)); free(msg); exit(2); } - len = ntohl(len); - msg->buf = malloc(len + 1); + len = ntohl(len); + msg->buf = malloc(len); if (msg->buf == NULL) { logmsg("ERROR: Couldn't malloc %d bytes: %s.", len, strerror(errno)); @@ -141,7 +138,6 @@ read_msg() free(msg); exit(2); } - msg->buf[len] = '\0'; logmsg("DEBUG: reading message body (len: %d).", len); if (restartable_read(msg->buf, len) == -1) { @@ -153,9 +149,7 @@ read_msg() exit(2); } - msg->type = msg->buf[0]; - msg->msg = msg->buf + 1; - msg->msglen = len - 1; + msg->cmd = erl_decode(msg->buf); return msg; } @@ -166,11 +160,11 @@ write_cmd(const char *cmd, msglen_t len) msglen_t nlen; nlen = htonl(len + 3); - if (restartable_write((char *)&nlen, sizeof(nlen)) == -1) + if (restartable_write((unsigned char *)&nlen, sizeof(nlen)) == -1) return -1; - if (restartable_write(" - ", 3) == -1) + if (restartable_write((unsigned char *)" - ", 3) == -1) return -1; - if (restartable_write(cmd, len) == -1) + if (restartable_write((unsigned char *)cmd, len) == -1) return -1; return 0; @@ -179,16 +173,26 @@ write_cmd(const char *cmd, msglen_t len) void dispatch_db_cmd(MYSQL *dbh, msg_t *msg) { - switch (msg->type) { - case QUERY_MSG: - logmsg("DEBUG: got query msg: %s.", msg->msg); - write_cmd(msg->msg, msg->msglen); - break; - - default: - logmsg("WARNING: message type %d unknown.", msg->type); + ETERM *tag; + + tag = erl_element(1, msg->cmd); + if (strncmp((char *)ERL_ATOM_PTR(tag), QUERY_MSG, sizeof(QUERY_MSG)) == 0) { + ETERM *query; + char *q; + query = erl_element(2, msg->cmd); + q = erl_iolist_to_string(query); + erl_free_term(query); + + logmsg("DEBUG: got query msg: %s.", q); + write_cmd(q, strlen(q)); + erl_free(q); + } else { + logmsg("WARNING: message type %s unknown.", (char *)ERL_ATOM_PTR(tag)); + erl_free_term(tag); exit(3); } + + erl_free_term(tag); } void @@ -217,6 +221,8 @@ main(int argc, char *argv[]) user = argv[4]; passwd = argv[5]; + erl_init(NULL, 0); + mysql_init(&dbh); if (mysql_real_connect(&dbh, host, user, passwd, db_name, atoi(port), NULL, 0) == NULL) { @@ -229,6 +235,7 @@ main(int argc, char *argv[]) dispatch_db_cmd(&dbh, msg); /* XXX: Move this to function */ + erl_free_compound(msg->cmd); free(msg->buf); free(msg); } diff --git a/src/mysqlerl_connection.erl b/src/mysqlerl_connection.erl index 52a6300..fb0479c 100644 --- a/src/mysqlerl_connection.erl +++ b/src/mysqlerl_connection.erl @@ -9,6 +9,8 @@ handle_call/3, handle_cast/2, handle_info/2]). -define(QUERY_MSG, 0). +-define(COMMIT_MSG, 1). +-define(ROLLBACK_MSG, 2). -define(EXTENDED_MSG, 255). -record(state, {ref}). @@ -30,7 +32,7 @@ init([Host, Port, Database, User, Password, Options]) -> Cmd = lists:flatten(io_lib:format("~s ~s ~w ~s ~s ~s ~s", [helper(), Host, Port, Database, User, Password, Options])), - Ref = open_port({spawn, Cmd}, [{packet, 4}]), + Ref = open_port({spawn, Cmd}, [{packet, 4}, binary]), {ok, #state{ref = Ref}}. terminate(#port_closed{reason = Reason}, #state{ref = Ref}) -> @@ -69,7 +71,10 @@ helper() -> handle_query(Ref, Query) -> io:format("DEBUG: got query: ~p~n", [Query]), - port_command(Ref, [?QUERY_MSG | Query]), + make_request(Ref, {sql_query, Query}). + +make_request(Ref, Req) -> + port_command(Ref, term_to_binary(Req)), receive {Ref, {data, Res}} -> {ok, Res}; Other -> |