Communication.java 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729
  1. package communication;
  2. import java.io.DataInputStream;
  3. import java.io.DataOutputStream;
  4. import java.io.IOException;
  5. import java.io.Serializable;
  6. import java.io.StreamCorruptedException;
  7. import java.math.BigInteger;
  8. import java.net.InetSocketAddress;
  9. import java.net.ServerSocket;
  10. import java.net.Socket;
  11. import java.net.SocketException;
  12. import java.nio.charset.Charset;
  13. import java.util.ArrayList;
  14. import java.util.concurrent.BlockingQueue;
  15. import java.util.concurrent.LinkedBlockingQueue;
  16. import org.apache.commons.lang3.SerializationUtils;
  17. import util.Util;
  18. /**
  19. * Basic Usage
  20. *
  21. * 1. Call {@link #start(int)} or {@link #connect(InetSocketAddress)} to
  22. * initiate a connection 2. Wait for {@link #getState()} to return
  23. * {@link #STATE_CONNECTED} 3. {@link #write(byte[])} and {@link #read()}
  24. * messages. 4. Close the connection with {@link #stop()}. NOTE: This may
  25. * invalidate unread data
  26. *
  27. * Alternatively, you can always call start. And the first side of the
  28. * connection to call connect will win.
  29. */
  30. public class Communication {
  31. public static boolean D = false;
  32. // Constants that indicate the current connection state
  33. public static final int STATE_NONE = 0; // we're doing nothing
  34. public static final int STATE_LISTEN = 1; // now listening for incoming
  35. // connections
  36. public static final int STATE_CONNECTING = 2; // now initiating an outgoing
  37. // connection
  38. public static final int STATE_CONNECTED = 3; // now connected to a remote
  39. // device
  40. public static final int STATE_STOPPED = 4; // we're shutting things down
  41. public static final int STATE_RETRY = 5; // we are going to retry, but first
  42. // we listen
  43. private AcceptThread mSecureAcceptThread;
  44. private ConnectThread mConnectThread;
  45. private ConnectedThread mConnectedThread;
  46. // Maximum reconnect attempts
  47. private static final int MAX_RETRY = 2;
  48. /***********************
  49. * Private Members
  50. **********************/
  51. // Current number of reconnect attempts
  52. private int mNumTries;
  53. private int mPort = 0;
  54. private boolean acceptMode = false;
  55. protected int mState;
  56. protected InetSocketAddress mAddress;
  57. public Communication() {
  58. mState = STATE_NONE;
  59. }
  60. public void setTcpNoDelay(boolean on) {
  61. if (mConnectedThread != null)
  62. mConnectedThread.setTcpNoDelay(on);
  63. }
  64. /**
  65. * Set the current state of the connection
  66. *
  67. * @param state
  68. * An integer defining the current connection state
  69. */
  70. protected synchronized void setState(int state) {
  71. if (D)
  72. Util.debug("setState() " + mState + " -> " + state);
  73. mState = state;
  74. }
  75. /**
  76. * Return the current connection state.
  77. */
  78. public synchronized int getState() {
  79. return mState;
  80. }
  81. /**
  82. * Start the communication service. Specifically start AcceptThread to begin
  83. * a session in listening (server) mode.
  84. */
  85. public synchronized void start(int port) {
  86. if (D)
  87. Util.debug("start");
  88. acceptMode = true;
  89. startAcceptThread(port);
  90. mPort = port;
  91. mNumTries = 0;
  92. setState(STATE_LISTEN);
  93. }
  94. private synchronized void startAcceptThread(int port) {
  95. // Cancel any thread attempting to make a connection
  96. if (mConnectThread != null) {
  97. mConnectThread.cancel();
  98. mConnectThread = null;
  99. }
  100. // Cancel any thread currently running a connection
  101. if (mConnectedThread != null) {
  102. mConnectedThread.cancel();
  103. mConnectedThread = null;
  104. }
  105. // Start the thread to listen on a ServerSocket
  106. if (mSecureAcceptThread == null) {
  107. mSecureAcceptThread = new AcceptThread(port);
  108. mSecureAcceptThread.start();
  109. }
  110. }
  111. protected synchronized void retry() {
  112. if (D)
  113. Util.debug("retry");
  114. if (D)
  115. Util.debug("Retrying in state: " + getState());
  116. if (mState == STATE_CONNECTED)
  117. return;
  118. // TODO: Does this logic belong here
  119. if (mNumTries >= MAX_RETRY) {
  120. signalFailed();
  121. if (acceptMode)
  122. start(mPort);
  123. return;
  124. }
  125. startAcceptThread(mPort);
  126. setState(STATE_RETRY);
  127. int sleep = (int) (Math.random() * 1000 + 100);
  128. if (D)
  129. Util.debug("Sleeping: " + sleep);
  130. try {
  131. Thread.sleep(sleep);
  132. } catch (InterruptedException e) {
  133. Util.debug("Sleep interupted");
  134. } // TODO: This may block the main thread?
  135. if (D)
  136. Util.debug("Waking up: " + getState());
  137. // TODO: make this less strict
  138. if (mState != STATE_CONNECTING && mState != STATE_CONNECTED && mConnectedThread == null
  139. && mConnectThread == null)
  140. connect(mAddress);
  141. }
  142. /**
  143. * Start the ConnectThread to initiate a connection to a remote device.
  144. *
  145. * @param address
  146. * The address of the server
  147. * @param secure
  148. * Socket Security type - Secure (true) , Insecure (false)
  149. */
  150. public synchronized void connect(InetSocketAddress address) {
  151. if (D)
  152. Util.disp("connect to: " + address);
  153. // Don't throw out connections if we are already connected
  154. if (mState == STATE_CONNECTING || mConnectedThread != null) {
  155. return;
  156. }
  157. mNumTries++;
  158. mAddress = address;
  159. // Cancel any thread attempting to make a connection
  160. if (mState == STATE_CONNECTING) {
  161. if (mConnectThread != null) {
  162. mConnectThread.cancel();
  163. mConnectThread = null;
  164. }
  165. }
  166. // Cancel any thread currently running a connection
  167. if (mConnectedThread != null) {
  168. mConnectedThread.cancel();
  169. mConnectedThread = null;
  170. }
  171. // Start the thread to connect with the given device
  172. mConnectThread = new ConnectThread(address);
  173. mConnectThread.start();
  174. setState(STATE_CONNECTING);
  175. }
  176. /**
  177. * Start the ConnectedThread to begin managing a connection
  178. *
  179. * @param socket
  180. * The Socket on which the connection was made
  181. */
  182. public synchronized void connected(Socket socket) {
  183. if (D)
  184. Util.debug("connected");
  185. // Cancel the thread that completed the connection
  186. if (mConnectThread != null) {
  187. mConnectThread.cancel();
  188. mConnectThread = null;
  189. }
  190. // Cancel any thread currently running a connection
  191. if (mConnectedThread != null) {
  192. mConnectedThread.cancel();
  193. mConnectedThread = null;
  194. }
  195. // Cancel the accept thread because we only want to connect to one
  196. // device
  197. if (mSecureAcceptThread != null) {
  198. mSecureAcceptThread.cancel();
  199. mSecureAcceptThread = null;
  200. }
  201. // Start the thread to manage the connection and perform transmissions
  202. mConnectedThread = new ConnectedThread(socket);
  203. mConnectedThread.start();
  204. setState(STATE_CONNECTED);
  205. }
  206. protected void connectionFailed() {
  207. Util.error("Connection to the device failed");
  208. // Start the service over to restart listening mode
  209. if (getState() != STATE_STOPPED)
  210. retry();
  211. }
  212. /**
  213. * Indicate that the connection was lost and notify the UI Activity.
  214. */
  215. protected void connectionLost() {
  216. if (D)
  217. Util.error("Connection to the device lost");
  218. // Start the service over to restart listening mode
  219. if (getState() != STATE_STOPPED && acceptMode) {
  220. start(mPort);
  221. }
  222. }
  223. protected void signalFailed() {
  224. // TODO:
  225. }
  226. /**
  227. * Stop all threads
  228. */
  229. public synchronized void stop() {
  230. if (D)
  231. Util.debug("stop");
  232. setState(STATE_STOPPED);
  233. if (mConnectedThread != null) {
  234. mConnectedThread.cancel();
  235. mConnectedThread = null;
  236. }
  237. if (mConnectThread != null) {
  238. mConnectThread.cancel();
  239. mConnectThread = null;
  240. }
  241. if (mSecureAcceptThread != null) {
  242. mSecureAcceptThread.cancel();
  243. mSecureAcceptThread = null;
  244. }
  245. }
  246. /**
  247. * Write to the ConnectedThread in an unsynchronized manner
  248. *
  249. * This does not add message boundries!!
  250. *
  251. * @param out
  252. * The bytes to write
  253. * @see ConnectedThread#write(byte[])
  254. */
  255. public void write(byte[] out) {
  256. // Create temporary object
  257. ConnectedThread r;
  258. // Synchronize a copy of the ConnectedThread
  259. synchronized (this) {
  260. if (mState != STATE_CONNECTED)
  261. return;
  262. r = mConnectedThread;
  263. }
  264. // Perform the write unsynchronized
  265. r.write(out);
  266. }
  267. /**
  268. * Write a length encoded byte array.
  269. *
  270. * @param out
  271. */
  272. public void writeLengthEncoded(byte[] out) {
  273. write("" + out.length);
  274. write(out);
  275. }
  276. public void write(int out) {
  277. write(BigInteger.valueOf(out).toByteArray());
  278. }
  279. public void write(byte[][] out) {
  280. write(out.length);
  281. for (int i = 0; i < out.length; i++)
  282. write(out[i]);
  283. }
  284. public void write(int[] out) {
  285. write(out.length);
  286. for (int i = 0; i < out.length; i++)
  287. write(out[i]);
  288. }
  289. public <T> void write(T out) {
  290. write(SerializationUtils.serialize((Serializable) out));
  291. }
  292. public <T> void write(T[] out) {
  293. write(out.length);
  294. for (int i = 0; i < out.length; i++)
  295. write(out[i]);
  296. }
  297. public static final Charset defaultCharset = Charset.forName("ASCII");
  298. // TODO: Rather than having millions of write/read methods can we take
  299. // advantage of DataStreams?
  300. public void write(String buffer) {
  301. write(buffer, defaultCharset);
  302. }
  303. /*
  304. * This was added to allow backwords compaitibility with older versions
  305. * which used the default charset (usually utf-8) instead of asc-ii. This is
  306. * almost never what we want to do
  307. */
  308. public void write(String buffer, Charset charset) {
  309. write(buffer.getBytes(charset));
  310. if (D)
  311. Util.debug("Write: " + buffer);
  312. }
  313. /**
  314. * Read a string from Connected Thread
  315. *
  316. * @see #read()
  317. */
  318. public String readString() {
  319. return new String(read());
  320. }
  321. /**
  322. * Read from the ConnectedThread in an unsynchronized manner Note, this is a
  323. * blocking call
  324. *
  325. * @return the bytes read
  326. * @see ConnectedThread#read()
  327. */
  328. public byte[] read() {
  329. // Create temporary object
  330. ConnectedThread r;
  331. // Synchronize a copy of the ConnectedThread
  332. synchronized (this) {
  333. if (mState != STATE_CONNECTED)
  334. return null;
  335. r = mConnectedThread;
  336. }
  337. // Perform the read unsynchronized and parse
  338. byte[] readMessage = r.read();
  339. if (D)
  340. Util.debug("Read: " + new String(readMessage));
  341. return readMessage;
  342. }
  343. /**
  344. * Read a specific number of bytes from the ConnectedThread in an
  345. * unsynchronized manner Note, this is a blocking call
  346. *
  347. * @return the bytes read
  348. * @see ConnectedThread#read()
  349. */
  350. public byte[] readLengthEncoded() {
  351. int len = Integer.parseInt(readString());
  352. ArrayList<byte[]> bytes = new ArrayList<byte[]>();
  353. byte[] data = read();
  354. if (data.length != len) {
  355. bytes.add(data);
  356. data = read();
  357. }
  358. byte[] total = new byte[len];
  359. int offset = 0;
  360. for (byte[] b : bytes) {
  361. for (int i = 0; i < b.length; i++) {
  362. total[offset++] = b[i];
  363. }
  364. }
  365. return total;
  366. }
  367. public int readInt() {
  368. return new BigInteger(read()).intValue();
  369. }
  370. public byte[][] readDoubleByteArray() {
  371. int len = readInt();
  372. byte[][] data = new byte[len][];
  373. for (int i = 0; i < len; i++)
  374. data[i] = read();
  375. return data;
  376. }
  377. public int[] readIntArray() {
  378. int len = readInt();
  379. int[] data = new int[len];
  380. for (int i = 0; i < len; i++)
  381. data[i] = readInt();
  382. return data;
  383. }
  384. public <T> T readObject() {
  385. T object = SerializationUtils.deserialize(read());
  386. return object;
  387. }
  388. public <T> T[] readObjectArray() {
  389. int len = readInt();
  390. @SuppressWarnings("unchecked")
  391. T[] data = (T[]) new Object[len];
  392. for (int i = 0; i < len; i++)
  393. data[i] = readObject();
  394. return data;
  395. }
  396. /**
  397. * This thread runs while listening for incoming connections. It behaves
  398. * like a server-side client. It runs until a connection is accepted (or
  399. * until cancelled).
  400. */
  401. private class AcceptThread extends Thread {
  402. // The local server socket
  403. private final ServerSocket mmServerSocket;
  404. public AcceptThread(int port) {
  405. ServerSocket tmp = null;
  406. try {
  407. tmp = new ServerSocket(port);
  408. } catch (IOException e) {
  409. Util.error("ServerSocket unable to start", e);
  410. }
  411. mmServerSocket = tmp;
  412. }
  413. public void run() {
  414. if (D)
  415. Util.disp("BEGIN mAcceptThread ");
  416. setName("AcceptThread");
  417. Socket socket = null;
  418. // Listen to the server socket if we're not connected
  419. while (mState != STATE_CONNECTED) {
  420. try {
  421. // This is a blocking call and will only return on a
  422. // successful connection or an exception
  423. socket = mmServerSocket.accept();
  424. // socket.setTcpNoDelay(true);
  425. } catch (IOException e) {
  426. Util.error("accept() failed", e);
  427. break;
  428. }
  429. // If a connection was accepted
  430. if (socket != null) {
  431. synchronized (Communication.this) {
  432. switch (mState) {
  433. case STATE_LISTEN:
  434. case STATE_CONNECTING:
  435. // Situation normal. Start the connected thread.
  436. connected(socket);
  437. break;
  438. case STATE_NONE:
  439. case STATE_CONNECTED:
  440. // Either not ready or already connected.
  441. // Terminate new socket.
  442. try {
  443. socket.close();
  444. } catch (IOException e) {
  445. Util.error("Could not close unwanted socket", e);
  446. }
  447. // TODO: Should we really be returning here?
  448. return;
  449. }
  450. }
  451. }
  452. }
  453. if (D)
  454. Util.disp("END mAcceptThread");
  455. }
  456. public void cancel() {
  457. if (D)
  458. Util.debug("AcceptThread canceled " + this);
  459. try {
  460. mmServerSocket.close();
  461. } catch (IOException e) {
  462. Util.error("close() of server failed", e);
  463. }
  464. }
  465. }
  466. /**
  467. * This thread runs while attempting to make an outgoing connection with a
  468. * device. It runs straight through; the connection either succeeds or
  469. * fails.
  470. */
  471. private class ConnectThread extends Thread {
  472. private final Socket mmSocket;
  473. private final InetSocketAddress mmAddress;
  474. public ConnectThread(InetSocketAddress address) {
  475. mmAddress = address;
  476. mmSocket = new Socket();
  477. /*
  478. * try { mmSocket.setTcpNoDelay(true); } catch (SocketException e) {
  479. * e.printStackTrace(); }
  480. */
  481. }
  482. public void run() {
  483. Util.debug("BEGIN mConnectThread");
  484. setName("ConnectThread");
  485. try {
  486. // This is a blocking call and will only return on a
  487. // successful connection or an exception
  488. mmSocket.connect(mmAddress);
  489. } catch (IOException e) {
  490. // Close the socket
  491. try {
  492. mmSocket.close();
  493. } catch (IOException e2) {
  494. Util.error("unable to close() socket during connection failure", e2);
  495. }
  496. connectionFailed();
  497. return;
  498. }
  499. // Reset the ConnectThread because we're done
  500. synchronized (Communication.this) {
  501. mConnectThread = null;
  502. }
  503. // Start the connected thread
  504. connected(mmSocket);
  505. }
  506. public void cancel() {
  507. try {
  508. mmSocket.close();
  509. } catch (IOException e) {
  510. Util.error("close() of connect socket failed", e);
  511. }
  512. }
  513. }
  514. /**
  515. * This thread runs during a connection with a remote device. It handles all
  516. * incoming and outgoing transmissions.
  517. */
  518. private class ConnectedThread extends Thread {
  519. private final Socket mmSocket;
  520. private final DataInputStream mmInStream;
  521. private final DataOutputStream mmOutStream;
  522. private BlockingQueue<byte[]> mMessageBuffer;
  523. public ConnectedThread(Socket socket) {
  524. Util.debug("create ConnectedThread");
  525. mmSocket = socket;
  526. DataInputStream tmpIn = null;
  527. DataOutputStream tmpOut = null;
  528. mMessageBuffer = new LinkedBlockingQueue<byte[]>(); // TODO: add a
  529. // capacity here
  530. // to prevent
  531. // doS
  532. // Get the Socket input and output streams
  533. try {
  534. tmpIn = new DataInputStream(socket.getInputStream());
  535. tmpOut = new DataOutputStream(socket.getOutputStream());
  536. } catch (StreamCorruptedException e) {
  537. Util.error("object streams corrupt", e);
  538. } catch (IOException e) {
  539. Util.error("temp sockets not created", e);
  540. }
  541. mmInStream = tmpIn;
  542. mmOutStream = tmpOut;
  543. }
  544. public void setTcpNoDelay(boolean on) {
  545. if (mmSocket != null)
  546. try {
  547. mmSocket.setTcpNoDelay(on);
  548. } catch (SocketException e) {
  549. e.printStackTrace();
  550. }
  551. }
  552. /**
  553. * Read from the ConnectedThread in an unsynchronized manner
  554. *
  555. * This is a blocking call and will only return data if the readLoop
  556. * flag is false
  557. *
  558. * @return the bytes read
  559. * @see ConnectedThread#read()
  560. */
  561. public byte[] read() {
  562. try {
  563. return mMessageBuffer.take();
  564. } catch (InterruptedException e) {
  565. Util.error("Message Read Interupted");
  566. return null;
  567. }
  568. }
  569. /**
  570. * Write to the connected OutStream.
  571. *
  572. * @param buffer
  573. * The bytes to write
  574. */
  575. public void write(byte[] buffer) {
  576. try {
  577. mmOutStream.writeInt(buffer.length);
  578. mmOutStream.write(buffer);
  579. mmOutStream.flush();
  580. } catch (IOException e) {
  581. Util.error("Exception during write", e);
  582. }
  583. }
  584. public void run() {
  585. Util.disp("BEGIN mConnectedThread");
  586. int bytes;
  587. // Keep listening to the InputStream while connected
  588. while (true) {
  589. try {
  590. // Read from the InputStream
  591. bytes = mmInStream.readInt();
  592. byte[] buffer = new byte[bytes]; // TODO: This is a little
  593. // dangerous
  594. mmInStream.readFully(buffer, 0, bytes);
  595. try {
  596. mMessageBuffer.put(buffer);
  597. } catch (InterruptedException e) {
  598. Util.error("Message add interupted.");
  599. // TODO: possibly move this catch elsewhere
  600. }
  601. } catch (IOException e) {
  602. if (D)
  603. Util.debug("Device disconnected");
  604. connectionLost();
  605. break;
  606. }
  607. }
  608. }
  609. public void cancel() {
  610. try {
  611. mmInStream.close();
  612. mmOutStream.close();
  613. mmSocket.close();
  614. } catch (IOException e) {
  615. Util.error("close() of connect socket failed", e);
  616. }
  617. }
  618. }
  619. }