Network.java 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package com.oblivm.backend.network;
  2. import java.io.IOException;
  3. import java.io.InputStream;
  4. import java.io.OutputStream;
  5. import java.math.BigInteger;
  6. import java.net.ServerSocket;
  7. import java.net.Socket;
  8. import java.nio.ByteBuffer;
  9. import com.oblivm.backend.flexsc.CompEnv;
  10. import com.oblivm.backend.flexsc.Mode;
  11. import com.oblivm.backend.gc.GCSignal;
  12. import communication.Communication;
  13. public class Network {
  14. protected Socket sock;
  15. protected ServerSocket serverSock;
  16. public CustomizedConcurrentQueue queue;
  17. ThreadedIO threadedio;
  18. public InputStream is;
  19. public OutputStream os;
  20. Thread thd;
  21. boolean THREADEDIO = true;
  22. static int NetworkThreadedQueueSize = 1024 * 256;
  23. public Communication sender;
  24. public Communication receiver;
  25. public void setUpThread() {
  26. if (THREADEDIO) {
  27. queue = new CustomizedConcurrentQueue(NetworkThreadedQueueSize);
  28. threadedio = new ThreadedIO(queue, os);
  29. thd = new Thread(threadedio);
  30. thd.start();
  31. }
  32. }
  33. public Network() {
  34. }
  35. public Network(Communication s, Communication r) {
  36. sender = s;
  37. receiver = r;
  38. }
  39. public Network(InputStream is, OutputStream os, Socket sock) {
  40. this.is = is;
  41. this.os = os;
  42. this.sock = sock;
  43. }
  44. public void disconnect() {
  45. try {
  46. if (THREADEDIO) {
  47. queue.destory();
  48. os.flush();
  49. thd.join();
  50. }
  51. os.flush();
  52. // protocol payloads are received.
  53. if (sock != null) {
  54. sock.close();
  55. }
  56. if (serverSock != null) {
  57. serverSock.close();
  58. }
  59. } catch (IOException e) {
  60. // TODO Auto-generated catch block
  61. e.printStackTrace();
  62. } catch (InterruptedException e) {
  63. // TODO Auto-generated catch block
  64. e.printStackTrace();
  65. }
  66. }
  67. public void flush() {
  68. try {
  69. os.flush();
  70. } catch (IOException e) {
  71. // TODO Auto-generated catch block
  72. e.printStackTrace();
  73. }
  74. }
  75. public byte[] readBytes(int len) {
  76. byte[] temp = new byte[len];
  77. try {
  78. int remain = len;
  79. while (0 < remain) {
  80. int readBytes;
  81. readBytes = is.read(temp, len - remain, remain);
  82. if (readBytes != -1) {
  83. remain -= readBytes;
  84. }
  85. }
  86. } catch (IOException e) {
  87. e.printStackTrace();
  88. }
  89. return temp;
  90. }
  91. public void readBytes(byte[] temp) {
  92. // byte[] temp = new byte[len];
  93. try {
  94. int remain = temp.length;
  95. while (0 < remain) {
  96. int readBytes;
  97. readBytes = is.read(temp, temp.length - remain, remain);
  98. if (readBytes != -1) {
  99. remain -= readBytes;
  100. }
  101. }
  102. } catch (IOException e) {
  103. e.printStackTrace();
  104. }
  105. }
  106. public byte[] readBytes() {
  107. byte[] lenBytes = readBytes(4);
  108. int len = ByteBuffer.wrap(lenBytes).getInt();
  109. return readBytes(len);
  110. }
  111. public void writeByte(byte[] data) {
  112. writeByte(ByteBuffer.allocate(4).putInt(data.length).array(), 4);
  113. writeByte(data, data.length);
  114. }
  115. public void writeByte(byte[] data, int length) {
  116. try {
  117. if (THREADEDIO) {
  118. queue.insert(data);
  119. // System.out.println(data.length);
  120. } else {
  121. os.write(data);
  122. }
  123. } catch (Exception e) {
  124. e.printStackTrace();
  125. System.exit(1);
  126. }
  127. }
  128. public void writeByte(byte data) {
  129. try {
  130. if (THREADEDIO)
  131. queue.insert(new byte[] { data });
  132. else {
  133. os.write(data);
  134. }
  135. } catch (Exception e) {
  136. // TODO Auto-generated catch block
  137. e.printStackTrace();
  138. }
  139. }
  140. public void writeBI(BigInteger bi) {
  141. writeByte(bi.toByteArray());
  142. }
  143. public BigInteger readBI() {
  144. byte[] rep = readBytes();
  145. return new BigInteger(rep);
  146. }
  147. public void writeInt(int i) {
  148. writeByte(ByteBuffer.allocate(4).putInt(i).array(), 4);
  149. }
  150. public int readInt() {
  151. return ByteBuffer.wrap(readBytes(4)).getInt();
  152. }
  153. public <T> void send(T[][][] data, CompEnv<T> env) throws IOException {
  154. for (int i = 0; i < data.length; i++) {
  155. send(data[i], env);
  156. }
  157. }
  158. public <T> void send(T[][] data, CompEnv<T> env) throws IOException {
  159. for (int i = 0; i < data.length; i++) {
  160. send(data[i], env);
  161. }
  162. }
  163. public <T> T[][] read(int length1, int length2, CompEnv<T> env) throws IOException {
  164. T[][] ret = env.newTArray(length1, 1);
  165. for (int i = 0; i < length1; i++) {
  166. ret[i] = read(length2, env);
  167. }
  168. return ret;
  169. }
  170. public <T> T[][][] read(int length1, int length2, int length3, CompEnv<T> env) throws IOException {
  171. T[][][] ret = env.newTArray(length1, 1, 1);
  172. for (int i = 0; i < length1; i++) {
  173. ret[i] = read(length2, length3, env);
  174. }
  175. return ret;
  176. }
  177. public <T> void send(T[] data, CompEnv<T> env) throws IOException {
  178. for (int i = 0; i < data.length; i++) {
  179. send(data[i], env);
  180. }
  181. }
  182. public <T> void send(T data, CompEnv<T> env) throws IOException {
  183. Mode mode = env.getMode();
  184. if (mode == Mode.REAL) {
  185. GCSignal gcData = (GCSignal) data;
  186. gcData.send(this);
  187. } else if (mode == Mode.VERIFY) {
  188. writeBoolean((Boolean) data);
  189. } else if (mode == Mode.COUNT) {
  190. }
  191. }
  192. public <T> T[] read(int length, CompEnv<T> env) throws IOException {
  193. T[] ret = env.newTArray(length);
  194. for (int i = 0; i < length; i++) {
  195. ret[i] = read(env);
  196. }
  197. return ret;
  198. }
  199. @SuppressWarnings("unchecked")
  200. public <T> T read(CompEnv<T> env) throws IOException {
  201. Mode mode = env.getMode();
  202. if (mode == Mode.REAL || mode == Mode.OPT || mode == Mode.OFFLINE) {
  203. GCSignal signal = GCSignal.receive(this);
  204. return (T) signal;
  205. } else if (mode == Mode.VERIFY) {
  206. Boolean vData = readBoolean();
  207. return (T) vData;
  208. } else if (mode == Mode.COUNT) {
  209. return env.ZERO();
  210. }
  211. // shouldn't happen;
  212. return null;
  213. }
  214. public boolean readBoolean() throws IOException {
  215. int read = readInt();
  216. return read == 1;
  217. }
  218. public void writeBoolean(boolean data) throws IOException {
  219. int sen = data ? 1 : 0;
  220. writeInt(sen);
  221. }
  222. }