Communication.java 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677
  1. package communication;
  2. import java.io.DataInputStream;
  3. import java.io.DataOutputStream;
  4. import java.io.IOException;
  5. import java.io.Serializable;
  6. import java.io.StreamCorruptedException;
  7. import java.net.InetSocketAddress;
  8. import java.net.ServerSocket;
  9. import java.net.Socket;
  10. import java.net.SocketException;
  11. import java.nio.charset.Charset;
  12. import java.util.ArrayList;
  13. import java.util.concurrent.BlockingQueue;
  14. import java.util.concurrent.LinkedBlockingQueue;
  15. import org.apache.commons.lang3.SerializationUtils;
  16. import util.Util;
  17. /**
  18. * Basic Usage
  19. *
  20. * 1. Call {@link #start(int)} or {@link #connect(InetSocketAddress)} to
  21. * initiate a connection 2. Wait for {@link #getState()} to return
  22. * {@link #STATE_CONNECTED} 3. {@link #write(byte[])} and {@link #read()}
  23. * messages. 4. Close the connection with {@link #stop()}. NOTE: This may
  24. * invalidate unread data
  25. *
  26. * Alternatively, you can always call start. And the first side of the
  27. * connection to call connect will win.
  28. */
  29. public class Communication {
  30. public static boolean D = false;
  31. // Constants that indicate the current connection state
  32. public static final int STATE_NONE = 0; // we're doing nothing
  33. public static final int STATE_LISTEN = 1; // now listening for incoming
  34. // connections
  35. public static final int STATE_CONNECTING = 2; // now initiating an outgoing
  36. // connection
  37. public static final int STATE_CONNECTED = 3; // now connected to a remote
  38. // device
  39. public static final int STATE_STOPPED = 4; // we're shutting things down
  40. public static final int STATE_RETRY = 5; // we are going to retry, but first
  41. // we listen
  42. private AcceptThread mSecureAcceptThread;
  43. private ConnectThread mConnectThread;
  44. private ConnectedThread mConnectedThread;
  45. // Maximum reconnect attempts
  46. private static final int MAX_RETRY = 2;
  47. /***********************
  48. * Private Members
  49. **********************/
  50. // Current number of reconnect attempts
  51. private int mNumTries;
  52. private int mPort = 0;
  53. private boolean acceptMode = false;
  54. protected int mState;
  55. protected InetSocketAddress mAddress;
  56. public Communication() {
  57. mState = STATE_NONE;
  58. }
  59. public void setTcpNoDelay(boolean on) {
  60. if (mConnectedThread != null)
  61. mConnectedThread.setTcpNoDelay(on);
  62. }
  63. /**
  64. * Set the current state of the connection
  65. *
  66. * @param state
  67. * An integer defining the current connection state
  68. */
  69. protected synchronized void setState(int state) {
  70. if (D)
  71. Util.debug("setState() " + mState + " -> " + state);
  72. mState = state;
  73. }
  74. /**
  75. * Return the current connection state.
  76. */
  77. public synchronized int getState() {
  78. return mState;
  79. }
  80. /**
  81. * Start the communication service. Specifically start AcceptThread to begin
  82. * a session in listening (server) mode.
  83. */
  84. public synchronized void start(int port) {
  85. if (D)
  86. Util.debug("start");
  87. acceptMode = true;
  88. startAcceptThread(port);
  89. mPort = port;
  90. mNumTries = 0;
  91. setState(STATE_LISTEN);
  92. }
  93. private synchronized void startAcceptThread(int port) {
  94. // Cancel any thread attempting to make a connection
  95. if (mConnectThread != null) {
  96. mConnectThread.cancel();
  97. mConnectThread = null;
  98. }
  99. // Cancel any thread currently running a connection
  100. if (mConnectedThread != null) {
  101. mConnectedThread.cancel();
  102. mConnectedThread = null;
  103. }
  104. // Start the thread to listen on a ServerSocket
  105. if (mSecureAcceptThread == null) {
  106. mSecureAcceptThread = new AcceptThread(port);
  107. mSecureAcceptThread.start();
  108. }
  109. }
  110. protected synchronized void retry() {
  111. if (D)
  112. Util.debug("retry");
  113. if (D)
  114. Util.debug("Retrying in state: " + getState());
  115. if (mState == STATE_CONNECTED)
  116. return;
  117. // TODO: Does this logic belong here
  118. if (mNumTries >= MAX_RETRY) {
  119. signalFailed();
  120. if (acceptMode)
  121. start(mPort);
  122. return;
  123. }
  124. startAcceptThread(mPort);
  125. setState(STATE_RETRY);
  126. int sleep = (int) (Math.random() * 1000 + 100);
  127. if (D)
  128. Util.debug("Sleeping: " + sleep);
  129. try {
  130. Thread.sleep(sleep);
  131. } catch (InterruptedException e) {
  132. Util.debug("Sleep interupted");
  133. } // TODO: This may block the main thread?
  134. if (D)
  135. Util.debug("Waking up: " + getState());
  136. // TODO: make this less strict
  137. if (mState != STATE_CONNECTING && mState != STATE_CONNECTED && mConnectedThread == null
  138. && mConnectThread == null)
  139. connect(mAddress);
  140. }
  141. /**
  142. * Start the ConnectThread to initiate a connection to a remote device.
  143. *
  144. * @param address
  145. * The address of the server
  146. * @param secure
  147. * Socket Security type - Secure (true) , Insecure (false)
  148. */
  149. public synchronized void connect(InetSocketAddress address) {
  150. if (D)
  151. Util.disp("connect to: " + address);
  152. // Don't throw out connections if we are already connected
  153. if (mState == STATE_CONNECTING || mConnectedThread != null) {
  154. return;
  155. }
  156. mNumTries++;
  157. mAddress = address;
  158. // Cancel any thread attempting to make a connection
  159. if (mState == STATE_CONNECTING) {
  160. if (mConnectThread != null) {
  161. mConnectThread.cancel();
  162. mConnectThread = null;
  163. }
  164. }
  165. // Cancel any thread currently running a connection
  166. if (mConnectedThread != null) {
  167. mConnectedThread.cancel();
  168. mConnectedThread = null;
  169. }
  170. // Start the thread to connect with the given device
  171. mConnectThread = new ConnectThread(address);
  172. mConnectThread.start();
  173. setState(STATE_CONNECTING);
  174. }
  175. /**
  176. * Start the ConnectedThread to begin managing a connection
  177. *
  178. * @param socket
  179. * The Socket on which the connection was made
  180. */
  181. public synchronized void connected(Socket socket) {
  182. if (D)
  183. Util.debug("connected");
  184. // Cancel the thread that completed the connection
  185. if (mConnectThread != null) {
  186. mConnectThread.cancel();
  187. mConnectThread = null;
  188. }
  189. // Cancel any thread currently running a connection
  190. if (mConnectedThread != null) {
  191. mConnectedThread.cancel();
  192. mConnectedThread = null;
  193. }
  194. // Cancel the accept thread because we only want to connect to one
  195. // device
  196. if (mSecureAcceptThread != null) {
  197. mSecureAcceptThread.cancel();
  198. mSecureAcceptThread = null;
  199. }
  200. // Start the thread to manage the connection and perform transmissions
  201. mConnectedThread = new ConnectedThread(socket);
  202. mConnectedThread.start();
  203. setState(STATE_CONNECTED);
  204. }
  205. protected void connectionFailed() {
  206. Util.error("Connection to the device failed");
  207. // Start the service over to restart listening mode
  208. if (getState() != STATE_STOPPED)
  209. retry();
  210. }
  211. /**
  212. * Indicate that the connection was lost and notify the UI Activity.
  213. */
  214. protected void connectionLost() {
  215. if (D)
  216. Util.error("Connection to the device lost");
  217. // Start the service over to restart listening mode
  218. if (getState() != STATE_STOPPED && acceptMode) {
  219. start(mPort);
  220. }
  221. }
  222. protected void signalFailed() {
  223. // TODO:
  224. }
  225. /**
  226. * Stop all threads
  227. */
  228. public synchronized void stop() {
  229. if (D)
  230. Util.debug("stop");
  231. setState(STATE_STOPPED);
  232. if (mConnectedThread != null) {
  233. mConnectedThread.cancel();
  234. mConnectedThread = null;
  235. }
  236. if (mConnectThread != null) {
  237. mConnectThread.cancel();
  238. mConnectThread = null;
  239. }
  240. if (mSecureAcceptThread != null) {
  241. mSecureAcceptThread.cancel();
  242. mSecureAcceptThread = null;
  243. }
  244. }
  245. /**
  246. * Write to the ConnectedThread in an unsynchronized manner
  247. *
  248. * This does not add message boundries!!
  249. *
  250. * @param out
  251. * The bytes to write
  252. * @see ConnectedThread#write(byte[])
  253. */
  254. public void write(byte[] out) {
  255. // Create temporary object
  256. ConnectedThread r;
  257. // Synchronize a copy of the ConnectedThread
  258. synchronized (this) {
  259. if (mState != STATE_CONNECTED)
  260. return;
  261. r = mConnectedThread;
  262. }
  263. // Perform the write unsynchronized
  264. r.write(out);
  265. }
  266. /**
  267. * Write a length encoded byte array.
  268. *
  269. * @param out
  270. */
  271. public void writeLengthEncoded(byte[] out) {
  272. write("" + out.length);
  273. write(out);
  274. }
  275. public <T> void write(T out) {
  276. write(SerializationUtils.serialize((Serializable) out));
  277. }
  278. public static final Charset defaultCharset = Charset.forName("ASCII");
  279. // TODO: Rather than having millions of write/read methods can we take
  280. // advantage of DataStreams?
  281. public void write(String buffer) {
  282. write(buffer, defaultCharset);
  283. }
  284. /*
  285. * This was added to allow backwords compaitibility with older versions
  286. * which used the default charset (usually utf-8) instead of asc-ii. This is
  287. * almost never what we want to do
  288. */
  289. public void write(String buffer, Charset charset) {
  290. write(buffer.getBytes(charset));
  291. if (D)
  292. Util.debug("Write: " + buffer);
  293. }
  294. /**
  295. * Read a string from Connected Thread
  296. *
  297. * @see #read()
  298. */
  299. public String readString() {
  300. return new String(read());
  301. }
  302. /**
  303. * Read from the ConnectedThread in an unsynchronized manner Note, this is a
  304. * blocking call
  305. *
  306. * @return the bytes read
  307. * @see ConnectedThread#read()
  308. */
  309. public byte[] read() {
  310. // Create temporary object
  311. ConnectedThread r;
  312. // Synchronize a copy of the ConnectedThread
  313. synchronized (this) {
  314. if (mState != STATE_CONNECTED)
  315. return null;
  316. r = mConnectedThread;
  317. }
  318. // Perform the read unsynchronized and parse
  319. byte[] readMessage = r.read();
  320. if (D)
  321. Util.debug("Read: " + new String(readMessage));
  322. return readMessage;
  323. }
  324. /**
  325. * Read a specific number of bytes from the ConnectedThread in an
  326. * unsynchronized manner Note, this is a blocking call
  327. *
  328. * @return the bytes read
  329. * @see ConnectedThread#read()
  330. */
  331. public byte[] readLengthEncoded() {
  332. int len = Integer.parseInt(readString());
  333. ArrayList<byte[]> bytes = new ArrayList<byte[]>();
  334. byte[] data = read();
  335. if (data.length != len) {
  336. bytes.add(data);
  337. data = read();
  338. }
  339. byte[] total = new byte[len];
  340. int offset = 0;
  341. for (byte[] b : bytes) {
  342. for (int i = 0; i < b.length; i++) {
  343. total[offset++] = b[i];
  344. }
  345. }
  346. return total;
  347. }
  348. public <T> T readObject() {
  349. T object = SerializationUtils.deserialize(read());
  350. return object;
  351. }
  352. /**
  353. * This thread runs while listening for incoming connections. It behaves
  354. * like a server-side client. It runs until a connection is accepted (or
  355. * until cancelled).
  356. */
  357. private class AcceptThread extends Thread {
  358. // The local server socket
  359. private final ServerSocket mmServerSocket;
  360. public AcceptThread(int port) {
  361. ServerSocket tmp = null;
  362. try {
  363. tmp = new ServerSocket(port);
  364. } catch (IOException e) {
  365. Util.error("ServerSocket unable to start", e);
  366. }
  367. mmServerSocket = tmp;
  368. }
  369. public void run() {
  370. if (D)
  371. Util.disp("BEGIN mAcceptThread ");
  372. setName("AcceptThread");
  373. Socket socket = null;
  374. // Listen to the server socket if we're not connected
  375. while (mState != STATE_CONNECTED) {
  376. try {
  377. // This is a blocking call and will only return on a
  378. // successful connection or an exception
  379. socket = mmServerSocket.accept();
  380. // socket.setTcpNoDelay(true);
  381. } catch (IOException e) {
  382. Util.error("accept() failed", e);
  383. break;
  384. }
  385. // If a connection was accepted
  386. if (socket != null) {
  387. synchronized (Communication.this) {
  388. switch (mState) {
  389. case STATE_LISTEN:
  390. case STATE_CONNECTING:
  391. // Situation normal. Start the connected thread.
  392. connected(socket);
  393. break;
  394. case STATE_NONE:
  395. case STATE_CONNECTED:
  396. // Either not ready or already connected.
  397. // Terminate new socket.
  398. try {
  399. socket.close();
  400. } catch (IOException e) {
  401. Util.error("Could not close unwanted socket", e);
  402. }
  403. // TODO: Should we really be returning here?
  404. return;
  405. }
  406. }
  407. }
  408. }
  409. if (D)
  410. Util.disp("END mAcceptThread");
  411. }
  412. public void cancel() {
  413. if (D)
  414. Util.debug("AcceptThread canceled " + this);
  415. try {
  416. mmServerSocket.close();
  417. } catch (IOException e) {
  418. Util.error("close() of server failed", e);
  419. }
  420. }
  421. }
  422. /**
  423. * This thread runs while attempting to make an outgoing connection with a
  424. * device. It runs straight through; the connection either succeeds or
  425. * fails.
  426. */
  427. private class ConnectThread extends Thread {
  428. private final Socket mmSocket;
  429. private final InetSocketAddress mmAddress;
  430. public ConnectThread(InetSocketAddress address) {
  431. mmAddress = address;
  432. mmSocket = new Socket();
  433. /*
  434. * try { mmSocket.setTcpNoDelay(true); } catch (SocketException e) {
  435. * e.printStackTrace(); }
  436. */
  437. }
  438. public void run() {
  439. Util.debug("BEGIN mConnectThread");
  440. setName("ConnectThread");
  441. try {
  442. // This is a blocking call and will only return on a
  443. // successful connection or an exception
  444. mmSocket.connect(mmAddress);
  445. } catch (IOException e) {
  446. // Close the socket
  447. try {
  448. mmSocket.close();
  449. } catch (IOException e2) {
  450. Util.error("unable to close() socket during connection failure", e2);
  451. }
  452. connectionFailed();
  453. return;
  454. }
  455. // Reset the ConnectThread because we're done
  456. synchronized (Communication.this) {
  457. mConnectThread = null;
  458. }
  459. // Start the connected thread
  460. connected(mmSocket);
  461. }
  462. public void cancel() {
  463. try {
  464. mmSocket.close();
  465. } catch (IOException e) {
  466. Util.error("close() of connect socket failed", e);
  467. }
  468. }
  469. }
  470. /**
  471. * This thread runs during a connection with a remote device. It handles all
  472. * incoming and outgoing transmissions.
  473. */
  474. private class ConnectedThread extends Thread {
  475. private final Socket mmSocket;
  476. private final DataInputStream mmInStream;
  477. private final DataOutputStream mmOutStream;
  478. private BlockingQueue<byte[]> mMessageBuffer;
  479. public ConnectedThread(Socket socket) {
  480. Util.debug("create ConnectedThread");
  481. mmSocket = socket;
  482. DataInputStream tmpIn = null;
  483. DataOutputStream tmpOut = null;
  484. mMessageBuffer = new LinkedBlockingQueue<byte[]>(); // TODO: add a
  485. // capacity here
  486. // to prevent
  487. // doS
  488. // Get the Socket input and output streams
  489. try {
  490. tmpIn = new DataInputStream(socket.getInputStream());
  491. tmpOut = new DataOutputStream(socket.getOutputStream());
  492. } catch (StreamCorruptedException e) {
  493. Util.error("object streams corrupt", e);
  494. } catch (IOException e) {
  495. Util.error("temp sockets not created", e);
  496. }
  497. mmInStream = tmpIn;
  498. mmOutStream = tmpOut;
  499. }
  500. public void setTcpNoDelay(boolean on) {
  501. if (mmSocket != null)
  502. try {
  503. mmSocket.setTcpNoDelay(on);
  504. } catch (SocketException e) {
  505. e.printStackTrace();
  506. }
  507. }
  508. /**
  509. * Read from the ConnectedThread in an unsynchronized manner
  510. *
  511. * This is a blocking call and will only return data if the readLoop
  512. * flag is false
  513. *
  514. * @return the bytes read
  515. * @see ConnectedThread#read()
  516. */
  517. public byte[] read() {
  518. try {
  519. return mMessageBuffer.take();
  520. } catch (InterruptedException e) {
  521. Util.error("Message Read Interupted");
  522. return null;
  523. }
  524. }
  525. /**
  526. * Write to the connected OutStream.
  527. *
  528. * @param buffer
  529. * The bytes to write
  530. */
  531. public void write(byte[] buffer) {
  532. try {
  533. mmOutStream.writeInt(buffer.length);
  534. mmOutStream.write(buffer);
  535. mmOutStream.flush();
  536. } catch (IOException e) {
  537. Util.error("Exception during write", e);
  538. }
  539. }
  540. public void run() {
  541. Util.disp("BEGIN mConnectedThread");
  542. int bytes;
  543. // Keep listening to the InputStream while connected
  544. while (true) {
  545. try {
  546. // Read from the InputStream
  547. bytes = mmInStream.readInt();
  548. byte[] buffer = new byte[bytes]; // TODO: This is a little
  549. // dangerous
  550. mmInStream.readFully(buffer, 0, bytes);
  551. try {
  552. mMessageBuffer.put(buffer);
  553. } catch (InterruptedException e) {
  554. Util.error("Message add interupted.");
  555. // TODO: possibly move this catch elsewhere
  556. }
  557. } catch (IOException e) {
  558. if (D)
  559. Util.debug("Device disconnected");
  560. connectionLost();
  561. break;
  562. }
  563. }
  564. }
  565. public void cancel() {
  566. try {
  567. mmInStream.close();
  568. mmOutStream.close();
  569. mmSocket.close();
  570. } catch (IOException e) {
  571. Util.error("close() of connect socket failed", e);
  572. }
  573. }
  574. }
  575. }