123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475 |
- /*
- * Copyright (C) 2011-2018 Intel Corporation. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in
- * the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Intel Corporation nor the names of its
- * contributors may be used to endorse or promote products derived
- * from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
- #ifndef __STDC_LIMIT_MACROS
- #define __STDC_LIMIT_MACROS
- #endif
- #include <stdint.h>
- #include <NonBlockingUnixCommunicationSocket.h>
- #include <arch.h>
- #include <sys/types.h>
- #include <unistd.h>
- #include <fcntl.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <errno.h>
- #include <sys/epoll.h>
- #include <string.h>
- #include <se_trace.h>
- NonBlockingUnixCommunicationSocket::~NonBlockingUnixCommunicationSocket()
- {
- if (mEvents != NULL)
- delete [] mEvents;
- close(mEpoll);
- close(mCommandPipe[0]);
- close(mCommandPipe[1]);
- }
- se_static_assert(MAX_EVENTS<=UINT32_MAX/sizeof(struct epoll_event));
- bool NonBlockingUnixCommunicationSocket::init()
- {
- //create the epoll structure
- mEpoll = epoll_create(1);
- if (mEpoll < 0)
- return false;
-
- //create the command pipe
- int ret;
- ret = pipe(mCommandPipe);
- if (ret != 0)
- {
- close(mEpoll);
- return false;
- }
- //place one end of the pipe in the epoll list
- struct epoll_event event;
- event.data.fd = mCommandPipe[0];
- event.events = EPOLLIN | EPOLLET;
- int registerCommand = epoll_ctl (mEpoll, EPOLL_CTL_ADD, mCommandPipe[0], &event);
- //connect to the AESM - blocking connect
- bool connectInit = UnixCommunicationSocket::init();
- //register the event
- event.data.fd = mSocket;
- event.events = EPOLLET;
- int registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_ADD, mSocket, &event);
- if (registerCommand != 0 || registerSocket != 0 || connectInit == false)
- {
- close(mEpoll);
- close(mCommandPipe[0]);
- close(mCommandPipe[1]);
-
- return false;
- }
- //create events buffer
- mEvents = new struct epoll_event[MAX_EVENTS];
- memset((char*)mEvents, 0, MAX_EVENTS * sizeof(struct epoll_event));
- return MakeNonBlocking();
- }
- char* NonBlockingUnixCommunicationSocket::readRaw(ssize_t length)
- {
- if (mSocket == -1)
- return NULL;
- // Add read event
- struct epoll_event event;
- event.data.fd = mSocket;
- event.events = EPOLLIN | EPOLLET;
- int registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_MOD, mSocket, &event);
- if (registerSocket != 0)
- {
- return NULL;
- }
- ssize_t total_read = 0;
- ssize_t step = 0;
- char * recBuf = NULL;
- recBuf = new char[length];
- memset(recBuf, 0, length);
- int32_t epollTimeout = (mTimeoutMseconds > 0 ? mTimeoutMseconds : -1);
- int eventNum = 0;
- int i = 0;
- bool errorDetected = false;
- bool cancellationDetected = false;
- bool peerSocketClosed = false;
- MarkStartTime();
-
- do{
- //try a direct read (maybe all data is available already)
- step = read(mSocket, recBuf, length);
- if(step == -1 && errno == EINTR && CheckForTimeout() == false){
- SE_TRACE_WARNING("read is interrupted by signal\n");
- continue;
- }
- if (step == -1 && errno != EAGAIN)
- {
- errorDetected = true;
- }
- else
- {
- if (step != -1)
- {
- total_read += step;
- }
- if (total_read == length)
- {
- break; //we are finished here
- }
- }
- //wait for events
- do {
- eventNum = epoll_wait(mEpoll, mEvents, MAX_EVENTS, epollTimeout);
- } while (eventNum == -1 && errno == EINTR && CheckForTimeout() == false);
- if (eventNum == -1)
- {
- errorDetected = true;
- }
-
- for (i = 0;
- CheckForTimeout() == false && //need to be sure to check this first
- errorDetected == false &&
- cancellationDetected == false &&
- peerSocketClosed == false &&
- i < eventNum;
- i++)
- {
- if (mEvents[i].events & EPOLLHUP)
- {
- peerSocketClosed = true;
- //peer closed socket. one more reading all remaining data.
- }
- if ((mEvents[i].events & EPOLLERR) ||
- (!(mEvents[i].events & EPOLLIN)))
- {
- //error
- errorDetected = true;
- }
- else
- {
- if (mEvents[i].data.fd == mCommandPipe[0])
- {
- //cancellation -- in the case this logic would complicate by needing more commands, we will detach this into
- //a CommandManager of some sort
- cancellationDetected = true;
- }
- else
- {
- //read data
- step = partialRead(recBuf + total_read, length - total_read);
- if (step == -1)
- {
- errorDetected = true;
- }
- if (step == 0) //peer closed socket
- {
- //did this happen before getting the entire data ?
- if (total_read != length)
- errorDetected = true;
- }
- total_read += step;
- }
- }
- }
-
- if (total_read != length)
- {
- if (errorDetected || cancellationDetected || wasTimeoutDetected())
- {
- disconnect();
- delete [] recBuf;
- recBuf = NULL;
- break;
- }
- }
- //clear events
- memset((char*)mEvents, 0, MAX_EVENTS * sizeof(struct epoll_event));
- }while (total_read < length);
- if(mSocket!=-1)
- {
- event.data.fd = mSocket;
- event.events = EPOLLET;
- registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_MOD, mSocket, &event);
- if (registerSocket != 0)
- {
- disconnect();
- if (NULL != recBuf)
- delete [] recBuf;
- return NULL;
- }
- }else
- {
- // disconnected, recBuf is set NULL when disconnect() is called.
- }
- return recBuf;
- }
- /**
- * Read no more than maxLength bytes
- */
- ssize_t NonBlockingUnixCommunicationSocket::partialRead(char* buffer, ssize_t maxLength)
- {
- ssize_t step = 0;
- ssize_t chunkSize = (maxLength < 512 ? maxLength : 512);
- ssize_t totalRead = 0;
- ssize_t remaining = maxLength;
- while (totalRead < maxLength)
- {
- remaining = maxLength - totalRead;
- step = read(mSocket, buffer + totalRead, (remaining > chunkSize ? chunkSize : remaining));
- if(step == -1 && errno == EINTR && CheckForTimeout() == false){
- SE_TRACE_WARNING("read was interrupted by signal\n");
- continue;
- }
- if (step == -1)
- {
- if (errno != EAGAIN)
- return -1;
- break;
- }
-
- totalRead += step;
- if (step == 0)
- break;
- }
- return totalRead;
- }
- ssize_t NonBlockingUnixCommunicationSocket::writeRaw(const char* data, ssize_t length)
- {
- if (mSocket == -1)
- return -1;
- ssize_t total_write = 0;
- ssize_t step = 0;
- int32_t epollTimeout = (mTimeoutMseconds > 0 ? mTimeoutMseconds : -1);
- int eventNum = 0;
- int i = 0;
- bool errorDetected = false;
- bool cancellationDetected = false;
- bool peerSocketClosed = false;
- bool lastWriteSuccessful = false;
- struct epoll_event event;
- int registerSocket;
- MarkStartTime();
- do
- {
- step = write(mSocket, data + total_write, length - total_write);
- if(step == -1 && errno == EINTR && CheckForTimeout() == false){
- SE_TRACE_WARNING("write was interrupted by signal\n");
- continue;
- }
- if (step == -1 && errno != EAGAIN)
- {
- // an error occured
- errorDetected = true;
- }
- else
- {
- if (step == -1 && errno == EAGAIN)
- {
- // the internal buffer is full
- // EPOLLOUT is added so that an event is generated when there is
- // empty space in the buffer
- lastWriteSuccessful = false;
- event.data.fd = mSocket;
- event.events = EPOLLET | EPOLLOUT;
- registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_MOD, mSocket, &event);
- if (registerSocket != 0)
- {
- return -1;
- }
- }
- else
- {
- // the write was successful
- if (!lastWriteSuccessful)
- {
- // remove EPOLLOUT
- lastWriteSuccessful = true;
- event.data.fd = mSocket;
- event.events = EPOLLET;
- registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_MOD, mSocket, &event);
- if (registerSocket != 0)
- {
- return -1;
- }
- }
- total_write += step;
- if (total_write == length)
- {
- break;
- }
- continue;
- }
- }
- do {
- eventNum = epoll_wait(mEpoll, mEvents, MAX_EVENTS, epollTimeout);
- } while (eventNum == -1 && errno == EINTR && CheckForTimeout() == false);
- if (eventNum == -1)
- {
- errorDetected = true;
- }
- for (i = 0;
- CheckForTimeout() == false &&
- errorDetected == false &&
- cancellationDetected == false &&
- peerSocketClosed == false &&
- i < eventNum;
- i++)
- {
- if (mEvents[i].events & EPOLLHUP)
- {
- // the socket or that pipe have been closed
- peerSocketClosed = true;
- continue;
- }
- if ((mEvents[i].events & EPOLLERR) ||
- (!(mEvents[i].events & EPOLLOUT)))
- {
- // received an event other than EPOLLOUT
- errorDetected = true;
- }
- else
- {
- if (mEvents[i].data.fd == mCommandPipe[0])
- {
- cancellationDetected = true;
- }
- }
- }
- if (errorDetected || cancellationDetected || wasTimeoutDetected() || peerSocketClosed)
- {
- disconnect();
- break;
- }
- memset((char*)mEvents, 0, MAX_EVENTS * sizeof(struct epoll_event));
- }
- while(total_write < length);
- if(mSocket!=-1){
- event.data.fd = mSocket;
- event.events = EPOLLET;
- registerSocket = epoll_ctl (mEpoll, EPOLL_CTL_MOD, mSocket, &event);
- if (registerSocket != 0)
- {
- return -1;
- }
- }else
- {//disconneded due to error.
- return -1;
- }
- return total_write;
- }
- int NonBlockingUnixCommunicationSocket::getSockDescriptor()
- {
- return UnixCommunicationSocket::getSockDescriptor();
- }
- bool NonBlockingUnixCommunicationSocket::wasTimeoutDetected()
- {
- return UnixCommunicationSocket::wasTimeoutDetected();
- }
- bool NonBlockingUnixCommunicationSocket::setTimeout(uint32_t milliseconds)
- {
- mTimeoutMseconds = milliseconds;
- return true;
- }
- bool NonBlockingUnixCommunicationSocket::MakeNonBlocking()
- {
- int flags, ret;
- flags = fcntl (mSocket, F_GETFL, 0);
- if (flags == -1)
- {
- return false;
- }
- flags |= (int)O_NONBLOCK;
- ret = fcntl (mSocket, F_SETFL, flags);
- if (ret == -1)
- {
- return false;
- }
- return true;
- }
- void NonBlockingUnixCommunicationSocket::Cancel() const
- {
- //write anything on the pipe
- char cmd = '1';
- if (write(mCommandPipe[0],&cmd,1) < 0)
- {
- // do nothing
- }
- }
|