NonBlockingUnixCommunicationSocket.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. /*
  2. * Copyright (C) 2011-2017 Intel Corporation. All rights reserved.
  3. *
  4. * Redistribution and use in source and binary forms, with or without
  5. * modification, are permitted provided that the following conditions
  6. * are met:
  7. *
  8. * * Redistributions of source code must retain the above copyright
  9. * notice, this list of conditions and the following disclaimer.
  10. * * Redistributions in binary form must reproduce the above copyright
  11. * notice, this list of conditions and the following disclaimer in
  12. * the documentation and/or other materials provided with the
  13. * distribution.
  14. * * Neither the name of Intel Corporation nor the names of its
  15. * contributors may be used to endorse or promote products derived
  16. * from this software without specific prior written permission.
  17. *
  18. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  19. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  20. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  21. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  22. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  23. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  24. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  25. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  26. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  27. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  28. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  29. *
  30. */
  31. #ifndef __STDC_LIMIT_MACROS
  32. #define __STDC_LIMIT_MACROS
  33. #endif
  34. #include <stdint.h>
  35. #include <NonBlockingUnixCommunicationSocket.h>
  36. #include <arch.h>
  37. #include <sys/types.h>
  38. #include <unistd.h>
  39. #include <fcntl.h>
  40. #include <stdio.h>
  41. #include <stdlib.h>
  42. #include <errno.h>
  43. #include <sys/epoll.h>
  44. #include <string.h>
  45. #include <se_trace.h>
  46. NonBlockingUnixCommunicationSocket::~NonBlockingUnixCommunicationSocket()
  47. {
  48. if (mEvents != NULL)
  49. delete [] mEvents;
  50. close(mEpoll);
  51. close(mCommandPipe[0]);
  52. close(mCommandPipe[1]);
  53. }
  54. se_static_assert(MAX_EVENTS<=UINT32_MAX/sizeof(struct epoll_event));
  55. bool NonBlockingUnixCommunicationSocket::init()
  56. {
  57. //create the epoll structure
  58. mEpoll = epoll_create(1);
  59. if (mEpoll < 0)
  60. return false;
  61. //create the command pipe
  62. int ret;
  63. ret = pipe(mCommandPipe);
  64. if (ret != 0)
  65. {
  66. close(mEpoll);
  67. return false;
  68. }
  69. //place one end of the pipe in the epoll list
  70. struct epoll_event event;
  71. event.data.fd = mCommandPipe[0];
  72. event.events = EPOLLIN | EPOLLET;
  73. int registerCommand = epoll_ctl (mEpoll, EPOLL_CTL_ADD, mCommandPipe[0], &event);
  74. //connect to the AESM - blocking connect
  75. bool connectInit = UnixCommunicationSocket::init();
  76. //register the event
  77. event.data.fd = mSocket;
  78. event.events = EPOLLET;
  79. int registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_ADD, mSocket, &event);
  80. if (registerCommand != 0 || registerSocket != 0 || connectInit == false)
  81. {
  82. close(mEpoll);
  83. close(mCommandPipe[0]);
  84. close(mCommandPipe[1]);
  85. return false;
  86. }
  87. //create events buffer
  88. mEvents = new struct epoll_event[MAX_EVENTS];
  89. memset((char*)mEvents, 0, MAX_EVENTS * sizeof(struct epoll_event));
  90. return MakeNonBlocking();
  91. }
  92. char* NonBlockingUnixCommunicationSocket::readRaw(ssize_t length)
  93. {
  94. if (mSocket == -1)
  95. return NULL;
  96. // Add read event
  97. struct epoll_event event;
  98. event.data.fd = mSocket;
  99. event.events = EPOLLIN | EPOLLET;
  100. int registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_MOD, mSocket, &event);
  101. if (registerSocket != 0)
  102. {
  103. return NULL;
  104. }
  105. ssize_t total_read = 0;
  106. ssize_t step = 0;
  107. char * recBuf = NULL;
  108. recBuf = new char[length];
  109. memset(recBuf, 0, length);
  110. int32_t epollTimeout = (mTimeoutMseconds > 0 ? mTimeoutMseconds : -1);
  111. int eventNum = 0;
  112. int i = 0;
  113. bool errorDetected = false;
  114. bool cancellationDetected = false;
  115. bool peerSocketClosed = false;
  116. MarkStartTime();
  117. do{
  118. //try a direct read (maybe all data is available already)
  119. step = read(mSocket, recBuf, length);
  120. if(step == -1 && errno == EINTR && CheckForTimeout() == false){
  121. SE_TRACE_WARNING("read is interrupted by signal\n");
  122. continue;
  123. }
  124. if (step == -1 && errno != EAGAIN)
  125. {
  126. errorDetected = true;
  127. }
  128. else
  129. {
  130. if (step != -1)
  131. {
  132. total_read += step;
  133. }
  134. if (total_read == length)
  135. {
  136. break; //we are finished here
  137. }
  138. }
  139. //wait for events
  140. do {
  141. eventNum = epoll_wait(mEpoll, mEvents, MAX_EVENTS, epollTimeout);
  142. } while (eventNum == -1 && errno == EINTR && CheckForTimeout() == false);
  143. if (eventNum == -1)
  144. {
  145. errorDetected = true;
  146. }
  147. for (i = 0;
  148. CheckForTimeout() == false && //need to be sure to check this first
  149. errorDetected == false &&
  150. cancellationDetected == false &&
  151. peerSocketClosed == false &&
  152. i < eventNum;
  153. i++)
  154. {
  155. if (mEvents[i].events & EPOLLHUP)
  156. {
  157. peerSocketClosed = true;
  158. //peer closed socket. one more reading all remaining data.
  159. }
  160. if ((mEvents[i].events & EPOLLERR) ||
  161. (!(mEvents[i].events & EPOLLIN)))
  162. {
  163. //error
  164. errorDetected = true;
  165. }
  166. else
  167. {
  168. if (mEvents[i].data.fd == mCommandPipe[0])
  169. {
  170. //cancellation -- in the case this logic would complicate by needing more commands, we will detach this into
  171. //a CommandManager of some sort
  172. cancellationDetected = true;
  173. }
  174. else
  175. {
  176. //read data
  177. step = partialRead(recBuf + total_read, length - total_read);
  178. if (step == -1)
  179. {
  180. errorDetected = true;
  181. }
  182. if (step == 0) //peer closed socket
  183. {
  184. //did this happen before getting the entire data ?
  185. if (total_read != length)
  186. errorDetected = true;
  187. }
  188. total_read += step;
  189. }
  190. }
  191. }
  192. if (total_read != length)
  193. {
  194. if (errorDetected || cancellationDetected || wasTimeoutDetected())
  195. {
  196. disconnect();
  197. delete [] recBuf;
  198. recBuf = NULL;
  199. break;
  200. }
  201. }
  202. //clear events
  203. memset((char*)mEvents, 0, MAX_EVENTS * sizeof(struct epoll_event));
  204. }while (total_read < length);
  205. event.data.fd = mSocket;
  206. event.events = EPOLLET;
  207. registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_MOD, mSocket, &event);
  208. if (registerSocket != 0)
  209. {
  210. disconnect();
  211. if (NULL != recBuf)
  212. delete [] recBuf;
  213. return NULL;
  214. }
  215. return recBuf;
  216. }
  217. /**
  218. * Read no more than maxLength bytes
  219. */
  220. ssize_t NonBlockingUnixCommunicationSocket::partialRead(char* buffer, ssize_t maxLength)
  221. {
  222. ssize_t step = 0;
  223. ssize_t chunkSize = (maxLength < 512 ? maxLength : 512);
  224. ssize_t totalRead = 0;
  225. ssize_t remaining = maxLength;
  226. while (totalRead < maxLength)
  227. {
  228. remaining = maxLength - totalRead;
  229. step = read(mSocket, buffer + totalRead, (remaining > chunkSize ? chunkSize : remaining));
  230. if(step == -1 && errno == EINTR && CheckForTimeout() == false){
  231. SE_TRACE_WARNING("read was interrupted by signal\n");
  232. continue;
  233. }
  234. if (step == -1)
  235. {
  236. if (errno != EAGAIN)
  237. return -1;
  238. break;
  239. }
  240. totalRead += step;
  241. if (step == 0)
  242. break;
  243. }
  244. return totalRead;
  245. }
  246. ssize_t NonBlockingUnixCommunicationSocket::writeRaw(const char* data, ssize_t length)
  247. {
  248. if (mSocket == -1)
  249. return -1;
  250. ssize_t total_write = 0;
  251. ssize_t step = 0;
  252. int32_t epollTimeout = (mTimeoutMseconds > 0 ? mTimeoutMseconds : -1);
  253. int eventNum = 0;
  254. int i = 0;
  255. bool errorDetected = false;
  256. bool cancellationDetected = false;
  257. bool peerSocketClosed = false;
  258. bool lastWriteSuccessful = false;
  259. struct epoll_event event;
  260. int registerSocket;
  261. MarkStartTime();
  262. do
  263. {
  264. step = write(mSocket, data + total_write, length - total_write);
  265. if(step == -1 && errno == EINTR && CheckForTimeout() == false){
  266. SE_TRACE_WARNING("write was interrupted by signal\n");
  267. continue;
  268. }
  269. if (step == -1 && errno != EAGAIN)
  270. {
  271. // an error occured
  272. errorDetected = true;
  273. }
  274. else
  275. {
  276. if (step == -1 && errno == EAGAIN)
  277. {
  278. // the internal buffer is full
  279. // EPOLLOUT is added so that an event is generated when there is
  280. // empty space in the buffer
  281. lastWriteSuccessful = false;
  282. event.data.fd = mSocket;
  283. event.events = EPOLLET | EPOLLOUT;
  284. registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_MOD, mSocket, &event);
  285. if (registerSocket != 0)
  286. {
  287. return -1;
  288. }
  289. }
  290. else
  291. {
  292. // the write was successful
  293. if (!lastWriteSuccessful)
  294. {
  295. // remove EPOLLOUT
  296. lastWriteSuccessful = true;
  297. event.data.fd = mSocket;
  298. event.events = EPOLLET;
  299. registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_MOD, mSocket, &event);
  300. if (registerSocket != 0)
  301. {
  302. return -1;
  303. }
  304. }
  305. total_write += step;
  306. if (total_write == length)
  307. {
  308. break;
  309. }
  310. continue;
  311. }
  312. }
  313. do {
  314. eventNum = epoll_wait(mEpoll, mEvents, MAX_EVENTS, epollTimeout);
  315. } while (eventNum == -1 && errno == EINTR && CheckForTimeout() == false);
  316. if (eventNum == -1)
  317. {
  318. errorDetected = true;
  319. }
  320. for (i = 0;
  321. CheckForTimeout() == false &&
  322. errorDetected == false &&
  323. cancellationDetected == false &&
  324. peerSocketClosed == false &&
  325. i < eventNum;
  326. i++)
  327. {
  328. if (mEvents[i].events & EPOLLHUP)
  329. {
  330. // the socket or that pipe have been closed
  331. peerSocketClosed = true;
  332. continue;
  333. }
  334. if ((mEvents[i].events & EPOLLERR) ||
  335. (!(mEvents[i].events & EPOLLOUT)))
  336. {
  337. // received an event other than EPOLLOUT
  338. errorDetected = true;
  339. }
  340. else
  341. {
  342. if (mEvents[i].data.fd == mCommandPipe[0])
  343. {
  344. cancellationDetected = true;
  345. }
  346. }
  347. }
  348. if (errorDetected || cancellationDetected || wasTimeoutDetected() || peerSocketClosed)
  349. {
  350. disconnect();
  351. break;
  352. }
  353. memset((char*)mEvents, 0, MAX_EVENTS * sizeof(struct epoll_event));
  354. }
  355. while(total_write < length);
  356. event.data.fd = mSocket;
  357. event.events = EPOLLET;
  358. registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_MOD, mSocket, &event);
  359. if (registerSocket != 0)
  360. {
  361. return -1;
  362. }
  363. return total_write;
  364. }
  365. int NonBlockingUnixCommunicationSocket::getSockDescriptor()
  366. {
  367. return UnixCommunicationSocket::getSockDescriptor();
  368. }
  369. bool NonBlockingUnixCommunicationSocket::wasTimeoutDetected()
  370. {
  371. return UnixCommunicationSocket::wasTimeoutDetected();
  372. }
  373. bool NonBlockingUnixCommunicationSocket::setTimeout(uint32_t milliseconds)
  374. {
  375. mTimeoutMseconds = milliseconds;
  376. return true;
  377. }
  378. bool NonBlockingUnixCommunicationSocket::MakeNonBlocking()
  379. {
  380. int flags, ret;
  381. flags = fcntl (mSocket, F_GETFL, 0);
  382. if (flags == -1)
  383. {
  384. return false;
  385. }
  386. flags |= (int)O_NONBLOCK;
  387. ret = fcntl (mSocket, F_SETFL, flags);
  388. if (ret == -1)
  389. {
  390. return false;
  391. }
  392. return true;
  393. }
  394. void NonBlockingUnixCommunicationSocket::Cancel() const
  395. {
  396. //write anything on the pipe
  397. char cmd = '1';
  398. if (write(mCommandPipe[0],&cmd,1) < 0)
  399. {
  400. // do nothing
  401. }
  402. }