|
@@ -32,6 +32,15 @@
|
|
|
/* The valid-after time for a consensus (or for the target consensus of a
|
|
|
* diff), encoded as ISO UTC. */
|
|
|
#define LABEL_VALID_AFTER "consensus-valid-after"
|
|
|
+/* The fresh-until time for a consensus (or for the target consensus of a
|
|
|
+ * diff), encoded as ISO UTC. */
|
|
|
+#define LABEL_FRESH_UNTIL "consensus-fresh-until"
|
|
|
+/* The valid-until time for a consensus (or for the target consensus of a
|
|
|
+ * diff), encoded as ISO UTC. */
|
|
|
+#define LABEL_VALID_UNTIL "consensus-valid-until"
|
|
|
+/* Comma-separated list of hex-encoded identity digests for the voting
|
|
|
+ * authorities. */
|
|
|
+#define LABEL_SIGNATORIES "consensus-signatories"
|
|
|
/* A hex encoded SHA3 digest of the object, as compressed (if any) */
|
|
|
#define LABEL_SHA3_DIGEST "sha3-digest"
|
|
|
/* A hex encoded SHA3 digest of the object before compression. */
|
|
@@ -96,6 +105,36 @@ n_diff_compression_methods(void)
|
|
|
return ARRAY_LENGTH(compress_diffs_with);
|
|
|
}
|
|
|
|
|
|
+/** Which methods do we use for precompressing consensuses? */
|
|
|
+static const compress_method_t compress_consensus_with[] = {
|
|
|
+ ZLIB_METHOD,
|
|
|
+#ifdef HAVE_LZMA
|
|
|
+ LZMA_METHOD,
|
|
|
+#endif
|
|
|
+#ifdef HAVE_ZSTD
|
|
|
+ ZSTD_METHOD,
|
|
|
+#endif
|
|
|
+};
|
|
|
+
|
|
|
+/** How many different methods will we try to use for diff compression? */
|
|
|
+STATIC unsigned
|
|
|
+n_consensus_compression_methods(void)
|
|
|
+{
|
|
|
+ return ARRAY_LENGTH(compress_consensus_with);
|
|
|
+}
|
|
|
+
|
|
|
+/** For which compression method do we retain old consensuses? There's no
|
|
|
+ * need to keep all of them, since we won't be serving them. We'll
|
|
|
+ * go with ZLIB_METHOD because it's pretty fast and everyone has it.
|
|
|
+ */
|
|
|
+#define RETAIN_CONSENSUS_COMPRESSED_WITH_METHOD ZLIB_METHOD
|
|
|
+
|
|
|
+/** Handles pointing to the latest consensus entries as compressed and
|
|
|
+ * stored. */
|
|
|
+static consensus_cache_entry_handle_t *
|
|
|
+ latest_consensus[N_CONSENSUS_FLAVORS]
|
|
|
+ [ARRAY_LENGTH(compress_consensus_with)];
|
|
|
+
|
|
|
/** Hashtable node used to remember the current status of the diff
|
|
|
* from a given sha3 digest to the current consensus. */
|
|
|
typedef struct cdm_diff_t {
|
|
@@ -135,13 +174,12 @@ static consdiff_cfg_t consdiff_cfg = {
|
|
|
};
|
|
|
|
|
|
static int consdiffmgr_ensure_space_for_files(int n);
|
|
|
+static int consensus_queue_compression_work(const char *consensus,
|
|
|
+ const networkstatus_t *as_parsed);
|
|
|
static int consensus_diff_queue_diff_work(consensus_cache_entry_t *diff_from,
|
|
|
consensus_cache_entry_t *diff_to);
|
|
|
static void consdiffmgr_set_cache_flags(void);
|
|
|
|
|
|
-/* Just gzip consensuses for now. */
|
|
|
-#define COMPRESS_CONSENSUS_WITH GZIP_METHOD
|
|
|
-
|
|
|
/* =====
|
|
|
* Hashtable setup
|
|
|
* ===== */
|
|
@@ -410,11 +448,6 @@ cdm_cache_lookup_consensus(consensus_flavor_t flavor, time_t valid_after)
|
|
|
consensus_cache_filter_list(matches, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
|
|
|
|
|
|
consensus_cache_entry_t *result = NULL;
|
|
|
- if (smartlist_len(matches) > 1) {
|
|
|
- log_warn(LD_BUG, "How odd; there appear to be two matching consensuses "
|
|
|
- "with flavor %s published at %s.",
|
|
|
- flavname, formatted_time);
|
|
|
- }
|
|
|
if (smartlist_len(matches)) {
|
|
|
result = smartlist_get(matches, 0);
|
|
|
}
|
|
@@ -458,59 +491,7 @@ consdiffmgr_add_consensus(const char *consensus,
|
|
|
}
|
|
|
|
|
|
/* We don't have it. Add it to the cache. */
|
|
|
- consdiffmgr_ensure_space_for_files(1);
|
|
|
-
|
|
|
- {
|
|
|
- size_t bodylen = strlen(consensus);
|
|
|
- config_line_t *labels = NULL;
|
|
|
- char formatted_time[ISO_TIME_LEN+1];
|
|
|
- format_iso_time_nospace(formatted_time, valid_after);
|
|
|
- const char *flavname = networkstatus_get_flavor_name(flavor);
|
|
|
-
|
|
|
- cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_UNCOMPRESSED,
|
|
|
- (const uint8_t *)consensus, bodylen);
|
|
|
- {
|
|
|
- const char *start, *end;
|
|
|
- if (router_get_networkstatus_v3_signed_boundaries(consensus,
|
|
|
- &start, &end) < 0) {
|
|
|
- start = consensus;
|
|
|
- end = consensus+bodylen;
|
|
|
- }
|
|
|
- cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_AS_SIGNED,
|
|
|
- (const uint8_t *)start,
|
|
|
- end - start);
|
|
|
- }
|
|
|
-
|
|
|
- char *body_compressed = NULL;
|
|
|
- size_t size_compressed = 0;
|
|
|
- if (tor_compress(&body_compressed, &size_compressed,
|
|
|
- consensus, bodylen, COMPRESS_CONSENSUS_WITH) < 0) {
|
|
|
- config_free_lines(labels);
|
|
|
- return -1;
|
|
|
- }
|
|
|
- cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST,
|
|
|
- (const uint8_t *)body_compressed, size_compressed);
|
|
|
- config_line_prepend(&labels, LABEL_COMPRESSION_TYPE,
|
|
|
- compression_method_get_name(COMPRESS_CONSENSUS_WITH));
|
|
|
- config_line_prepend(&labels, LABEL_FLAVOR, flavname);
|
|
|
- config_line_prepend(&labels, LABEL_VALID_AFTER, formatted_time);
|
|
|
- config_line_prepend(&labels, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
|
|
|
-
|
|
|
- entry = consensus_cache_add(cdm_cache_get(),
|
|
|
- labels,
|
|
|
- (const uint8_t *)body_compressed,
|
|
|
- size_compressed);
|
|
|
- tor_free(body_compressed);
|
|
|
- config_free_lines(labels);
|
|
|
- }
|
|
|
-
|
|
|
- if (entry) {
|
|
|
- consensus_cache_entry_mark_for_aggressive_release(entry);
|
|
|
- consensus_cache_entry_decref(entry);
|
|
|
- }
|
|
|
-
|
|
|
- cdm_cache_dirty = 1;
|
|
|
- return entry ? 0 : -1;
|
|
|
+ return consensus_queue_compression_work(consensus, as_parsed);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -543,6 +524,47 @@ sort_and_find_most_recent(smartlist_t *lst)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/** Return i such that compress_consensus_with[i] == method. Return
|
|
|
+ * -1 if no such i exists. */
|
|
|
+static int
|
|
|
+consensus_compression_method_pos(compress_method_t method)
|
|
|
+{
|
|
|
+ unsigned i;
|
|
|
+ for (i = 0; i < n_consensus_compression_methods(); ++i) {
|
|
|
+ if (compress_consensus_with[i] == method) {
|
|
|
+ return i;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * If we know a consensus with the flavor <b>flavor</b> compressed with
|
|
|
+ * <b>method</b>, set *<b>entry_out</b> to that value. Return values are as
|
|
|
+ * for consdiffmgr_find_diff_from().
|
|
|
+ */
|
|
|
+consdiff_status_t
|
|
|
+consdiffmgr_find_consensus(struct consensus_cache_entry_t **entry_out,
|
|
|
+ consensus_flavor_t flavor,
|
|
|
+ compress_method_t method)
|
|
|
+{
|
|
|
+ tor_assert((int)flavor < N_CONSENSUS_FLAVORS);
|
|
|
+
|
|
|
+ int pos = consensus_compression_method_pos(method);
|
|
|
+ if (pos < 0) {
|
|
|
+ // We don't compress consensuses with this method.
|
|
|
+ return CONSDIFF_NOT_FOUND;
|
|
|
+ }
|
|
|
+ consensus_cache_entry_handle_t *handle = latest_consensus[flavor][pos];
|
|
|
+ if (!handle)
|
|
|
+ return CONSDIFF_NOT_FOUND;
|
|
|
+ *entry_out = consensus_cache_entry_handle_get(handle);
|
|
|
+ if (entry_out)
|
|
|
+ return CONSDIFF_AVAILABLE;
|
|
|
+ else
|
|
|
+ return CONSDIFF_NOT_FOUND;
|
|
|
+}
|
|
|
+
|
|
|
/**
|
|
|
* Look up consensus_cache_entry_t for the consensus of type <b>flavor</b>,
|
|
|
* from the source consensus with the specified digest (which must be SHA3).
|
|
@@ -684,6 +706,42 @@ consdiffmgr_cleanup(void)
|
|
|
smartlist_clear(diffs);
|
|
|
}
|
|
|
|
|
|
+ // 3. Delete all consensuses except the most recent that are compressed with
|
|
|
+ // an un-preferred method.
|
|
|
+ for (int flav = 0; flav < N_CONSENSUS_FLAVORS; ++flav) {
|
|
|
+ const char *flavname = networkstatus_get_flavor_name(flav);
|
|
|
+ /* Determine the most recent consensus of this flavor */
|
|
|
+ consensus_cache_find_all(consensuses, cdm_cache_get(),
|
|
|
+ LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
|
|
|
+ consensus_cache_filter_list(consensuses, LABEL_FLAVOR, flavname);
|
|
|
+ consensus_cache_entry_t *most_recent =
|
|
|
+ sort_and_find_most_recent(consensuses);
|
|
|
+ if (most_recent == NULL)
|
|
|
+ continue;
|
|
|
+ const char *most_recent_sha3_uncompressed =
|
|
|
+ consensus_cache_entry_get_value(most_recent,
|
|
|
+ LABEL_SHA3_DIGEST_UNCOMPRESSED);
|
|
|
+ const char *retain_methodname = compression_method_get_name(
|
|
|
+ RETAIN_CONSENSUS_COMPRESSED_WITH_METHOD);
|
|
|
+
|
|
|
+ if (BUG(most_recent_sha3_uncompressed == NULL))
|
|
|
+ continue;
|
|
|
+ SMARTLIST_FOREACH_BEGIN(consensuses, consensus_cache_entry_t *, ent) {
|
|
|
+ const char *lv_sha3_uncompressed =
|
|
|
+ consensus_cache_entry_get_value(ent, LABEL_SHA3_DIGEST_UNCOMPRESSED);
|
|
|
+ if (BUG(! lv_sha3_uncompressed))
|
|
|
+ continue;
|
|
|
+ if (!strcmp(lv_sha3_uncompressed, most_recent_sha3_uncompressed))
|
|
|
+ continue; // This _is_ the most recent.
|
|
|
+ const char *lv_methodname =
|
|
|
+ consensus_cache_entry_get_value(ent, LABEL_COMPRESSION_TYPE);
|
|
|
+ if (! lv_methodname || strcmp(lv_methodname, retain_methodname)) {
|
|
|
+ consensus_cache_entry_mark_for_removal(ent);
|
|
|
+ ++n_to_delete;
|
|
|
+ }
|
|
|
+ } SMARTLIST_FOREACH_END(ent);
|
|
|
+ }
|
|
|
+
|
|
|
smartlist_free(objects);
|
|
|
smartlist_free(consensuses);
|
|
|
smartlist_free(diffs);
|
|
@@ -787,10 +845,14 @@ consdiffmgr_rescan_flavor_(consensus_flavor_t flavor)
|
|
|
|
|
|
// 1. find the most recent consensus, and the ones that we might want
|
|
|
// to diff to it.
|
|
|
+ const char *methodname = compression_method_get_name(
|
|
|
+ RETAIN_CONSENSUS_COMPRESSED_WITH_METHOD);
|
|
|
+
|
|
|
matches = smartlist_new();
|
|
|
consensus_cache_find_all(matches, cdm_cache_get(),
|
|
|
LABEL_FLAVOR, flavname);
|
|
|
consensus_cache_filter_list(matches, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
|
|
|
+ consensus_cache_filter_list(matches, LABEL_COMPRESSION_TYPE, methodname);
|
|
|
consensus_cache_entry_t *most_recent = sort_and_find_most_recent(matches);
|
|
|
if (!most_recent) {
|
|
|
log_info(LD_DIRSERV, "No 'most recent' %s consensus found; "
|
|
@@ -835,6 +897,10 @@ consdiffmgr_rescan_flavor_(consensus_flavor_t flavor)
|
|
|
if (strmap_get(have_diff_from, va) != NULL)
|
|
|
continue; /* we already have this one. */
|
|
|
smartlist_add(compute_diffs_from, ent);
|
|
|
+ /* Since we are not going to serve this as the most recent consensus
|
|
|
+ * any more, we should stop keeping it mmap'd when it's not in use.
|
|
|
+ */
|
|
|
+ consensus_cache_entry_mark_for_aggressive_release(ent);
|
|
|
} SMARTLIST_FOREACH_END(ent);
|
|
|
|
|
|
log_info(LD_DIRSERV,
|
|
@@ -879,6 +945,48 @@ consdiffmgr_rescan_flavor_(consensus_flavor_t flavor)
|
|
|
strmap_free(have_diff_from, NULL);
|
|
|
}
|
|
|
|
|
|
+/**
|
|
|
+ * Scan the cache for the latest consensuses and add their handles to
|
|
|
+ * latest_consensus
|
|
|
+ */
|
|
|
+static void
|
|
|
+consdiffmgr_consensus_load(void)
|
|
|
+{
|
|
|
+ smartlist_t *matches = smartlist_new();
|
|
|
+ for (int flav = 0; flav < N_CONSENSUS_FLAVORS; ++flav) {
|
|
|
+ const char *flavname = networkstatus_get_flavor_name(flav);
|
|
|
+ smartlist_clear(matches);
|
|
|
+ consensus_cache_find_all(matches, cdm_cache_get(),
|
|
|
+ LABEL_FLAVOR, flavname);
|
|
|
+ consensus_cache_filter_list(matches, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
|
|
|
+ consensus_cache_entry_t *most_recent = sort_and_find_most_recent(matches);
|
|
|
+ if (! most_recent)
|
|
|
+ continue; // no consensuses.
|
|
|
+ const char *most_recent_sha3 =
|
|
|
+ consensus_cache_entry_get_value(most_recent,
|
|
|
+ LABEL_SHA3_DIGEST_UNCOMPRESSED);
|
|
|
+ if (BUG(most_recent_sha3 == NULL))
|
|
|
+ continue; // LCOV_EXCL_LINE
|
|
|
+ consensus_cache_filter_list(matches, LABEL_SHA3_DIGEST_UNCOMPRESSED,
|
|
|
+ most_recent_sha3);
|
|
|
+
|
|
|
+ // Everything that remains matches the most recent consensus of this
|
|
|
+ // flavor.
|
|
|
+ SMARTLIST_FOREACH_BEGIN(matches, consensus_cache_entry_t *, ent) {
|
|
|
+ const char *lv_compression =
|
|
|
+ consensus_cache_entry_get_value(ent, LABEL_COMPRESSION_TYPE);
|
|
|
+ compress_method_t method =
|
|
|
+ compression_method_get_by_name(lv_compression);
|
|
|
+ int pos = consensus_compression_method_pos(method);
|
|
|
+ if (pos < 0)
|
|
|
+ continue;
|
|
|
+ consensus_cache_entry_handle_free(latest_consensus[flav][pos]);
|
|
|
+ latest_consensus[flav][pos] = consensus_cache_entry_handle_new(ent);
|
|
|
+ } SMARTLIST_FOREACH_END(ent);
|
|
|
+ }
|
|
|
+ smartlist_free(matches);
|
|
|
+}
|
|
|
+
|
|
|
/**
|
|
|
* Scan the cache for diffs, and add them to the hashtable.
|
|
|
*/
|
|
@@ -936,6 +1044,7 @@ consdiffmgr_rescan(void)
|
|
|
|
|
|
if (cdm_cache_loaded == 0) {
|
|
|
consdiffmgr_diffs_load();
|
|
|
+ consdiffmgr_consensus_load();
|
|
|
cdm_cache_loaded = 1;
|
|
|
}
|
|
|
|
|
@@ -1051,6 +1160,14 @@ consdiffmgr_free_all(void)
|
|
|
next = HT_NEXT_RMV(cdm_diff_ht, &cdm_diff_ht, diff);
|
|
|
cdm_diff_free(this);
|
|
|
}
|
|
|
+ int i;
|
|
|
+ unsigned j;
|
|
|
+ for (i = 0; i < N_CONSENSUS_FLAVORS; ++i) {
|
|
|
+ for (j = 0; j < n_consensus_compression_methods(); ++j) {
|
|
|
+ consensus_cache_entry_handle_free(latest_consensus[i][j]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ memset(latest_consensus, 0, sizeof(latest_consensus));
|
|
|
consensus_cache_free(cons_diff_cache);
|
|
|
cons_diff_cache = NULL;
|
|
|
}
|
|
@@ -1071,6 +1188,93 @@ typedef struct compressed_result_t {
|
|
|
size_t bodylen;
|
|
|
} compressed_result_t;
|
|
|
|
|
|
+/**
|
|
|
+ * Compress the bytestring <b>input</b> of length <b>len</b> using the
|
|
|
+ * <n>n_methods</b> compression methods listed in the array <b>methods</b>.
|
|
|
+ *
|
|
|
+ * For each successful compression, set the fields in the <b>results_out</b>
|
|
|
+ * array in the position corresponding to the compression method. Use
|
|
|
+ * <b>labels_in</b> as a basis for the labels of the result.
|
|
|
+ *
|
|
|
+ * Return 0 if all compression succeeded; -1 if any failed.
|
|
|
+ */
|
|
|
+static int
|
|
|
+compress_multiple(compressed_result_t *results_out, int n_methods,
|
|
|
+ const compress_method_t *methods,
|
|
|
+ const uint8_t *input, size_t len,
|
|
|
+ const config_line_t *labels_in)
|
|
|
+{
|
|
|
+ int rv = 0;
|
|
|
+ int i;
|
|
|
+ for (i = 0; i < n_methods; ++i) {
|
|
|
+ compress_method_t method = methods[i];
|
|
|
+ const char *methodname = compression_method_get_name(method);
|
|
|
+ char *result;
|
|
|
+ size_t sz;
|
|
|
+ if (0 == tor_compress(&result, &sz, (const char*)input, len, method)) {
|
|
|
+ results_out[i].body = (uint8_t*)result;
|
|
|
+ results_out[i].bodylen = sz;
|
|
|
+ results_out[i].labels = config_lines_dup(labels_in);
|
|
|
+ cdm_labels_prepend_sha3(&results_out[i].labels, LABEL_SHA3_DIGEST,
|
|
|
+ results_out[i].body,
|
|
|
+ results_out[i].bodylen);
|
|
|
+ config_line_prepend(&results_out[i].labels,
|
|
|
+ LABEL_COMPRESSION_TYPE,
|
|
|
+ methodname);
|
|
|
+ } else {
|
|
|
+ rv = -1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return rv;
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Given an array of <b>n</b> compressed_result_t in <b>results</b>,
|
|
|
+ * as produced by compress_multiple, store them all into the
|
|
|
+ * consdiffmgr, and store handles to them in the <b>handles_out</b>
|
|
|
+ * array.
|
|
|
+ *
|
|
|
+ * Return CDM_DIFF_PRESENT if any was stored, and CDM_DIFF_ERROR if none
|
|
|
+ * was stored.
|
|
|
+ */
|
|
|
+static cdm_diff_status_t
|
|
|
+store_multiple(consensus_cache_entry_handle_t **handles_out,
|
|
|
+ int n,
|
|
|
+ const compress_method_t *methods,
|
|
|
+ const compressed_result_t *results,
|
|
|
+ const char *description)
|
|
|
+{
|
|
|
+ cdm_diff_status_t status = CDM_DIFF_ERROR;
|
|
|
+ consdiffmgr_ensure_space_for_files(n);
|
|
|
+
|
|
|
+ int i;
|
|
|
+ for (i = 0; i < n; ++i) {
|
|
|
+ compress_method_t method = methods[i];
|
|
|
+ uint8_t *body_out = results[i].body;
|
|
|
+ size_t bodylen_out = results[i].bodylen;
|
|
|
+ config_line_t *labels = results[i].labels;
|
|
|
+ const char *methodname = compression_method_get_name(method);
|
|
|
+ if (body_out && bodylen_out && labels) {
|
|
|
+ /* Success! Store the results */
|
|
|
+ log_info(LD_DIRSERV, "Adding %s, compressed with %s",
|
|
|
+ description, methodname);
|
|
|
+
|
|
|
+ consensus_cache_entry_t *ent =
|
|
|
+ consensus_cache_add(cdm_cache_get(),
|
|
|
+ labels,
|
|
|
+ body_out,
|
|
|
+ bodylen_out);
|
|
|
+ if (BUG(ent == NULL))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ status = CDM_DIFF_PRESENT;
|
|
|
+ handles_out[i] = consensus_cache_entry_handle_new(ent);
|
|
|
+ consensus_cache_entry_decref(ent);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return status;
|
|
|
+}
|
|
|
+
|
|
|
/**
|
|
|
* An object passed to a worker thread that will try to produce a consensus
|
|
|
* diff.
|
|
@@ -1144,6 +1348,12 @@ consensus_diff_worker_threadfn(void *state_, void *work_)
|
|
|
|
|
|
const char *lv_to_valid_after =
|
|
|
consensus_cache_entry_get_value(job->diff_to, LABEL_VALID_AFTER);
|
|
|
+ const char *lv_to_fresh_until =
|
|
|
+ consensus_cache_entry_get_value(job->diff_to, LABEL_FRESH_UNTIL);
|
|
|
+ const char *lv_to_valid_until =
|
|
|
+ consensus_cache_entry_get_value(job->diff_to, LABEL_VALID_UNTIL);
|
|
|
+ const char *lv_to_signatories =
|
|
|
+ consensus_cache_entry_get_value(job->diff_to, LABEL_SIGNATORIES);
|
|
|
const char *lv_from_valid_after =
|
|
|
consensus_cache_entry_get_value(job->diff_from, LABEL_VALID_AFTER);
|
|
|
const char *lv_from_digest =
|
|
@@ -1213,6 +1423,12 @@ consensus_diff_worker_threadfn(void *state_, void *work_)
|
|
|
job->out[0].bodylen = difflen;
|
|
|
|
|
|
config_line_t *common_labels = NULL;
|
|
|
+ if (lv_to_valid_until)
|
|
|
+ config_line_prepend(&common_labels, LABEL_VALID_UNTIL, lv_to_valid_until);
|
|
|
+ if (lv_to_fresh_until)
|
|
|
+ config_line_prepend(&common_labels, LABEL_FRESH_UNTIL, lv_to_fresh_until);
|
|
|
+ if (lv_to_signatories)
|
|
|
+ config_line_prepend(&common_labels, LABEL_SIGNATORIES, lv_to_signatories);
|
|
|
cdm_labels_prepend_sha3(&common_labels,
|
|
|
LABEL_SHA3_DIGEST_UNCOMPRESSED,
|
|
|
job->out[0].body,
|
|
@@ -1235,24 +1451,10 @@ consensus_diff_worker_threadfn(void *state_, void *work_)
|
|
|
job->out[0].body,
|
|
|
job->out[0].bodylen);
|
|
|
|
|
|
- unsigned u;
|
|
|
- for (u = 1; u < n_diff_compression_methods(); ++u) {
|
|
|
- compress_method_t method = compress_diffs_with[u];
|
|
|
- const char *methodname = compression_method_get_name(method);
|
|
|
- char *result;
|
|
|
- size_t sz;
|
|
|
- if (0 == tor_compress(&result, &sz, consensus_diff, difflen, method)) {
|
|
|
- job->out[u].body = (uint8_t*)result;
|
|
|
- job->out[u].bodylen = sz;
|
|
|
- job->out[u].labels = config_lines_dup(common_labels);
|
|
|
- cdm_labels_prepend_sha3(&job->out[u].labels, LABEL_SHA3_DIGEST,
|
|
|
- job->out[u].body,
|
|
|
- job->out[u].bodylen);
|
|
|
- config_line_prepend(&job->out[u].labels,
|
|
|
- LABEL_COMPRESSION_TYPE,
|
|
|
- methodname);
|
|
|
- }
|
|
|
- }
|
|
|
+ compress_multiple(job->out+1,
|
|
|
+ n_diff_compression_methods()-1,
|
|
|
+ compress_diffs_with+1,
|
|
|
+ (const uint8_t*)consensus_diff, difflen, common_labels);
|
|
|
|
|
|
config_free_lines(common_labels);
|
|
|
return WQ_RPL_REPLY;
|
|
@@ -1318,36 +1520,20 @@ consensus_diff_worker_replyfn(void *work_)
|
|
|
cache = 0;
|
|
|
}
|
|
|
|
|
|
- int status = CDM_DIFF_ERROR;
|
|
|
consensus_cache_entry_handle_t *handles[ARRAY_LENGTH(compress_diffs_with)];
|
|
|
memset(handles, 0, sizeof(handles));
|
|
|
|
|
|
- consdiffmgr_ensure_space_for_files(n_diff_compression_methods());
|
|
|
-
|
|
|
- unsigned u;
|
|
|
- for (u = 0; u < n_diff_compression_methods(); ++u) {
|
|
|
- compress_method_t method = compress_diffs_with[u];
|
|
|
- uint8_t *body_out = job->out[u].body;
|
|
|
- size_t bodylen_out = job->out[u].bodylen;
|
|
|
- config_line_t *labels = job->out[u].labels;
|
|
|
- const char *methodname = compression_method_get_name(method);
|
|
|
- if (body_out && bodylen_out && labels) {
|
|
|
- /* Success! Store the results */
|
|
|
- log_info(LD_DIRSERV, "Adding consensus diff from %s to %s, "
|
|
|
- "compressed with %s",
|
|
|
- lv_from_digest, lv_to_digest, methodname);
|
|
|
+ char description[128];
|
|
|
+ tor_snprintf(description, sizeof(description),
|
|
|
+ "consensus diff from %s to %s",
|
|
|
+ lv_from_digest, lv_to_digest);
|
|
|
|
|
|
- consensus_cache_entry_t *ent =
|
|
|
- consensus_cache_add(cdm_cache_get(),
|
|
|
- labels,
|
|
|
- body_out,
|
|
|
- bodylen_out);
|
|
|
+ int status = store_multiple(handles,
|
|
|
+ n_diff_compression_methods(),
|
|
|
+ compress_diffs_with,
|
|
|
+ job->out,
|
|
|
+ description);
|
|
|
|
|
|
- status = CDM_DIFF_PRESENT;
|
|
|
- handles[u] = consensus_cache_entry_handle_new(ent);
|
|
|
- consensus_cache_entry_decref(ent);
|
|
|
- }
|
|
|
- }
|
|
|
if (status != CDM_DIFF_PRESENT) {
|
|
|
/* Failure! Nothing to do but complain */
|
|
|
log_warn(LD_DIRSERV,
|
|
@@ -1357,6 +1543,7 @@ consensus_diff_worker_replyfn(void *work_)
|
|
|
status = CDM_DIFF_ERROR;
|
|
|
}
|
|
|
|
|
|
+ unsigned u;
|
|
|
for (u = 0; u < ARRAY_LENGTH(handles); ++u) {
|
|
|
compress_method_t method = compress_diffs_with[u];
|
|
|
if (cache) {
|
|
@@ -1408,3 +1595,229 @@ consensus_diff_queue_diff_work(consensus_cache_entry_t *diff_from,
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
+/**
|
|
|
+ * Holds requests and replies for consensus_compress_workers.
|
|
|
+ */
|
|
|
+typedef struct consensus_compress_worker_job_t {
|
|
|
+ char *consensus;
|
|
|
+ size_t consensus_len;
|
|
|
+ consensus_flavor_t flavor;
|
|
|
+ config_line_t *labels_in;
|
|
|
+ compressed_result_t out[ARRAY_LENGTH(compress_consensus_with)];
|
|
|
+} consensus_compress_worker_job_t;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Free all resources held in <b>job</b>
|
|
|
+ */
|
|
|
+static void
|
|
|
+consensus_compress_worker_job_free(consensus_compress_worker_job_t *job)
|
|
|
+{
|
|
|
+ if (!job)
|
|
|
+ return;
|
|
|
+ tor_free(job->consensus);
|
|
|
+ config_free_lines(job->labels_in);
|
|
|
+ unsigned u;
|
|
|
+ for (u = 0; u < n_consensus_compression_methods(); ++u) {
|
|
|
+ config_free_lines(job->out[u].labels);
|
|
|
+ tor_free(job->out[u].body);
|
|
|
+ }
|
|
|
+ tor_free(job);
|
|
|
+}
|
|
|
+/**
|
|
|
+ * Worker function. This function runs inside a worker thread and receives
|
|
|
+ * a consensus_compress_worker_job_t as its input.
|
|
|
+ */
|
|
|
+static workqueue_reply_t
|
|
|
+consensus_compress_worker_threadfn(void *state_, void *work_)
|
|
|
+{
|
|
|
+ (void)state_;
|
|
|
+ consensus_compress_worker_job_t *job = work_;
|
|
|
+ consensus_flavor_t flavor = job->flavor;
|
|
|
+ const char *consensus = job->consensus;
|
|
|
+ size_t bodylen = job->consensus_len;
|
|
|
+
|
|
|
+ config_line_t *labels = config_lines_dup(job->labels_in);
|
|
|
+ const char *flavname = networkstatus_get_flavor_name(flavor);
|
|
|
+
|
|
|
+ cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_UNCOMPRESSED,
|
|
|
+ (const uint8_t *)consensus, bodylen);
|
|
|
+ {
|
|
|
+ const char *start, *end;
|
|
|
+ if (router_get_networkstatus_v3_signed_boundaries(consensus,
|
|
|
+ &start, &end) < 0) {
|
|
|
+ start = consensus;
|
|
|
+ end = consensus+bodylen;
|
|
|
+ }
|
|
|
+ cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_AS_SIGNED,
|
|
|
+ (const uint8_t *)start,
|
|
|
+ end - start);
|
|
|
+ }
|
|
|
+ config_line_prepend(&labels, LABEL_FLAVOR, flavname);
|
|
|
+ config_line_prepend(&labels, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
|
|
|
+
|
|
|
+ compress_multiple(job->out,
|
|
|
+ n_consensus_compression_methods(),
|
|
|
+ compress_consensus_with,
|
|
|
+ (const uint8_t*)consensus, bodylen, labels);
|
|
|
+ config_free_lines(labels);
|
|
|
+ return WQ_RPL_REPLY;
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Worker function: This function runs in the main thread, and receives
|
|
|
+ * a consensus_diff_compress_job_t that the worker thread has already
|
|
|
+ * processed.
|
|
|
+ */
|
|
|
+static void
|
|
|
+consensus_compress_worker_replyfn(void *work_)
|
|
|
+{
|
|
|
+ consensus_compress_worker_job_t *job = work_;
|
|
|
+
|
|
|
+ consensus_cache_entry_handle_t *handles[
|
|
|
+ ARRAY_LENGTH(compress_consensus_with)];
|
|
|
+ memset(handles, 0, sizeof(handles));
|
|
|
+
|
|
|
+ store_multiple(handles,
|
|
|
+ n_consensus_compression_methods(),
|
|
|
+ compress_consensus_with,
|
|
|
+ job->out,
|
|
|
+ "consensus");
|
|
|
+ cdm_cache_dirty = 1;
|
|
|
+
|
|
|
+ unsigned u;
|
|
|
+ consensus_flavor_t f = job->flavor;
|
|
|
+ tor_assert((int)f < N_CONSENSUS_FLAVORS);
|
|
|
+ for (u = 0; u < ARRAY_LENGTH(handles); ++u) {
|
|
|
+ if (handles[u] == NULL)
|
|
|
+ continue;
|
|
|
+ consensus_cache_entry_handle_free(latest_consensus[f][u]);
|
|
|
+ latest_consensus[f][u] = handles[u];
|
|
|
+ }
|
|
|
+
|
|
|
+ consensus_compress_worker_job_free(job);
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * If true, we compress in worker threads.
|
|
|
+ */
|
|
|
+static int background_compression = 0;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Queue a job to compress <b>consensus</b> and store its compressed
|
|
|
+ * text in the cache.
|
|
|
+ */
|
|
|
+static int
|
|
|
+consensus_queue_compression_work(const char *consensus,
|
|
|
+ const networkstatus_t *as_parsed)
|
|
|
+{
|
|
|
+ tor_assert(consensus);
|
|
|
+ tor_assert(as_parsed);
|
|
|
+
|
|
|
+ consensus_compress_worker_job_t *job = tor_malloc_zero(sizeof(*job));
|
|
|
+ job->consensus = tor_strdup(consensus);
|
|
|
+ job->consensus_len = strlen(consensus);
|
|
|
+ job->flavor = as_parsed->flavor;
|
|
|
+
|
|
|
+ char va_str[ISO_TIME_LEN+1];
|
|
|
+ char vu_str[ISO_TIME_LEN+1];
|
|
|
+ char fu_str[ISO_TIME_LEN+1];
|
|
|
+ format_iso_time_nospace(va_str, as_parsed->valid_after);
|
|
|
+ format_iso_time_nospace(fu_str, as_parsed->fresh_until);
|
|
|
+ format_iso_time_nospace(vu_str, as_parsed->valid_until);
|
|
|
+ config_line_append(&job->labels_in, LABEL_VALID_AFTER, va_str);
|
|
|
+ config_line_append(&job->labels_in, LABEL_FRESH_UNTIL, fu_str);
|
|
|
+ config_line_append(&job->labels_in, LABEL_VALID_UNTIL, vu_str);
|
|
|
+ if (as_parsed->voters) {
|
|
|
+ smartlist_t *hexvoters = smartlist_new();
|
|
|
+ SMARTLIST_FOREACH_BEGIN(as_parsed->voters,
|
|
|
+ networkstatus_voter_info_t *, vi) {
|
|
|
+ if (smartlist_len(vi->sigs) == 0)
|
|
|
+ continue; // didn't sign.
|
|
|
+ char d[HEX_DIGEST_LEN+1];
|
|
|
+ base16_encode(d, sizeof(d), vi->identity_digest, DIGEST_LEN);
|
|
|
+ smartlist_add_strdup(hexvoters, d);
|
|
|
+ } SMARTLIST_FOREACH_END(vi);
|
|
|
+ char *signers = smartlist_join_strings(hexvoters, ",", 0, NULL);
|
|
|
+ config_line_prepend(&job->labels_in, LABEL_SIGNATORIES, signers);
|
|
|
+ tor_free(signers);
|
|
|
+ SMARTLIST_FOREACH(hexvoters, char *, cp, tor_free(cp));
|
|
|
+ }
|
|
|
+
|
|
|
+ if (background_compression) {
|
|
|
+ workqueue_entry_t *work;
|
|
|
+ work = cpuworker_queue_work(consensus_compress_worker_threadfn,
|
|
|
+ consensus_compress_worker_replyfn,
|
|
|
+ job);
|
|
|
+ if (!work) {
|
|
|
+ consensus_compress_worker_job_free(job);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ return 0;
|
|
|
+ } else {
|
|
|
+ consensus_compress_worker_threadfn(NULL, job);
|
|
|
+ consensus_compress_worker_replyfn(job);
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Tell the consdiffmgr backend to compress consensuses in worker threads.
|
|
|
+ */
|
|
|
+void
|
|
|
+consdiffmgr_enable_background_compression(void)
|
|
|
+{
|
|
|
+ // This isn't the default behavior because it would break unit tests.
|
|
|
+ background_compression = 1;
|
|
|
+}
|
|
|
+
|
|
|
+/** Read the set of voters from the cached object <b>ent</b> into
|
|
|
+ * <b>out</b>, as a list of hex-encoded digests. Return 0 on success,
|
|
|
+ * -1 if no signatories were recorded. */
|
|
|
+int
|
|
|
+consensus_cache_entry_get_voter_id_digests(const consensus_cache_entry_t *ent,
|
|
|
+ smartlist_t *out)
|
|
|
+{
|
|
|
+ tor_assert(ent);
|
|
|
+ tor_assert(out);
|
|
|
+ const char *s;
|
|
|
+ s = consensus_cache_entry_get_value(ent, LABEL_SIGNATORIES);
|
|
|
+ if (s == NULL)
|
|
|
+ return -1;
|
|
|
+ smartlist_split_string(out, s, ",", SPLIT_SKIP_SPACE|SPLIT_STRIP_SPACE, 0);
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
+/** Read the fresh-until time of cached object <b>ent</b> into *<b>out</b>
|
|
|
+ * and return 0, or return -1 if no such time was recorded. */
|
|
|
+int
|
|
|
+consensus_cache_entry_get_fresh_until(const consensus_cache_entry_t *ent,
|
|
|
+ time_t *out)
|
|
|
+{
|
|
|
+ tor_assert(ent);
|
|
|
+ tor_assert(out);
|
|
|
+ const char *s;
|
|
|
+ s = consensus_cache_entry_get_value(ent, LABEL_FRESH_UNTIL);
|
|
|
+ if (s == NULL || parse_iso_time_nospace(s, out) < 0)
|
|
|
+ return -1;
|
|
|
+ else
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
+/** Read the valid until timestamp from the cached object <b>ent</b> into
|
|
|
+ * *<b>out</b> and return 0, or return -1 if no such time was recorded. */
|
|
|
+int
|
|
|
+consensus_cache_entry_get_valid_until(const consensus_cache_entry_t *ent,
|
|
|
+ time_t *out)
|
|
|
+{
|
|
|
+ tor_assert(ent);
|
|
|
+ tor_assert(out);
|
|
|
+
|
|
|
+ const char *s;
|
|
|
+ s = consensus_cache_entry_get_value(ent, LABEL_VALID_UNTIL);
|
|
|
+ if (s == NULL || parse_iso_time_nospace(s, out) < 0)
|
|
|
+ return -1;
|
|
|
+ else
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|