Communication.java 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904
  1. package communication;
  2. import java.io.DataInputStream;
  3. import java.io.DataOutputStream;
  4. import java.io.IOException;
  5. import java.io.StreamCorruptedException;
  6. import java.math.BigInteger;
  7. import java.net.InetSocketAddress;
  8. import java.net.ServerSocket;
  9. import java.net.Socket;
  10. import java.net.SocketException;
  11. import java.nio.charset.Charset;
  12. import java.util.ArrayList;
  13. import java.util.concurrent.BlockingQueue;
  14. import java.util.concurrent.LinkedBlockingQueue;
  15. import com.oblivm.backend.gc.GCSignal;
  16. import oram.Bucket;
  17. import oram.Tuple;
  18. import util.Bandwidth;
  19. import util.P;
  20. import util.Util;
  21. /**
  22. * Basic Usage
  23. *
  24. * 1. Call {@link #start(int)} or {@link #connect(InetSocketAddress)} to
  25. * initiate a connection 2. Wait for {@link #getState()} to return
  26. * {@link #STATE_CONNECTED} 3. {@link #write(byte[])} and {@link #read()}
  27. * messages. 4. Close the connection with {@link #stop()}. NOTE: This may
  28. * invalidate unread data
  29. *
  30. * Alternatively, you can always call start. And the first side of the
  31. * connection to call connect will win.
  32. */
  33. public class Communication {
  34. public static boolean D = false;
  35. // Constants that indicate the current connection state
  36. public static final int STATE_NONE = 0; // we're doing nothing
  37. public static final int STATE_LISTEN = 1; // now listening for incoming
  38. // connections
  39. public static final int STATE_CONNECTING = 2; // now initiating an outgoing
  40. // connection
  41. public static final int STATE_CONNECTED = 3; // now connected to a remote
  42. // device
  43. public static final int STATE_STOPPED = 4; // we're shutting things down
  44. public static final int STATE_RETRY = 5; // we are going to retry, but first
  45. // we listen
  46. private AcceptThread mSecureAcceptThread;
  47. private ConnectThread mConnectThread;
  48. private ConnectedThread mConnectedThread;
  49. // Maximum reconnect attempts
  50. private static final int MAX_RETRY = 2;
  51. /***********************
  52. * Private Members
  53. **********************/
  54. // Current number of reconnect attempts
  55. private int mNumTries;
  56. private int mPort = 0;
  57. private boolean acceptMode = false;
  58. protected int mState;
  59. protected InetSocketAddress mAddress;
  60. public Bandwidth[] bandwidth;
  61. public boolean bandSwitch = true;
  62. public Communication() {
  63. mState = STATE_NONE;
  64. bandwidth = new Bandwidth[P.size];
  65. for (int i = 0; i < P.size; i++)
  66. bandwidth[i] = new Bandwidth(P.names[i]);
  67. }
  68. public void setTcpNoDelay(boolean on) {
  69. if (mConnectedThread != null)
  70. mConnectedThread.setTcpNoDelay(on);
  71. }
  72. /**
  73. * Set the current state of the connection
  74. *
  75. * @param state
  76. * An integer defining the current connection state
  77. */
  78. protected synchronized void setState(int state) {
  79. if (D)
  80. Util.debug("setState() " + mState + " -> " + state);
  81. mState = state;
  82. }
  83. /**
  84. * Return the current connection state.
  85. */
  86. public synchronized int getState() {
  87. return mState;
  88. }
  89. /**
  90. * Start the communication service. Specifically start AcceptThread to begin
  91. * a session in listening (server) mode.
  92. */
  93. public synchronized void start(int port) {
  94. if (D)
  95. Util.debug("start");
  96. acceptMode = true;
  97. startAcceptThread(port);
  98. mPort = port;
  99. mNumTries = 0;
  100. setState(STATE_LISTEN);
  101. }
  102. private synchronized void startAcceptThread(int port) {
  103. // Cancel any thread attempting to make a connection
  104. if (mConnectThread != null) {
  105. mConnectThread.cancel();
  106. mConnectThread = null;
  107. }
  108. // Cancel any thread currently running a connection
  109. if (mConnectedThread != null) {
  110. mConnectedThread.cancel();
  111. mConnectedThread = null;
  112. }
  113. // Start the thread to listen on a ServerSocket
  114. if (mSecureAcceptThread == null) {
  115. mSecureAcceptThread = new AcceptThread(port);
  116. mSecureAcceptThread.start();
  117. }
  118. }
  119. protected synchronized void retry() {
  120. if (D)
  121. Util.debug("retry");
  122. if (D)
  123. Util.debug("Retrying in state: " + getState());
  124. if (mState == STATE_CONNECTED)
  125. return;
  126. // TODO: Does this logic belong here
  127. if (mNumTries >= MAX_RETRY) {
  128. signalFailed();
  129. if (acceptMode)
  130. start(mPort);
  131. return;
  132. }
  133. startAcceptThread(mPort);
  134. setState(STATE_RETRY);
  135. int sleep = (int) (Math.random() * 1000 + 100);
  136. if (D)
  137. Util.debug("Sleeping: " + sleep);
  138. try {
  139. Thread.sleep(sleep);
  140. } catch (InterruptedException e) {
  141. Util.debug("Sleep interupted");
  142. } // TODO: This may block the main thread?
  143. if (D)
  144. Util.debug("Waking up: " + getState());
  145. // TODO: make this less strict
  146. if (mState != STATE_CONNECTING && mState != STATE_CONNECTED && mConnectedThread == null
  147. && mConnectThread == null)
  148. connect(mAddress);
  149. }
  150. /**
  151. * Start the ConnectThread to initiate a connection to a remote device.
  152. *
  153. * @param address
  154. * The address of the server
  155. * @param secure
  156. * Socket Security type - Secure (true) , Insecure (false)
  157. */
  158. public synchronized void connect(InetSocketAddress address) {
  159. if (D)
  160. Util.disp("connect to: " + address);
  161. // Don't throw out connections if we are already connected
  162. /*
  163. * if (mState == STATE_CONNECTING || mConnectedThread != null) { return;
  164. * }
  165. */
  166. mNumTries++;
  167. mAddress = address;
  168. // Cancel any thread attempting to make a connection
  169. if (mState == STATE_CONNECTING) {
  170. if (mConnectThread != null) {
  171. mConnectThread.cancel();
  172. mConnectThread = null;
  173. }
  174. }
  175. // Cancel any thread currently running a connection
  176. if (mConnectedThread != null) {
  177. mConnectedThread.cancel();
  178. mConnectedThread = null;
  179. }
  180. // Start the thread to connect with the given device
  181. mConnectThread = new ConnectThread(address);
  182. mConnectThread.start();
  183. setState(STATE_CONNECTING);
  184. }
  185. /**
  186. * Start the ConnectedThread to begin managing a connection
  187. *
  188. * @param socket
  189. * The Socket on which the connection was made
  190. */
  191. public synchronized void connected(Socket socket) {
  192. if (D)
  193. Util.debug("connected");
  194. // Cancel the thread that completed the connection
  195. if (mConnectThread != null) {
  196. mConnectThread.cancel();
  197. mConnectThread = null;
  198. }
  199. // Cancel any thread currently running a connection
  200. if (mConnectedThread != null) {
  201. mConnectedThread.cancel();
  202. mConnectedThread = null;
  203. }
  204. // Cancel the accept thread because we only want to connect to one
  205. // device
  206. if (mSecureAcceptThread != null) {
  207. mSecureAcceptThread.cancel();
  208. mSecureAcceptThread = null;
  209. }
  210. // Start the thread to manage the connection and perform transmissions
  211. mConnectedThread = new ConnectedThread(socket);
  212. mConnectedThread.start();
  213. setState(STATE_CONNECTED);
  214. }
  215. protected void connectionFailed() {
  216. Util.error("Connection to the device failed");
  217. // Start the service over to restart listening mode
  218. if (getState() != STATE_STOPPED)
  219. retry();
  220. }
  221. /**
  222. * Indicate that the connection was lost and notify the UI Activity.
  223. */
  224. protected void connectionLost() {
  225. if (D)
  226. Util.error("Connection to the device lost");
  227. // Start the service over to restart listening mode
  228. if (getState() != STATE_STOPPED && acceptMode) {
  229. start(mPort);
  230. }
  231. }
  232. protected void signalFailed() {
  233. // TODO:
  234. }
  235. /**
  236. * Stop all threads
  237. */
  238. public synchronized void stop() {
  239. if (D)
  240. Util.debug("stop");
  241. setState(STATE_STOPPED);
  242. if (mConnectedThread != null) {
  243. mConnectedThread.cancel();
  244. mConnectedThread = null;
  245. }
  246. if (mConnectThread != null) {
  247. mConnectThread.cancel();
  248. mConnectThread = null;
  249. }
  250. if (mSecureAcceptThread != null) {
  251. mSecureAcceptThread.cancel();
  252. mSecureAcceptThread = null;
  253. }
  254. }
  255. /**
  256. * Write to the ConnectedThread in an unsynchronized manner
  257. *
  258. * This does not add message boundries!!
  259. *
  260. * @param out
  261. * The bytes to write
  262. * @see ConnectedThread#write(byte[])
  263. */
  264. public void write(byte[] out) {
  265. // Create temporary object
  266. ConnectedThread r;
  267. // Synchronize a copy of the ConnectedThread
  268. synchronized (this) {
  269. if (mState != STATE_CONNECTED)
  270. return;
  271. r = mConnectedThread;
  272. }
  273. // Perform the write unsynchronized
  274. r.write(out);
  275. }
  276. public void write(int pid, byte[] out) {
  277. write(out);
  278. if (bandSwitch)
  279. bandwidth[pid].add(out.length);
  280. }
  281. /**
  282. * Write a length encoded byte array.
  283. *
  284. * @param out
  285. */
  286. public void writeLengthEncoded(byte[] out) {
  287. write("" + out.length);
  288. write(out);
  289. }
  290. /*
  291. * public <T> void write(T out) {
  292. * write(SerializationUtils.serialize((Serializable) out)); }
  293. *
  294. * public <T> void write(int pid, T out) { write(pid,
  295. * SerializationUtils.serialize((Serializable) out)); }
  296. */
  297. public void write(BigInteger b) {
  298. write(b.toByteArray());
  299. }
  300. public void write(int pid, BigInteger b) {
  301. write(pid, b.toByteArray());
  302. }
  303. public void write(int n) {
  304. write(BigInteger.valueOf(n));
  305. }
  306. public void write(int pid, int n) {
  307. write(pid, BigInteger.valueOf(n));
  308. }
  309. public void write(long n) {
  310. write(BigInteger.valueOf(n));
  311. }
  312. public void write(int pid, long n) {
  313. write(pid, BigInteger.valueOf(n));
  314. }
  315. public void write(byte[][] arr) {
  316. write(ComUtil.serialize(arr));
  317. }
  318. public void write(int pid, byte[][] arr) {
  319. write(pid, ComUtil.serialize(arr));
  320. }
  321. public void write(byte[][][] arr) {
  322. write(ComUtil.serialize(arr));
  323. }
  324. public void write(int pid, byte[][][] arr) {
  325. write(pid, ComUtil.serialize(arr));
  326. }
  327. public void write(int[] arr) {
  328. write(ComUtil.serialize(arr));
  329. }
  330. public void write(int pid, int[] arr) {
  331. write(pid, ComUtil.serialize(arr));
  332. }
  333. public void write(int[][] arr) {
  334. write(ComUtil.serialize(arr));
  335. }
  336. public void write(int pid, int[][] arr) {
  337. write(pid, ComUtil.serialize(arr));
  338. }
  339. public void write(Tuple t) {
  340. write(ComUtil.serialize(t));
  341. }
  342. public void write(int pid, Tuple t) {
  343. write(pid, ComUtil.serialize(t));
  344. }
  345. public void write(Tuple[] arr) {
  346. write(ComUtil.serialize(arr));
  347. }
  348. public void write(int pid, Tuple[] arr) {
  349. write(pid, ComUtil.serialize(arr));
  350. }
  351. public void write(Bucket b) {
  352. write(b.getTuples());
  353. }
  354. public void write(int pid, Bucket b) {
  355. write(pid, b.getTuples());
  356. }
  357. public void write(Bucket[] arr) {
  358. write(ComUtil.serialize(arr));
  359. }
  360. public void write(int pid, Bucket[] arr) {
  361. write(ComUtil.serialize(arr));
  362. }
  363. public void write(GCSignal key) {
  364. write(key.bytes);
  365. }
  366. public void write(int pid, GCSignal key) {
  367. write(pid, key.bytes);
  368. }
  369. public void write(GCSignal[] arr) {
  370. write(ComUtil.serialize(arr));
  371. }
  372. public void write(int pid, GCSignal[] arr) {
  373. write(pid, ComUtil.serialize(arr));
  374. }
  375. public void write(GCSignal[][] arr) {
  376. write(ComUtil.serialize(arr));
  377. }
  378. public void write(int pid, GCSignal[][] arr) {
  379. write(pid, ComUtil.serialize(arr));
  380. }
  381. public void write(GCSignal[][][] arr) {
  382. write(ComUtil.serialize(arr));
  383. }
  384. public void write(int pid, GCSignal[][][] arr) {
  385. write(pid, ComUtil.serialize(arr));
  386. }
  387. public void write(GCSignal[][][][] arr) {
  388. write(ComUtil.serialize(arr));
  389. }
  390. public void write(int pid, GCSignal[][][][] arr) {
  391. write(pid, ComUtil.serialize(arr));
  392. }
  393. public void write(ArrayList<byte[]> arr) {
  394. write(ComUtil.serialize(arr));
  395. }
  396. public void write(int pid, ArrayList<byte[]> arr) {
  397. write(pid, ComUtil.serialize(arr));
  398. }
  399. public static final Charset defaultCharset = Charset.forName("ASCII");
  400. // TODO: Rather than having millions of write/read methods can we take
  401. // advantage of DataStreams?
  402. public void write(String buffer) {
  403. write(buffer, defaultCharset);
  404. }
  405. /*
  406. * This was added to allow backwords compaitibility with older versions
  407. * which used the default charset (usually utf-8) instead of asc-ii. This is
  408. * almost never what we want to do
  409. */
  410. public void write(String buffer, Charset charset) {
  411. write(buffer.getBytes(charset));
  412. if (D)
  413. Util.debug("Write: " + buffer);
  414. }
  415. /**
  416. * Read a string from Connected Thread
  417. *
  418. * @see #read()
  419. */
  420. public String readString() {
  421. return new String(read());
  422. }
  423. /**
  424. * Read from the ConnectedThread in an unsynchronized manner Note, this is a
  425. * blocking call
  426. *
  427. * @return the bytes read
  428. * @see ConnectedThread#read()
  429. */
  430. public byte[] read() {
  431. // Create temporary object
  432. ConnectedThread r;
  433. // Synchronize a copy of the ConnectedThread
  434. synchronized (this) {
  435. if (mState != STATE_CONNECTED)
  436. return null;
  437. r = mConnectedThread;
  438. }
  439. // Perform the read unsynchronized and parse
  440. byte[] readMessage = r.read();
  441. if (D)
  442. Util.debug("Read: " + new String(readMessage));
  443. return readMessage;
  444. }
  445. /**
  446. * Read a specific number of bytes from the ConnectedThread in an
  447. * unsynchronized manner Note, this is a blocking call
  448. *
  449. * @return the bytes read
  450. * @see ConnectedThread#read()
  451. */
  452. public byte[] readLengthEncoded() {
  453. int len = Integer.parseInt(readString());
  454. ArrayList<byte[]> bytes = new ArrayList<byte[]>();
  455. byte[] data = read();
  456. if (data.length != len) {
  457. bytes.add(data);
  458. data = read();
  459. }
  460. byte[] total = new byte[len];
  461. int offset = 0;
  462. for (byte[] b : bytes) {
  463. for (int i = 0; i < b.length; i++) {
  464. total[offset++] = b[i];
  465. }
  466. }
  467. return total;
  468. }
  469. /*
  470. * public <T> T readObject() { T object =
  471. * SerializationUtils.deserialize(read()); return object; }
  472. */
  473. public BigInteger readBigInteger() {
  474. return new BigInteger(read());
  475. }
  476. public int readInt() {
  477. return readBigInteger().intValue();
  478. }
  479. public long readLong() {
  480. return readBigInteger().longValue();
  481. }
  482. public byte[][] readDoubleByteArray() {
  483. return ComUtil.toDoubleByteArray(read());
  484. }
  485. public byte[][][] readTripleByteArray() {
  486. return ComUtil.toTripleByteArray(read());
  487. }
  488. public int[] readIntArray() {
  489. return ComUtil.toIntArray(read());
  490. }
  491. public int[][] readDoubleIntArray() {
  492. return ComUtil.toDoubleIntArray(read());
  493. }
  494. public Tuple readTuple() {
  495. return ComUtil.toTuple(read());
  496. }
  497. public Tuple[] readTupleArray() {
  498. return ComUtil.toTupleArray(read());
  499. }
  500. public Bucket readBucket() {
  501. return new Bucket(readTupleArray());
  502. }
  503. public Bucket[] readBucketArray() {
  504. return ComUtil.toBucketArray(read());
  505. }
  506. public GCSignal readGCSignal() {
  507. return new GCSignal(read());
  508. }
  509. public GCSignal[] readGCSignalArray() {
  510. return ComUtil.toGCSignalArray(read());
  511. }
  512. public GCSignal[][] readDoubleGCSignalArray() {
  513. return ComUtil.toDoubleGCSignalArray(read());
  514. }
  515. public GCSignal[][][] readTripleGCSignalArray() {
  516. return ComUtil.toTripleGCSignalArray(read());
  517. }
  518. public GCSignal[][][][] readQuadGCSignalArray() {
  519. return ComUtil.toQuadGCSignalArray(read());
  520. }
  521. public ArrayList<byte[]> readArrayList() {
  522. return ComUtil.toArrayList(read());
  523. }
  524. /**
  525. * This thread runs while listening for incoming connections. It behaves
  526. * like a server-side client. It runs until a connection is accepted (or
  527. * until cancelled).
  528. */
  529. private class AcceptThread extends Thread {
  530. // The local server socket
  531. private final ServerSocket mmServerSocket;
  532. public AcceptThread(int port) {
  533. ServerSocket tmp = null;
  534. try {
  535. tmp = new ServerSocket(port);
  536. } catch (IOException e) {
  537. Util.error("ServerSocket unable to start", e);
  538. }
  539. mmServerSocket = tmp;
  540. }
  541. public void run() {
  542. if (D)
  543. Util.disp("BEGIN mAcceptThread ");
  544. setName("AcceptThread");
  545. Socket socket = null;
  546. // Listen to the server socket if we're not connected
  547. while (mState != STATE_CONNECTED) {
  548. try {
  549. // This is a blocking call and will only return on a
  550. // successful connection or an exception
  551. socket = mmServerSocket.accept();
  552. // socket.setTcpNoDelay(true);
  553. } catch (IOException e) {
  554. Util.error("accept() failed", e);
  555. break;
  556. }
  557. // If a connection was accepted
  558. if (socket != null) {
  559. synchronized (Communication.this) {
  560. switch (mState) {
  561. case STATE_LISTEN:
  562. case STATE_CONNECTING:
  563. // Situation normal. Start the connected thread.
  564. connected(socket);
  565. break;
  566. case STATE_NONE:
  567. case STATE_CONNECTED:
  568. // Either not ready or already connected.
  569. // Terminate new socket.
  570. try {
  571. socket.close();
  572. } catch (IOException e) {
  573. Util.error("Could not close unwanted socket", e);
  574. }
  575. // TODO: Should we really be returning here?
  576. return;
  577. }
  578. }
  579. }
  580. }
  581. if (D)
  582. Util.disp("END mAcceptThread");
  583. }
  584. public void cancel() {
  585. if (D)
  586. Util.debug("AcceptThread canceled " + this);
  587. try {
  588. mmServerSocket.close();
  589. } catch (IOException e) {
  590. Util.error("close() of server failed", e);
  591. }
  592. }
  593. }
  594. /**
  595. * This thread runs while attempting to make an outgoing connection with a
  596. * device. It runs straight through; the connection either succeeds or
  597. * fails.
  598. */
  599. private class ConnectThread extends Thread {
  600. private final Socket mmSocket;
  601. private final InetSocketAddress mmAddress;
  602. public ConnectThread(InetSocketAddress address) {
  603. mmAddress = address;
  604. mmSocket = new Socket();
  605. /*
  606. * try { mmSocket.setTcpNoDelay(true); } catch (SocketException e) {
  607. * e.printStackTrace(); }
  608. */
  609. }
  610. public void run() {
  611. Util.debug("BEGIN mConnectThread");
  612. setName("ConnectThread");
  613. try {
  614. // This is a blocking call and will only return on a
  615. // successful connection or an exception
  616. mmSocket.connect(mmAddress);
  617. } catch (IOException e) {
  618. // Close the socket
  619. try {
  620. mmSocket.close();
  621. } catch (IOException e2) {
  622. Util.error("unable to close() socket during connection failure", e2);
  623. }
  624. connectionFailed();
  625. return;
  626. }
  627. // Reset the ConnectThread because we're done
  628. synchronized (Communication.this) {
  629. mConnectThread = null;
  630. }
  631. // Start the connected thread
  632. connected(mmSocket);
  633. }
  634. public void cancel() {
  635. try {
  636. mmSocket.close();
  637. } catch (IOException e) {
  638. Util.error("close() of connect socket failed", e);
  639. }
  640. }
  641. }
  642. /**
  643. * This thread runs during a connection with a remote device. It handles all
  644. * incoming and outgoing transmissions.
  645. */
  646. private class ConnectedThread extends Thread {
  647. private final Socket mmSocket;
  648. private final DataInputStream mmInStream;
  649. private final DataOutputStream mmOutStream;
  650. private BlockingQueue<byte[]> mMessageBuffer;
  651. public ConnectedThread(Socket socket) {
  652. Util.debug("create ConnectedThread");
  653. mmSocket = socket;
  654. DataInputStream tmpIn = null;
  655. DataOutputStream tmpOut = null;
  656. mMessageBuffer = new LinkedBlockingQueue<byte[]>(); // TODO: add a
  657. // capacity here
  658. // to prevent
  659. // doS
  660. // Get the Socket input and output streams
  661. try {
  662. tmpIn = new DataInputStream(socket.getInputStream());
  663. tmpOut = new DataOutputStream(socket.getOutputStream());
  664. } catch (StreamCorruptedException e) {
  665. Util.error("object streams corrupt", e);
  666. } catch (IOException e) {
  667. Util.error("temp sockets not created", e);
  668. }
  669. mmInStream = tmpIn;
  670. mmOutStream = tmpOut;
  671. }
  672. public void setTcpNoDelay(boolean on) {
  673. if (mmSocket != null)
  674. try {
  675. mmSocket.setTcpNoDelay(on);
  676. } catch (SocketException e) {
  677. e.printStackTrace();
  678. }
  679. }
  680. /**
  681. * Read from the ConnectedThread in an unsynchronized manner
  682. *
  683. * This is a blocking call and will only return data if the readLoop
  684. * flag is false
  685. *
  686. * @return the bytes read
  687. * @see ConnectedThread#read()
  688. */
  689. public byte[] read() {
  690. try {
  691. return mMessageBuffer.take();
  692. } catch (InterruptedException e) {
  693. Util.error("Message Read Interupted");
  694. return null;
  695. }
  696. }
  697. /**
  698. * Write to the connected OutStream.
  699. *
  700. * @param buffer
  701. * The bytes to write
  702. */
  703. public void write(byte[] buffer) {
  704. try {
  705. mmOutStream.writeInt(buffer.length);
  706. mmOutStream.write(buffer);
  707. mmOutStream.flush();
  708. } catch (IOException e) {
  709. Util.error("Exception during write", e);
  710. }
  711. }
  712. public void run() {
  713. Util.disp("BEGIN mConnectedThread");
  714. int bytes;
  715. // Keep listening to the InputStream while connected
  716. while (true) {
  717. try {
  718. // Read from the InputStream
  719. bytes = mmInStream.readInt();
  720. byte[] buffer = new byte[bytes]; // TODO: This is a little
  721. // dangerous
  722. mmInStream.readFully(buffer, 0, bytes);
  723. try {
  724. mMessageBuffer.put(buffer);
  725. } catch (InterruptedException e) {
  726. Util.error("Message add interupted.");
  727. // TODO: possibly move this catch elsewhere
  728. }
  729. } catch (IOException e) {
  730. if (D)
  731. Util.debug("Device disconnected");
  732. connectionLost();
  733. break;
  734. }
  735. }
  736. }
  737. public void cancel() {
  738. try {
  739. mmInStream.close();
  740. mmOutStream.close();
  741. mmSocket.close();
  742. } catch (IOException e) {
  743. Util.error("close() of connect socket failed", e);
  744. }
  745. }
  746. }
  747. }