Network.java 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. package com.oblivm.backend.network;
  2. import java.io.IOException;
  3. import java.io.InputStream;
  4. import java.io.OutputStream;
  5. import java.math.BigInteger;
  6. import java.net.ServerSocket;
  7. import java.net.Socket;
  8. import java.nio.ByteBuffer;
  9. import com.oblivm.backend.flexsc.CompEnv;
  10. import com.oblivm.backend.flexsc.Mode;
  11. import com.oblivm.backend.gc.GCSignal;
  12. public class Network {
  13. protected Socket sock;
  14. protected ServerSocket serverSock;
  15. public CustomizedConcurrentQueue queue;
  16. ThreadedIO threadedio;
  17. public InputStream is;
  18. public OutputStream os;
  19. Thread thd;
  20. boolean THREADEDIO = true;
  21. static int NetworkThreadedQueueSize = 1024 * 256;
  22. public void setUpThread() {
  23. if (THREADEDIO) {
  24. queue = new CustomizedConcurrentQueue(NetworkThreadedQueueSize);
  25. threadedio = new ThreadedIO(queue, os);
  26. thd = new Thread(threadedio);
  27. thd.start();
  28. }
  29. }
  30. public Network() {
  31. }
  32. public Network(InputStream is, OutputStream os, Socket sock) {
  33. this.is = is;
  34. this.os = os;
  35. this.sock = sock;
  36. }
  37. public void disconnect() {
  38. try {
  39. if (THREADEDIO) {
  40. queue.destory();
  41. os.flush();
  42. thd.join();
  43. }
  44. os.flush();
  45. // protocol payloads are received.
  46. if (sock != null) {
  47. sock.close();
  48. }
  49. if (serverSock != null) {
  50. serverSock.close();
  51. }
  52. } catch (IOException e) {
  53. // TODO Auto-generated catch block
  54. e.printStackTrace();
  55. } catch (InterruptedException e) {
  56. // TODO Auto-generated catch block
  57. e.printStackTrace();
  58. }
  59. }
  60. public void flush() {
  61. try {
  62. os.flush();
  63. } catch (IOException e) {
  64. // TODO Auto-generated catch block
  65. e.printStackTrace();
  66. }
  67. }
  68. public byte[] readBytes(int len) {
  69. byte[] temp = new byte[len];
  70. try {
  71. int remain = len;
  72. while (0 < remain) {
  73. int readBytes;
  74. readBytes = is.read(temp, len - remain, remain);
  75. if (readBytes != -1) {
  76. remain -= readBytes;
  77. }
  78. }
  79. } catch (IOException e) {
  80. e.printStackTrace();
  81. }
  82. return temp;
  83. }
  84. public void readBytes(byte[] temp) {
  85. // byte[] temp = new byte[len];
  86. try {
  87. int remain = temp.length;
  88. while (0 < remain) {
  89. int readBytes;
  90. readBytes = is.read(temp, temp.length - remain, remain);
  91. if (readBytes != -1) {
  92. remain -= readBytes;
  93. }
  94. }
  95. } catch (IOException e) {
  96. e.printStackTrace();
  97. }
  98. }
  99. public byte[] readBytes() {
  100. byte[] lenBytes = readBytes(4);
  101. int len = ByteBuffer.wrap(lenBytes).getInt();
  102. return readBytes(len);
  103. }
  104. public void writeByte(byte[] data) {
  105. writeByte(ByteBuffer.allocate(4).putInt(data.length).array(), 4);
  106. writeByte(data, data.length);
  107. }
  108. public void writeByte(byte[] data, int length) {
  109. try {
  110. if (THREADEDIO) {
  111. queue.insert(data);
  112. // System.out.println(data.length);
  113. } else {
  114. os.write(data);
  115. }
  116. } catch (Exception e) {
  117. e.printStackTrace();
  118. System.exit(1);
  119. }
  120. }
  121. public void writeByte(byte data) {
  122. try {
  123. if (THREADEDIO)
  124. queue.insert(new byte[] { data });
  125. else {
  126. os.write(data);
  127. }
  128. } catch (Exception e) {
  129. // TODO Auto-generated catch block
  130. e.printStackTrace();
  131. }
  132. }
  133. public void writeBI(BigInteger bi) {
  134. writeByte(bi.toByteArray());
  135. }
  136. public BigInteger readBI() {
  137. byte[] rep = readBytes();
  138. return new BigInteger(rep);
  139. }
  140. public void writeInt(int i) {
  141. writeByte(ByteBuffer.allocate(4).putInt(i).array(), 4);
  142. }
  143. public int readInt() {
  144. return ByteBuffer.wrap(readBytes(4)).getInt();
  145. }
  146. public <T> void send(T[][][] data, CompEnv<T> env) throws IOException {
  147. for (int i = 0; i < data.length; i++) {
  148. send(data[i], env);
  149. }
  150. }
  151. public <T> void send(T[][] data, CompEnv<T> env) throws IOException {
  152. for (int i = 0; i < data.length; i++) {
  153. send(data[i], env);
  154. }
  155. }
  156. public <T> T[][] read(int length1, int length2, CompEnv<T> env) throws IOException {
  157. T[][] ret = env.newTArray(length1, 1);
  158. for (int i = 0; i < length1; i++) {
  159. ret[i] = read(length2, env);
  160. }
  161. return ret;
  162. }
  163. public <T> T[][][] read(int length1, int length2, int length3, CompEnv<T> env) throws IOException {
  164. T[][][] ret = env.newTArray(length1, 1, 1);
  165. for (int i = 0; i < length1; i++) {
  166. ret[i] = read(length2, length3, env);
  167. }
  168. return ret;
  169. }
  170. public <T> void send(T[] data, CompEnv<T> env) throws IOException {
  171. for (int i = 0; i < data.length; i++) {
  172. send(data[i], env);
  173. }
  174. }
  175. public <T> void send(T data, CompEnv<T> env) throws IOException {
  176. Mode mode = env.getMode();
  177. if (mode == Mode.REAL) {
  178. GCSignal gcData = (GCSignal) data;
  179. gcData.send(this);
  180. } else if (mode == Mode.VERIFY) {
  181. writeBoolean((Boolean) data);
  182. } else if (mode == Mode.COUNT) {
  183. }
  184. }
  185. public <T> T[] read(int length, CompEnv<T> env) throws IOException {
  186. T[] ret = env.newTArray(length);
  187. for (int i = 0; i < length; i++) {
  188. ret[i] = read(env);
  189. }
  190. return ret;
  191. }
  192. @SuppressWarnings("unchecked")
  193. public <T> T read(CompEnv<T> env) throws IOException {
  194. Mode mode = env.getMode();
  195. if (mode == Mode.REAL || mode == Mode.OPT || mode == Mode.OFFLINE) {
  196. GCSignal signal = GCSignal.receive(this);
  197. return (T) signal;
  198. } else if (mode == Mode.VERIFY) {
  199. Boolean vData = readBoolean();
  200. return (T) vData;
  201. } else if (mode == Mode.COUNT) {
  202. return env.ZERO();
  203. }
  204. // shouldn't happen;
  205. return null;
  206. }
  207. public boolean readBoolean() throws IOException {
  208. int read = readInt();
  209. return read == 1;
  210. }
  211. public void writeBoolean(boolean data) throws IOException {
  212. int sen = data ? 1 : 0;
  213. writeInt(sen);
  214. }
  215. }