123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428 |
- #!/usr/bin/python
- # Copyright 2017-2019, The Tor Project, Inc
- # See LICENSE for licensing information
- """
- hs_ntor_ref.py
- This module is a reference implementation of the modified ntor protocol
- proposed for Tor hidden services in proposal 224 (Next Generation Hidden
- Services) in section [NTOR-WITH-EXTRA-DATA].
- The modified ntor protocol is a single-round protocol, with three steps in total:
- 1: Client generates keys and sends them to service via INTRODUCE cell
- 2: Service computes key material based on client's keys, and sends its own
- keys to client via RENDEZVOUS cell
- 3: Client computes key material as well.
- It's meant to be used to validate Tor's HS ntor implementation by conducting
- various integration tests. Specifically it conducts the following three tests:
- - Tests our Python implementation by running the whole protocol in Python and
- making sure that results are consistent.
- - Tests little-t-tor ntor implementation. We use this Python code to instrument
- little-t-tor and carry out the handshake by using little-t-tor code. The
- small C wrapper at src/test/test-hs-ntor-cl is used for this Python module to
- interface with little-t-tor.
- - Cross-tests Python and little-t-tor implementation by running half of the
- protocol in Python code and the other in little-t-tor. This is actually two
- tests so that all parts of the protocol are run both by little-t-tor and
- Python.
- It requires the curve25519 python module from the curve25519-donna package.
- The whole logic and concept for this test suite was taken from ntor_ref.py.
- *** DO NOT USE THIS IN PRODUCTION. ***
- """
- import struct
- import os, sys
- import binascii
- import subprocess
- try:
- import curve25519
- curve25519mod = curve25519.keys
- except ImportError:
- curve25519 = None
- import slownacl_curve25519
- curve25519mod = slownacl_curve25519
- import hashlib
- try:
- import sha3
- except ImportError:
- # In python 3.6, the sha3 functions are in hashlib whether we
- # import sha3 or not.
- sha3 = None
- try:
- # Pull the sha3 functions in.
- from hashlib import sha3_256, shake_256
- shake_squeeze = shake_256.digest
- except ImportError:
- if hasattr(sha3, "SHA3256"):
- # If this happens, then we have the old "sha3" module which
- # hashlib and pysha3 superseded.
- sha3_256 = sha3.SHA3256
- shake_256 = sha3.SHAKE256
- shake_squeeze = shake_256.squeeze
- else:
- # error code 77 tells automake to skip this test
- sys.exit(77)
- # Import Nick's ntor reference implementation in Python
- # We are gonna use a few of its utilities.
- from ntor_ref import hash_nil
- from ntor_ref import PrivateKey
- # String constants used in this protocol
- PROTOID = b"tor-hs-ntor-curve25519-sha3-256-1"
- T_HSENC = PROTOID + b":hs_key_extract"
- T_HSVERIFY = PROTOID + b":hs_verify"
- T_HSMAC = PROTOID + b":hs_mac"
- M_HSEXPAND = PROTOID + b":hs_key_expand"
- INTRO_SECRET_LEN = 161
- REND_SECRET_LEN = 225
- AUTH_INPUT_LEN = 199
- # Implements MAC(k,m) = H(htonll(len(k)) | k | m)
- def mac(k,m):
- def htonll(num):
- return struct.pack('!q', num)
- s = sha3_256()
- s.update(htonll(len(k)))
- s.update(k)
- s.update(m)
- return s.digest()
- ######################################################################
- # Functions that implement the modified HS ntor protocol
- """As client compute key material for INTRODUCE cell as follows:
- intro_secret_hs_input = EXP(B,x) | AUTH_KEY | X | B | PROTOID
- info = m_hsexpand | subcredential
- hs_keys = KDF(intro_secret_hs_input | t_hsenc | info, S_KEY_LEN+MAC_LEN)
- ENC_KEY = hs_keys[0:S_KEY_LEN]
- MAC_KEY = hs_keys[S_KEY_LEN:S_KEY_LEN+MAC_KEY_LEN]
- """
- def intro2_ntor_client(intro_auth_pubkey_str, intro_enc_pubkey,
- client_ephemeral_enc_pubkey, client_ephemeral_enc_privkey, subcredential):
- dh_result = client_ephemeral_enc_privkey.get_shared_key(intro_enc_pubkey, hash_nil)
- secret = dh_result + intro_auth_pubkey_str + client_ephemeral_enc_pubkey.serialize() + intro_enc_pubkey.serialize() + PROTOID
- assert(len(secret) == INTRO_SECRET_LEN)
- info = M_HSEXPAND + subcredential
- kdf = shake_256()
- kdf.update(secret + T_HSENC + info)
- key_material = shake_squeeze(kdf, 64*8)
- enc_key = key_material[0:32]
- mac_key = key_material[32:64]
- return enc_key, mac_key
- """Wrapper over intro2_ntor_client()"""
- def client_part1(intro_auth_pubkey_str, intro_enc_pubkey,
- client_ephemeral_enc_pubkey, client_ephemeral_enc_privkey, subcredential):
- enc_key, mac_key = intro2_ntor_client(intro_auth_pubkey_str, intro_enc_pubkey, client_ephemeral_enc_pubkey, client_ephemeral_enc_privkey, subcredential)
- assert(enc_key)
- assert(mac_key)
- return enc_key, mac_key
- """As service compute key material for INTRODUCE cell as follows:
- intro_secret_hs_input = EXP(X,b) | AUTH_KEY | X | B | PROTOID
- info = m_hsexpand | subcredential
- hs_keys = KDF(intro_secret_hs_input | t_hsenc | info, S_KEY_LEN+MAC_LEN)
- HS_DEC_KEY = hs_keys[0:S_KEY_LEN]
- HS_MAC_KEY = hs_keys[S_KEY_LEN:S_KEY_LEN+MAC_KEY_LEN]
- """
- def intro2_ntor_service(intro_auth_pubkey_str, client_enc_pubkey, service_enc_privkey, service_enc_pubkey, subcredential):
- dh_result = service_enc_privkey.get_shared_key(client_enc_pubkey, hash_nil)
- secret = dh_result + intro_auth_pubkey_str + client_enc_pubkey.serialize() + service_enc_pubkey.serialize() + PROTOID
- assert(len(secret) == INTRO_SECRET_LEN)
- info = M_HSEXPAND + subcredential
- kdf = shake_256()
- kdf.update(secret + T_HSENC + info)
- key_material = shake_squeeze(kdf, 64*8)
- enc_key = key_material[0:32]
- mac_key = key_material[32:64]
- return enc_key, mac_key
- """As service compute key material for INTRODUCE and REDNEZVOUS cells.
- Use intro2_ntor_service() to calculate the INTRODUCE key material, and use
- the following computations to do the RENDEZVOUS ones:
- rend_secret_hs_input = EXP(X,y) | EXP(X,b) | AUTH_KEY | B | X | Y | PROTOID
- NTOR_KEY_SEED = MAC(rend_secret_hs_input, t_hsenc)
- verify = MAC(rend_secret_hs_input, t_hsverify)
- auth_input = verify | AUTH_KEY | B | Y | X | PROTOID | "Server"
- AUTH_INPUT_MAC = MAC(auth_input, t_hsmac)
- """
- def service_part1(intro_auth_pubkey_str, client_enc_pubkey, intro_enc_privkey, intro_enc_pubkey, subcredential):
- intro_enc_key, intro_mac_key = intro2_ntor_service(intro_auth_pubkey_str, client_enc_pubkey, intro_enc_privkey, intro_enc_pubkey, subcredential)
- assert(intro_enc_key)
- assert(intro_mac_key)
- service_ephemeral_privkey = PrivateKey()
- service_ephemeral_pubkey = service_ephemeral_privkey.get_public()
- dh_result1 = service_ephemeral_privkey.get_shared_key(client_enc_pubkey, hash_nil)
- dh_result2 = intro_enc_privkey.get_shared_key(client_enc_pubkey, hash_nil)
- rend_secret_hs_input = dh_result1 + dh_result2 + intro_auth_pubkey_str + intro_enc_pubkey.serialize() + client_enc_pubkey.serialize() + service_ephemeral_pubkey.serialize() + PROTOID
- assert(len(rend_secret_hs_input) == REND_SECRET_LEN)
- ntor_key_seed = mac(rend_secret_hs_input, T_HSENC)
- verify = mac(rend_secret_hs_input, T_HSVERIFY)
- auth_input = verify + intro_auth_pubkey_str + intro_enc_pubkey.serialize() + service_ephemeral_pubkey.serialize() + client_enc_pubkey.serialize() + PROTOID + b"Server"
- assert(len(auth_input) == AUTH_INPUT_LEN)
- auth_input_mac = mac(auth_input, T_HSMAC)
- assert(ntor_key_seed)
- assert(auth_input_mac)
- assert(service_ephemeral_pubkey)
- return intro_enc_key, intro_mac_key, ntor_key_seed, auth_input_mac, service_ephemeral_pubkey
- """As client compute key material for rendezvous cells as follows:
- rend_secret_hs_input = EXP(Y,x) | EXP(B,x) | AUTH_KEY | B | X | Y | PROTOID
- NTOR_KEY_SEED = MAC(ntor_secret_input, t_hsenc)
- verify = MAC(ntor_secret_input, t_hsverify)
- auth_input = verify | AUTH_KEY | B | Y | X | PROTOID | "Server"
- AUTH_INPUT_MAC = MAC(auth_input, t_hsmac)
- """
- def client_part2(intro_auth_pubkey_str, client_ephemeral_enc_pubkey, client_ephemeral_enc_privkey,
- intro_enc_pubkey, service_ephemeral_rend_pubkey):
- dh_result1 = client_ephemeral_enc_privkey.get_shared_key(service_ephemeral_rend_pubkey, hash_nil)
- dh_result2 = client_ephemeral_enc_privkey.get_shared_key(intro_enc_pubkey, hash_nil)
- rend_secret_hs_input = dh_result1 + dh_result2 + intro_auth_pubkey_str + intro_enc_pubkey.serialize() + client_ephemeral_enc_pubkey.serialize() + service_ephemeral_rend_pubkey.serialize() + PROTOID
- assert(len(rend_secret_hs_input) == REND_SECRET_LEN)
- ntor_key_seed = mac(rend_secret_hs_input, T_HSENC)
- verify = mac(rend_secret_hs_input, T_HSVERIFY)
- auth_input = verify + intro_auth_pubkey_str + intro_enc_pubkey.serialize() + service_ephemeral_rend_pubkey.serialize() + client_ephemeral_enc_pubkey.serialize() + PROTOID + b"Server"
- assert(len(auth_input) == AUTH_INPUT_LEN)
- auth_input_mac = mac(auth_input, T_HSMAC)
- assert(ntor_key_seed)
- assert(auth_input_mac)
- return ntor_key_seed, auth_input_mac
- #################################################################################
- """
- Utilities for communicating with the little-t-tor ntor wrapper to conduct the
- integration tests
- """
- PROG = "./src/test/test-hs-ntor-cl"
- if sys.version_info[0] >= 3:
- enhex=lambda s: binascii.b2a_hex(s).decode("ascii")
- else:
- enhex=lambda s: binascii.b2a_hex(s)
- dehex=lambda s: binascii.a2b_hex(s.strip())
- def tor_client1(intro_auth_pubkey_str, intro_enc_pubkey,
- client_ephemeral_enc_privkey, subcredential):
- p = subprocess.Popen([PROG, "client1",
- enhex(intro_auth_pubkey_str),
- enhex(intro_enc_pubkey.serialize()),
- enhex(client_ephemeral_enc_privkey.serialize()),
- enhex(subcredential)],
- stdout=subprocess.PIPE)
- return map(dehex, p.stdout.readlines())
- def tor_server1(intro_auth_pubkey_str, intro_enc_privkey,
- client_ephemeral_enc_pubkey, subcredential):
- p = subprocess.Popen([PROG, "server1",
- enhex(intro_auth_pubkey_str),
- enhex(intro_enc_privkey.serialize()),
- enhex(client_ephemeral_enc_pubkey.serialize()),
- enhex(subcredential)],
- stdout=subprocess.PIPE)
- return map(dehex, p.stdout.readlines())
- def tor_client2(intro_auth_pubkey_str, client_ephemeral_enc_privkey,
- intro_enc_pubkey, service_ephemeral_rend_pubkey, subcredential):
- p = subprocess.Popen([PROG, "client2",
- enhex(intro_auth_pubkey_str),
- enhex(client_ephemeral_enc_privkey.serialize()),
- enhex(intro_enc_pubkey.serialize()),
- enhex(service_ephemeral_rend_pubkey.serialize()),
- enhex(subcredential)],
- stdout=subprocess.PIPE)
- return map(dehex, p.stdout.readlines())
- ##################################################################################
- # Perform a pure python ntor test
- def do_pure_python_ntor_test():
- # Initialize all needed key material
- client_ephemeral_enc_privkey = PrivateKey()
- client_ephemeral_enc_pubkey = client_ephemeral_enc_privkey.get_public()
- intro_enc_privkey = PrivateKey()
- intro_enc_pubkey = intro_enc_privkey.get_public()
- intro_auth_pubkey_str = os.urandom(32)
- subcredential = os.urandom(32)
- client_enc_key, client_mac_key = client_part1(intro_auth_pubkey_str, intro_enc_pubkey, client_ephemeral_enc_pubkey, client_ephemeral_enc_privkey, subcredential)
- service_enc_key, service_mac_key, service_ntor_key_seed, service_auth_input_mac, service_ephemeral_pubkey = service_part1(intro_auth_pubkey_str, client_ephemeral_enc_pubkey, intro_enc_privkey, intro_enc_pubkey, subcredential)
- assert(client_enc_key == service_enc_key)
- assert(client_mac_key == service_mac_key)
- client_ntor_key_seed, client_auth_input_mac = client_part2(intro_auth_pubkey_str, client_ephemeral_enc_pubkey, client_ephemeral_enc_privkey,
- intro_enc_pubkey, service_ephemeral_pubkey)
- assert(client_ntor_key_seed == service_ntor_key_seed)
- assert(client_auth_input_mac == service_auth_input_mac)
- print("DONE: python dance [%s]" % repr(client_auth_input_mac))
- # Perform a pure little-t-tor integration test.
- def do_little_t_tor_ntor_test():
- # Initialize all needed key material
- subcredential = os.urandom(32)
- client_ephemeral_enc_privkey = PrivateKey()
- client_ephemeral_enc_pubkey = client_ephemeral_enc_privkey.get_public()
- intro_enc_privkey = PrivateKey()
- intro_enc_pubkey = intro_enc_privkey.get_public() # service-side enc key
- intro_auth_pubkey_str = os.urandom(32)
- client_enc_key, client_mac_key = tor_client1(intro_auth_pubkey_str, intro_enc_pubkey,
- client_ephemeral_enc_privkey, subcredential)
- assert(client_enc_key)
- assert(client_mac_key)
- service_enc_key, service_mac_key, service_ntor_auth_mac, service_ntor_key_seed, service_eph_pubkey = tor_server1(intro_auth_pubkey_str,
- intro_enc_privkey,
- client_ephemeral_enc_pubkey,
- subcredential)
- assert(service_enc_key)
- assert(service_mac_key)
- assert(service_ntor_auth_mac)
- assert(service_ntor_key_seed)
- assert(client_enc_key == service_enc_key)
- assert(client_mac_key == service_mac_key)
- # Turn from bytes to key
- service_eph_pubkey = curve25519mod.Public(service_eph_pubkey)
- client_ntor_auth_mac, client_ntor_key_seed = tor_client2(intro_auth_pubkey_str, client_ephemeral_enc_privkey,
- intro_enc_pubkey, service_eph_pubkey, subcredential)
- assert(client_ntor_auth_mac)
- assert(client_ntor_key_seed)
- assert(client_ntor_key_seed == service_ntor_key_seed)
- assert(client_ntor_auth_mac == service_ntor_auth_mac)
- print("DONE: tor dance [%s]" % repr(client_ntor_auth_mac))
- """
- Do mixed test as follows:
- 1. C -> S (python mode)
- 2. C <- S (tor mode)
- 3. Client computes keys (python mode)
- """
- def do_first_mixed_test():
- subcredential = os.urandom(32)
- client_ephemeral_enc_privkey = PrivateKey()
- client_ephemeral_enc_pubkey = client_ephemeral_enc_privkey.get_public()
- intro_enc_privkey = PrivateKey()
- intro_enc_pubkey = intro_enc_privkey.get_public() # service-side enc key
- intro_auth_pubkey_str = os.urandom(32)
- # Let's do mixed
- client_enc_key, client_mac_key = client_part1(intro_auth_pubkey_str, intro_enc_pubkey,
- client_ephemeral_enc_pubkey, client_ephemeral_enc_privkey,
- subcredential)
- service_enc_key, service_mac_key, service_ntor_auth_mac, service_ntor_key_seed, service_eph_pubkey = tor_server1(intro_auth_pubkey_str,
- intro_enc_privkey,
- client_ephemeral_enc_pubkey,
- subcredential)
- assert(service_enc_key)
- assert(service_mac_key)
- assert(service_ntor_auth_mac)
- assert(service_ntor_key_seed)
- assert(service_eph_pubkey)
- assert(client_enc_key == service_enc_key)
- assert(client_mac_key == service_mac_key)
- # Turn from bytes to key
- service_eph_pubkey = curve25519mod.Public(service_eph_pubkey)
- client_ntor_key_seed, client_auth_input_mac = client_part2(intro_auth_pubkey_str, client_ephemeral_enc_pubkey, client_ephemeral_enc_privkey,
- intro_enc_pubkey, service_eph_pubkey)
- assert(client_auth_input_mac == service_ntor_auth_mac)
- assert(client_ntor_key_seed == service_ntor_key_seed)
- print("DONE: 1st mixed dance [%s]" % repr(client_auth_input_mac))
- """
- Do mixed test as follows:
- 1. C -> S (tor mode)
- 2. C <- S (python mode)
- 3. Client computes keys (tor mode)
- """
- def do_second_mixed_test():
- subcredential = os.urandom(32)
- client_ephemeral_enc_privkey = PrivateKey()
- client_ephemeral_enc_pubkey = client_ephemeral_enc_privkey.get_public()
- intro_enc_privkey = PrivateKey()
- intro_enc_pubkey = intro_enc_privkey.get_public() # service-side enc key
- intro_auth_pubkey_str = os.urandom(32)
- # Let's do mixed
- client_enc_key, client_mac_key = tor_client1(intro_auth_pubkey_str, intro_enc_pubkey,
- client_ephemeral_enc_privkey, subcredential)
- assert(client_enc_key)
- assert(client_mac_key)
- service_enc_key, service_mac_key, service_ntor_key_seed, service_ntor_auth_mac, service_ephemeral_pubkey = service_part1(intro_auth_pubkey_str, client_ephemeral_enc_pubkey, intro_enc_privkey, intro_enc_pubkey, subcredential)
- client_ntor_auth_mac, client_ntor_key_seed = tor_client2(intro_auth_pubkey_str, client_ephemeral_enc_privkey,
- intro_enc_pubkey, service_ephemeral_pubkey, subcredential)
- assert(client_ntor_auth_mac)
- assert(client_ntor_key_seed)
- assert(client_ntor_key_seed == service_ntor_key_seed)
- assert(client_ntor_auth_mac == service_ntor_auth_mac)
- print("DONE: 2nd mixed dance [%s]" % repr(client_ntor_auth_mac))
- def do_mixed_tests():
- do_first_mixed_test()
- do_second_mixed_test()
- if __name__ == '__main__':
- do_pure_python_ntor_test()
- do_little_t_tor_ntor_test()
- do_mixed_tests()
|