NonBlockingUnixCommunicationSocket.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. /*
  2. * Copyright (C) 2011-2018 Intel Corporation. All rights reserved.
  3. *
  4. * Redistribution and use in source and binary forms, with or without
  5. * modification, are permitted provided that the following conditions
  6. * are met:
  7. *
  8. * * Redistributions of source code must retain the above copyright
  9. * notice, this list of conditions and the following disclaimer.
  10. * * Redistributions in binary form must reproduce the above copyright
  11. * notice, this list of conditions and the following disclaimer in
  12. * the documentation and/or other materials provided with the
  13. * distribution.
  14. * * Neither the name of Intel Corporation nor the names of its
  15. * contributors may be used to endorse or promote products derived
  16. * from this software without specific prior written permission.
  17. *
  18. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  19. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  20. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  21. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  22. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  23. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  24. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  25. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  26. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  27. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  28. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  29. *
  30. */
  31. #ifndef __STDC_LIMIT_MACROS
  32. #define __STDC_LIMIT_MACROS
  33. #endif
  34. #include <stdint.h>
  35. #include <NonBlockingUnixCommunicationSocket.h>
  36. #include <arch.h>
  37. #include <sys/types.h>
  38. #include <unistd.h>
  39. #include <fcntl.h>
  40. #include <stdio.h>
  41. #include <stdlib.h>
  42. #include <errno.h>
  43. #include <sys/epoll.h>
  44. #include <string.h>
  45. #include <se_trace.h>
  46. NonBlockingUnixCommunicationSocket::~NonBlockingUnixCommunicationSocket()
  47. {
  48. if (mEvents != NULL)
  49. delete [] mEvents;
  50. close(mEpoll);
  51. close(mCommandPipe[0]);
  52. close(mCommandPipe[1]);
  53. }
  54. se_static_assert(MAX_EVENTS<=UINT32_MAX/sizeof(struct epoll_event));
  55. bool NonBlockingUnixCommunicationSocket::init()
  56. {
  57. //create the epoll structure
  58. mEpoll = epoll_create(1);
  59. if (mEpoll < 0)
  60. return false;
  61. //create the command pipe
  62. int ret;
  63. ret = pipe(mCommandPipe);
  64. if (ret != 0)
  65. {
  66. close(mEpoll);
  67. return false;
  68. }
  69. //place one end of the pipe in the epoll list
  70. struct epoll_event event;
  71. event.data.fd = mCommandPipe[0];
  72. event.events = EPOLLIN | EPOLLET;
  73. int registerCommand = epoll_ctl (mEpoll, EPOLL_CTL_ADD, mCommandPipe[0], &event);
  74. //connect to the AESM - blocking connect
  75. bool connectInit = UnixCommunicationSocket::init();
  76. //register the event
  77. event.data.fd = mSocket;
  78. event.events = EPOLLET;
  79. int registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_ADD, mSocket, &event);
  80. if (registerCommand != 0 || registerSocket != 0 || connectInit == false)
  81. {
  82. close(mEpoll);
  83. close(mCommandPipe[0]);
  84. close(mCommandPipe[1]);
  85. return false;
  86. }
  87. //create events buffer
  88. mEvents = new struct epoll_event[MAX_EVENTS];
  89. memset((char*)mEvents, 0, MAX_EVENTS * sizeof(struct epoll_event));
  90. return MakeNonBlocking();
  91. }
  92. char* NonBlockingUnixCommunicationSocket::readRaw(ssize_t length)
  93. {
  94. if (mSocket == -1)
  95. return NULL;
  96. // Add read event
  97. struct epoll_event event;
  98. event.data.fd = mSocket;
  99. event.events = EPOLLIN | EPOLLET;
  100. int registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_MOD, mSocket, &event);
  101. if (registerSocket != 0)
  102. {
  103. return NULL;
  104. }
  105. ssize_t total_read = 0;
  106. ssize_t step = 0;
  107. char * recBuf = NULL;
  108. recBuf = new char[length];
  109. memset(recBuf, 0, length);
  110. int32_t epollTimeout = (mTimeoutMseconds > 0 ? mTimeoutMseconds : -1);
  111. int eventNum = 0;
  112. int i = 0;
  113. bool errorDetected = false;
  114. bool cancellationDetected = false;
  115. bool peerSocketClosed = false;
  116. MarkStartTime();
  117. do{
  118. //try a direct read (maybe all data is available already)
  119. step = read(mSocket, recBuf, length);
  120. if(step == -1 && errno == EINTR && CheckForTimeout() == false){
  121. SE_TRACE_WARNING("read is interrupted by signal\n");
  122. continue;
  123. }
  124. if (step == -1 && errno != EAGAIN)
  125. {
  126. errorDetected = true;
  127. }
  128. else
  129. {
  130. if (step != -1)
  131. {
  132. total_read += step;
  133. }
  134. if (total_read == length)
  135. {
  136. break; //we are finished here
  137. }
  138. }
  139. //wait for events
  140. do {
  141. eventNum = epoll_wait(mEpoll, mEvents, MAX_EVENTS, epollTimeout);
  142. } while (eventNum == -1 && errno == EINTR && CheckForTimeout() == false);
  143. if (eventNum == -1)
  144. {
  145. errorDetected = true;
  146. }
  147. for (i = 0;
  148. CheckForTimeout() == false && //need to be sure to check this first
  149. errorDetected == false &&
  150. cancellationDetected == false &&
  151. peerSocketClosed == false &&
  152. i < eventNum;
  153. i++)
  154. {
  155. if (mEvents[i].events & EPOLLHUP)
  156. {
  157. peerSocketClosed = true;
  158. //peer closed socket. one more reading all remaining data.
  159. }
  160. if ((mEvents[i].events & EPOLLERR) ||
  161. (!(mEvents[i].events & EPOLLIN)))
  162. {
  163. //error
  164. errorDetected = true;
  165. }
  166. else
  167. {
  168. if (mEvents[i].data.fd == mCommandPipe[0])
  169. {
  170. //cancellation -- in the case this logic would complicate by needing more commands, we will detach this into
  171. //a CommandManager of some sort
  172. cancellationDetected = true;
  173. }
  174. else
  175. {
  176. //read data
  177. step = partialRead(recBuf + total_read, length - total_read);
  178. if (step == -1)
  179. {
  180. errorDetected = true;
  181. }
  182. if (step == 0) //peer closed socket
  183. {
  184. //did this happen before getting the entire data ?
  185. if (total_read != length)
  186. errorDetected = true;
  187. }
  188. total_read += step;
  189. }
  190. }
  191. }
  192. if (total_read != length)
  193. {
  194. if (errorDetected || cancellationDetected || wasTimeoutDetected())
  195. {
  196. disconnect();
  197. delete [] recBuf;
  198. recBuf = NULL;
  199. break;
  200. }
  201. }
  202. //clear events
  203. memset((char*)mEvents, 0, MAX_EVENTS * sizeof(struct epoll_event));
  204. }while (total_read < length);
  205. if(mSocket!=-1)
  206. {
  207. event.data.fd = mSocket;
  208. event.events = EPOLLET;
  209. registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_MOD, mSocket, &event);
  210. if (registerSocket != 0)
  211. {
  212. disconnect();
  213. if (NULL != recBuf)
  214. delete [] recBuf;
  215. return NULL;
  216. }
  217. }else
  218. {
  219. // disconnected, recBuf is set NULL when disconnect() is called.
  220. }
  221. return recBuf;
  222. }
  223. /**
  224. * Read no more than maxLength bytes
  225. */
  226. ssize_t NonBlockingUnixCommunicationSocket::partialRead(char* buffer, ssize_t maxLength)
  227. {
  228. ssize_t step = 0;
  229. ssize_t chunkSize = (maxLength < 512 ? maxLength : 512);
  230. ssize_t totalRead = 0;
  231. ssize_t remaining = maxLength;
  232. while (totalRead < maxLength)
  233. {
  234. remaining = maxLength - totalRead;
  235. step = read(mSocket, buffer + totalRead, (remaining > chunkSize ? chunkSize : remaining));
  236. if(step == -1 && errno == EINTR && CheckForTimeout() == false){
  237. SE_TRACE_WARNING("read was interrupted by signal\n");
  238. continue;
  239. }
  240. if (step == -1)
  241. {
  242. if (errno != EAGAIN)
  243. return -1;
  244. break;
  245. }
  246. totalRead += step;
  247. if (step == 0)
  248. break;
  249. }
  250. return totalRead;
  251. }
  252. ssize_t NonBlockingUnixCommunicationSocket::writeRaw(const char* data, ssize_t length)
  253. {
  254. if (mSocket == -1)
  255. return -1;
  256. ssize_t total_write = 0;
  257. ssize_t step = 0;
  258. int32_t epollTimeout = (mTimeoutMseconds > 0 ? mTimeoutMseconds : -1);
  259. int eventNum = 0;
  260. int i = 0;
  261. bool errorDetected = false;
  262. bool cancellationDetected = false;
  263. bool peerSocketClosed = false;
  264. bool lastWriteSuccessful = false;
  265. struct epoll_event event;
  266. int registerSocket;
  267. MarkStartTime();
  268. do
  269. {
  270. step = write(mSocket, data + total_write, length - total_write);
  271. if(step == -1 && errno == EINTR && CheckForTimeout() == false){
  272. SE_TRACE_WARNING("write was interrupted by signal\n");
  273. continue;
  274. }
  275. if (step == -1 && errno != EAGAIN)
  276. {
  277. // an error occured
  278. errorDetected = true;
  279. }
  280. else
  281. {
  282. if (step == -1 && errno == EAGAIN)
  283. {
  284. // the internal buffer is full
  285. // EPOLLOUT is added so that an event is generated when there is
  286. // empty space in the buffer
  287. lastWriteSuccessful = false;
  288. event.data.fd = mSocket;
  289. event.events = EPOLLET | EPOLLOUT;
  290. registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_MOD, mSocket, &event);
  291. if (registerSocket != 0)
  292. {
  293. return -1;
  294. }
  295. }
  296. else
  297. {
  298. // the write was successful
  299. if (!lastWriteSuccessful)
  300. {
  301. // remove EPOLLOUT
  302. lastWriteSuccessful = true;
  303. event.data.fd = mSocket;
  304. event.events = EPOLLET;
  305. registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_MOD, mSocket, &event);
  306. if (registerSocket != 0)
  307. {
  308. return -1;
  309. }
  310. }
  311. total_write += step;
  312. if (total_write == length)
  313. {
  314. break;
  315. }
  316. continue;
  317. }
  318. }
  319. do {
  320. eventNum = epoll_wait(mEpoll, mEvents, MAX_EVENTS, epollTimeout);
  321. } while (eventNum == -1 && errno == EINTR && CheckForTimeout() == false);
  322. if (eventNum == -1)
  323. {
  324. errorDetected = true;
  325. }
  326. for (i = 0;
  327. CheckForTimeout() == false &&
  328. errorDetected == false &&
  329. cancellationDetected == false &&
  330. peerSocketClosed == false &&
  331. i < eventNum;
  332. i++)
  333. {
  334. if (mEvents[i].events & EPOLLHUP)
  335. {
  336. // the socket or that pipe have been closed
  337. peerSocketClosed = true;
  338. continue;
  339. }
  340. if ((mEvents[i].events & EPOLLERR) ||
  341. (!(mEvents[i].events & EPOLLOUT)))
  342. {
  343. // received an event other than EPOLLOUT
  344. errorDetected = true;
  345. }
  346. else
  347. {
  348. if (mEvents[i].data.fd == mCommandPipe[0])
  349. {
  350. cancellationDetected = true;
  351. }
  352. }
  353. }
  354. if (errorDetected || cancellationDetected || wasTimeoutDetected() || peerSocketClosed)
  355. {
  356. disconnect();
  357. break;
  358. }
  359. memset((char*)mEvents, 0, MAX_EVENTS * sizeof(struct epoll_event));
  360. }
  361. while(total_write < length);
  362. if(mSocket!=-1){
  363. event.data.fd = mSocket;
  364. event.events = EPOLLET;
  365. registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_MOD, mSocket, &event);
  366. if (registerSocket != 0)
  367. {
  368. return -1;
  369. }
  370. }else
  371. {//disconneded due to error.
  372. return -1;
  373. }
  374. return total_write;
  375. }
  376. int NonBlockingUnixCommunicationSocket::getSockDescriptor()
  377. {
  378. return UnixCommunicationSocket::getSockDescriptor();
  379. }
  380. bool NonBlockingUnixCommunicationSocket::wasTimeoutDetected()
  381. {
  382. return UnixCommunicationSocket::wasTimeoutDetected();
  383. }
  384. bool NonBlockingUnixCommunicationSocket::setTimeout(uint32_t milliseconds)
  385. {
  386. mTimeoutMseconds = milliseconds;
  387. return true;
  388. }
  389. bool NonBlockingUnixCommunicationSocket::MakeNonBlocking()
  390. {
  391. int flags, ret;
  392. flags = fcntl (mSocket, F_GETFL, 0);
  393. if (flags == -1)
  394. {
  395. return false;
  396. }
  397. flags |= (int)O_NONBLOCK;
  398. ret = fcntl (mSocket, F_SETFL, flags);
  399. if (ret == -1)
  400. {
  401. return false;
  402. }
  403. return true;
  404. }
  405. void NonBlockingUnixCommunicationSocket::Cancel() const
  406. {
  407. //write anything on the pipe
  408. char cmd = '1';
  409. if (write(mCommandPipe[0],&cmd,1) < 0)
  410. {
  411. // do nothing
  412. }
  413. }