|
@@ -121,37 +121,6 @@ static void epoch_websocket_close_handler(
|
|
|
sync->cv.notify_all();
|
|
|
}
|
|
|
|
|
|
-static int tally_websocket_data_handler(
|
|
|
- struct mg_connection *conn,
|
|
|
- int bits,
|
|
|
- char *data,
|
|
|
- size_t data_len,
|
|
|
- void *user_data)
|
|
|
-{
|
|
|
- if ((bits & 0xf) == MG_WEBSOCKET_OPCODE_CONNECTION_CLOSE)
|
|
|
- return false;
|
|
|
-
|
|
|
- if ((bits & 0xf) == MG_WEBSOCKET_OPCODE_DATACOMPLETE)
|
|
|
- {
|
|
|
- struct synchronization_tool *sync = (struct synchronization_tool *) user_data;
|
|
|
-
|
|
|
- unique_lock<mutex> lck(sync->mtx);
|
|
|
- sync->val++;
|
|
|
-
|
|
|
- sync->cv.notify_all();
|
|
|
-
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- if ((bits & 0xf) != MG_WEBSOCKET_OPCODE_BINARY && (bits & 0xf) != MG_WEBSOCKET_OPCODE_CONTINUATION)
|
|
|
- {
|
|
|
- std::cerr << "Unknown opcode: failing." << std::endl;
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- return true;
|
|
|
-}
|
|
|
-
|
|
|
Twistpoint get_generator(
|
|
|
default_random_engine& rng,
|
|
|
vector<Proof>& pi,
|
|
@@ -454,22 +423,30 @@ vector<Proof> epoch_build_up(
|
|
|
false);
|
|
|
|
|
|
struct synchronization_tool epochSync;
|
|
|
+ vector<struct mg_connection *> conns;
|
|
|
+ unique_lock<mutex> lck(epochSync.mtx);
|
|
|
epochSync.val = 1;
|
|
|
for (size_t j = 0; j < serverIPs.size(); j++)
|
|
|
{
|
|
|
if (i == j)
|
|
|
continue;
|
|
|
|
|
|
- distribute_epoch_updates(
|
|
|
- serverIPs[j],
|
|
|
- serverPorts[j],
|
|
|
- data,
|
|
|
- &epochSync);
|
|
|
+ struct mg_connection *currConn =
|
|
|
+ distribute_epoch_updates(
|
|
|
+ serverIPs[j],
|
|
|
+ serverPorts[j],
|
|
|
+ data,
|
|
|
+ &epochSync);
|
|
|
+
|
|
|
+ conns.push_back(currConn);
|
|
|
}
|
|
|
- unique_lock<mutex> lck(epochSync.mtx);
|
|
|
+
|
|
|
while (epochSync.val < serverIPs.size())
|
|
|
epochSync.cv.wait(lck);
|
|
|
|
|
|
+ for (size_t j = 0; j < conns.size(); j++)
|
|
|
+ mg_close_connection(conns[j]);
|
|
|
+
|
|
|
generatorProofHolder = pi[0];
|
|
|
}
|
|
|
else
|
|
@@ -555,21 +532,29 @@ void epoch_break_down(
|
|
|
true);
|
|
|
|
|
|
struct synchronization_tool epochSync;
|
|
|
+ vector<struct mg_connection *> conns;
|
|
|
epochSync.val = 1;
|
|
|
for (size_t j = 0; j < serverIPs.size(); j++)
|
|
|
{
|
|
|
if (i == j)
|
|
|
continue;
|
|
|
|
|
|
- distribute_epoch_updates(
|
|
|
- serverIPs[j],
|
|
|
- serverPorts[j],
|
|
|
- data,
|
|
|
- &epochSync);
|
|
|
+ struct mg_connection *currConn =
|
|
|
+ distribute_epoch_updates(
|
|
|
+ serverIPs[j],
|
|
|
+ serverPorts[j],
|
|
|
+ data,
|
|
|
+ &epochSync);
|
|
|
+
|
|
|
+ conns.push_back(currConn);
|
|
|
}
|
|
|
+
|
|
|
unique_lock<mutex> lck(epochSync.mtx);
|
|
|
while (epochSync.val < serverIPs.size())
|
|
|
epochSync.cv.wait(lck);
|
|
|
+
|
|
|
+ for (size_t j = 0; j < conns.size(); j++)
|
|
|
+ mg_close_connection(conns[j]);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -599,56 +584,49 @@ void tally_scores(
|
|
|
std::vector<CurveBipoint>& serverTallyScores)
|
|
|
{
|
|
|
struct synchronization_tool tallySync;
|
|
|
- tallySync.val = 0;
|
|
|
+ vector<struct mg_connection *> conns;
|
|
|
+
|
|
|
+ unique_lock<mutex> lck(tallySync.mtx);
|
|
|
+ tallySync.val = 1;
|
|
|
for (size_t i = 0; i < serverIPs.size(); i++)
|
|
|
{
|
|
|
if (serverIPs[i] == selfIP && serverPorts[i] == selfPort)
|
|
|
- {
|
|
|
- unique_lock<mutex> lck(tallySync.mtx);
|
|
|
- tallySync.val++;
|
|
|
continue;
|
|
|
- }
|
|
|
- else
|
|
|
+
|
|
|
+ struct mg_connection *currConn = NULL;
|
|
|
+ while (!currConn)
|
|
|
{
|
|
|
- bool flag = false;
|
|
|
- while (!flag)
|
|
|
- {
|
|
|
- struct mg_connection *conn =
|
|
|
- mg_connect_websocket_client(
|
|
|
- serverIPs[i].c_str(),
|
|
|
- serverPorts[i],
|
|
|
- USE_SSL,
|
|
|
- NULL,
|
|
|
- 0,
|
|
|
- GET_DECRYPTION_URI,
|
|
|
- "null",
|
|
|
- tally_websocket_data_handler,
|
|
|
- empty_websocket_close_handler,
|
|
|
- &tallySync);
|
|
|
-
|
|
|
- if (!conn)
|
|
|
- {
|
|
|
- std::cerr << "Trouble initiating epoch update with server at " << serverIPs[i] << ":" << serverPorts[i] << std::endl;
|
|
|
- continue;
|
|
|
- }
|
|
|
+ currConn = mg_connect_websocket_client(
|
|
|
+ serverIPs[i].c_str(),
|
|
|
+ serverPorts[i],
|
|
|
+ USE_SSL,
|
|
|
+ NULL,
|
|
|
+ 0,
|
|
|
+ GET_DECRYPTION_URI,
|
|
|
+ "null",
|
|
|
+ synchro_websocket_data_handler,
|
|
|
+ synchro_websocket_close_handler,
|
|
|
+ &tallySync);
|
|
|
|
|
|
- mg_websocket_client_write(
|
|
|
- conn,
|
|
|
- MG_WEBSOCKET_OPCODE_DATACOMPLETE,
|
|
|
- "",
|
|
|
- 0);
|
|
|
+ if (!currConn)
|
|
|
+ cerr << "Trouble getting partial decryption from server at " << serverIPs[i] << ":" << serverPorts[i] << endl;
|
|
|
+ }
|
|
|
|
|
|
- mg_close_connection(conn);
|
|
|
+ mg_websocket_client_write(
|
|
|
+ currConn,
|
|
|
+ MG_WEBSOCKET_OPCODE_DATACOMPLETE,
|
|
|
+ "",
|
|
|
+ 0);
|
|
|
|
|
|
- flag = true;
|
|
|
- }
|
|
|
- }
|
|
|
+ conns.push_back(currConn);
|
|
|
}
|
|
|
|
|
|
- unique_lock<mutex> lck(tallySync.mtx);
|
|
|
while (tallySync.val < serverIPs.size())
|
|
|
tallySync.cv.wait(lck);
|
|
|
|
|
|
+ for (size_t i = 0; i < conns.size(); i++)
|
|
|
+ mg_close_connection(conns[i]);
|
|
|
+
|
|
|
std::vector<EGCiphertext> retval;
|
|
|
std::vector<Twistpoint> currentPseudonyms = prsonaServer->get_current_pseudonyms();
|
|
|
std::vector<Scalar> decryptedTalliedScores = prsonaServer->tally_scores();
|
|
@@ -710,62 +688,55 @@ void distribute_tallied_scores(
|
|
|
data = buffer.str();
|
|
|
|
|
|
struct synchronization_tool tallySync;
|
|
|
- tallySync.val = 0;
|
|
|
+ vector<struct mg_connection *> conns;
|
|
|
+
|
|
|
+ unique_lock<mutex> lck(tallySync.mtx);
|
|
|
+ prsonaServer->receive_tallied_scores(userTallyScores, serverTallyScores);
|
|
|
+ tallySync.val = 1;
|
|
|
for (size_t i = 0; i < serverIPs.size(); i++)
|
|
|
{
|
|
|
if (serverIPs[i] == selfIP && serverPorts[i] == selfPort)
|
|
|
- {
|
|
|
- prsonaServer->receive_tallied_scores(userTallyScores, serverTallyScores);
|
|
|
- unique_lock<mutex> lck(tallySync.mtx);
|
|
|
- tallySync.val++;
|
|
|
continue;
|
|
|
- }
|
|
|
- else
|
|
|
+
|
|
|
+ struct mg_connection *currConn = NULL;
|
|
|
+ while (!currConn)
|
|
|
{
|
|
|
- bool flag = false;
|
|
|
- while (!flag)
|
|
|
- {
|
|
|
- struct mg_connection *conn =
|
|
|
- mg_connect_websocket_client(
|
|
|
- serverIPs[i].c_str(),
|
|
|
- serverPorts[i],
|
|
|
- USE_SSL,
|
|
|
- NULL,
|
|
|
- 0,
|
|
|
- GIVE_DECRYPTION_URI,
|
|
|
- "null",
|
|
|
- tally_websocket_data_handler,
|
|
|
- empty_websocket_close_handler,
|
|
|
- &tallySync);
|
|
|
-
|
|
|
- if (!conn)
|
|
|
- {
|
|
|
- std::cerr << "Trouble initiating epoch update with server at " << serverIPs[i] << ":" << serverPorts[i] << std::endl;
|
|
|
- continue;
|
|
|
- }
|
|
|
+ currConn = mg_connect_websocket_client(
|
|
|
+ serverIPs[i].c_str(),
|
|
|
+ serverPorts[i],
|
|
|
+ USE_SSL,
|
|
|
+ NULL,
|
|
|
+ 0,
|
|
|
+ GIVE_DECRYPTION_URI,
|
|
|
+ "null",
|
|
|
+ synchro_websocket_data_handler,
|
|
|
+ synchro_websocket_close_handler,
|
|
|
+ &tallySync);
|
|
|
|
|
|
- mg_websocket_client_write(
|
|
|
- conn,
|
|
|
- MG_WEBSOCKET_OPCODE_BINARY,
|
|
|
- data.c_str(),
|
|
|
- data.length());
|
|
|
+ if (!currConn)
|
|
|
+ cerr << "Trouble giving full re-encryption to server at " << serverIPs[i] << ":" << serverPorts[i] << endl;
|
|
|
+ }
|
|
|
|
|
|
- mg_websocket_client_write(
|
|
|
- conn,
|
|
|
- MG_WEBSOCKET_OPCODE_DATACOMPLETE,
|
|
|
- "",
|
|
|
- 0);
|
|
|
+ mg_websocket_client_write(
|
|
|
+ currConn,
|
|
|
+ MG_WEBSOCKET_OPCODE_BINARY,
|
|
|
+ data.c_str(),
|
|
|
+ data.length());
|
|
|
|
|
|
- mg_close_connection(conn);
|
|
|
+ mg_websocket_client_write(
|
|
|
+ currConn,
|
|
|
+ MG_WEBSOCKET_OPCODE_DATACOMPLETE,
|
|
|
+ "",
|
|
|
+ 0);
|
|
|
|
|
|
- flag = true;
|
|
|
- }
|
|
|
- }
|
|
|
+ conns.push_back(currConn);
|
|
|
}
|
|
|
|
|
|
- unique_lock<mutex> lck(tallySync.mtx);
|
|
|
while (tallySync.val < serverIPs.size())
|
|
|
tallySync.cv.wait(lck);
|
|
|
+
|
|
|
+ for (size_t i = 0; i < conns.size(); i++)
|
|
|
+ mg_close_connection(conns[i]);
|
|
|
}
|
|
|
|
|
|
void epoch(
|
|
@@ -781,7 +752,7 @@ void epoch(
|
|
|
Twistpoint nextGenerator = PrsonaServer::EL_GAMAL_GENERATOR;
|
|
|
|
|
|
struct synchronization_tool updateSync;
|
|
|
-
|
|
|
+
|
|
|
unique_lock<mutex> lck(*updateMtx, defer_lock);
|
|
|
obtain_update_locks(
|
|
|
lck,
|
|
@@ -848,13 +819,14 @@ void epoch(
|
|
|
class EpochReadyHandler : public CivetHandler
|
|
|
{
|
|
|
public:
|
|
|
- EpochReadyHandler(struct synchronization_tool *exitSync, struct synchronization_tool *readySync, size_t numServers)
|
|
|
- : exitSync(exitSync), readySync(readySync), numServers(numServers)
|
|
|
+ EpochReadyHandler(struct synchronization_tool *exitSync, struct synchronization_tool *readySync, mutex *updateMtx, size_t numServers)
|
|
|
+ : exitSync(exitSync), readySync(readySync), updateMtx(updateMtx), numServers(numServers)
|
|
|
{ /* */ }
|
|
|
|
|
|
bool handleGet(CivetServer *server, struct mg_connection *conn)
|
|
|
{
|
|
|
unique_lock<mutex> exitLock(exitSync->mtx, defer_lock);
|
|
|
+ unique_lock<mutex> updateLock(*updateMtx, defer_lock);
|
|
|
unique_lock<mutex> readyLock(readySync->mtx);
|
|
|
|
|
|
if (readySync->val < numServers)
|
|
@@ -864,19 +836,26 @@ class EpochReadyHandler : public CivetHandler
|
|
|
"text/plain\r\nConnection: close\r\n\r\n");
|
|
|
mg_printf(conn, "Server is waiting for other servers to begin.\n");
|
|
|
}
|
|
|
- else if (exitLock.try_lock())
|
|
|
+ else if (!exitLock.try_lock())
|
|
|
{
|
|
|
mg_printf(conn,
|
|
|
- "HTTP/1.1 200 OK\r\nContent-Type: "
|
|
|
+ "HTTP/1.1 503 Service Unavailable\r\nContent-Type: "
|
|
|
"text/plain\r\nConnection: close\r\n\r\n");
|
|
|
- mg_printf(conn, "Server is ready for epoch.\n");
|
|
|
+ mg_printf(conn, "Server is still in a previous epoch.\n");
|
|
|
}
|
|
|
- else
|
|
|
+ else if (!updateLock.try_lock())
|
|
|
{
|
|
|
mg_printf(conn,
|
|
|
"HTTP/1.1 503 Service Unavailable\r\nContent-Type: "
|
|
|
"text/plain\r\nConnection: close\r\n\r\n");
|
|
|
- mg_printf(conn, "Server is still in a previous epoch.\n");
|
|
|
+ mg_printf(conn, "Server is handling other updates.\n");
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ mg_printf(conn,
|
|
|
+ "HTTP/1.1 200 OK\r\nContent-Type: "
|
|
|
+ "text/plain\r\nConnection: close\r\n\r\n");
|
|
|
+ mg_printf(conn, "Server is ready for epoch.\n");
|
|
|
}
|
|
|
|
|
|
return true;
|
|
@@ -884,6 +863,7 @@ class EpochReadyHandler : public CivetHandler
|
|
|
|
|
|
private:
|
|
|
struct synchronization_tool *exitSync, *readySync;
|
|
|
+ mutex *updateMtx;
|
|
|
const size_t numServers;
|
|
|
|
|
|
};
|
|
@@ -1243,7 +1223,7 @@ int main(int argc, char *argv[])
|
|
|
AltRemoteControlHandler triggerEpochHandler(1, &exitSync, "Server will initiate epoch!");
|
|
|
server.addHandler(TRIGGER_EPOCH_URI, triggerEpochHandler);
|
|
|
|
|
|
- EpochReadyHandler epochReadyHandler(&exitSync, &readySync, numServers);
|
|
|
+ EpochReadyHandler epochReadyHandler(&exitSync, &readySync, &updateMtx, numServers);
|
|
|
server.addHandler(EPOCH_READY_URI, epochReadyHandler);
|
|
|
|
|
|
EpochNumHandler epochNumHandler(&epochNum);
|
|
@@ -1256,10 +1236,14 @@ int main(int argc, char *argv[])
|
|
|
|
|
|
if (exitSync.val2)
|
|
|
{
|
|
|
- cout << "[" << seedStr << "] Executing epoch." << endl;
|
|
|
+ size_t currEpoch = epochNum.load();
|
|
|
+ cout << "[" << seedStr << "] Executing epoch calculations (going from t = " << currEpoch << " to " << currEpoch + 1 << ")." << endl;
|
|
|
|
|
|
epoch(&updateMtx, &epochNum, prsonaServer, rng, serverIPs, serverPorts, selfIP, selfPort);
|
|
|
|
|
|
+ currEpoch = epochNum.load();
|
|
|
+ cout << "[" << seedStr << "] Epoch calculations complete (now in t = " << currEpoch << ")." << endl;
|
|
|
+
|
|
|
exitSync.val2 = 0;
|
|
|
}
|
|
|
}
|