Communication.java 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902
  1. package communication;
  2. import java.io.DataInputStream;
  3. import java.io.DataOutputStream;
  4. import java.io.IOException;
  5. import java.io.StreamCorruptedException;
  6. import java.math.BigInteger;
  7. import java.net.InetSocketAddress;
  8. import java.net.ServerSocket;
  9. import java.net.Socket;
  10. import java.net.SocketException;
  11. import java.nio.charset.Charset;
  12. import java.util.ArrayList;
  13. import java.util.concurrent.BlockingQueue;
  14. import java.util.concurrent.LinkedBlockingQueue;
  15. import com.oblivm.backend.gc.GCSignal;
  16. import oram.Bucket;
  17. import oram.Tuple;
  18. import util.Bandwidth;
  19. import util.P;
  20. import util.Util;
  21. /**
  22. * Basic Usage
  23. *
  24. * 1. Call {@link #start(int)} or {@link #connect(InetSocketAddress)} to
  25. * initiate a connection 2. Wait for {@link #getState()} to return
  26. * {@link #STATE_CONNECTED} 3. {@link #write(byte[])} and {@link #read()}
  27. * messages. 4. Close the connection with {@link #stop()}. NOTE: This may
  28. * invalidate unread data
  29. *
  30. * Alternatively, you can always call start. And the first side of the
  31. * connection to call connect will win.
  32. */
  33. public class Communication {
  34. public static boolean D = false;
  35. // Constants that indicate the current connection state
  36. public static final int STATE_NONE = 0; // we're doing nothing
  37. public static final int STATE_LISTEN = 1; // now listening for incoming
  38. // connections
  39. public static final int STATE_CONNECTING = 2; // now initiating an outgoing
  40. // connection
  41. public static final int STATE_CONNECTED = 3; // now connected to a remote
  42. // device
  43. public static final int STATE_STOPPED = 4; // we're shutting things down
  44. public static final int STATE_RETRY = 5; // we are going to retry, but first
  45. // we listen
  46. private AcceptThread mSecureAcceptThread;
  47. private ConnectThread mConnectThread;
  48. private ConnectedThread mConnectedThread;
  49. // Maximum reconnect attempts
  50. private static final int MAX_RETRY = 2;
  51. /***********************
  52. * Private Members
  53. **********************/
  54. // Current number of reconnect attempts
  55. private int mNumTries;
  56. private int mPort = 0;
  57. private boolean acceptMode = false;
  58. protected int mState;
  59. protected InetSocketAddress mAddress;
  60. public Bandwidth[] bandwidth;
  61. public boolean bandSwitch = true;
  62. public Communication() {
  63. mState = STATE_NONE;
  64. bandwidth = new Bandwidth[P.size];
  65. for (int i = 0; i < P.size; i++)
  66. bandwidth[i] = new Bandwidth(P.names[i]);
  67. }
  68. public void setTcpNoDelay(boolean on) {
  69. if (mConnectedThread != null)
  70. mConnectedThread.setTcpNoDelay(on);
  71. }
  72. /**
  73. * Set the current state of the connection
  74. *
  75. * @param state
  76. * An integer defining the current connection state
  77. */
  78. protected synchronized void setState(int state) {
  79. if (D)
  80. Util.debug("setState() " + mState + " -> " + state);
  81. mState = state;
  82. }
  83. /**
  84. * Return the current connection state.
  85. */
  86. public synchronized int getState() {
  87. return mState;
  88. }
  89. /**
  90. * Start the communication service. Specifically start AcceptThread to begin
  91. * a session in listening (server) mode.
  92. */
  93. public synchronized void start(int port) {
  94. if (D)
  95. Util.debug("start");
  96. acceptMode = true;
  97. startAcceptThread(port);
  98. mPort = port;
  99. mNumTries = 0;
  100. setState(STATE_LISTEN);
  101. }
  102. private synchronized void startAcceptThread(int port) {
  103. // Cancel any thread attempting to make a connection
  104. if (mConnectThread != null) {
  105. mConnectThread.cancel();
  106. mConnectThread = null;
  107. }
  108. // Cancel any thread currently running a connection
  109. if (mConnectedThread != null) {
  110. mConnectedThread.cancel();
  111. mConnectedThread = null;
  112. }
  113. // Start the thread to listen on a ServerSocket
  114. if (mSecureAcceptThread == null) {
  115. mSecureAcceptThread = new AcceptThread(port);
  116. mSecureAcceptThread.start();
  117. }
  118. }
  119. protected synchronized void retry() {
  120. if (D)
  121. Util.debug("retry");
  122. if (D)
  123. Util.debug("Retrying in state: " + getState());
  124. if (mState == STATE_CONNECTED)
  125. return;
  126. // TODO: Does this logic belong here
  127. if (mNumTries >= MAX_RETRY) {
  128. signalFailed();
  129. if (acceptMode)
  130. start(mPort);
  131. return;
  132. }
  133. startAcceptThread(mPort);
  134. setState(STATE_RETRY);
  135. int sleep = (int) (Math.random() * 1000 + 100);
  136. if (D)
  137. Util.debug("Sleeping: " + sleep);
  138. try {
  139. Thread.sleep(sleep);
  140. } catch (InterruptedException e) {
  141. Util.debug("Sleep interupted");
  142. } // TODO: This may block the main thread?
  143. if (D)
  144. Util.debug("Waking up: " + getState());
  145. // TODO: make this less strict
  146. if (mState != STATE_CONNECTING && mState != STATE_CONNECTED && mConnectedThread == null
  147. && mConnectThread == null)
  148. connect(mAddress);
  149. }
  150. /**
  151. * Start the ConnectThread to initiate a connection to a remote device.
  152. *
  153. * @param address
  154. * The address of the server
  155. * @param secure
  156. * Socket Security type - Secure (true) , Insecure (false)
  157. */
  158. public synchronized void connect(InetSocketAddress address) {
  159. if (D)
  160. Util.disp("connect to: " + address);
  161. // Don't throw out connections if we are already connected
  162. if (mState == STATE_CONNECTING || mConnectedThread != null) {
  163. return;
  164. }
  165. mNumTries++;
  166. mAddress = address;
  167. // Cancel any thread attempting to make a connection
  168. if (mState == STATE_CONNECTING) {
  169. if (mConnectThread != null) {
  170. mConnectThread.cancel();
  171. mConnectThread = null;
  172. }
  173. }
  174. // Cancel any thread currently running a connection
  175. if (mConnectedThread != null) {
  176. mConnectedThread.cancel();
  177. mConnectedThread = null;
  178. }
  179. // Start the thread to connect with the given device
  180. mConnectThread = new ConnectThread(address);
  181. mConnectThread.start();
  182. setState(STATE_CONNECTING);
  183. }
  184. /**
  185. * Start the ConnectedThread to begin managing a connection
  186. *
  187. * @param socket
  188. * The Socket on which the connection was made
  189. */
  190. public synchronized void connected(Socket socket) {
  191. if (D)
  192. Util.debug("connected");
  193. // Cancel the thread that completed the connection
  194. if (mConnectThread != null) {
  195. mConnectThread.cancel();
  196. mConnectThread = null;
  197. }
  198. // Cancel any thread currently running a connection
  199. if (mConnectedThread != null) {
  200. mConnectedThread.cancel();
  201. mConnectedThread = null;
  202. }
  203. // Cancel the accept thread because we only want to connect to one
  204. // device
  205. if (mSecureAcceptThread != null) {
  206. mSecureAcceptThread.cancel();
  207. mSecureAcceptThread = null;
  208. }
  209. // Start the thread to manage the connection and perform transmissions
  210. mConnectedThread = new ConnectedThread(socket);
  211. mConnectedThread.start();
  212. setState(STATE_CONNECTED);
  213. }
  214. protected void connectionFailed() {
  215. Util.error("Connection to the device failed");
  216. // Start the service over to restart listening mode
  217. if (getState() != STATE_STOPPED)
  218. retry();
  219. }
  220. /**
  221. * Indicate that the connection was lost and notify the UI Activity.
  222. */
  223. protected void connectionLost() {
  224. if (D)
  225. Util.error("Connection to the device lost");
  226. // Start the service over to restart listening mode
  227. if (getState() != STATE_STOPPED && acceptMode) {
  228. start(mPort);
  229. }
  230. }
  231. protected void signalFailed() {
  232. // TODO:
  233. }
  234. /**
  235. * Stop all threads
  236. */
  237. public synchronized void stop() {
  238. if (D)
  239. Util.debug("stop");
  240. setState(STATE_STOPPED);
  241. if (mConnectedThread != null) {
  242. mConnectedThread.cancel();
  243. mConnectedThread = null;
  244. }
  245. if (mConnectThread != null) {
  246. mConnectThread.cancel();
  247. mConnectThread = null;
  248. }
  249. if (mSecureAcceptThread != null) {
  250. mSecureAcceptThread.cancel();
  251. mSecureAcceptThread = null;
  252. }
  253. }
  254. /**
  255. * Write to the ConnectedThread in an unsynchronized manner
  256. *
  257. * This does not add message boundries!!
  258. *
  259. * @param out
  260. * The bytes to write
  261. * @see ConnectedThread#write(byte[])
  262. */
  263. public void write(byte[] out) {
  264. // Create temporary object
  265. ConnectedThread r;
  266. // Synchronize a copy of the ConnectedThread
  267. synchronized (this) {
  268. if (mState != STATE_CONNECTED)
  269. return;
  270. r = mConnectedThread;
  271. }
  272. // Perform the write unsynchronized
  273. r.write(out);
  274. }
  275. public void write(int pid, byte[] out) {
  276. write(out);
  277. if (bandSwitch)
  278. bandwidth[pid].add(out.length);
  279. }
  280. /**
  281. * Write a length encoded byte array.
  282. *
  283. * @param out
  284. */
  285. public void writeLengthEncoded(byte[] out) {
  286. write("" + out.length);
  287. write(out);
  288. }
  289. /*
  290. * public <T> void write(T out) {
  291. * write(SerializationUtils.serialize((Serializable) out)); }
  292. *
  293. * public <T> void write(int pid, T out) { write(pid,
  294. * SerializationUtils.serialize((Serializable) out)); }
  295. */
  296. public void write(BigInteger b) {
  297. write(b.toByteArray());
  298. }
  299. public void write(int pid, BigInteger b) {
  300. write(pid, b.toByteArray());
  301. }
  302. public void write(int n) {
  303. write(BigInteger.valueOf(n));
  304. }
  305. public void write(int pid, int n) {
  306. write(pid, BigInteger.valueOf(n));
  307. }
  308. public void write(long n) {
  309. write(BigInteger.valueOf(n));
  310. }
  311. public void write(int pid, long n) {
  312. write(pid, BigInteger.valueOf(n));
  313. }
  314. public void write(byte[][] arr) {
  315. write(ComUtil.serialize(arr));
  316. }
  317. public void write(int pid, byte[][] arr) {
  318. write(pid, ComUtil.serialize(arr));
  319. }
  320. public void write(byte[][][] arr) {
  321. write(ComUtil.serialize(arr));
  322. }
  323. public void write(int pid, byte[][][] arr) {
  324. write(pid, ComUtil.serialize(arr));
  325. }
  326. public void write(int[] arr) {
  327. write(ComUtil.serialize(arr));
  328. }
  329. public void write(int pid, int[] arr) {
  330. write(pid, ComUtil.serialize(arr));
  331. }
  332. public void write(int[][] arr) {
  333. write(ComUtil.serialize(arr));
  334. }
  335. public void write(int pid, int[][] arr) {
  336. write(pid, ComUtil.serialize(arr));
  337. }
  338. public void write(Tuple t) {
  339. write(ComUtil.serialize(t));
  340. }
  341. public void write(int pid, Tuple t) {
  342. write(pid, ComUtil.serialize(t));
  343. }
  344. public void write(Tuple[] arr) {
  345. write(ComUtil.serialize(arr));
  346. }
  347. public void write(int pid, Tuple[] arr) {
  348. write(pid, ComUtil.serialize(arr));
  349. }
  350. public void write(Bucket b) {
  351. write(b.getTuples());
  352. }
  353. public void write(int pid, Bucket b) {
  354. write(pid, b.getTuples());
  355. }
  356. public void write(Bucket[] arr) {
  357. write(ComUtil.serialize(arr));
  358. }
  359. public void write(int pid, Bucket[] arr) {
  360. write(ComUtil.serialize(arr));
  361. }
  362. public void write(GCSignal key) {
  363. write(key.bytes);
  364. }
  365. public void write(int pid, GCSignal key) {
  366. write(pid, key.bytes);
  367. }
  368. public void write(GCSignal[] arr) {
  369. write(ComUtil.serialize(arr));
  370. }
  371. public void write(int pid, GCSignal[] arr) {
  372. write(pid, ComUtil.serialize(arr));
  373. }
  374. public void write(GCSignal[][] arr) {
  375. write(ComUtil.serialize(arr));
  376. }
  377. public void write(int pid, GCSignal[][] arr) {
  378. write(pid, ComUtil.serialize(arr));
  379. }
  380. public void write(GCSignal[][][] arr) {
  381. write(ComUtil.serialize(arr));
  382. }
  383. public void write(int pid, GCSignal[][][] arr) {
  384. write(pid, ComUtil.serialize(arr));
  385. }
  386. public void write(GCSignal[][][][] arr) {
  387. write(ComUtil.serialize(arr));
  388. }
  389. public void write(int pid, GCSignal[][][][] arr) {
  390. write(pid, ComUtil.serialize(arr));
  391. }
  392. public void write(ArrayList<byte[]> arr) {
  393. write(ComUtil.serialize(arr));
  394. }
  395. public void write(int pid, ArrayList<byte[]> arr) {
  396. write(pid, ComUtil.serialize(arr));
  397. }
  398. public static final Charset defaultCharset = Charset.forName("ASCII");
  399. // TODO: Rather than having millions of write/read methods can we take
  400. // advantage of DataStreams?
  401. public void write(String buffer) {
  402. write(buffer, defaultCharset);
  403. }
  404. /*
  405. * This was added to allow backwords compaitibility with older versions
  406. * which used the default charset (usually utf-8) instead of asc-ii. This is
  407. * almost never what we want to do
  408. */
  409. public void write(String buffer, Charset charset) {
  410. write(buffer.getBytes(charset));
  411. if (D)
  412. Util.debug("Write: " + buffer);
  413. }
  414. /**
  415. * Read a string from Connected Thread
  416. *
  417. * @see #read()
  418. */
  419. public String readString() {
  420. return new String(read());
  421. }
  422. /**
  423. * Read from the ConnectedThread in an unsynchronized manner Note, this is a
  424. * blocking call
  425. *
  426. * @return the bytes read
  427. * @see ConnectedThread#read()
  428. */
  429. public byte[] read() {
  430. // Create temporary object
  431. ConnectedThread r;
  432. // Synchronize a copy of the ConnectedThread
  433. synchronized (this) {
  434. if (mState != STATE_CONNECTED)
  435. return null;
  436. r = mConnectedThread;
  437. }
  438. // Perform the read unsynchronized and parse
  439. byte[] readMessage = r.read();
  440. if (D)
  441. Util.debug("Read: " + new String(readMessage));
  442. return readMessage;
  443. }
  444. /**
  445. * Read a specific number of bytes from the ConnectedThread in an
  446. * unsynchronized manner Note, this is a blocking call
  447. *
  448. * @return the bytes read
  449. * @see ConnectedThread#read()
  450. */
  451. public byte[] readLengthEncoded() {
  452. int len = Integer.parseInt(readString());
  453. ArrayList<byte[]> bytes = new ArrayList<byte[]>();
  454. byte[] data = read();
  455. if (data.length != len) {
  456. bytes.add(data);
  457. data = read();
  458. }
  459. byte[] total = new byte[len];
  460. int offset = 0;
  461. for (byte[] b : bytes) {
  462. for (int i = 0; i < b.length; i++) {
  463. total[offset++] = b[i];
  464. }
  465. }
  466. return total;
  467. }
  468. /*
  469. * public <T> T readObject() { T object =
  470. * SerializationUtils.deserialize(read()); return object; }
  471. */
  472. public BigInteger readBigInteger() {
  473. return new BigInteger(read());
  474. }
  475. public int readInt() {
  476. return readBigInteger().intValue();
  477. }
  478. public long readLong() {
  479. return readBigInteger().longValue();
  480. }
  481. public byte[][] readDoubleByteArray() {
  482. return ComUtil.toDoubleByteArray(read());
  483. }
  484. public byte[][][] readTripleByteArray() {
  485. return ComUtil.toTripleByteArray(read());
  486. }
  487. public int[] readIntArray() {
  488. return ComUtil.toIntArray(read());
  489. }
  490. public int[][] readDoubleIntArray() {
  491. return ComUtil.toDoubleIntArray(read());
  492. }
  493. public Tuple readTuple() {
  494. return ComUtil.toTuple(read());
  495. }
  496. public Tuple[] readTupleArray() {
  497. return ComUtil.toTupleArray(read());
  498. }
  499. public Bucket readBucket() {
  500. return new Bucket(readTupleArray());
  501. }
  502. public Bucket[] readBucketArray() {
  503. return ComUtil.toBucketArray(read());
  504. }
  505. public GCSignal readGCSignal() {
  506. return new GCSignal(read());
  507. }
  508. public GCSignal[] readGCSignalArray() {
  509. return ComUtil.toGCSignalArray(read());
  510. }
  511. public GCSignal[][] readDoubleGCSignalArray() {
  512. return ComUtil.toDoubleGCSignalArray(read());
  513. }
  514. public GCSignal[][][] readTripleGCSignalArray() {
  515. return ComUtil.toTripleGCSignalArray(read());
  516. }
  517. public GCSignal[][][][] readQuadGCSignalArray() {
  518. return ComUtil.toQuadGCSignalArray(read());
  519. }
  520. public ArrayList<byte[]> readArrayList() {
  521. return ComUtil.toArrayList(read());
  522. }
  523. /**
  524. * This thread runs while listening for incoming connections. It behaves
  525. * like a server-side client. It runs until a connection is accepted (or
  526. * until cancelled).
  527. */
  528. private class AcceptThread extends Thread {
  529. // The local server socket
  530. private final ServerSocket mmServerSocket;
  531. public AcceptThread(int port) {
  532. ServerSocket tmp = null;
  533. try {
  534. tmp = new ServerSocket(port);
  535. } catch (IOException e) {
  536. Util.error("ServerSocket unable to start", e);
  537. }
  538. mmServerSocket = tmp;
  539. }
  540. public void run() {
  541. if (D)
  542. Util.disp("BEGIN mAcceptThread ");
  543. setName("AcceptThread");
  544. Socket socket = null;
  545. // Listen to the server socket if we're not connected
  546. while (mState != STATE_CONNECTED) {
  547. try {
  548. // This is a blocking call and will only return on a
  549. // successful connection or an exception
  550. socket = mmServerSocket.accept();
  551. // socket.setTcpNoDelay(true);
  552. } catch (IOException e) {
  553. Util.error("accept() failed", e);
  554. break;
  555. }
  556. // If a connection was accepted
  557. if (socket != null) {
  558. synchronized (Communication.this) {
  559. switch (mState) {
  560. case STATE_LISTEN:
  561. case STATE_CONNECTING:
  562. // Situation normal. Start the connected thread.
  563. connected(socket);
  564. break;
  565. case STATE_NONE:
  566. case STATE_CONNECTED:
  567. // Either not ready or already connected.
  568. // Terminate new socket.
  569. try {
  570. socket.close();
  571. } catch (IOException e) {
  572. Util.error("Could not close unwanted socket", e);
  573. }
  574. // TODO: Should we really be returning here?
  575. return;
  576. }
  577. }
  578. }
  579. }
  580. if (D)
  581. Util.disp("END mAcceptThread");
  582. }
  583. public void cancel() {
  584. if (D)
  585. Util.debug("AcceptThread canceled " + this);
  586. try {
  587. mmServerSocket.close();
  588. } catch (IOException e) {
  589. Util.error("close() of server failed", e);
  590. }
  591. }
  592. }
  593. /**
  594. * This thread runs while attempting to make an outgoing connection with a
  595. * device. It runs straight through; the connection either succeeds or
  596. * fails.
  597. */
  598. private class ConnectThread extends Thread {
  599. private final Socket mmSocket;
  600. private final InetSocketAddress mmAddress;
  601. public ConnectThread(InetSocketAddress address) {
  602. mmAddress = address;
  603. mmSocket = new Socket();
  604. /*
  605. * try { mmSocket.setTcpNoDelay(true); } catch (SocketException e) {
  606. * e.printStackTrace(); }
  607. */
  608. }
  609. public void run() {
  610. Util.debug("BEGIN mConnectThread");
  611. setName("ConnectThread");
  612. try {
  613. // This is a blocking call and will only return on a
  614. // successful connection or an exception
  615. mmSocket.connect(mmAddress);
  616. } catch (IOException e) {
  617. // Close the socket
  618. try {
  619. mmSocket.close();
  620. } catch (IOException e2) {
  621. Util.error("unable to close() socket during connection failure", e2);
  622. }
  623. connectionFailed();
  624. return;
  625. }
  626. // Reset the ConnectThread because we're done
  627. synchronized (Communication.this) {
  628. mConnectThread = null;
  629. }
  630. // Start the connected thread
  631. connected(mmSocket);
  632. }
  633. public void cancel() {
  634. try {
  635. mmSocket.close();
  636. } catch (IOException e) {
  637. Util.error("close() of connect socket failed", e);
  638. }
  639. }
  640. }
  641. /**
  642. * This thread runs during a connection with a remote device. It handles all
  643. * incoming and outgoing transmissions.
  644. */
  645. private class ConnectedThread extends Thread {
  646. private final Socket mmSocket;
  647. private final DataInputStream mmInStream;
  648. private final DataOutputStream mmOutStream;
  649. private BlockingQueue<byte[]> mMessageBuffer;
  650. public ConnectedThread(Socket socket) {
  651. Util.debug("create ConnectedThread");
  652. mmSocket = socket;
  653. DataInputStream tmpIn = null;
  654. DataOutputStream tmpOut = null;
  655. mMessageBuffer = new LinkedBlockingQueue<byte[]>(); // TODO: add a
  656. // capacity here
  657. // to prevent
  658. // doS
  659. // Get the Socket input and output streams
  660. try {
  661. tmpIn = new DataInputStream(socket.getInputStream());
  662. tmpOut = new DataOutputStream(socket.getOutputStream());
  663. } catch (StreamCorruptedException e) {
  664. Util.error("object streams corrupt", e);
  665. } catch (IOException e) {
  666. Util.error("temp sockets not created", e);
  667. }
  668. mmInStream = tmpIn;
  669. mmOutStream = tmpOut;
  670. }
  671. public void setTcpNoDelay(boolean on) {
  672. if (mmSocket != null)
  673. try {
  674. mmSocket.setTcpNoDelay(on);
  675. } catch (SocketException e) {
  676. e.printStackTrace();
  677. }
  678. }
  679. /**
  680. * Read from the ConnectedThread in an unsynchronized manner
  681. *
  682. * This is a blocking call and will only return data if the readLoop
  683. * flag is false
  684. *
  685. * @return the bytes read
  686. * @see ConnectedThread#read()
  687. */
  688. public byte[] read() {
  689. try {
  690. return mMessageBuffer.take();
  691. } catch (InterruptedException e) {
  692. Util.error("Message Read Interupted");
  693. return null;
  694. }
  695. }
  696. /**
  697. * Write to the connected OutStream.
  698. *
  699. * @param buffer
  700. * The bytes to write
  701. */
  702. public void write(byte[] buffer) {
  703. try {
  704. mmOutStream.writeInt(buffer.length);
  705. mmOutStream.write(buffer);
  706. mmOutStream.flush();
  707. } catch (IOException e) {
  708. Util.error("Exception during write", e);
  709. }
  710. }
  711. public void run() {
  712. Util.disp("BEGIN mConnectedThread");
  713. int bytes;
  714. // Keep listening to the InputStream while connected
  715. while (true) {
  716. try {
  717. // Read from the InputStream
  718. bytes = mmInStream.readInt();
  719. byte[] buffer = new byte[bytes]; // TODO: This is a little
  720. // dangerous
  721. mmInStream.readFully(buffer, 0, bytes);
  722. try {
  723. mMessageBuffer.put(buffer);
  724. } catch (InterruptedException e) {
  725. Util.error("Message add interupted.");
  726. // TODO: possibly move this catch elsewhere
  727. }
  728. } catch (IOException e) {
  729. if (D)
  730. Util.debug("Device disconnected");
  731. connectionLost();
  732. break;
  733. }
  734. }
  735. }
  736. public void cancel() {
  737. try {
  738. mmInStream.close();
  739. mmOutStream.close();
  740. mmSocket.close();
  741. } catch (IOException e) {
  742. Util.error("close() of connect socket failed", e);
  743. }
  744. }
  745. }
  746. }