Communication.java 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990
  1. package communication;
  2. import java.io.DataInputStream;
  3. import java.io.DataOutputStream;
  4. import java.io.IOException;
  5. import java.io.Serializable;
  6. import java.io.StreamCorruptedException;
  7. import java.math.BigInteger;
  8. import java.net.InetSocketAddress;
  9. import java.net.ServerSocket;
  10. import java.net.Socket;
  11. import java.net.SocketException;
  12. import java.nio.charset.Charset;
  13. import java.util.ArrayList;
  14. import java.util.concurrent.BlockingQueue;
  15. import java.util.concurrent.LinkedBlockingQueue;
  16. import org.apache.commons.lang3.SerializationUtils;
  17. import com.oblivm.backend.gc.GCSignal;
  18. import crypto.SimpleAES;
  19. import oram.Bucket;
  20. import oram.Global;
  21. import oram.Tuple;
  22. import util.Bandwidth;
  23. import util.StopWatch;
  24. import util.Util;
  25. /**
  26. * Basic Usage
  27. *
  28. * 1. Call {@link #start(int)} or {@link #connect(InetSocketAddress)} to
  29. * initiate a connection 2. Wait for {@link #getState()} to return
  30. * {@link #STATE_CONNECTED} 3. {@link #write(byte[])} and {@link #read()}
  31. * messages. 4. Close the connection with {@link #stop()}. NOTE: This may
  32. * invalidate unread data
  33. *
  34. * Alternatively, you can always call start. And the first side of the
  35. * connection to call connect will win.
  36. */
  37. public class Communication {
  38. public static boolean D = false;
  39. // Constants that indicate the current connection state
  40. public static final int STATE_NONE = 0; // we're doing nothing
  41. public static final int STATE_LISTEN = 1; // now listening for incoming
  42. // connections
  43. public static final int STATE_CONNECTING = 2; // now initiating an outgoing
  44. // connection
  45. public static final int STATE_CONNECTED = 3; // now connected to a remote
  46. // device
  47. public static final int STATE_STOPPED = 4; // we're shutting things down
  48. public static final int STATE_RETRY = 5; // we are going to retry, but first
  49. // we listen
  50. private AcceptThread mSecureAcceptThread;
  51. private ConnectThread mConnectThread;
  52. private ConnectedThread mConnectedThread;
  53. // Maximum reconnect attempts
  54. private static final int MAX_RETRY = 2;
  55. /***********************
  56. * Private Members
  57. **********************/
  58. // Current number of reconnect attempts
  59. private int mNumTries;
  60. private int mPort = 0;
  61. private boolean acceptMode = false;
  62. protected int mState;
  63. protected InetSocketAddress mAddress;
  64. // TODO: enable link encryption and remove manual AES Enc
  65. private static SimpleAES aes = new SimpleAES();
  66. public StopWatch comEnc = new StopWatch("ComEnc_comp");
  67. public Communication() {
  68. mState = STATE_NONE;
  69. }
  70. public void setTcpNoDelay(boolean on) {
  71. if (mConnectedThread != null)
  72. mConnectedThread.setTcpNoDelay(on);
  73. }
  74. /**
  75. * Set the current state of the connection
  76. *
  77. * @param state
  78. * An integer defining the current connection state
  79. */
  80. protected synchronized void setState(int state) {
  81. if (D)
  82. Util.debug("setState() " + mState + " -> " + state);
  83. mState = state;
  84. }
  85. /**
  86. * Return the current connection state.
  87. */
  88. public synchronized int getState() {
  89. return mState;
  90. }
  91. /**
  92. * Start the communication service. Specifically start AcceptThread to begin a
  93. * session in listening (server) mode.
  94. */
  95. public synchronized void start(int port) {
  96. if (D)
  97. Util.debug("start");
  98. acceptMode = true;
  99. startAcceptThread(port);
  100. mPort = port;
  101. mNumTries = 0;
  102. setState(STATE_LISTEN);
  103. }
  104. private synchronized void startAcceptThread(int port) {
  105. // Cancel any thread attempting to make a connection
  106. if (mConnectThread != null) {
  107. mConnectThread.cancel();
  108. mConnectThread = null;
  109. }
  110. // Cancel any thread currently running a connection
  111. if (mConnectedThread != null) {
  112. mConnectedThread.cancel();
  113. mConnectedThread = null;
  114. }
  115. // Start the thread to listen on a ServerSocket
  116. if (mSecureAcceptThread == null) {
  117. mSecureAcceptThread = new AcceptThread(port);
  118. mSecureAcceptThread.start();
  119. }
  120. }
  121. protected synchronized void retry() {
  122. if (D)
  123. Util.debug("retry");
  124. if (D)
  125. Util.debug("Retrying in state: " + getState());
  126. if (mState == STATE_CONNECTED)
  127. return;
  128. // TODO: Does this logic belong here
  129. if (mNumTries >= MAX_RETRY) {
  130. signalFailed();
  131. if (acceptMode)
  132. start(mPort);
  133. return;
  134. }
  135. startAcceptThread(mPort);
  136. setState(STATE_RETRY);
  137. int sleep = (int) (Math.random() * 1000 + 100);
  138. if (D)
  139. Util.debug("Sleeping: " + sleep);
  140. try {
  141. Thread.sleep(sleep);
  142. } catch (InterruptedException e) {
  143. Util.debug("Sleep interupted");
  144. } // TODO: This may block the main thread?
  145. if (D)
  146. Util.debug("Waking up: " + getState());
  147. // TODO: make this less strict
  148. if (mState != STATE_CONNECTING && mState != STATE_CONNECTED && mConnectedThread == null
  149. && mConnectThread == null)
  150. connect(mAddress);
  151. }
  152. /**
  153. * Start the ConnectThread to initiate a connection to a remote device.
  154. *
  155. * @param address
  156. * The address of the server
  157. * @param secure
  158. * Socket Security type - Secure (true) , Insecure (false)
  159. */
  160. public synchronized void connect(InetSocketAddress address) {
  161. if (D)
  162. Util.disp("connect to: " + address);
  163. // Don't throw out connections if we are already connected
  164. /*
  165. * if (mState == STATE_CONNECTING || mConnectedThread != null) { return; }
  166. */
  167. mNumTries++;
  168. mAddress = address;
  169. // Cancel any thread attempting to make a connection
  170. if (mState == STATE_CONNECTING) {
  171. if (mConnectThread != null) {
  172. mConnectThread.cancel();
  173. mConnectThread = null;
  174. }
  175. }
  176. // Cancel any thread currently running a connection
  177. if (mConnectedThread != null) {
  178. mConnectedThread.cancel();
  179. mConnectedThread = null;
  180. }
  181. // Start the thread to connect with the given device
  182. mConnectThread = new ConnectThread(address);
  183. mConnectThread.start();
  184. setState(STATE_CONNECTING);
  185. }
  186. /**
  187. * Start the ConnectedThread to begin managing a connection
  188. *
  189. * @param socket
  190. * The Socket on which the connection was made
  191. */
  192. public synchronized void connected(Socket socket) {
  193. if (D)
  194. Util.debug("connected");
  195. // Cancel the thread that completed the connection
  196. if (mConnectThread != null) {
  197. mConnectThread.cancel();
  198. mConnectThread = null;
  199. }
  200. // Cancel any thread currently running a connection
  201. if (mConnectedThread != null) {
  202. mConnectedThread.cancel();
  203. mConnectedThread = null;
  204. }
  205. // Cancel the accept thread because we only want to connect to one
  206. // device
  207. if (mSecureAcceptThread != null) {
  208. mSecureAcceptThread.cancel();
  209. mSecureAcceptThread = null;
  210. }
  211. // Start the thread to manage the connection and perform transmissions
  212. mConnectedThread = new ConnectedThread(socket);
  213. mConnectedThread.start();
  214. setState(STATE_CONNECTED);
  215. }
  216. protected void connectionFailed() {
  217. Util.error("Connection to the device failed");
  218. // Start the service over to restart listening mode
  219. if (getState() != STATE_STOPPED)
  220. retry();
  221. }
  222. /**
  223. * Indicate that the connection was lost and notify the UI Activity.
  224. */
  225. protected void connectionLost() {
  226. if (D)
  227. Util.error("Connection to the device lost");
  228. // Start the service over to restart listening mode
  229. if (getState() != STATE_STOPPED && acceptMode) {
  230. start(mPort);
  231. }
  232. }
  233. protected void signalFailed() {
  234. // TODO:
  235. }
  236. /**
  237. * Stop all threads
  238. */
  239. public synchronized void stop() {
  240. if (D)
  241. Util.debug("stop");
  242. setState(STATE_STOPPED);
  243. if (mConnectedThread != null) {
  244. mConnectedThread.cancel();
  245. mConnectedThread = null;
  246. }
  247. if (mConnectThread != null) {
  248. mConnectThread.cancel();
  249. mConnectThread = null;
  250. }
  251. if (mSecureAcceptThread != null) {
  252. mSecureAcceptThread.cancel();
  253. mSecureAcceptThread = null;
  254. }
  255. }
  256. /**
  257. * Write to the ConnectedThread in an unsynchronized manner
  258. *
  259. * This does not add message boundries!!
  260. *
  261. * @param out
  262. * The bytes to write
  263. * @see ConnectedThread#write(byte[])
  264. */
  265. public void write(byte[] out) {
  266. // Create temporary object
  267. ConnectedThread r;
  268. // Synchronize a copy of the ConnectedThread
  269. synchronized (this) {
  270. if (mState != STATE_CONNECTED)
  271. return;
  272. r = mConnectedThread;
  273. }
  274. // Perform the write unsynchronized
  275. r.write(out);
  276. }
  277. public void write(Bandwidth bandwidth, byte[] out) {
  278. comEnc.start();
  279. out = aes.encrypt(out);
  280. comEnc.stop();
  281. write(out);
  282. if (Global.bandSwitch)
  283. bandwidth.add(out.length);
  284. }
  285. /**
  286. * Write a length encoded byte array.
  287. *
  288. * @param out
  289. */
  290. public void writeLengthEncoded(byte[] out) {
  291. write("" + out.length);
  292. write(out);
  293. }
  294. public <T> void write(T out) {
  295. write(SerializationUtils.serialize((Serializable) out));
  296. }
  297. public <T> void write(Bandwidth bandwidth, T out) {
  298. write(bandwidth, SerializationUtils.serialize((Serializable) out));
  299. }
  300. public void write(BigInteger b) {
  301. write(b.toByteArray());
  302. }
  303. public void write(Bandwidth bandwidth, BigInteger b) {
  304. write(bandwidth, b.toByteArray());
  305. }
  306. public void write(int n) {
  307. write(BigInteger.valueOf(n));
  308. }
  309. public void write(Bandwidth bandwidth, int n) {
  310. write(bandwidth, BigInteger.valueOf(n));
  311. }
  312. public void write(long n) {
  313. write(BigInteger.valueOf(n));
  314. }
  315. public void write(Bandwidth bandwidth, long n) {
  316. write(bandwidth, BigInteger.valueOf(n));
  317. }
  318. public void write(byte[][] arr) {
  319. write(ComUtil.serialize(arr));
  320. }
  321. public void write(Bandwidth bandwidth, byte[][] arr) {
  322. write(bandwidth, ComUtil.serialize(arr));
  323. }
  324. public void write(byte[][][] arr) {
  325. write(ComUtil.serialize(arr));
  326. }
  327. public void write(Bandwidth bandwidth, byte[][][] arr) {
  328. write(bandwidth, ComUtil.serialize(arr));
  329. }
  330. public void write(int[] arr) {
  331. write(ComUtil.serialize(arr));
  332. }
  333. public void write(Bandwidth bandwidth, int[] arr) {
  334. write(bandwidth, ComUtil.serialize(arr));
  335. }
  336. public void write(int[][] arr) {
  337. write(ComUtil.serialize(arr));
  338. }
  339. public void write(Bandwidth bandwidth, int[][] arr) {
  340. write(bandwidth, ComUtil.serialize(arr));
  341. }
  342. public void write(Tuple t) {
  343. write(ComUtil.serialize(t));
  344. }
  345. public void write(Bandwidth bandwidth, Tuple t) {
  346. write(bandwidth, ComUtil.serialize(t));
  347. }
  348. public void write(Tuple[] arr) {
  349. write(ComUtil.serialize(arr));
  350. }
  351. public void write(Bandwidth bandwidth, Tuple[] arr) {
  352. write(bandwidth, ComUtil.serialize(arr));
  353. }
  354. public void write(Bucket b) {
  355. write(b.getTuples());
  356. }
  357. public void write(Bandwidth bandwidth, Bucket b) {
  358. write(bandwidth, b.getTuples());
  359. }
  360. public void write(Bucket[] arr) {
  361. write(ComUtil.serialize(arr));
  362. }
  363. public void write(Bandwidth bandwidth, Bucket[] arr) {
  364. write(bandwidth, ComUtil.serialize(arr));
  365. }
  366. public void write(GCSignal key) {
  367. write(key.bytes);
  368. }
  369. public void write(Bandwidth bandwidth, GCSignal key) {
  370. write(bandwidth, key.bytes);
  371. }
  372. public void write(GCSignal[] arr) {
  373. write(ComUtil.serialize(arr));
  374. }
  375. public void write(Bandwidth bandwidth, GCSignal[] arr) {
  376. write(bandwidth, ComUtil.serialize(arr));
  377. }
  378. public void write(GCSignal[][] arr) {
  379. write(ComUtil.serialize(arr));
  380. }
  381. public void write(Bandwidth bandwidth, GCSignal[][] arr) {
  382. write(bandwidth, ComUtil.serialize(arr));
  383. }
  384. public void write(GCSignal[][][] arr) {
  385. write(ComUtil.serialize(arr));
  386. }
  387. public void write(Bandwidth bandwidth, GCSignal[][][] arr) {
  388. write(bandwidth, ComUtil.serialize(arr));
  389. }
  390. public void write(GCSignal[][][][] arr) {
  391. write(ComUtil.serialize(arr));
  392. }
  393. public void write(Bandwidth bandwidth, GCSignal[][][][] arr) {
  394. write(bandwidth, ComUtil.serialize(arr));
  395. }
  396. public void write(ArrayList<byte[]> arr) {
  397. write(ComUtil.serialize(arr));
  398. }
  399. public void write(Bandwidth bandwidth, ArrayList<byte[]> arr) {
  400. write(bandwidth, ComUtil.serialize(arr));
  401. }
  402. public static final Charset defaultCharset = Charset.forName("ASCII");
  403. // TODO: Rather than having millions of write/read methods can we take
  404. // advantage of DataStreams?
  405. public void write(String buffer) {
  406. write(buffer, defaultCharset);
  407. }
  408. /*
  409. * This was added to allow backwords compaitibility with older versions which
  410. * used the default charset (usually utf-8) instead of asc-ii. This is almost
  411. * never what we want to do
  412. */
  413. public void write(String buffer, Charset charset) {
  414. write(buffer.getBytes(charset));
  415. if (D)
  416. Util.debug("Write: " + buffer);
  417. }
  418. /**
  419. * Read a string from Connected Thread
  420. *
  421. * @see #read()
  422. */
  423. public String readString() {
  424. return new String(read());
  425. }
  426. /**
  427. * Read from the ConnectedThread in an unsynchronized manner Note, this is a
  428. * blocking call
  429. *
  430. * @return the bytes read
  431. * @see ConnectedThread#read()
  432. */
  433. public byte[] read() {
  434. // Create temporary object
  435. ConnectedThread r;
  436. // Synchronize a copy of the ConnectedThread
  437. synchronized (this) {
  438. if (mState != STATE_CONNECTED)
  439. return null;
  440. r = mConnectedThread;
  441. }
  442. // Perform the read unsynchronized and parse
  443. byte[] readMessage = r.read();
  444. if (D)
  445. Util.debug("Read: " + new String(readMessage));
  446. return readMessage;
  447. }
  448. public byte[] readAndDec() {
  449. byte[] msg = read();
  450. comEnc.start();
  451. msg = aes.decrypt(msg);
  452. comEnc.stop();
  453. return msg;
  454. }
  455. /**
  456. * Read a specific number of bytes from the ConnectedThread in an unsynchronized
  457. * manner Note, this is a blocking call
  458. *
  459. * @return the bytes read
  460. * @see ConnectedThread#read()
  461. */
  462. public byte[] readLengthEncoded() {
  463. int len = Integer.parseInt(readString());
  464. ArrayList<byte[]> bytes = new ArrayList<byte[]>();
  465. byte[] data = read();
  466. if (data.length != len) {
  467. bytes.add(data);
  468. data = read();
  469. }
  470. byte[] total = new byte[len];
  471. int offset = 0;
  472. for (byte[] b : bytes) {
  473. for (int i = 0; i < b.length; i++) {
  474. total[offset++] = b[i];
  475. }
  476. }
  477. return total;
  478. }
  479. public <T> T readObject() {
  480. T object = SerializationUtils.deserialize(read());
  481. return object;
  482. }
  483. public <T> T readObjectAndDec() {
  484. T object = SerializationUtils.deserialize(readAndDec());
  485. return object;
  486. }
  487. public BigInteger readBigInteger() {
  488. return new BigInteger(read());
  489. }
  490. public BigInteger readBigIntegerAndDec() {
  491. return new BigInteger(readAndDec());
  492. }
  493. public int readInt() {
  494. return readBigInteger().intValue();
  495. }
  496. public int readIntAndDec() {
  497. return readBigIntegerAndDec().intValue();
  498. }
  499. public long readLong() {
  500. return readBigInteger().longValue();
  501. }
  502. public long readLongAndDec() {
  503. return readBigIntegerAndDec().longValue();
  504. }
  505. public byte[][] readDoubleByteArray() {
  506. return ComUtil.toDoubleByteArray(read());
  507. }
  508. public byte[][] readDoubleByteArrayAndDec() {
  509. return ComUtil.toDoubleByteArray(readAndDec());
  510. }
  511. public byte[][][] readTripleByteArray() {
  512. return ComUtil.toTripleByteArray(read());
  513. }
  514. public byte[][][] readTripleByteArrayAndDec() {
  515. return ComUtil.toTripleByteArray(readAndDec());
  516. }
  517. public int[] readIntArray() {
  518. return ComUtil.toIntArray(read());
  519. }
  520. public int[] readIntArrayAndDec() {
  521. return ComUtil.toIntArray(readAndDec());
  522. }
  523. public int[][] readDoubleIntArray() {
  524. return ComUtil.toDoubleIntArray(read());
  525. }
  526. public int[][] readDoubleIntArrayAndDec() {
  527. return ComUtil.toDoubleIntArray(readAndDec());
  528. }
  529. public Tuple readTuple() {
  530. return ComUtil.toTuple(read());
  531. }
  532. public Tuple readTupleAndDec() {
  533. return ComUtil.toTuple(readAndDec());
  534. }
  535. public Tuple[] readTupleArray() {
  536. return ComUtil.toTupleArray(read());
  537. }
  538. public Tuple[] readTupleArrayAndDec() {
  539. return ComUtil.toTupleArray(readAndDec());
  540. }
  541. public Bucket readBucket() {
  542. return new Bucket(readTupleArray());
  543. }
  544. public Bucket readBucketAndDec() {
  545. return new Bucket(readTupleArrayAndDec());
  546. }
  547. public Bucket[] readBucketArray() {
  548. return ComUtil.toBucketArray(read());
  549. }
  550. public Bucket[] readBucketArrayAndDec() {
  551. return ComUtil.toBucketArray(readAndDec());
  552. }
  553. public GCSignal readGCSignal() {
  554. return new GCSignal(read());
  555. }
  556. public GCSignal readGCSignalAndDec() {
  557. return new GCSignal(readAndDec());
  558. }
  559. public GCSignal[] readGCSignalArray() {
  560. return ComUtil.toGCSignalArray(read());
  561. }
  562. public GCSignal[] readGCSignalArrayAndDec() {
  563. return ComUtil.toGCSignalArray(readAndDec());
  564. }
  565. public GCSignal[][] readDoubleGCSignalArray() {
  566. return ComUtil.toDoubleGCSignalArray(read());
  567. }
  568. public GCSignal[][] readDoubleGCSignalArrayAndDec() {
  569. return ComUtil.toDoubleGCSignalArray(readAndDec());
  570. }
  571. public GCSignal[][][] readTripleGCSignalArray() {
  572. return ComUtil.toTripleGCSignalArray(read());
  573. }
  574. public GCSignal[][][] readTripleGCSignalArrayAndDec() {
  575. return ComUtil.toTripleGCSignalArray(readAndDec());
  576. }
  577. public GCSignal[][][][] readQuadGCSignalArray() {
  578. return ComUtil.toQuadGCSignalArray(read());
  579. }
  580. public GCSignal[][][][] readQuadGCSignalArrayAndDec() {
  581. return ComUtil.toQuadGCSignalArray(readAndDec());
  582. }
  583. public ArrayList<byte[]> readArrayList() {
  584. return ComUtil.toArrayList(read());
  585. }
  586. public ArrayList<byte[]> readArrayListAndDec() {
  587. return ComUtil.toArrayList(readAndDec());
  588. }
  589. /**
  590. * This thread runs while listening for incoming connections. It behaves like a
  591. * server-side client. It runs until a connection is accepted (or until
  592. * cancelled).
  593. */
  594. private class AcceptThread extends Thread {
  595. // The local server socket
  596. private final ServerSocket mmServerSocket;
  597. public AcceptThread(int port) {
  598. ServerSocket tmp = null;
  599. try {
  600. tmp = new ServerSocket(port);
  601. } catch (IOException e) {
  602. Util.error("ServerSocket unable to start", e);
  603. }
  604. mmServerSocket = tmp;
  605. }
  606. public void run() {
  607. if (D)
  608. Util.disp("BEGIN mAcceptThread ");
  609. setName("AcceptThread");
  610. Socket socket = null;
  611. // Listen to the server socket if we're not connected
  612. while (mState != STATE_CONNECTED) {
  613. try {
  614. // This is a blocking call and will only return on a
  615. // successful connection or an exception
  616. socket = mmServerSocket.accept();
  617. // socket.setTcpNoDelay(true);
  618. } catch (IOException e) {
  619. Util.error("accept() failed", e);
  620. break;
  621. }
  622. // If a connection was accepted
  623. if (socket != null) {
  624. synchronized (Communication.this) {
  625. switch (mState) {
  626. case STATE_LISTEN:
  627. case STATE_CONNECTING:
  628. // Situation normal. Start the connected thread.
  629. connected(socket);
  630. break;
  631. case STATE_NONE:
  632. case STATE_CONNECTED:
  633. // Either not ready or already connected.
  634. // Terminate new socket.
  635. try {
  636. socket.close();
  637. } catch (IOException e) {
  638. Util.error("Could not close unwanted socket", e);
  639. }
  640. // TODO: Should we really be returning here?
  641. return;
  642. }
  643. }
  644. }
  645. }
  646. if (D)
  647. Util.disp("END mAcceptThread");
  648. }
  649. public void cancel() {
  650. if (D)
  651. Util.debug("AcceptThread canceled " + this);
  652. try {
  653. mmServerSocket.close();
  654. } catch (IOException e) {
  655. Util.error("close() of server failed", e);
  656. }
  657. }
  658. }
  659. /**
  660. * This thread runs while attempting to make an outgoing connection with a
  661. * device. It runs straight through; the connection either succeeds or fails.
  662. */
  663. private class ConnectThread extends Thread {
  664. private final Socket mmSocket;
  665. private final InetSocketAddress mmAddress;
  666. public ConnectThread(InetSocketAddress address) {
  667. mmAddress = address;
  668. mmSocket = new Socket();
  669. /*
  670. * try { mmSocket.setTcpNoDelay(true); } catch (SocketException e) {
  671. * e.printStackTrace(); }
  672. */
  673. }
  674. public void run() {
  675. Util.debug("BEGIN mConnectThread");
  676. setName("ConnectThread");
  677. try {
  678. // This is a blocking call and will only return on a
  679. // successful connection or an exception
  680. mmSocket.connect(mmAddress);
  681. } catch (IOException e) {
  682. // Close the socket
  683. try {
  684. mmSocket.close();
  685. } catch (IOException e2) {
  686. Util.error("unable to close() socket during connection failure", e2);
  687. }
  688. connectionFailed();
  689. return;
  690. }
  691. // Reset the ConnectThread because we're done
  692. synchronized (Communication.this) {
  693. mConnectThread = null;
  694. }
  695. // Start the connected thread
  696. connected(mmSocket);
  697. }
  698. public void cancel() {
  699. try {
  700. mmSocket.close();
  701. } catch (IOException e) {
  702. Util.error("close() of connect socket failed", e);
  703. }
  704. }
  705. }
  706. /**
  707. * This thread runs during a connection with a remote device. It handles all
  708. * incoming and outgoing transmissions.
  709. */
  710. private class ConnectedThread extends Thread {
  711. private final Socket mmSocket;
  712. private final DataInputStream mmInStream;
  713. private final DataOutputStream mmOutStream;
  714. private BlockingQueue<byte[]> mMessageBuffer;
  715. public ConnectedThread(Socket socket) {
  716. Util.debug("create ConnectedThread");
  717. mmSocket = socket;
  718. DataInputStream tmpIn = null;
  719. DataOutputStream tmpOut = null;
  720. mMessageBuffer = new LinkedBlockingQueue<byte[]>(); // TODO: add a
  721. // capacity here
  722. // to prevent
  723. // doS
  724. // Get the Socket input and output streams
  725. try {
  726. tmpIn = new DataInputStream(socket.getInputStream());
  727. tmpOut = new DataOutputStream(socket.getOutputStream());
  728. } catch (StreamCorruptedException e) {
  729. Util.error("object streams corrupt", e);
  730. } catch (IOException e) {
  731. Util.error("temp sockets not created", e);
  732. }
  733. mmInStream = tmpIn;
  734. mmOutStream = tmpOut;
  735. }
  736. public void setTcpNoDelay(boolean on) {
  737. if (mmSocket != null)
  738. try {
  739. mmSocket.setTcpNoDelay(on);
  740. } catch (SocketException e) {
  741. e.printStackTrace();
  742. }
  743. }
  744. /**
  745. * Read from the ConnectedThread in an unsynchronized manner
  746. *
  747. * This is a blocking call and will only return data if the readLoop flag is
  748. * false
  749. *
  750. * @return the bytes read
  751. * @see ConnectedThread#read()
  752. */
  753. public byte[] read() {
  754. try {
  755. return mMessageBuffer.take();
  756. } catch (InterruptedException e) {
  757. Util.error("Message Read Interupted");
  758. return null;
  759. }
  760. }
  761. /**
  762. * Write to the connected OutStream.
  763. *
  764. * @param buffer
  765. * The bytes to write
  766. */
  767. public void write(byte[] buffer) {
  768. try {
  769. mmOutStream.writeInt(buffer.length);
  770. mmOutStream.write(buffer);
  771. mmOutStream.flush();
  772. } catch (IOException e) {
  773. Util.error("Exception during write", e);
  774. }
  775. }
  776. public void run() {
  777. Util.disp("BEGIN mConnectedThread");
  778. int bytes;
  779. // Keep listening to the InputStream while connected
  780. while (true) {
  781. try {
  782. // Read from the InputStream
  783. bytes = mmInStream.readInt();
  784. byte[] buffer = new byte[bytes]; // TODO: This is a little
  785. // dangerous
  786. mmInStream.readFully(buffer, 0, bytes);
  787. try {
  788. mMessageBuffer.put(buffer);
  789. } catch (InterruptedException e) {
  790. Util.error("Message add interupted.");
  791. // TODO: possibly move this catch elsewhere
  792. }
  793. } catch (IOException e) {
  794. if (D)
  795. Util.debug("Device disconnected");
  796. connectionLost();
  797. break;
  798. }
  799. }
  800. }
  801. public void cancel() {
  802. try {
  803. mmInStream.close();
  804. mmOutStream.close();
  805. mmSocket.close();
  806. } catch (IOException e) {
  807. Util.error("close() of connect socket failed", e);
  808. }
  809. }
  810. }
  811. }