async library for manda (adaptive backend manager protocol)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1522 lines
37 KiB

  1. #include "libmanda.h"
  2. #include "libmanda-protocol.h"
  3. #include "idlist.h"
  4. #include <arpa/inet.h>
  5. #include <errno.h>
  6. #include <sys/socket.h>
  7. #include <fcntl.h>
  8. #include <unistd.h>
  9. #include <string.h>
  10. #include <assert.h>
  11. #define TIMEOUT_STEP (3)
  12. #define ENTER(x) do { ++((x)->refcount); } while(0)
  13. #define LEAVE(x, destroy) do { if (0 == --((x)->refcount)) destroy(x); } while(0)
  14. #define UNUSED(x) ((void)(x))
  15. #define CONST_STR_LEN(x) (x), (sizeof(x) - 1)
  16. #define GSTR_LEN(x) ((x) ? (x)->str : ""), ((x) ? (x)->len : 0)
  17. #if __GNUC__
  18. # define INLINE static inline
  19. #else
  20. # define INLINE static
  21. #endif
  22. typedef struct messageheader messageheader;
  23. typedef struct request request;
  24. typedef void (*con_message_cb)(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *mesg_payload);
  25. typedef void (*con_close_cb)(manda_connection *con);
  26. struct messageheader {
  27. guint16 command, size, reqid, respid;
  28. };
  29. struct request {
  30. guint16 command; /* zero means slot is free */
  31. double timeout, timeout_step;
  32. guint16 timeout_prev_id, timeout_next_id;
  33. gpointer data;
  34. };
  35. struct manda_connection {
  36. gpointer data; /* application data */
  37. gpointer priv_data; /* internal data */
  38. const manda_async_ctrl *ctrl;
  39. con_message_cb message_cb;
  40. con_close_cb close_cb;
  41. int fd;
  42. gboolean closed;
  43. manda_fd_watcher fd_watcher;
  44. manda_timeout req_timeout;
  45. manda_IDList *request_ids;
  46. GArray *requests;
  47. guint16 timeout_first_id, timeout_last_id;
  48. guint cur_header_pos; /* how many bytes of the header we have */
  49. guint8 cur_header_buf[8];
  50. messageheader cur_header;
  51. guint cur_payload_pos;
  52. GByteArray *cur_payload;
  53. guint cur_send_pos;
  54. GQueue send_queue;
  55. gint refcount;
  56. };
  57. static void fd_no_block(int fd) {
  58. #ifdef O_NONBLOCK
  59. fcntl(fd, F_SETFL, O_NONBLOCK | O_RDWR);
  60. #elif defined _WIN32
  61. int i = 1;
  62. ioctlsocket(fd, FIONBIO, &i);
  63. #else
  64. #error No way found to set non-blocking mode for fds.
  65. #endif
  66. }
  67. static void fd_init(int fd) {
  68. #ifdef FD_CLOEXEC
  69. /* close fd on exec (cgi) */
  70. fcntl(fd, F_SETFD, FD_CLOEXEC);
  71. #endif
  72. fd_no_block(fd);
  73. }
  74. /* -------------------------- */
  75. /* Message building / parsing */
  76. /* -------------------------- */
  77. INLINE guint8 _read_net_uint8(const guint8* buf) {
  78. return *buf;
  79. }
  80. INLINE guint16 _read_net_uint16(const guint8* buf) {
  81. guint16 i;
  82. memcpy(&i, buf, sizeof(i));
  83. return ntohs(i);
  84. }
  85. INLINE guint32 _read_net_uint32(const guint8* buf) {
  86. guint32 i;
  87. memcpy(&i, buf, sizeof(i));
  88. return ntohl(i);
  89. }
  90. INLINE void _write_net_uint8(guint8 *buf, guint8 val) {
  91. *buf = val;
  92. }
  93. INLINE void _write_net_uint16(guint8 *buf, guint16 val) {
  94. val = htons(val);
  95. memcpy(buf, &val, sizeof(val));
  96. }
  97. INLINE void _write_net_uint32(guint8 *buf, guint32 val) {
  98. val = htons(val);
  99. memcpy(buf, &val, sizeof(val));
  100. }
  101. INLINE gboolean read_net_uint8(GByteArray *buf, guint *pos, guint8 *dest) {
  102. if (buf->len < sizeof(*dest) || *pos >= buf->len - sizeof(*dest)) {
  103. /* end of buffer */
  104. *dest = 0;
  105. *pos = buf->len;
  106. return FALSE;
  107. }
  108. *dest = _read_net_uint8(buf->data + *pos);
  109. *pos += sizeof(*dest);
  110. return TRUE;
  111. }
  112. INLINE gboolean read_net_uint16(GByteArray *buf, guint *pos, guint16 *dest) {
  113. if (buf->len < sizeof(*dest) || *pos >= buf->len - sizeof(*dest)) {
  114. /* end of buffer */
  115. *dest = 0;
  116. *pos = buf->len;
  117. return FALSE;
  118. }
  119. *dest = _read_net_uint16(buf->data + *pos);
  120. *pos += sizeof(*dest);
  121. return TRUE;
  122. }
  123. INLINE gboolean read_net_uint32(GByteArray *buf, guint *pos, guint32 *dest) {
  124. if (buf->len < sizeof(*dest) || *pos >= buf->len - sizeof(*dest)) {
  125. /* end of buffer */
  126. *dest = 0;
  127. *pos = buf->len;
  128. return FALSE;
  129. }
  130. *dest = _read_net_uint32(buf->data + *pos);
  131. *pos += sizeof(*dest);
  132. return TRUE;
  133. }
  134. INLINE gboolean read_net_string(GByteArray *buf, guint *pos, GString *dest) {
  135. guint16 slen;
  136. g_string_truncate(dest, 0);
  137. if (!read_net_uint16(buf, pos, &slen)) return FALSE;
  138. if (buf->len < slen || *pos >= buf->len - slen) {
  139. *pos = buf->len;
  140. return FALSE;
  141. }
  142. g_string_set_size(dest, slen);
  143. memcpy(dest->str, buf->data + *pos, slen);
  144. *pos += slen;
  145. return TRUE;
  146. }
  147. INLINE void write_net_uint8(GByteArray *buf, guint8 val) {
  148. guint curlen = buf->len;
  149. g_byte_array_set_size(buf, curlen + sizeof(val));
  150. _write_net_uint8(buf->data + curlen, val);
  151. }
  152. INLINE void write_net_uint16(GByteArray *buf, guint16 val) {
  153. guint curlen = buf->len;
  154. g_byte_array_set_size(buf, curlen + sizeof(val));
  155. _write_net_uint16(buf->data + curlen, val);
  156. }
  157. INLINE void write_net_uint32(GByteArray *buf, guint32 val) {
  158. guint curlen = buf->len;
  159. g_byte_array_set_size(buf, curlen + sizeof(val));
  160. _write_net_uint32(buf->data + curlen, val);
  161. }
  162. INLINE void write_net_string(GByteArray *buf, const gchar *str, guint len) {
  163. guint curlen = buf->len;
  164. g_byte_array_set_size(buf, curlen + sizeof(guint16) + len);
  165. _write_net_uint16(buf->data + curlen, len);
  166. memcpy(buf->data + curlen + sizeof(guint16), str, len);
  167. }
  168. /* ---------------- */
  169. /* Basic Connection */
  170. /* ---------------- */
  171. static void _con_free(manda_connection *con);
  172. static void con_req_unlink(manda_connection *con, request *req) {
  173. request *prev = NULL, *next = NULL;
  174. /* unlink */
  175. if (req->timeout_next_id != 0) {
  176. next = &g_array_index(con->requests, request, req->timeout_next_id);
  177. next->timeout_prev_id = req->timeout_prev_id;
  178. } else {
  179. con->timeout_last_id = req->timeout_prev_id;
  180. }
  181. if (req->timeout_prev_id != 0) {
  182. prev = &g_array_index(con->requests, request, req->timeout_prev_id);
  183. prev->timeout_next_id = req->timeout_next_id;
  184. } else {
  185. con->timeout_first_id = req->timeout_next_id;
  186. }
  187. req->timeout_next_id = req->timeout_prev_id = 0;
  188. }
  189. static void con_req_push(manda_connection *con, request *req, guint16 reqid) {
  190. request *prev;
  191. if (con->timeout_last_id != 0) {
  192. prev = &g_array_index(con->requests, request, con->timeout_last_id);
  193. prev->timeout_next_id = reqid;
  194. } else {
  195. con->timeout_first_id = reqid;
  196. }
  197. req->timeout_prev_id = con->timeout_last_id;
  198. con->timeout_last_id = reqid;
  199. }
  200. static void con_close(manda_connection *con) {
  201. GByteArray *buf;
  202. if (con->closed) return;
  203. con->closed = TRUE;
  204. ENTER(con);
  205. if (NULL != con->fd_watcher.priv) {
  206. con->ctrl->destroy_fd_watcher(con->data, &con->fd_watcher);
  207. con->fd_watcher.priv = NULL;
  208. }
  209. if (NULL != con->req_timeout.priv) {
  210. con->ctrl->destroy_timeout(con->data, &con->req_timeout);
  211. con->req_timeout.priv = NULL;
  212. }
  213. if (-1 != con->fd) {
  214. while (-1 == close(con->fd) && errno == EINTR) ;
  215. con->fd = -1;
  216. }
  217. while (NULL != (buf = g_queue_pop_head(&con->send_queue))) {
  218. g_byte_array_free(buf, TRUE);
  219. }
  220. con->cur_send_pos = 0;
  221. if (NULL != con->close_cb) {
  222. con->close_cb(con);
  223. }
  224. /* "timeout" all requests */
  225. for ( ; con->timeout_first_id > 0; ) {
  226. request *req;
  227. guint16 req_command;
  228. gpointer req_data;
  229. req = &g_array_index(con->requests, request, con->timeout_first_id);
  230. req_command = req->command;
  231. req_data = req->data;
  232. con_req_unlink(con, req);
  233. /* reset request */
  234. req->command = 0;
  235. req->timeout = 0;
  236. req->data = NULL;
  237. if (NULL != con->message_cb) {
  238. con->message_cb(con, req_command, req_data, 0, 0, NULL);
  239. }
  240. }
  241. if (NULL != con->request_ids) {
  242. manda_idlist_free(con->request_ids);
  243. con->request_ids = NULL;
  244. }
  245. if (NULL != con->requests) {
  246. g_array_free(con->requests, TRUE);
  247. con->requests = NULL;
  248. }
  249. if (NULL != con->cur_payload) {
  250. g_byte_array_free(con->cur_payload, TRUE);
  251. con->cur_payload = NULL;
  252. }
  253. LEAVE(con, _con_free);
  254. }
  255. static void _con_free(manda_connection *con) {
  256. ENTER(con); /* don't want to enter _con_free again */
  257. con_close(con);
  258. g_slice_free(manda_connection, con);
  259. }
  260. static void con_free(manda_connection *con) {
  261. LEAVE(con, _con_free);
  262. }
  263. static void con_handle_response(manda_connection *con) {
  264. GByteArray *payload = con->cur_payload;
  265. guint16 orig_command = 0, mesg_command = con->cur_header.command, resp_to = con->cur_header.respid, req_id = con->cur_header.reqid;
  266. gpointer orig_data = NULL;
  267. con->cur_header_pos = 0;
  268. con->cur_payload_pos = 0;
  269. con->cur_payload = NULL;
  270. if (0 != resp_to) {
  271. request *req;
  272. if (!manda_idlist_is_used(con->request_ids, resp_to)) {
  273. /* protocol error */
  274. con_close(con);
  275. goto clean;
  276. }
  277. manda_idlist_put(con->request_ids, resp_to);
  278. /* try to find request data; if we can't find it the timeout already triggered */
  279. if (resp_to >= con->requests->len) goto clean;
  280. req = &g_array_index(con->requests, request, resp_to);
  281. if (0 == req->command) goto clean; /* "empty" request */
  282. orig_command = req->command;
  283. orig_data = req->data;
  284. /* reset request */
  285. con_req_unlink(con, req);
  286. req->command = 0;
  287. req->timeout = req->timeout_step = 0;
  288. req->data = NULL;
  289. }
  290. if (NULL != con->message_cb) {
  291. con->message_cb(con, orig_command, orig_data, mesg_command, req_id, payload);
  292. }
  293. clean:
  294. if (NULL != payload) g_byte_array_free(payload, TRUE);
  295. }
  296. static void con_fd_watcher_update(manda_connection *con) {
  297. if (con->fd != -1) {
  298. int events = (con->send_queue.length > 0) ? MANDA_FD_READ | MANDA_FD_WRITE : MANDA_FD_READ;
  299. if (events != con->fd_watcher.events) {
  300. con->fd_watcher.events = events;
  301. con->ctrl->update_fd_watcher(con->data, &con->fd_watcher);
  302. }
  303. }
  304. }
  305. static void con_fd_watcher_cb(manda_fd_watcher *watcher) {
  306. manda_connection *con = watcher->priv;
  307. guint i;
  308. ENTER(con);
  309. /* handle read */
  310. for ( i = 0 ; (!con->closed) && (i < 100) ; i++ ) {
  311. if (con->cur_header_pos < 8) {
  312. ssize_t r = read(con->fd, &con->cur_header_buf[con->cur_header_pos], 8 - con->cur_header_pos);
  313. if (r < 0) {
  314. switch (errno) {
  315. case EINTR:
  316. continue;
  317. case EAGAIN:
  318. #if EWOULDBLOCK != EAGAIN
  319. case EWOULDBLOCK:
  320. #endif
  321. break;
  322. case ECONNRESET: /* "eof" */
  323. con_close(con);
  324. goto out;
  325. default:
  326. con_close(con);
  327. goto out;
  328. }
  329. break;
  330. } else if (r == 0) { /* eof */
  331. con_close(con);
  332. goto out;
  333. } else {
  334. con->cur_header_pos += r;
  335. }
  336. }
  337. if (con->cur_header_pos < 8) break;
  338. /* parse header */
  339. con->cur_header.command = _read_net_uint16(con->cur_header_buf + 0);
  340. con->cur_header.size = _read_net_uint16(con->cur_header_buf + 2);
  341. con->cur_header.reqid = _read_net_uint16(con->cur_header_buf + 4);
  342. con->cur_header.respid = _read_net_uint16(con->cur_header_buf + 6);
  343. if (con->cur_header.size < 8) {
  344. /* error */
  345. con_close(con);
  346. } else if (con->cur_header.size > 8) {
  347. ssize_t r;
  348. if (!con->cur_payload) {
  349. con->cur_payload = g_byte_array_sized_new(con->cur_header.size - 8);
  350. g_byte_array_set_size(con->cur_payload, con->cur_header.size - 8);
  351. con->cur_payload_pos = 0;
  352. }
  353. r = read(con->fd, &con->cur_payload->data[con->cur_payload_pos], con->cur_payload->len - con->cur_payload_pos);
  354. if (r < 0) {
  355. switch (errno) {
  356. case EINTR:
  357. continue;
  358. case EAGAIN:
  359. #if EWOULDBLOCK != EAGAIN
  360. case EWOULDBLOCK:
  361. #endif
  362. break;
  363. case ECONNRESET: /* "eof" */
  364. con_close(con);
  365. goto out;
  366. default:
  367. con_close(con);
  368. goto out;
  369. }
  370. break;
  371. } else if (r == 0) { /* eof */
  372. con_close(con);
  373. goto out;
  374. } else {
  375. con->cur_payload_pos += r;
  376. }
  377. if (con->cur_payload_pos < con->cur_payload->len) break;
  378. con_handle_response(con);
  379. } else {
  380. con_handle_response(con);
  381. }
  382. }
  383. for ( i = 0 ; (!con->closed) && (i < 100) && (con->send_queue.length > 0) ; i++ ) {
  384. GByteArray *buf = g_queue_peek_head(&con->send_queue);
  385. ssize_t written;
  386. written = write(con->fd, &buf->data[con->cur_send_pos], buf->len - con->cur_send_pos);
  387. if (written < 0) {
  388. switch (errno) {
  389. case EINTR:
  390. continue;
  391. case EAGAIN:
  392. #if EWOULDBLOCK != EAGAIN
  393. case EWOULDBLOCK:
  394. #endif
  395. break;
  396. case ECONNRESET:
  397. case EPIPE:
  398. con_close(con);
  399. goto out;
  400. default: /* Fatal error/remote close, connection has to be closed */
  401. con_close(con);
  402. goto out;
  403. }
  404. break;
  405. } else {
  406. con->cur_send_pos += written;
  407. }
  408. if (con->cur_send_pos == buf->len) {
  409. con->cur_send_pos = 0;
  410. g_queue_pop_head(&con->send_queue);
  411. g_byte_array_free(buf, TRUE);
  412. } else {
  413. break;
  414. }
  415. }
  416. con_fd_watcher_update(con);
  417. out:
  418. LEAVE(con, _con_free);
  419. }
  420. static void con_timeout_cb(manda_timeout *timeout) {
  421. manda_connection *con = timeout->priv;
  422. double now = con->ctrl->get_time(con->data);
  423. for ( ; con->timeout_first_id != 0; ) {
  424. request *req;
  425. guint16 req_command;
  426. gpointer req_data;
  427. guint16 reqid = con->timeout_first_id;
  428. req = &g_array_index(con->requests, request, con->timeout_first_id);
  429. if (req->timeout_step > now) break;
  430. if (req->timeout > now) {
  431. req->timeout_step = now + TIMEOUT_STEP;
  432. if (req->timeout_step > req->timeout) req->timeout_step = req->timeout;
  433. /* requeue */
  434. con_req_unlink(con, req);
  435. con_req_push(con, req, reqid);
  436. continue;
  437. }
  438. req_command = req->command;
  439. req_data = req->data;
  440. /* reset request */
  441. con_req_unlink(con, req);
  442. req->command = 0;
  443. req->timeout = req->timeout_step = 0;
  444. req->data = NULL;
  445. if (NULL != con->message_cb) {
  446. con->message_cb(con, req_command, req_data, 0, 0, NULL);
  447. }
  448. }
  449. if (con->timeout_first_id != 0) {
  450. request *req;
  451. req = &g_array_index(con->requests, request, con->timeout_first_id);
  452. con->req_timeout.timeout = req->timeout_step;
  453. con->ctrl->start_timeout(con->data, &con->req_timeout);
  454. }
  455. }
  456. static manda_connection* con_new(gpointer srv, const manda_async_ctrl *ctrl, gpointer priv_data, con_message_cb message_cb, con_close_cb close_cb, int fd) {
  457. manda_connection *con = g_slice_new0(manda_connection);
  458. gint first_id;
  459. con->data = srv;
  460. con->ctrl = ctrl;
  461. con->priv_data = priv_data;
  462. con->message_cb = message_cb;
  463. con->close_cb = close_cb;
  464. con->fd = fd;
  465. con->fd_watcher.priv = con;
  466. con->fd_watcher.callback = con_fd_watcher_cb;
  467. con->fd_watcher.events = MANDA_FD_READ;
  468. con->fd_watcher.fd = fd;
  469. con->req_timeout.priv = con;
  470. con->req_timeout.callback = con_timeout_cb;
  471. con->req_timeout.timeout = 0;
  472. con->request_ids = manda_idlist_new(65536);
  473. /* id 0 is reserved so request it here */
  474. first_id = manda_idlist_get(con->request_ids);
  475. assert(0 == first_id);
  476. con->requests = g_array_new(FALSE, TRUE, sizeof(request));
  477. con->ctrl->new_fd_watcher(con->data, &con->fd_watcher);
  478. con->ctrl->update_fd_watcher(con->data, &con->fd_watcher);
  479. con->ctrl->new_timeout(con->data, &con->req_timeout);
  480. return con;
  481. }
  482. static void con_fix_header(GByteArray *payload, guint16 command, guint16 req_id, guint16 resp_id) {
  483. _write_net_uint16(payload->data + 0, command);
  484. _write_net_uint16(payload->data + 2, payload->len);
  485. _write_net_uint16(payload->data + 4, req_id);
  486. _write_net_uint16(payload->data + 6, resp_id);
  487. }
  488. /* payload needs to be prefixed with 8 dummy bytes for the header */
  489. static gboolean con_send_request(manda_connection *con, GByteArray *payload, guint16 command, guint16 resp_id, gpointer data, double wait_timeout) {
  490. double now;
  491. gint reqid;
  492. request *req;
  493. gboolean res = TRUE;
  494. ENTER(con);
  495. if (con->closed) {
  496. /* connection closed */
  497. goto error;
  498. }
  499. if (payload->len > 65535 || payload->len < 8 || 0 == command || wait_timeout <= 0) {
  500. /* payload too big / invalid parameters */
  501. goto error;
  502. }
  503. reqid = manda_idlist_get(con->request_ids);
  504. if (-1 == reqid) goto error; /* no free request id available */
  505. assert(reqid > 0 && reqid < 65536);
  506. if ((guint) reqid > con->requests->len) {
  507. g_array_set_size(con->requests, reqid+1);
  508. }
  509. req = &g_array_index(con->requests, request, reqid);
  510. now = con->ctrl->get_time(con->data);
  511. req->timeout = now + wait_timeout;;
  512. req->timeout_step = now + TIMEOUT_STEP;
  513. req->data = data;
  514. req->command = command;
  515. if (req->timeout_step > req->timeout) req->timeout_step = req->timeout;
  516. if (con->timeout_first_id == 0) {
  517. con->req_timeout.timeout = req->timeout_step;
  518. con->ctrl->start_timeout(con->data, &con->req_timeout);
  519. }
  520. con_fix_header(payload, command, reqid, resp_id);
  521. con_req_push(con, req, reqid);
  522. g_queue_push_tail(&con->send_queue, payload);
  523. con_fd_watcher_update(con);
  524. goto out;
  525. error:
  526. res = FALSE;
  527. g_byte_array_free(payload, TRUE);
  528. out:
  529. LEAVE(con, _con_free);
  530. return res;
  531. }
  532. /* payload needs to be prefixed with 8 dummy bytes for the header */
  533. static gboolean con_send_notify(manda_connection *con, GByteArray *payload, guint16 command, guint16 resp_id) {
  534. gboolean res = TRUE;
  535. ENTER(con);
  536. if (con->closed) {
  537. /* connection closed */
  538. goto error;
  539. }
  540. if (payload->len > 65535 || payload->len < 8 || 0 == command) {
  541. /* payload too big / invalid parameters */
  542. goto error;
  543. }
  544. con_fix_header(payload, command, 0, resp_id);
  545. g_queue_push_tail(&con->send_queue, payload);
  546. con_fd_watcher_update(con);
  547. goto out;
  548. error:
  549. g_byte_array_free(payload, TRUE);
  550. res = FALSE;
  551. out:
  552. LEAVE(con, _con_free);
  553. return res;
  554. }
  555. /* message construction helper */
  556. static GByteArray* new_payload() {
  557. GByteArray* buf = g_byte_array_new();
  558. g_byte_array_set_size(buf, 8);
  559. return buf;
  560. }
  561. static gboolean send_unkown_command(manda_connection *con, guint16 cmd, guint16 resp_id) {
  562. GByteArray *payload;
  563. if (NULL == con) return FALSE;
  564. payload = new_payload();
  565. write_net_uint16(payload, cmd);
  566. return con_send_notify(con, payload, MANDA_CMD_UNKNOWN_COMMAND, resp_id);
  567. }
  568. static gboolean send_server_bind_backend(manda_connection *con, guint16 resp_id, guint32 backend_id, const gchar *str, guint len) {
  569. GByteArray *payload;
  570. if (NULL == con) return FALSE;
  571. payload = new_payload();
  572. write_net_uint32(payload, backend_id);
  573. write_net_string(payload, str, len);
  574. return con_send_notify(con, payload, MANDA_CMD_BIND_BACKEND, resp_id);
  575. }
  576. static gboolean send_server_bind_backend_failed(manda_connection *con, guint16 resp_id, const gchar *str, guint len) {
  577. return send_server_bind_backend(con, resp_id, 0, str, len);
  578. }
  579. static gboolean send_server_release_backend(manda_connection *con, guint32 backend_id, const gchar *str, guint len) {
  580. GByteArray *payload;
  581. if (NULL == con) return FALSE;
  582. payload = new_payload();
  583. write_net_uint32(payload, backend_id);
  584. write_net_string(payload, str, len);
  585. return con_send_notify(con, payload, MANDA_CMD_RELEASE_BACKEND, 0);
  586. }
  587. static gboolean send_client_bind_backend(manda_connection *con, const gchar *name, guint len, gpointer data, double wait_timeout) {
  588. GByteArray *payload;
  589. if (NULL == con) return FALSE;
  590. payload = new_payload();
  591. write_net_string(payload, name, len);
  592. return con_send_request(con, payload, MANDA_CMD_BIND_BACKEND, 0, data, wait_timeout);
  593. }
  594. static gboolean send_client_release_backend(manda_connection *con, guint32 backend_id) {
  595. GByteArray *payload;
  596. if (NULL == con) return FALSE;
  597. payload = new_payload();
  598. write_net_uint32(payload, backend_id);
  599. return con_send_notify(con, payload, MANDA_CMD_RELEASE_BACKEND, 0);
  600. }
  601. static gboolean send_client_update_backend(manda_connection *con, guint32 backend_id, guint32 load, guint32 workers) {
  602. GByteArray *payload;
  603. if (NULL == con) return FALSE;
  604. payload = new_payload();
  605. write_net_uint32(payload, backend_id);
  606. write_net_uint32(payload, load);
  607. write_net_uint32(payload, workers);
  608. return con_send_notify(con, payload, MANDA_CMD_UPDATE_BACKEND, 0);
  609. }
  610. /* server connection */
  611. typedef struct server_socket server_socket;
  612. struct server_socket {
  613. manda_server *srv;
  614. int fd;
  615. gpointer data;
  616. manda_fd_watcher fd_watcher;
  617. };
  618. static void scon_con_closed_cb(manda_connection *con);
  619. static void scon_con_message_cb(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *mesg_payload);
  620. static void scon_release(manda_server_connection *con);
  621. static void sbackend_free(manda_server_backend *backend);
  622. static void manda_server_release_backend(manda_server_connection *con, guint32 id);
  623. static void server_listening_cb(manda_fd_watcher *fd_watcher) {
  624. server_socket *sock = fd_watcher->priv;
  625. manda_server *s = sock->srv;
  626. int fd;
  627. int i;
  628. manda_server_connection *con;
  629. fd = accept(sock->fd, NULL, NULL);
  630. if (-1 == fd) return;
  631. fd_init(fd);
  632. con = g_slice_new0(manda_server_connection);
  633. con->srv = s;
  634. con->refcount = 1;
  635. con->backends = g_ptr_array_new();
  636. con->idlist = manda_idlist_new(65536);
  637. i = manda_idlist_get(con->idlist); /* 0 is an invalid backend id, reserve it */
  638. assert(i == 0);
  639. con->con = con_new(s->data, s->ctrl, con, scon_con_message_cb, scon_con_closed_cb, fd);
  640. }
  641. static void scon_free(manda_server_connection *con) {
  642. ENTER(con); /* don't want to enter scon_free again */
  643. manda_server_con_close(con);
  644. g_slice_free(manda_server_connection, con);
  645. }
  646. static void scon_release(manda_server_connection *con) {
  647. LEAVE(con, scon_free);
  648. }
  649. void manda_server_con_close(manda_server_connection *con) {
  650. guint i;
  651. manda_connection *bcon = con->con;
  652. if (NULL == bcon) return;
  653. ENTER(con);
  654. g_ptr_array_remove_fast(con->srv->connections, con);
  655. scon_release(con);
  656. con->con = NULL;
  657. bcon->close_cb = NULL;
  658. con_close(bcon);
  659. con_free(bcon);
  660. if (NULL != con->srv->callbacks->closed_connection) {
  661. con->srv->callbacks->closed_connection(con->srv->data, con);
  662. }
  663. for (i = 0; i < con->backends->len; i++) {
  664. manda_server_backend_use *use = g_ptr_array_index(con->backends, i);
  665. if (NULL == use) continue;
  666. manda_server_release_backend(con, i);
  667. }
  668. g_ptr_array_free(con->backends, TRUE);
  669. con->backends = NULL;
  670. manda_idlist_free(con->idlist);
  671. con->idlist = NULL;
  672. LEAVE(con, scon_free);
  673. }
  674. static void scon_con_closed_cb(manda_connection *con) {
  675. manda_server_connection *scon = con->priv_data;
  676. manda_server_con_close(scon);
  677. }
  678. void manda_server_add_socket(manda_server *s, int fd, gpointer data) {
  679. server_socket *sock = g_slice_new0(server_socket);
  680. sock->srv = s;
  681. sock->fd = fd;
  682. sock->data = data;
  683. sock->fd_watcher.callback = server_listening_cb;
  684. sock->fd_watcher.events = MANDA_FD_READ;
  685. sock->fd_watcher.fd = fd;
  686. sock->fd_watcher.priv = sock;
  687. s->ctrl->new_fd_watcher(s->data, &sock->fd_watcher);
  688. s->ctrl->update_fd_watcher(s->data, &sock->fd_watcher);
  689. g_ptr_array_add(s->sockets, sock);
  690. }
  691. static void manda_server_free_socket(server_socket *sock) {
  692. manda_server *s = sock->srv;
  693. s->ctrl->destroy_fd_watcher(s->data, &sock->fd_watcher);
  694. while (-1 == close(sock->fd) && errno == EINTR) ;
  695. sock->fd = -1;
  696. g_slice_free(server_socket, sock);
  697. }
  698. manda_server* manda_server_new(gpointer srv, const manda_async_ctrl *ctrl, const manda_server_callbacks *callbacks) {
  699. manda_server *s = g_slice_new(manda_server);
  700. s->refcount = 1;
  701. s->data = srv;
  702. s->connections = g_ptr_array_new();
  703. s->ctrl = ctrl;
  704. s->callbacks = callbacks;
  705. s->sockets = g_ptr_array_new();
  706. return s;
  707. }
  708. void manda_server_close(manda_server *s) {
  709. guint i;
  710. for (i = 0; i < s->sockets->len; i++) {
  711. server_socket *sock = g_ptr_array_index(s->sockets, i);
  712. manda_server_free_socket(sock);
  713. }
  714. g_ptr_array_set_size(s->sockets, 0);
  715. while (s->connections->len > 0) {
  716. manda_server_connection *con = g_ptr_array_index(s->connections, 0);
  717. manda_server_con_close(con);
  718. }
  719. }
  720. static void manda_server_free(manda_server *s) {
  721. manda_server_close(s);
  722. g_ptr_array_free(s->sockets, TRUE);
  723. g_ptr_array_free(s->connections, TRUE);
  724. g_slice_free(manda_server, s);
  725. }
  726. void manda_server_acquire(manda_server *s) {
  727. ENTER(s);
  728. }
  729. void manda_server_release(manda_server *s) {
  730. if (NULL == s) return;
  731. LEAVE(s, manda_server_free);
  732. }
  733. static void manda_server_release_backend(manda_server_connection *con, guint32 id) {
  734. manda_server_backend_use *use;
  735. manda_server_backend *b;
  736. if (NULL == con->backends) return;
  737. if (id >= con->backends->len) return;
  738. if (manda_idlist_is_used(con->idlist, id)) {
  739. /* notify client */
  740. send_server_release_backend(con->con, id, CONST_STR_LEN("lost backend"));
  741. }
  742. use = g_ptr_array_index(con->backends, id);
  743. if (NULL == use) return;
  744. g_ptr_array_index(con->backends, id) = NULL;
  745. b = use->backend;
  746. if (use->ndx != b->usage->len - 1) {
  747. manda_server_backend_use *u = g_ptr_array_index(b->usage, b->usage->len - 1);
  748. g_ptr_array_index(b->usage, use->ndx) = u;
  749. u->ndx = use->ndx;
  750. }
  751. b->sum_last_load -= use->last_load;
  752. g_ptr_array_set_size(b->usage, b->usage->len-1);
  753. LEAVE(b, sbackend_free);
  754. if (NULL != con->srv && NULL != con->srv->callbacks->release_backend) {
  755. con->srv->callbacks->release_backend(con->srv->data, b, use->ndx, use);
  756. }
  757. g_slice_free(manda_server_backend_use, use);
  758. }
  759. manda_server_backend* manda_server_backend_new(gpointer data, GString *addr) {
  760. manda_server_backend *backend = g_slice_new0(manda_server_backend);
  761. backend->data = data;
  762. backend->usage = g_ptr_array_new();
  763. backend->sum_last_load = 0;
  764. backend->addr = addr;
  765. backend->refcount = 1;
  766. return backend;
  767. }
  768. static void sbackend_free(manda_server_backend *backend) {
  769. ENTER(backend); /* don't want to enter sbackend_free again */
  770. manda_server_drop_backend(backend);
  771. g_ptr_array_free(backend->usage, TRUE);
  772. backend->usage = NULL;
  773. g_string_free(backend->addr, TRUE);
  774. backend->addr = NULL;
  775. g_slice_free(manda_server_backend, backend);
  776. }
  777. void manda_server_backend_release(manda_server_backend *backend) {
  778. if (NULL == backend) return;
  779. LEAVE(backend, sbackend_free);
  780. }
  781. void manda_server_backend_acquire(manda_server_backend *backend) {
  782. ENTER(backend);
  783. }
  784. gboolean manda_server_return_backend(manda_server_connection *con, gint16 reqid, manda_server_backend *backend) {
  785. manda_server_backend_use *use;
  786. gint i;
  787. if (NULL == con->con || con->con->closed) return FALSE;
  788. i = manda_idlist_get(con->idlist);
  789. if (-1 == i) {
  790. send_server_bind_backend_failed(con->con, reqid, CONST_STR_LEN("no free backend id available"));
  791. return FALSE;
  792. }
  793. assert(i > 0);
  794. use = g_slice_new0(manda_server_backend_use);
  795. use->con = con;
  796. use->backend_id = i;
  797. use->last_load = use->last_workers = 0;
  798. use->backend = backend;
  799. use->ndx = backend->usage->len;
  800. g_ptr_array_add(backend->usage, use);
  801. if (use->backend_id >= con->backends->len) g_ptr_array_set_size(con->backends, use->backend_id + 1);
  802. g_ptr_array_index(con->backends, use->backend_id) = use;
  803. ENTER(backend);
  804. return send_server_bind_backend(con->con, reqid, use->backend_id, GSTR_LEN(backend->addr));
  805. }
  806. void manda_server_return_backend_fail(manda_server_connection *con, gint16 reqid, GString *errmsg) {
  807. send_server_bind_backend_failed(con->con, reqid, GSTR_LEN(errmsg));
  808. }
  809. void manda_server_drop_backend(manda_server_backend *backend) {
  810. ENTER(backend);
  811. while (backend->usage->len > 0) {
  812. manda_server_backend_use *u = g_ptr_array_index(backend->usage, 0);
  813. manda_server_release_backend(u->con, u->backend_id);
  814. }
  815. LEAVE(backend, sbackend_free);
  816. }
  817. static void sbackend_update(manda_server_connection *con, guint32 backend_id, guint32 load, guint32 workers) {
  818. manda_server_backend_use *use;
  819. manda_server_backend *b;
  820. if (NULL == con->backends) return;
  821. if (backend_id >= con->backends->len) return;
  822. use = g_ptr_array_index(con->backends, backend_id);
  823. if (NULL == use) return;
  824. b = use->backend;
  825. b->sum_last_load += load - use->last_load;
  826. use->last_load = load;
  827. use->last_workers = workers;
  828. if (NULL != con->srv->callbacks->update_backend) {
  829. con->srv->callbacks->update_backend(con->srv->data, b, use->ndx);
  830. }
  831. }
  832. static void scon_con_message_cb(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *payload) {
  833. manda_server_connection *scon = con->priv_data;
  834. manda_server *srv;
  835. guint pos = 0;
  836. UNUSED(orig_command); UNUSED(orig_data);
  837. if (con->closed || NULL == scon || NULL == scon->srv) return;
  838. srv = scon->srv;
  839. ENTER(con);
  840. ENTER(scon);
  841. ENTER(srv);
  842. switch (mesg_command) {
  843. case MANDA_CMD_BIND_BACKEND: {
  844. GString *name = NULL;
  845. if (NULL == payload) goto error;
  846. name = g_string_sized_new(payload->len - 2);
  847. if (!read_net_string(payload, &pos, name)) { g_string_free(name, TRUE); goto error; }
  848. if (pos != payload->len) { g_string_free(name, TRUE); goto error; }
  849. srv->callbacks->bind_backend(srv->data, scon, name, mesg_req_id);
  850. g_string_free(name, TRUE);
  851. }
  852. break;
  853. case MANDA_CMD_RELEASE_BACKEND: {
  854. guint32 backend_id;
  855. if (NULL == payload) goto error;
  856. if (!read_net_uint32(payload, &pos, &backend_id)) goto error;
  857. if (pos != payload->len) goto error;
  858. if (backend_id == 0) goto error; /* invalid id */
  859. if (!manda_idlist_is_used(scon->idlist, backend_id)) goto error;
  860. manda_idlist_put(scon->idlist, backend_id);
  861. manda_server_release_backend(scon, backend_id);
  862. }
  863. break;
  864. case MANDA_CMD_UPDATE_BACKEND: {
  865. guint32 backend_id, load, workers;
  866. if (NULL == payload) goto error;
  867. if (!read_net_uint32(payload, &pos, &backend_id)) goto error;
  868. if (!read_net_uint32(payload, &pos, &load)) goto error;
  869. if (!read_net_uint32(payload, &pos, &workers)) goto error;
  870. if (pos != payload->len) goto error;
  871. if (backend_id == 0) goto error; /* invalid id */
  872. if (!manda_idlist_is_used(scon->idlist, backend_id)) goto error;
  873. sbackend_update(scon, backend_id, load, workers);
  874. }
  875. break;
  876. case MANDA_CMD_UNKNOWN_COMMAND:
  877. /* we require all our commands to be handled: */
  878. goto error;
  879. default:
  880. send_unkown_command(con, mesg_command, mesg_req_id);
  881. break;
  882. }
  883. goto out;
  884. error:
  885. manda_server_con_close(scon);
  886. out:
  887. LEAVE(srv, manda_server_free);
  888. LEAVE(scon, scon_free);
  889. LEAVE(con, _con_free);
  890. }
  891. /* client connection */
  892. static void client_message_cb(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 mesg_command, guint16 mesg_req_id, GByteArray *mesg_payload);
  893. static void client_close_cb(manda_connection *con);
  894. static void mclient_free(manda_client *c) {
  895. ENTER(c); /* don't want to enter mclient_free again */
  896. manda_client_close(c);
  897. g_free(c->addr);
  898. c->addr = NULL;
  899. if (NULL != c->backends) {
  900. g_ptr_array_free(c->backends, TRUE);
  901. c->backends = NULL;
  902. }
  903. g_slice_free(manda_client, c);
  904. }
  905. /* TODO: start timer to reconnect if connect failed */
  906. static void client_connect_cb(manda_fd_watcher *watcher) {
  907. manda_client *c = watcher->priv;
  908. int s = watcher->fd;
  909. struct sockaddr addr;
  910. socklen_t len;
  911. ENTER(c);
  912. c->ctrl->destroy_fd_watcher(c->data, &c->sock_watcher);
  913. c->sock_watcher.fd = -1;
  914. c->sock_fd = -1;
  915. /* create new connection:
  916. * see http://www.cyberconf.org/~cynbe/ref/nonblocking-connects.html
  917. */
  918. /* Check to see if we can determine our peer's address. */
  919. len = sizeof(addr);
  920. if (getpeername(s, &addr, &len) == -1) {
  921. /* connect failed; find out why */
  922. int err;
  923. len = sizeof(err);
  924. #ifdef SO_ERROR
  925. getsockopt(s, SOL_SOCKET, SO_ERROR, (void*)&err, &len);
  926. #else
  927. {
  928. char ch;
  929. errno = 0;
  930. read(s, &ch, 1);
  931. err = errno;
  932. }
  933. #endif
  934. close(s);
  935. UNUSED(err);
  936. goto out;
  937. } else {
  938. /* connect succeeded */
  939. c->con = con_new(c->data, c->ctrl, c, client_message_cb, client_close_cb, s);
  940. }
  941. out:
  942. LEAVE(c, mclient_free);
  943. }
  944. static void client_connect(manda_client *c) {
  945. int s;
  946. double now;
  947. ENTER(c);
  948. if (NULL != c->con || c->closed) goto out;
  949. if (-1 != c->sock_fd || NULL == c->addr) goto out;
  950. now = c->ctrl->get_time(c->data);
  951. if (c->last_connect_ts + 1.0 > now) goto out; /* only try connect once per second */
  952. c->last_connect_ts = now;
  953. do {
  954. s = socket(c->addr->sa_family, SOCK_STREAM, 0);
  955. } while (-1 == s && errno == EINTR);
  956. if (-1 == s) goto out;
  957. fd_init(s);
  958. c->sock_fd = s;
  959. c->sock_watcher.fd = s;
  960. c->sock_watcher.events = 0;
  961. c->ctrl->new_fd_watcher(c->data, &c->sock_watcher);
  962. if (-1 == connect(s, c->addr, c->addrlen)) {
  963. switch (errno) {
  964. case EINPROGRESS:
  965. case EALREADY:
  966. case EINTR:
  967. c->sock_watcher.events = MANDA_FD_READ | MANDA_FD_WRITE;
  968. c->ctrl->update_fd_watcher(c->data, &c->sock_watcher);
  969. goto out;
  970. case EAGAIN: /* server overloaded */
  971. default:
  972. c->ctrl->destroy_fd_watcher(c->data, &c->sock_watcher);
  973. close(s);
  974. c->sock_watcher.fd = -1;
  975. c->sock_fd = -1;
  976. goto out;
  977. }
  978. }
  979. c->ctrl->destroy_fd_watcher(c->data, &c->sock_watcher);
  980. c->sock_watcher.fd = -1;
  981. c->sock_fd = -1;
  982. c->con = con_new(c->data, c->ctrl, c, client_message_cb, client_close_cb, s);
  983. out:
  984. LEAVE(c, mclient_free);
  985. return;
  986. }
  987. manda_client* manda_client_new(gpointer srv, const manda_async_ctrl *ctrl, struct sockaddr *addr, socklen_t addrlen) {
  988. manda_client *c = g_slice_new0(manda_client);
  989. c->refcount = 1;
  990. c->closed = FALSE;
  991. c->data = srv;
  992. c->ctrl = ctrl;
  993. c->addrlen = addrlen;
  994. c->addr = g_memdup(addr, addrlen);
  995. c->sock_fd = -1;
  996. c->sock_watcher.fd = -1;
  997. c->sock_watcher.priv = c;
  998. c->sock_watcher.events = 0;
  999. c->sock_watcher.callback = client_connect_cb;
  1000. client_connect(c);
  1001. return c;
  1002. }
  1003. static void client_close(manda_client *c) {
  1004. if (-1 != c->sock_fd) {
  1005. c->ctrl->destroy_fd_watcher(c->data, &c->sock_watcher);
  1006. close(c->sock_fd);
  1007. c->sock_fd = -1;
  1008. c->sock_watcher.fd = -1;
  1009. }
  1010. if (NULL != c->con) {
  1011. c->con->close_cb = NULL;
  1012. con_close(c->con);
  1013. LEAVE(c->con, _con_free);
  1014. c->con = NULL;
  1015. }
  1016. if (NULL != c->backends) {
  1017. guint i;
  1018. for (i = 0; i < c->backends->len; i++) {
  1019. static const GString con_closed_msg = { CONST_STR_LEN("Connection closed"), 0 };
  1020. manda_client_backend *backend = g_ptr_array_index(c->backends, i);
  1021. if (NULL != backend) {
  1022. g_ptr_array_index(c->backends, i) = NULL;
  1023. backend->id = 0;
  1024. if (NULL != backend->callbacks) {
  1025. backend->callbacks->lost_backend(c->data, backend, &con_closed_msg);
  1026. }
  1027. g_string_free(backend->addr, TRUE);
  1028. g_slice_free(manda_client_backend, backend);
  1029. }
  1030. }
  1031. g_ptr_array_set_size(c->backends, 0);
  1032. }
  1033. if (!c->closed) {
  1034. /* TODO: timer? */
  1035. client_connect(c);
  1036. }
  1037. }
  1038. void manda_client_close(manda_client *c) {
  1039. ENTER(c);
  1040. c->closed = TRUE;
  1041. client_close(c);
  1042. LEAVE(c, mclient_free);
  1043. }
  1044. void manda_client_acquire(manda_client *c) {
  1045. ENTER(c);
  1046. }
  1047. void manda_client_release(manda_client *c) {
  1048. if (NULL == c) return;
  1049. LEAVE(c, mclient_free);
  1050. }
  1051. manda_client_backend* manda_client_bind_backend(manda_client *c, GString *name, gpointer data, const manda_client_backend_callbacks *callbacks) {
  1052. manda_client_backend *backend;
  1053. client_connect(c);
  1054. if (NULL == c->con) return NULL;
  1055. backend = g_slice_new(manda_client_backend);
  1056. backend->data = data;
  1057. backend->callbacks = callbacks;
  1058. backend->client = c;
  1059. backend->addr = NULL;
  1060. backend->id = 0;
  1061. if (!send_client_bind_backend(c->con, GSTR_LEN(name), backend, 5.0)) {
  1062. g_slice_free(manda_client_backend, backend);
  1063. return NULL;
  1064. }
  1065. return backend;
  1066. }
  1067. void manda_client_release_backend(manda_client_backend *backend) {
  1068. manda_client *c = backend->client;
  1069. if (backend->id != 0) {
  1070. /* "active" working backend */
  1071. send_client_release_backend(c->con, backend->id);
  1072. g_ptr_array_index(c->backends, backend->id) = NULL;
  1073. backend->id = 0;
  1074. g_string_free(backend->addr, TRUE);
  1075. g_slice_free(manda_client_backend, backend);
  1076. } else {
  1077. /* either "dead" backend or "waiting" backend */
  1078. /* in both cases free it somewhere else */
  1079. backend->callbacks = NULL;
  1080. }
  1081. }
  1082. void manda_client_update_backend(manda_client_backend *backend, guint32 load, guint32 workers) {
  1083. manda_client *c = backend->client;
  1084. if (backend->id != 0) {
  1085. send_client_update_backend(c->con, backend->id, load, workers);
  1086. }
  1087. }
  1088. static void client_message_cb(manda_connection *con, guint16 orig_command, gpointer orig_data, guint16 command, guint16 req_id, GByteArray *payload) {
  1089. manda_client *c = con->priv_data;
  1090. guint pos = 0;
  1091. UNUSED(req_id);
  1092. ENTER(c);
  1093. ENTER(con);
  1094. switch (command) {
  1095. case 0: /* timeout/request failed */
  1096. switch (orig_command) {
  1097. case MANDA_CMD_BIND_BACKEND: {
  1098. manda_client_backend *backend = orig_data;
  1099. backend->callbacks->lost_backend(c->data, backend, NULL);
  1100. g_string_free(backend->addr, TRUE);
  1101. g_slice_free(manda_client_backend, backend);
  1102. }
  1103. break;
  1104. default:
  1105. goto error;
  1106. }
  1107. break;
  1108. case MANDA_CMD_BIND_BACKEND:
  1109. switch (orig_command) {
  1110. case MANDA_CMD_BIND_BACKEND: {
  1111. manda_client_backend *backend = orig_data;
  1112. guint32 backend_id;
  1113. GString *addr = NULL;
  1114. if (NULL == payload) goto bind_backend_failed;
  1115. if (!read_net_uint32(payload, &pos, &backend_id)) goto bind_backend_failed;
  1116. addr = g_string_sized_new(payload->len - 2 - pos);
  1117. if (!read_net_string(payload, &pos, addr)) { g_string_free(addr, TRUE); addr = NULL; goto bind_backend_failed; }
  1118. if (pos != payload->len) { g_string_free(addr, TRUE); addr = NULL; goto bind_backend_failed; }
  1119. if (0 == backend_id) goto bind_backend_failed;
  1120. backend->addr = addr;
  1121. backend->id = backend_id;
  1122. if (backend_id >= c->backends->len) g_ptr_array_set_size(c->backends, backend_id+1);
  1123. g_ptr_array_index(c->backends, backend_id) = backend;
  1124. if (NULL != backend->callbacks) {
  1125. backend->callbacks->return_backend(c->data, backend);
  1126. } else {
  1127. manda_client_release_backend(backend);
  1128. }
  1129. goto out;
  1130. bind_backend_failed:
  1131. backend->callbacks->lost_backend(c->data, backend, addr);
  1132. g_string_free(backend->addr, TRUE);
  1133. g_slice_free(manda_client_backend, backend);
  1134. if (NULL == addr) goto error;
  1135. g_string_free(addr, TRUE);
  1136. }
  1137. break;
  1138. }
  1139. break;
  1140. case MANDA_CMD_RELEASE_BACKEND: {
  1141. manda_client_backend *backend = NULL;
  1142. guint32 backend_id;
  1143. GString *msg = NULL;
  1144. if (NULL == payload) goto error;
  1145. if (!read_net_uint32(payload, &pos, &backend_id)) goto error;
  1146. msg = g_string_sized_new(payload->len - 2 - pos);
  1147. if (!read_net_string(payload, &pos, msg)) { g_string_free(msg, TRUE); goto error; }
  1148. if (pos != payload->len) { g_string_free(msg, TRUE); goto error; }
  1149. if (0 == backend_id) { g_string_free(msg, TRUE); goto error; }
  1150. if (backend_id >= c->backends->len) { g_string_free(msg, TRUE); goto out; }
  1151. backend = g_ptr_array_index(c->backends, backend_id);
  1152. if (NULL == backend) { g_string_free(msg, TRUE); goto out; }
  1153. g_ptr_array_index(c->backends, backend_id) = NULL;
  1154. backend->id = 0;
  1155. if (NULL != backend->callbacks) {
  1156. backend->callbacks->lost_backend(c->data, backend, msg);
  1157. }
  1158. g_string_free(msg, TRUE);
  1159. g_string_free(backend->addr, TRUE);
  1160. g_slice_free(manda_client_backend, backend);
  1161. }
  1162. break;
  1163. case MANDA_CMD_UNKNOWN_COMMAND:
  1164. goto error;
  1165. }
  1166. goto out;
  1167. error:
  1168. client_close(c);
  1169. out:
  1170. LEAVE(con, _con_free);
  1171. LEAVE(c, mclient_free);
  1172. }
  1173. static void client_close_cb(manda_connection *con) {
  1174. manda_client *c = con->priv_data;
  1175. client_close(c);
  1176. }