Browse Source

reshuffle and postprocesst executed in parallel now

Boyoung- 9 years ago
parent
commit
7b94055205

+ 49 - 26
src/protocols/Pipeline.java

@@ -16,8 +16,7 @@ import util.Util;
 
 public class Pipeline extends Thread {
 
-	private Communication con1;
-	private Communication con2;
+	private Communication[] cons;
 	private Party party;
 	private PreData[] predata;
 	private Tree OTi;
@@ -28,10 +27,9 @@ public class Pipeline extends Thread {
 	private byte[] Li;
 	private OutAccess outaccess;
 
-	public Pipeline(Communication con1, Communication con2, Party party, PreData[] predata, Tree OTi, int h,
-			Timer timer, Metadata md, int treeIndex, byte[] Li, OutAccess outaccess) {
-		this.con1 = con1;
-		this.con2 = con2;
+	public Pipeline(Communication[] cons, Party party, PreData[] predata, Tree OTi, int h, Timer timer, Metadata md,
+			int treeIndex, byte[] Li, OutAccess outaccess) {
+		this.cons = cons;
 		this.party = party;
 		this.predata = predata;
 		this.OTi = OTi;
@@ -45,14 +43,25 @@ public class Pipeline extends Thread {
 
 	public void runE(PreData[] predata, Tree OTi, int h, Timer timer) {
 		// 1st eviction
-		Access access = new Access(con1, con2);
-		Reshuffle reshuffle = new Reshuffle(con1, con2);
-		PostProcessT postprocesst = new PostProcessT(con1, con2);
-		UpdateRoot updateroot = new UpdateRoot(con1, con2);
-		Eviction eviction = new Eviction(con1, con2);
-
-		Tuple[] path = reshuffle.runE(predata[0], outaccess.E_P, OTi.getTreeIndex() == 0, timer);
+		Access access = new Access(cons[0], cons[1]);
+		Reshuffle reshuffle = new Reshuffle(cons[2], cons[3]);
+		PostProcessT postprocesst = new PostProcessT(cons[0], cons[1]);
+		UpdateRoot updateroot = new UpdateRoot(cons[0], cons[1]);
+		Eviction eviction = new Eviction(cons[0], cons[1]);
+
+		Timer t = new Timer();
+		reshuffle.setArgs(Party.Eddie, predata[0], outaccess.E_P, OTi.getTreeIndex() == 0, t);
+		Thread thread = new Thread(reshuffle);
+		thread.start();
 		Tuple Ti = postprocesst.runE(predata[0], outaccess.E_Ti, OTi.getTreeIndex() == h - 1, timer);
+		try {
+			thread.join();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
+		Tuple[] path = reshuffle.getReturn();
+		timer.add(t);
+
 		Tuple[] root = Arrays.copyOfRange(path, 0, OTi.getStashSize());
 		root = updateroot.runE(predata[0], OTi.getTreeIndex() == 0, outaccess.Li, root, Ti, timer);
 		System.arraycopy(root, 0, path, 0, root.length);
@@ -72,14 +81,17 @@ public class Pipeline extends Thread {
 
 	public void runD(PreData predata[], Tree OTi, Timer timer) {
 		// 1st eviction
-		Access access = new Access(con1, con2);
-		Reshuffle reshuffle = new Reshuffle(con1, con2);
-		PostProcessT postprocesst = new PostProcessT(con1, con2);
-		UpdateRoot updateroot = new UpdateRoot(con1, con2);
-		Eviction eviction = new Eviction(con1, con2);
-
+		Access access = new Access(cons[0], cons[1]);
+		Reshuffle reshuffle = new Reshuffle(cons[2], cons[3]);
+		PostProcessT postprocesst = new PostProcessT(cons[0], cons[1]);
+		UpdateRoot updateroot = new UpdateRoot(cons[0], cons[1]);
+		Eviction eviction = new Eviction(cons[0], cons[1]);
+
+		// no extra thread for D's reshuffle and postprocesst
+		// because D does nothing online in these two protocols
 		reshuffle.runD();
 		postprocesst.runD();
+
 		updateroot.runD(predata[0], OTi.getTreeIndex() == 0, Li, OTi.getW(), timer);
 		eviction.runD(predata[0], OTi.getTreeIndex() == 0, Li, OTi, timer);
 
@@ -91,15 +103,26 @@ public class Pipeline extends Thread {
 
 	public OutAccess runC(PreData[] predata, Metadata md, int ti, byte[] Li, Timer timer) {
 		// 1st eviction
-		Access access = new Access(con1, con2);
-		Reshuffle reshuffle = new Reshuffle(con1, con2);
-		PostProcessT postprocesst = new PostProcessT(con1, con2);
-		UpdateRoot updateroot = new UpdateRoot(con1, con2);
-		Eviction eviction = new Eviction(con1, con2);
-
-		Tuple[] path = reshuffle.runC(predata[0], outaccess.C_P, ti == 0, timer);
+		Access access = new Access(cons[0], cons[1]);
+		Reshuffle reshuffle = new Reshuffle(cons[2], cons[3]);
+		PostProcessT postprocesst = new PostProcessT(cons[0], cons[1]);
+		UpdateRoot updateroot = new UpdateRoot(cons[0], cons[1]);
+		Eviction eviction = new Eviction(cons[0], cons[1]);
+
+		Timer t = new Timer();
+		reshuffle.setArgs(Party.Charlie, predata[0], outaccess.C_P, ti == 0, t);
+		Thread thread = new Thread(reshuffle);
+		thread.start();
 		Tuple Ti = postprocesst.runC(predata[0], outaccess.C_Ti, Li, outaccess.C_Lip1, outaccess.C_j2,
 				ti == md.getNumTrees() - 1, timer);
+		try {
+			thread.join();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
+		Tuple[] path = reshuffle.getReturn();
+		timer.add(t);
+
 		Tuple[] root = Arrays.copyOfRange(path, 0, md.getStashSizeOfTree(ti));
 		root = updateroot.runC(predata[0], ti == 0, root, Ti, timer);
 		System.arraycopy(root, 0, path, 0, root.length);

+ 35 - 1
src/protocols/Reshuffle.java

@@ -21,14 +21,32 @@ import util.P;
 import util.Timer;
 import util.Util;
 
-public class Reshuffle extends Protocol {
+public class Reshuffle extends Protocol implements Runnable {
 
 	private int pid = P.RSF;
 
+	private Party party;
+	private PreData predata;
+	private Tuple[] path;
+	private boolean firstTree;
+	private Timer timer;
+
 	public Reshuffle(Communication con1, Communication con2) {
 		super(con1, con2);
 	}
 
+	public void setArgs(Party party, PreData predata, Tuple[] path, boolean firstTree, Timer timer) {
+		this.party = party;
+		this.predata = predata;
+		this.path = path;
+		this.firstTree = firstTree;
+		this.timer = timer;
+	}
+
+	public Tuple[] getReturn() {
+		return path;
+	}
+
 	public Tuple[] runE(PreData predata, Tuple[] path, boolean firstTree, Timer timer) {
 		if (firstTree)
 			return path;
@@ -72,6 +90,22 @@ public class Reshuffle extends Protocol {
 		return predata.reshuffle_a_prime;
 	}
 
+	@Override
+	public void run() {
+		if (party == Party.Eddie) {
+			path = runE(predata, path, firstTree, timer);
+
+		} else if (party == Party.Debbie) {
+			runD();
+
+		} else if (party == Party.Charlie) {
+			path = runC(predata, path, firstTree, timer);
+
+		} else {
+			throw new NoSuchPartyException(party + "");
+		}
+	}
+
 	// for testing correctness
 	@Override
 	public void run(Party party, Metadata md, Forest forest) {

+ 14 - 9
src/protocols/Retrieve.java

@@ -122,20 +122,23 @@ public class Retrieve extends Protocol {
 		OutAccess outaccess = access.runE(predata[0], OTi, Ni, Nip1_pr, timer[0]);
 
 		int ti = OTi.getTreeIndex();
-		Pipeline pipeline = new Pipeline(cons1[ti + 1], cons2[ti + 1], Party.Eddie, predata, OTi, h, timer[ti + 1],
-				null, ti, outaccess.Li, outaccess);
+		Communication[] cons = new Communication[] { cons1[ti + 1], cons2[ti + 1], cons1[ti + h + 1],
+				cons2[ti + h + 1] };
+		Pipeline pipeline = new Pipeline(cons, Party.Eddie, predata, OTi, h, timer[ti + 1], null, ti, outaccess.Li,
+				outaccess);
 		pipeline.start();
 
 		return pipeline;
 	}
 
-	public Pipeline pipelineD(PreData predata[], Tree OTi, byte[] Ni, byte[] Nip1_pr, Timer[] timer) {
+	public Pipeline pipelineD(PreData predata[], Tree OTi, byte[] Ni, byte[] Nip1_pr, int h, Timer[] timer) {
 		Access access = new Access(con1, con2);
 		byte[] Li = access.runD(predata[0], OTi, Ni, Nip1_pr, timer[0]);
 
 		int ti = OTi.getTreeIndex();
-		Pipeline pipeline = new Pipeline(cons1[ti + 1], cons2[ti + 1], Party.Debbie, predata, OTi, 0, timer[ti + 1],
-				null, ti, Li, null);
+		Communication[] cons = new Communication[] { cons1[ti + 1], cons2[ti + 1], cons1[ti + h + 1],
+				cons2[ti + h + 1] };
+		Pipeline pipeline = new Pipeline(cons, Party.Debbie, predata, OTi, h, timer[ti + 1], null, ti, Li, null);
 		pipeline.start();
 
 		return pipeline;
@@ -145,8 +148,10 @@ public class Retrieve extends Protocol {
 		Access access = new Access(con1, con2);
 		OutAccess outaccess = access.runC(md, ti, Li, timer[0]);
 
-		Pipeline pipeline = new Pipeline(cons1[ti + 1], cons2[ti + 1], Party.Charlie, predata, null, 0, timer[ti + 1],
-				md, ti, Li, outaccess);
+		int h = md.getNumTrees();
+		Communication[] cons = new Communication[] { cons1[ti + 1], cons2[ti + 1], cons1[ti + h + 1],
+				cons2[ti + h + 1] };
+		Pipeline pipeline = new Pipeline(cons, Party.Charlie, predata, null, 0, timer[ti + 1], md, ti, Li, outaccess);
 		pipeline.start();
 
 		return new OutRetrieve(outaccess, pipeline);
@@ -286,7 +291,7 @@ public class Retrieve extends Protocol {
 						} else {
 							if (ti == 0)
 								ete_on.start();
-							threads[ti] = pipelineD(predata[ti], OTi, sD_Ni, sD_Nip1_pr, timer);
+							threads[ti] = pipelineD(predata[ti], OTi, sD_Ni, sD_Nip1_pr, numTrees, timer);
 						}
 
 					} else if (party == Party.Charlie) {
@@ -341,7 +346,7 @@ public class Retrieve extends Protocol {
 
 		Timer sum = new Timer();
 		for (int i = 0; i < timer.length; i++)
-			sum = sum.add(timer[i]);
+			sum.add(timer[i]);
 		sum.noPrePrint();
 		System.out.println();
 

+ 1 - 1
src/ui/CLI.java

@@ -108,7 +108,7 @@ public class CLI {
 		System.out.println("Starting " + party + "...");
 
 		Metadata md = new Metadata(configFile);
-		int numComs = Global.pipeline ? md.getNumTrees() + 1 : 1;
+		int numComs = Global.pipeline ? md.getNumTrees() * 2 + 1 : 1;
 		Communication[] con1 = new Communication[numComs];
 		Communication[] con2 = new Communication[numComs];
 

+ 16 - 5
src/util/StopWatch.java

@@ -108,7 +108,20 @@ public class StopWatch {
 		return sw;
 	}
 
-	public StopWatch add(StopWatch s) {
+	/*
+	 * public StopWatch add(StopWatch s) { if (isOn || s.isOn) { try { throw new
+	 * StopWatchException("StopWatch is still running"); } catch
+	 * (StopWatchException e) { e.printStackTrace(); } }
+	 * 
+	 * if (!task.equals(s.task)) { try { throw new StopWatchException(
+	 * "Tasks don't match: " + task + " != " + s.task); } catch
+	 * (StopWatchException e) { e.printStackTrace(); } }
+	 * 
+	 * StopWatch sw = new StopWatch(task); sw.elapsedWC = elapsedWC +
+	 * s.elapsedWC; sw.elapsedCPU = elapsedCPU + s.elapsedCPU; return sw; }
+	 */
+
+	public void add(StopWatch s) {
 		if (isOn || s.isOn) {
 			try {
 				throw new StopWatchException("StopWatch is still running");
@@ -125,9 +138,7 @@ public class StopWatch {
 			}
 		}
 
-		StopWatch sw = new StopWatch(task);
-		sw.elapsedWC = elapsedWC + s.elapsedWC;
-		sw.elapsedCPU = elapsedCPU + s.elapsedCPU;
-		return sw;
+		elapsedWC = elapsedWC + s.elapsedWC;
+		elapsedCPU = elapsedCPU + s.elapsedCPU;
 	}
 }

+ 11 - 4
src/util/Timer.java

@@ -74,14 +74,21 @@ public class Timer {
 		return new Timer(sws);
 	}
 
-	public Timer add(Timer t) {
+	/*
+	 * public Timer add(Timer t) { if (!stack.empty() || !t.stack.empty()) throw
+	 * new TimerException("Stack not empty");
+	 * 
+	 * StopWatch[][] sws = new StopWatch[P.size][M.size]; for (int i = 0; i <
+	 * watches.length; i++) for (int j = 0; j < watches[i].length; j++)
+	 * sws[i][j] = watches[i][j].add(t.watches[i][j]); return new Timer(sws); }
+	 */
+
+	public void add(Timer t) {
 		if (!stack.empty() || !t.stack.empty())
 			throw new TimerException("Stack not empty");
 
-		StopWatch[][] sws = new StopWatch[P.size][M.size];
 		for (int i = 0; i < watches.length; i++)
 			for (int j = 0; j < watches[i].length; j++)
-				sws[i][j] = watches[i][j].add(t.watches[i][j]);
-		return new Timer(sws);
+				watches[i][j].add(t.watches[i][j]);
 	}
 }