Communication.java 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991
  1. package communication;
  2. import java.io.DataInputStream;
  3. import java.io.DataOutputStream;
  4. import java.io.IOException;
  5. import java.io.StreamCorruptedException;
  6. import java.math.BigInteger;
  7. import java.net.InetSocketAddress;
  8. import java.net.ServerSocket;
  9. import java.net.Socket;
  10. import java.net.SocketException;
  11. import java.nio.charset.Charset;
  12. import java.util.ArrayList;
  13. import java.util.concurrent.BlockingQueue;
  14. import java.util.concurrent.LinkedBlockingQueue;
  15. import com.oblivm.backend.gc.GCSignal;
  16. import crypto.SimpleAES;
  17. import oram.Bucket;
  18. import oram.Tuple;
  19. import util.Bandwidth;
  20. import util.P;
  21. import util.StopWatch;
  22. import util.Util;
  23. /**
  24. * Basic Usage
  25. *
  26. * 1. Call {@link #start(int)} or {@link #connect(InetSocketAddress)} to
  27. * initiate a connection 2. Wait for {@link #getState()} to return
  28. * {@link #STATE_CONNECTED} 3. {@link #write(byte[])} and {@link #read()}
  29. * messages. 4. Close the connection with {@link #stop()}. NOTE: This may
  30. * invalidate unread data
  31. *
  32. * Alternatively, you can always call start. And the first side of the
  33. * connection to call connect will win.
  34. */
  35. public class Communication {
  36. public static boolean D = false;
  37. // Constants that indicate the current connection state
  38. public static final int STATE_NONE = 0; // we're doing nothing
  39. public static final int STATE_LISTEN = 1; // now listening for incoming
  40. // connections
  41. public static final int STATE_CONNECTING = 2; // now initiating an outgoing
  42. // connection
  43. public static final int STATE_CONNECTED = 3; // now connected to a remote
  44. // device
  45. public static final int STATE_STOPPED = 4; // we're shutting things down
  46. public static final int STATE_RETRY = 5; // we are going to retry, but first
  47. // we listen
  48. private AcceptThread mSecureAcceptThread;
  49. private ConnectThread mConnectThread;
  50. private ConnectedThread mConnectedThread;
  51. // Maximum reconnect attempts
  52. private static final int MAX_RETRY = 2;
  53. /***********************
  54. * Private Members
  55. **********************/
  56. // Current number of reconnect attempts
  57. private int mNumTries;
  58. private int mPort = 0;
  59. private boolean acceptMode = false;
  60. protected int mState;
  61. protected InetSocketAddress mAddress;
  62. public Bandwidth[] bandwidth;
  63. public boolean bandSwitch = true; // TODO: change this to static (or to
  64. // Global)
  65. private static SimpleAES aes = new SimpleAES();
  66. public StopWatch comEnc = new StopWatch("CE_online_comp");
  67. public Communication() {
  68. mState = STATE_NONE;
  69. bandwidth = new Bandwidth[P.size];
  70. for (int i = 0; i < P.size; i++)
  71. bandwidth[i] = new Bandwidth(P.names[i]);
  72. }
  73. public void setTcpNoDelay(boolean on) {
  74. if (mConnectedThread != null)
  75. mConnectedThread.setTcpNoDelay(on);
  76. }
  77. /**
  78. * Set the current state of the connection
  79. *
  80. * @param state
  81. * An integer defining the current connection state
  82. */
  83. protected synchronized void setState(int state) {
  84. if (D)
  85. Util.debug("setState() " + mState + " -> " + state);
  86. mState = state;
  87. }
  88. /**
  89. * Return the current connection state.
  90. */
  91. public synchronized int getState() {
  92. return mState;
  93. }
  94. /**
  95. * Start the communication service. Specifically start AcceptThread to begin
  96. * a session in listening (server) mode.
  97. */
  98. public synchronized void start(int port) {
  99. if (D)
  100. Util.debug("start");
  101. acceptMode = true;
  102. startAcceptThread(port);
  103. mPort = port;
  104. mNumTries = 0;
  105. setState(STATE_LISTEN);
  106. }
  107. private synchronized void startAcceptThread(int port) {
  108. // Cancel any thread attempting to make a connection
  109. if (mConnectThread != null) {
  110. mConnectThread.cancel();
  111. mConnectThread = null;
  112. }
  113. // Cancel any thread currently running a connection
  114. if (mConnectedThread != null) {
  115. mConnectedThread.cancel();
  116. mConnectedThread = null;
  117. }
  118. // Start the thread to listen on a ServerSocket
  119. if (mSecureAcceptThread == null) {
  120. mSecureAcceptThread = new AcceptThread(port);
  121. mSecureAcceptThread.start();
  122. }
  123. }
  124. protected synchronized void retry() {
  125. if (D)
  126. Util.debug("retry");
  127. if (D)
  128. Util.debug("Retrying in state: " + getState());
  129. if (mState == STATE_CONNECTED)
  130. return;
  131. // TODO: Does this logic belong here
  132. if (mNumTries >= MAX_RETRY) {
  133. signalFailed();
  134. if (acceptMode)
  135. start(mPort);
  136. return;
  137. }
  138. startAcceptThread(mPort);
  139. setState(STATE_RETRY);
  140. int sleep = (int) (Math.random() * 1000 + 100);
  141. if (D)
  142. Util.debug("Sleeping: " + sleep);
  143. try {
  144. Thread.sleep(sleep);
  145. } catch (InterruptedException e) {
  146. Util.debug("Sleep interupted");
  147. } // TODO: This may block the main thread?
  148. if (D)
  149. Util.debug("Waking up: " + getState());
  150. // TODO: make this less strict
  151. if (mState != STATE_CONNECTING && mState != STATE_CONNECTED && mConnectedThread == null
  152. && mConnectThread == null)
  153. connect(mAddress);
  154. }
  155. /**
  156. * Start the ConnectThread to initiate a connection to a remote device.
  157. *
  158. * @param address
  159. * The address of the server
  160. * @param secure
  161. * Socket Security type - Secure (true) , Insecure (false)
  162. */
  163. public synchronized void connect(InetSocketAddress address) {
  164. if (D)
  165. Util.disp("connect to: " + address);
  166. // Don't throw out connections if we are already connected
  167. /*
  168. * if (mState == STATE_CONNECTING || mConnectedThread != null) { return;
  169. * }
  170. */
  171. mNumTries++;
  172. mAddress = address;
  173. // Cancel any thread attempting to make a connection
  174. if (mState == STATE_CONNECTING) {
  175. if (mConnectThread != null) {
  176. mConnectThread.cancel();
  177. mConnectThread = null;
  178. }
  179. }
  180. // Cancel any thread currently running a connection
  181. if (mConnectedThread != null) {
  182. mConnectedThread.cancel();
  183. mConnectedThread = null;
  184. }
  185. // Start the thread to connect with the given device
  186. mConnectThread = new ConnectThread(address);
  187. mConnectThread.start();
  188. setState(STATE_CONNECTING);
  189. }
  190. /**
  191. * Start the ConnectedThread to begin managing a connection
  192. *
  193. * @param socket
  194. * The Socket on which the connection was made
  195. */
  196. public synchronized void connected(Socket socket) {
  197. if (D)
  198. Util.debug("connected");
  199. // Cancel the thread that completed the connection
  200. if (mConnectThread != null) {
  201. mConnectThread.cancel();
  202. mConnectThread = null;
  203. }
  204. // Cancel any thread currently running a connection
  205. if (mConnectedThread != null) {
  206. mConnectedThread.cancel();
  207. mConnectedThread = null;
  208. }
  209. // Cancel the accept thread because we only want to connect to one
  210. // device
  211. if (mSecureAcceptThread != null) {
  212. mSecureAcceptThread.cancel();
  213. mSecureAcceptThread = null;
  214. }
  215. // Start the thread to manage the connection and perform transmissions
  216. mConnectedThread = new ConnectedThread(socket);
  217. mConnectedThread.start();
  218. setState(STATE_CONNECTED);
  219. }
  220. protected void connectionFailed() {
  221. Util.error("Connection to the device failed");
  222. // Start the service over to restart listening mode
  223. if (getState() != STATE_STOPPED)
  224. retry();
  225. }
  226. /**
  227. * Indicate that the connection was lost and notify the UI Activity.
  228. */
  229. protected void connectionLost() {
  230. if (D)
  231. Util.error("Connection to the device lost");
  232. // Start the service over to restart listening mode
  233. if (getState() != STATE_STOPPED && acceptMode) {
  234. start(mPort);
  235. }
  236. }
  237. protected void signalFailed() {
  238. // TODO:
  239. }
  240. /**
  241. * Stop all threads
  242. */
  243. public synchronized void stop() {
  244. if (D)
  245. Util.debug("stop");
  246. setState(STATE_STOPPED);
  247. if (mConnectedThread != null) {
  248. mConnectedThread.cancel();
  249. mConnectedThread = null;
  250. }
  251. if (mConnectThread != null) {
  252. mConnectThread.cancel();
  253. mConnectThread = null;
  254. }
  255. if (mSecureAcceptThread != null) {
  256. mSecureAcceptThread.cancel();
  257. mSecureAcceptThread = null;
  258. }
  259. }
  260. /**
  261. * Write to the ConnectedThread in an unsynchronized manner
  262. *
  263. * This does not add message boundries!!
  264. *
  265. * @param out
  266. * The bytes to write
  267. * @see ConnectedThread#write(byte[])
  268. */
  269. public void write(byte[] out) {
  270. // Create temporary object
  271. ConnectedThread r;
  272. // Synchronize a copy of the ConnectedThread
  273. synchronized (this) {
  274. if (mState != STATE_CONNECTED)
  275. return;
  276. r = mConnectedThread;
  277. }
  278. // Perform the write unsynchronized
  279. r.write(out);
  280. }
  281. public void write(int pid, byte[] out) {
  282. comEnc.start();
  283. out = aes.encrypt(out);
  284. comEnc.stop();
  285. write(out);
  286. if (bandSwitch)
  287. bandwidth[pid].add(out.length);
  288. }
  289. /**
  290. * Write a length encoded byte array.
  291. *
  292. * @param out
  293. */
  294. public void writeLengthEncoded(byte[] out) {
  295. write("" + out.length);
  296. write(out);
  297. }
  298. /*
  299. * public <T> void write(T out) {
  300. * write(SerializationUtils.serialize((Serializable) out)); }
  301. *
  302. * public <T> void write(int pid, T out) { write(pid,
  303. * SerializationUtils.serialize((Serializable) out)); }
  304. */
  305. public void write(BigInteger b) {
  306. write(b.toByteArray());
  307. }
  308. public void write(int pid, BigInteger b) {
  309. write(pid, b.toByteArray());
  310. }
  311. public void write(int n) {
  312. write(BigInteger.valueOf(n));
  313. }
  314. public void write(int pid, int n) {
  315. write(pid, BigInteger.valueOf(n));
  316. }
  317. public void write(long n) {
  318. write(BigInteger.valueOf(n));
  319. }
  320. public void write(int pid, long n) {
  321. write(pid, BigInteger.valueOf(n));
  322. }
  323. public void write(byte[][] arr) {
  324. write(ComUtil.serialize(arr));
  325. }
  326. public void write(int pid, byte[][] arr) {
  327. write(pid, ComUtil.serialize(arr));
  328. }
  329. public void write(byte[][][] arr) {
  330. write(ComUtil.serialize(arr));
  331. }
  332. public void write(int pid, byte[][][] arr) {
  333. write(pid, ComUtil.serialize(arr));
  334. }
  335. public void write(int[] arr) {
  336. write(ComUtil.serialize(arr));
  337. }
  338. public void write(int pid, int[] arr) {
  339. write(pid, ComUtil.serialize(arr));
  340. }
  341. public void write(int[][] arr) {
  342. write(ComUtil.serialize(arr));
  343. }
  344. public void write(int pid, int[][] arr) {
  345. write(pid, ComUtil.serialize(arr));
  346. }
  347. public void write(Tuple t) {
  348. write(ComUtil.serialize(t));
  349. }
  350. public void write(int pid, Tuple t) {
  351. write(pid, ComUtil.serialize(t));
  352. }
  353. public void write(Tuple[] arr) {
  354. write(ComUtil.serialize(arr));
  355. }
  356. public void write(int pid, Tuple[] arr) {
  357. write(pid, ComUtil.serialize(arr));
  358. }
  359. public void write(Bucket b) {
  360. write(b.getTuples());
  361. }
  362. public void write(int pid, Bucket b) {
  363. write(pid, b.getTuples());
  364. }
  365. public void write(Bucket[] arr) {
  366. write(ComUtil.serialize(arr));
  367. }
  368. public void write(int pid, Bucket[] arr) {
  369. write(pid, ComUtil.serialize(arr));
  370. }
  371. public void write(GCSignal key) {
  372. write(key.bytes);
  373. }
  374. public void write(int pid, GCSignal key) {
  375. write(pid, key.bytes);
  376. }
  377. public void write(GCSignal[] arr) {
  378. write(ComUtil.serialize(arr));
  379. }
  380. public void write(int pid, GCSignal[] arr) {
  381. write(pid, ComUtil.serialize(arr));
  382. }
  383. public void write(GCSignal[][] arr) {
  384. write(ComUtil.serialize(arr));
  385. }
  386. public void write(int pid, GCSignal[][] arr) {
  387. write(pid, ComUtil.serialize(arr));
  388. }
  389. public void write(GCSignal[][][] arr) {
  390. write(ComUtil.serialize(arr));
  391. }
  392. public void write(int pid, GCSignal[][][] arr) {
  393. write(pid, ComUtil.serialize(arr));
  394. }
  395. public void write(GCSignal[][][][] arr) {
  396. write(ComUtil.serialize(arr));
  397. }
  398. public void write(int pid, GCSignal[][][][] arr) {
  399. write(pid, ComUtil.serialize(arr));
  400. }
  401. public void write(ArrayList<byte[]> arr) {
  402. write(ComUtil.serialize(arr));
  403. }
  404. public void write(int pid, ArrayList<byte[]> arr) {
  405. write(pid, ComUtil.serialize(arr));
  406. }
  407. public static final Charset defaultCharset = Charset.forName("ASCII");
  408. // TODO: Rather than having millions of write/read methods can we take
  409. // advantage of DataStreams?
  410. public void write(String buffer) {
  411. write(buffer, defaultCharset);
  412. }
  413. /*
  414. * This was added to allow backwords compaitibility with older versions
  415. * which used the default charset (usually utf-8) instead of asc-ii. This is
  416. * almost never what we want to do
  417. */
  418. public void write(String buffer, Charset charset) {
  419. write(buffer.getBytes(charset));
  420. if (D)
  421. Util.debug("Write: " + buffer);
  422. }
  423. /**
  424. * Read a string from Connected Thread
  425. *
  426. * @see #read()
  427. */
  428. public String readString() {
  429. return new String(read());
  430. }
  431. /**
  432. * Read from the ConnectedThread in an unsynchronized manner Note, this is a
  433. * blocking call
  434. *
  435. * @return the bytes read
  436. * @see ConnectedThread#read()
  437. */
  438. public byte[] read() {
  439. // Create temporary object
  440. ConnectedThread r;
  441. // Synchronize a copy of the ConnectedThread
  442. synchronized (this) {
  443. if (mState != STATE_CONNECTED)
  444. return null;
  445. r = mConnectedThread;
  446. }
  447. // Perform the read unsynchronized and parse
  448. byte[] readMessage = r.read();
  449. if (D)
  450. Util.debug("Read: " + new String(readMessage));
  451. return readMessage;
  452. }
  453. public byte[] read(int pid) {
  454. byte[] msg = read();
  455. comEnc.start();
  456. msg = aes.decrypt(msg);
  457. comEnc.stop();
  458. return msg;
  459. }
  460. /**
  461. * Read a specific number of bytes from the ConnectedThread in an
  462. * unsynchronized manner Note, this is a blocking call
  463. *
  464. * @return the bytes read
  465. * @see ConnectedThread#read()
  466. */
  467. public byte[] readLengthEncoded() {
  468. int len = Integer.parseInt(readString());
  469. ArrayList<byte[]> bytes = new ArrayList<byte[]>();
  470. byte[] data = read();
  471. if (data.length != len) {
  472. bytes.add(data);
  473. data = read();
  474. }
  475. byte[] total = new byte[len];
  476. int offset = 0;
  477. for (byte[] b : bytes) {
  478. for (int i = 0; i < b.length; i++) {
  479. total[offset++] = b[i];
  480. }
  481. }
  482. return total;
  483. }
  484. /*
  485. * public <T> T readObject() { T object =
  486. * SerializationUtils.deserialize(read()); return object; }
  487. */
  488. public BigInteger readBigInteger() {
  489. return new BigInteger(read());
  490. }
  491. public BigInteger readBigInteger(int pid) {
  492. return new BigInteger(read(pid));
  493. }
  494. public int readInt() {
  495. return readBigInteger().intValue();
  496. }
  497. public int readInt(int pid) {
  498. return readBigInteger(pid).intValue();
  499. }
  500. public long readLong() {
  501. return readBigInteger().longValue();
  502. }
  503. public long readLong(int pid) {
  504. return readBigInteger(pid).longValue();
  505. }
  506. public byte[][] readDoubleByteArray() {
  507. return ComUtil.toDoubleByteArray(read());
  508. }
  509. public byte[][] readDoubleByteArray(int pid) {
  510. return ComUtil.toDoubleByteArray(read(pid));
  511. }
  512. public byte[][][] readTripleByteArray() {
  513. return ComUtil.toTripleByteArray(read());
  514. }
  515. public byte[][][] readTripleByteArray(int pid) {
  516. return ComUtil.toTripleByteArray(read(pid));
  517. }
  518. public int[] readIntArray() {
  519. return ComUtil.toIntArray(read());
  520. }
  521. public int[] readIntArray(int pid) {
  522. return ComUtil.toIntArray(read(pid));
  523. }
  524. public int[][] readDoubleIntArray() {
  525. return ComUtil.toDoubleIntArray(read());
  526. }
  527. public int[][] readDoubleIntArray(int pid) {
  528. return ComUtil.toDoubleIntArray(read(pid));
  529. }
  530. public Tuple readTuple() {
  531. return ComUtil.toTuple(read());
  532. }
  533. public Tuple readTuple(int pid) {
  534. return ComUtil.toTuple(read(pid));
  535. }
  536. public Tuple[] readTupleArray() {
  537. return ComUtil.toTupleArray(read());
  538. }
  539. public Tuple[] readTupleArray(int pid) {
  540. return ComUtil.toTupleArray(read(pid));
  541. }
  542. public Bucket readBucket() {
  543. return new Bucket(readTupleArray());
  544. }
  545. public Bucket readBucket(int pid) {
  546. return new Bucket(readTupleArray(pid));
  547. }
  548. public Bucket[] readBucketArray() {
  549. return ComUtil.toBucketArray(read());
  550. }
  551. public Bucket[] readBucketArray(int pid) {
  552. return ComUtil.toBucketArray(read(pid));
  553. }
  554. public GCSignal readGCSignal() {
  555. return new GCSignal(read());
  556. }
  557. public GCSignal readGCSignal(int pid) {
  558. return new GCSignal(read(pid));
  559. }
  560. public GCSignal[] readGCSignalArray() {
  561. return ComUtil.toGCSignalArray(read());
  562. }
  563. public GCSignal[] readGCSignalArray(int pid) {
  564. return ComUtil.toGCSignalArray(read(pid));
  565. }
  566. public GCSignal[][] readDoubleGCSignalArray() {
  567. return ComUtil.toDoubleGCSignalArray(read());
  568. }
  569. public GCSignal[][] readDoubleGCSignalArray(int pid) {
  570. return ComUtil.toDoubleGCSignalArray(read(pid));
  571. }
  572. public GCSignal[][][] readTripleGCSignalArray() {
  573. return ComUtil.toTripleGCSignalArray(read());
  574. }
  575. public GCSignal[][][] readTripleGCSignalArray(int pid) {
  576. return ComUtil.toTripleGCSignalArray(read(pid));
  577. }
  578. public GCSignal[][][][] readQuadGCSignalArray() {
  579. return ComUtil.toQuadGCSignalArray(read());
  580. }
  581. public GCSignal[][][][] readQuadGCSignalArray(int pid) {
  582. return ComUtil.toQuadGCSignalArray(read(pid));
  583. }
  584. public ArrayList<byte[]> readArrayList() {
  585. return ComUtil.toArrayList(read());
  586. }
  587. public ArrayList<byte[]> readArrayList(int pid) {
  588. return ComUtil.toArrayList(read(pid));
  589. }
  590. /**
  591. * This thread runs while listening for incoming connections. It behaves
  592. * like a server-side client. It runs until a connection is accepted (or
  593. * until cancelled).
  594. */
  595. private class AcceptThread extends Thread {
  596. // The local server socket
  597. private final ServerSocket mmServerSocket;
  598. public AcceptThread(int port) {
  599. ServerSocket tmp = null;
  600. try {
  601. tmp = new ServerSocket(port);
  602. } catch (IOException e) {
  603. Util.error("ServerSocket unable to start", e);
  604. }
  605. mmServerSocket = tmp;
  606. }
  607. public void run() {
  608. if (D)
  609. Util.disp("BEGIN mAcceptThread ");
  610. setName("AcceptThread");
  611. Socket socket = null;
  612. // Listen to the server socket if we're not connected
  613. while (mState != STATE_CONNECTED) {
  614. try {
  615. // This is a blocking call and will only return on a
  616. // successful connection or an exception
  617. socket = mmServerSocket.accept();
  618. // socket.setTcpNoDelay(true);
  619. } catch (IOException e) {
  620. Util.error("accept() failed", e);
  621. break;
  622. }
  623. // If a connection was accepted
  624. if (socket != null) {
  625. synchronized (Communication.this) {
  626. switch (mState) {
  627. case STATE_LISTEN:
  628. case STATE_CONNECTING:
  629. // Situation normal. Start the connected thread.
  630. connected(socket);
  631. break;
  632. case STATE_NONE:
  633. case STATE_CONNECTED:
  634. // Either not ready or already connected.
  635. // Terminate new socket.
  636. try {
  637. socket.close();
  638. } catch (IOException e) {
  639. Util.error("Could not close unwanted socket", e);
  640. }
  641. // TODO: Should we really be returning here?
  642. return;
  643. }
  644. }
  645. }
  646. }
  647. if (D)
  648. Util.disp("END mAcceptThread");
  649. }
  650. public void cancel() {
  651. if (D)
  652. Util.debug("AcceptThread canceled " + this);
  653. try {
  654. mmServerSocket.close();
  655. } catch (IOException e) {
  656. Util.error("close() of server failed", e);
  657. }
  658. }
  659. }
  660. /**
  661. * This thread runs while attempting to make an outgoing connection with a
  662. * device. It runs straight through; the connection either succeeds or
  663. * fails.
  664. */
  665. private class ConnectThread extends Thread {
  666. private final Socket mmSocket;
  667. private final InetSocketAddress mmAddress;
  668. public ConnectThread(InetSocketAddress address) {
  669. mmAddress = address;
  670. mmSocket = new Socket();
  671. /*
  672. * try { mmSocket.setTcpNoDelay(true); } catch (SocketException e) {
  673. * e.printStackTrace(); }
  674. */
  675. }
  676. public void run() {
  677. Util.debug("BEGIN mConnectThread");
  678. setName("ConnectThread");
  679. try {
  680. // This is a blocking call and will only return on a
  681. // successful connection or an exception
  682. mmSocket.connect(mmAddress);
  683. } catch (IOException e) {
  684. // Close the socket
  685. try {
  686. mmSocket.close();
  687. } catch (IOException e2) {
  688. Util.error("unable to close() socket during connection failure", e2);
  689. }
  690. connectionFailed();
  691. return;
  692. }
  693. // Reset the ConnectThread because we're done
  694. synchronized (Communication.this) {
  695. mConnectThread = null;
  696. }
  697. // Start the connected thread
  698. connected(mmSocket);
  699. }
  700. public void cancel() {
  701. try {
  702. mmSocket.close();
  703. } catch (IOException e) {
  704. Util.error("close() of connect socket failed", e);
  705. }
  706. }
  707. }
  708. /**
  709. * This thread runs during a connection with a remote device. It handles all
  710. * incoming and outgoing transmissions.
  711. */
  712. private class ConnectedThread extends Thread {
  713. private final Socket mmSocket;
  714. private final DataInputStream mmInStream;
  715. private final DataOutputStream mmOutStream;
  716. private BlockingQueue<byte[]> mMessageBuffer;
  717. public ConnectedThread(Socket socket) {
  718. Util.debug("create ConnectedThread");
  719. mmSocket = socket;
  720. DataInputStream tmpIn = null;
  721. DataOutputStream tmpOut = null;
  722. mMessageBuffer = new LinkedBlockingQueue<byte[]>(); // TODO: add a
  723. // capacity here
  724. // to prevent
  725. // doS
  726. // Get the Socket input and output streams
  727. try {
  728. tmpIn = new DataInputStream(socket.getInputStream());
  729. tmpOut = new DataOutputStream(socket.getOutputStream());
  730. } catch (StreamCorruptedException e) {
  731. Util.error("object streams corrupt", e);
  732. } catch (IOException e) {
  733. Util.error("temp sockets not created", e);
  734. }
  735. mmInStream = tmpIn;
  736. mmOutStream = tmpOut;
  737. }
  738. public void setTcpNoDelay(boolean on) {
  739. if (mmSocket != null)
  740. try {
  741. mmSocket.setTcpNoDelay(on);
  742. } catch (SocketException e) {
  743. e.printStackTrace();
  744. }
  745. }
  746. /**
  747. * Read from the ConnectedThread in an unsynchronized manner
  748. *
  749. * This is a blocking call and will only return data if the readLoop
  750. * flag is false
  751. *
  752. * @return the bytes read
  753. * @see ConnectedThread#read()
  754. */
  755. public byte[] read() {
  756. try {
  757. return mMessageBuffer.take();
  758. } catch (InterruptedException e) {
  759. Util.error("Message Read Interupted");
  760. return null;
  761. }
  762. }
  763. /**
  764. * Write to the connected OutStream.
  765. *
  766. * @param buffer
  767. * The bytes to write
  768. */
  769. public void write(byte[] buffer) {
  770. try {
  771. mmOutStream.writeInt(buffer.length);
  772. mmOutStream.write(buffer);
  773. mmOutStream.flush();
  774. } catch (IOException e) {
  775. Util.error("Exception during write", e);
  776. }
  777. }
  778. public void run() {
  779. Util.disp("BEGIN mConnectedThread");
  780. int bytes;
  781. // Keep listening to the InputStream while connected
  782. while (true) {
  783. try {
  784. // Read from the InputStream
  785. bytes = mmInStream.readInt();
  786. byte[] buffer = new byte[bytes]; // TODO: This is a little
  787. // dangerous
  788. mmInStream.readFully(buffer, 0, bytes);
  789. try {
  790. mMessageBuffer.put(buffer);
  791. } catch (InterruptedException e) {
  792. Util.error("Message add interupted.");
  793. // TODO: possibly move this catch elsewhere
  794. }
  795. } catch (IOException e) {
  796. if (D)
  797. Util.debug("Device disconnected");
  798. connectionLost();
  799. break;
  800. }
  801. }
  802. }
  803. public void cancel() {
  804. try {
  805. mmInStream.close();
  806. mmOutStream.close();
  807. mmSocket.close();
  808. } catch (IOException e) {
  809. Util.error("close() of connect socket failed", e);
  810. }
  811. }
  812. }
  813. }