|
- #!/usr/bin/env python3
- import ast
- import collections
- import mmap
- import os
- import pathlib
- import random
- import shutil
- import string
- import subprocess
- import unittest
- from datetime import datetime, timedelta
- from regression import (
- HAS_SGX,
- RegressionTestCase,
- expectedFailureIf,
- )
- CPUINFO_FLAGS_WHITELIST = [
- 'fpu', 'vme', 'de', 'pse', 'tsc', 'msr', 'pae', 'mce', 'cx8', 'apic', 'sep',
- 'mtrr', 'pge', 'mca', 'cmov', 'pat', 'pse36', 'pn', 'clflush', 'dts',
- 'acpi', 'mmx', 'fxsr', 'sse', 'sse2', 'ss', 'ht', 'tm', 'ia64', 'pbe',
- ]
- class TC_00_Basic(RegressionTestCase):
- def test_000_atomic_math(self):
- stdout, stderr = self.run_binary(['AtomicMath'])
- self.assertIn('Subtract INT_MIN: Both values match 2147483648', stderr)
- self.assertIn('Subtract INT_MAX: Both values match -2147483647', stderr)
- self.assertIn('Subtract LLONG_MIN: Both values match -9223372036854775808', stderr)
- self.assertIn('Subtract LLONG_MAX: Both values match -9223372036854775807', stderr)
- def test_001_path_normalization(self):
- stdout, stderr = self.run_binary(['normalize_path'])
- self.assertIn("Success!\n", stderr)
- class TC_01_Bootstrap(RegressionTestCase):
- def test_100_basic_boostrapping(self):
- stdout, stderr = self.run_binary(['Bootstrap'])
- # Basic Bootstrapping
- self.assertIn('User Program Started', stderr)
- # Control Block: Executable Name
- self.assertIn('Loaded Executable: file:Bootstrap', stderr)
- # One Argument Given
- self.assertIn('# of Arguments: 1', stderr)
- self.assertIn('argv[0] = Bootstrap', stderr)
- # Control Block: Debug Stream (Inline)
- self.assertIn('Written to Debug Stream', stdout)
- # Control Block: Allocation Alignment
- self.assertIn('Allocation Alignment: {}'.format(mmap.ALLOCATIONGRANULARITY), stderr)
- # Control Block: Executable Range
- self.assertIn('Executable Range OK', stderr)
- def test_101_basic_boostrapping_five_arguments(self):
- stdout, stderr = self.run_binary(['Bootstrap', 'a', 'b', 'c', 'd'])
- # Five Arguments Given
- self.assertIn('# of Arguments: 5', stderr)
- self.assertIn('argv[1] = a', stderr)
- self.assertIn('argv[2] = b', stderr)
- self.assertIn('argv[3] = c', stderr)
- self.assertIn('argv[4] = d', stderr)
- def test_102_cpuinfo(self):
- with open('/proc/cpuinfo') as file:
- cpuinfo = file.read().strip().split('\n\n')[-1]
- cpuinfo = dict(map(str.strip, line.split(':'))
- for line in cpuinfo.split('\n'))
- if 'flags' in cpuinfo:
- cpuinfo['flags'] = ' '.join(flag for flag in cpuinfo['flags']
- if flag in CPUINFO_FLAGS_WHITELIST)
- stdout, stderr = self.run_binary(['Bootstrap'])
- self.assertIn('CPU num: {}'.format(int(cpuinfo['processor']) + 1),
- stderr)
- self.assertIn('CPU vendor: {[vendor_id]}'.format(cpuinfo), stderr)
- self.assertIn('CPU brand: {[model name]}'.format(cpuinfo), stderr)
- self.assertIn('CPU family: {[cpu family]}'.format(cpuinfo), stderr)
- self.assertIn('CPU model: {[model]}'.format(cpuinfo), stderr)
- self.assertIn('CPU stepping: {[stepping]}'.format(cpuinfo), stderr)
- self.assertIn('CPU flags: {[flags]}'.format(cpuinfo), stderr)
- def test_103_dotdot(self):
- stdout, stderr = self.run_binary(['..Bootstrap'])
- self.assertIn('User Program Started', stderr)
- def test_104_manifest_as_executable_name(self):
- manifest = self.get_manifest('Bootstrap2')
- stdout, stderr = self.run_binary([manifest])
- self.assertIn('User Program Started', stderr)
- self.assertIn('Loaded Manifest: file:' + manifest, stderr)
- def test_105_manifest_as_argument(self):
- manifest = self.get_manifest('Bootstrap4')
- stdout, stderr = self.run_binary([manifest])
- self.assertIn('Loaded Manifest: file:' + manifest, stderr)
- self.assertIn('Loaded Executable: file:Bootstrap', stderr)
- def test_106_manifest_with_shebang(self):
- manifest = self.get_manifest('Bootstrap4')
- stdout, stderr = self.run_binary(['./' + manifest])
- self.assertIn('Loaded Manifest: file:' + manifest, stderr)
- self.assertIn('Loaded Executable: file:Bootstrap', stderr)
- self.assertIn('argv[0] = Bootstrap', stderr)
- @unittest.skipUnless(HAS_SGX, 'need SGX')
- def test_107_manifest_with_nonelf_binary(self):
- manifest = self.get_manifest('nonelf_binary')
- #Expect return code is -ENOEXEC(248 as unsigned char)
- with self.expect_returncode(248):
- self.run_binary([manifest])
- def test_110_preload_libraries(self):
- stdout, stderr = self.run_binary(['Bootstrap3'])
- self.assertIn('Binary 1 Preloaded', stderr)
- self.assertIn('Binary 2 Preloaded', stderr)
- self.assertIn('Preloaded Function 1 Called', stderr)
- self.assertIn('Preloaded Function 2 Called', stderr)
- def test_111_preload_libraries(self):
- # Bootstrap without Executable but Preload Libraries
- stdout, stderr = self.run_binary([self.get_manifest('Bootstrap5')])
- self.assertIn('Binary 1 Preloaded', stderr)
- self.assertIn('Binary 2 Preloaded', stderr)
- @unittest.skipUnless(HAS_SGX, 'this test requires SGX')
- def test_120_8gb_enclave(self):
- manifest = self.get_manifest('Bootstrap6')
- stdout, stderr = self.run_binary([manifest], timeout=360)
- self.assertIn('Loaded Manifest: file:' + manifest, stderr)
- self.assertIn('Executable Range OK', stderr)
- def test_130_large_number_of_items_in_manifest(self):
- stdout, stderr = self.run_binary([self.get_manifest('Bootstrap7')])
- self.assertIn('key1000=na', stderr)
- self.assertIn('key1=na', stderr)
- @unittest.skip('this is broken on non-SGX, see #860')
- def test_140_missing_executable_and_manifest(self):
- try:
- stdout, stderr = self.run_binary(['fakenews'])
- self.fail(
- 'expected non-zero returncode, stderr: {!r}'.format(stderr))
- except subprocess.CalledProcessError as e:
- self.assertIn('USAGE: ', e.stderr.decode())
- class TC_02_Symbols(RegressionTestCase):
- ALL_SYMBOLS = [
- 'DkVirtualMemoryAlloc',
- 'DkVirtualMemoryFree',
- 'DkVirtualMemoryProtect',
- 'DkProcessCreate',
- 'DkProcessExit',
- 'DkStreamOpen',
- 'DkStreamWaitForClient',
- 'DkStreamRead',
- 'DkStreamWrite',
- 'DkStreamDelete',
- 'DkStreamMap',
- 'DkStreamUnmap',
- 'DkStreamSetLength',
- 'DkStreamFlush',
- 'DkSendHandle',
- 'DkReceiveHandle',
- 'DkStreamAttributesQuery',
- 'DkStreamAttributesQueryByHandle',
- 'DkStreamAttributesSetByHandle',
- 'DkStreamGetName',
- 'DkStreamChangeName',
- 'DkThreadCreate',
- 'DkThreadDelayExecution',
- 'DkThreadYieldExecution',
- 'DkThreadExit',
- 'DkThreadResume',
- 'DkSetExceptionHandler',
- 'DkExceptionReturn',
- 'DkMutexCreate',
- 'DkMutexRelease',
- 'DkNotificationEventCreate',
- 'DkSynchronizationEventCreate',
- 'DkEventSet',
- 'DkEventClear',
- 'DkObjectsWaitAny',
- 'DkObjectClose',
- 'DkSystemTimeQuery',
- 'DkRandomBitsRead',
- 'DkInstructionCacheFlush',
- 'DkSegmentRegister',
- 'DkMemoryAvailableQuota',
- ]
- def test_000_symbols(self):
- stdout, stderr = self.run_binary(['Symbols'])
- found_symbols = dict(line.split(' = ')
- for line in stderr.strip().split('\n') if line.startswith('Dk'))
- self.assertCountEqual(found_symbols, self.ALL_SYMBOLS)
- for k, v in found_symbols.items():
- v = ast.literal_eval(v)
- self.assertNotEqual(v, 0, 'symbol {} has value 0'.format(k))
- class TC_10_Exception(RegressionTestCase):
- def test_000_exception(self):
- stdout, stderr = self.run_binary(['Exception'])
- # Exception Handling (Div-by-Zero)
- self.assertIn('Arithmetic Exception Handler', stderr)
- # Exception Handling (Memory Fault)
- self.assertIn('Memory Fault Exception Handler', stderr)
- # Exception Handler Swap
- self.assertIn('Arithmetic Exception Handler 1', stderr)
- self.assertIn('Arithmetic Exception Handler 2', stderr)
- # Exception Handling (Set Context)
- self.assertIn('Arithmetic Exception Handler 1', stderr)
- # Exception Handling (Red zone)
- self.assertIn('Red zone test ok.', stderr)
- class TC_20_SingleProcess(RegressionTestCase):
- def test_000_exit_code(self):
- with self.expect_returncode(112):
- self.run_binary(['Exit'])
- def test_100_file(self):
- try:
- pathlib.Path('file_nonexist.tmp').unlink()
- except FileNotFoundError:
- pass
- pathlib.Path('file_delete.tmp').touch()
- with open('File', 'rb') as file:
- file_exist = file.read()
- stdout, stderr = self.run_binary(['File'])
- # Basic File Opening
- self.assertIn('File Open Test 1 OK', stderr)
- self.assertIn('File Open Test 2 OK', stderr)
- self.assertIn('File Open Test 3 OK', stderr)
- # Basic File Creation
- self.assertIn('File Creation Test 1 OK', stderr)
- self.assertIn('File Creation Test 2 OK', stderr)
- self.assertIn('File Creation Test 3 OK', stderr)
- # File Reading
- self.assertIn('Read Test 1 (0th - 40th): {}'.format(
- file_exist[0:40].hex()), stderr)
- self.assertIn('Read Test 2 (0th - 40th): {}'.format(
- file_exist[0:40].hex()), stderr)
- self.assertIn('Read Test 3 (200th - 240th): {}'.format(
- file_exist[200:240].hex()), stderr)
- # File Writing
- with open('file_nonexist.tmp', 'rb') as file:
- file_nonexist = file.read()
- self.assertEqual(file_exist[0:40], file_nonexist[200:240])
- self.assertEqual(file_exist[200:240], file_nonexist[0:40])
- # File Attribute Query
- self.assertIn('Query: type = ', stderr)
- self.assertIn(', size = {}'.format(len(file_exist)), stderr)
- # File Attribute Query by Handle
- self.assertIn('Query by Handle: type = ', stderr)
- self.assertIn(', size = {}'.format(len(file_exist)), stderr)
- # File Mapping
- self.assertIn(
- 'Map Test 1 (0th - 40th): {}'.format(file_exist[0:40].hex()),
- stderr)
- self.assertIn(
- 'Map Test 2 (200th - 240th): {}'.format(file_exist[200:240].hex()),
- stderr)
- self.assertIn(
- 'Map Test 3 (4096th - 4136th): {}'.format(file_exist[4096:4136].hex()),
- stderr)
- self.assertIn(
- 'Map Test 4 (4296th - 4336th): {}'.format(file_exist[4296:4336].hex()),
- stderr)
- # Set File Length
- self.assertEqual(
- pathlib.Path('file_nonexist.tmp').stat().st_size,
- mmap.ALLOCATIONGRANULARITY)
- # File Deletion
- self.assertFalse(pathlib.Path('file_delete.tmp').exists())
- @unittest.skipUnless(HAS_SGX, 'this test requires SGX')
- def test_101_nonexist_file(self):
- # Explicitly remove the file file_nonexist_disallowed.tmp before
- # running binary. Otherwise this test will fail if these tests are
- # run repeatedly.
- os.remove('file_nonexist_disallowed.tmp')
- stdout, stderr = self.run_binary(['File'])
- # Run file creation for non-existing file. This behavior is
- # disallowed unless sgx.allow_file_creation is explicitly set to 1.
- self.assertIn('File Creation Test 4 OK', stderr)
- def test_110_directory(self):
- for path in ['dir_exist.tmp', 'dir_nonexist.tmp', 'dir_delete.tmp']:
- try:
- shutil.rmtree(path)
- except FileNotFoundError:
- pass
- path = pathlib.Path('dir_exist.tmp')
- files = [path / ''.join(random.choice(string.ascii_letters)
- for j in range(8))
- for i in range(5)]
- path.mkdir()
- for p in files:
- p.touch()
- pathlib.Path('dir_delete.tmp').mkdir()
- stdout, stderr = self.run_binary(['Directory'])
- # Basic Directory Opening
- self.assertIn('Directory Open Test 1 OK', stderr)
- self.assertIn('Directory Open Test 2 OK', stderr)
- self.assertIn('Directory Open Test 3 OK', stderr)
- # Basic Directory Creation
- self.assertIn('Directory Creation Test 1 OK', stderr)
- self.assertIn('Directory Creation Test 2 OK', stderr)
- self.assertIn('Directory Creation Test 3 OK', stderr)
- # Directory Reading
- for p in files:
- self.assertIn('Read Directory: {}'.format(p.name), stderr)
- # Directory Attribute Query
- self.assertIn('Query: type = ', stderr)
- # Directory Attribute Query by Handle
- self.assertIn('Query by Handle: type = ', stderr)
- # Directory Deletion
- self.assertFalse(pathlib.Path('dir_delete.tmp').exists())
- def test_200_event(self):
- stdout, stderr = self.run_binary(['Event'])
- self.assertIn('Wait with too short timeout ok.', stderr)
- self.assertIn('Wait with long enough timeout ok.', stderr)
- def test_210_semaphore(self):
- stdout, stderr = self.run_binary(['Semaphore'])
- # Semaphore: Timeout on Locked Semaphores
- self.assertIn('Locked binary semaphore timed out (1000).', stderr)
- self.assertIn('Locked binary semaphore timed out (0).', stderr)
- # Semaphore: Acquire Unlocked Semaphores
- self.assertIn('Locked binary semaphore successfully (-1).', stderr)
- self.assertIn('Locked binary semaphore successfully (0).', stderr)
- def test_300_memory(self):
- stdout, stderr = self.run_binary(['Memory'])
- # Memory Allocation
- self.assertIn('Memory Allocation OK', stderr)
- # Memory Allocation with Address
- self.assertIn('Memory Allocation with Address OK', stderr)
- # Get Memory Total Quota
- self.assertIn('Total Memory:', stderr)
- for line in stderr.split('\n'):
- if line.startswith('Total Memory:'):
- self.assertNotEqual(line, 'Total Memory: 0')
- # Get Memory Available Quota
- self.assertIn('Get Memory Available Quota OK', stderr)
- @expectedFailureIf(HAS_SGX)
- def test_301_memory_nosgx(self):
- stdout, stderr = self.run_binary(['Memory'])
- # SGX1 does not support unmapping a page or changing its permission
- # after enclave init. Therefore the memory protection and deallocation
- # tests will fail. By utilizing SGX2 it's possibile to fix this.
- # Memory Protection
- self.assertIn('Memory Allocation Protection (RW) OK', stderr)
- self.assertIn('Memory Protection (R) OK', stderr)
- # Memory Deallocation
- self.assertIn('Memory Deallocation OK', stderr)
- def test_400_pipe(self):
- stdout, stderr = self.run_binary(['Pipe'])
- # Pipe Creation
- self.assertIn('Pipe Creation 1 OK', stderr)
- # Pipe Attributes
- self.assertIn('Pipe Attribute Query 1 on pipesrv returned OK', stderr)
- # Pipe Connection
- self.assertIn('Pipe Connection 1 OK', stderr)
- # Pipe Transmission
- self.assertIn('Pipe Write 1 OK', stderr)
- self.assertIn('Pipe Read 1: Hello World 1', stderr)
- self.assertIn('Pipe Write 2 OK', stderr)
- self.assertIn('Pipe Read 2: Hello World 2', stderr)
- def test_410_socket(self):
- stdout, stderr = self.run_binary(['Socket'])
- # TCP Socket Creation
- self.assertIn('TCP Creation 1 OK', stderr)
- # TCP Socket Connection
- self.assertIn('TCP Connection 1 OK', stderr)
- # TCP Socket Transmission
- self.assertIn('TCP Write 1 OK', stderr)
- self.assertIn('TCP Read 1: Hello World 1', stderr)
- self.assertIn('TCP Write 2 OK', stderr)
- self.assertIn('TCP Read 2: Hello World 2', stderr)
- # UDP Socket Creation
- self.assertIn('UDP Creation 1 OK', stderr)
- # UDP Socket Connection
- self.assertIn('UDP Connection 1 OK', stderr)
- # UDP Socket Transmission
- self.assertIn('UDP Write 1 OK', stderr)
- self.assertIn('UDP Read 1: Hello World 1', stderr)
- self.assertIn('UDP Write 2 OK', stderr)
- self.assertIn('UDP Read 2: Hello World 2', stderr)
- # Bound UDP Socket Transmission
- self.assertIn('UDP Write 3 OK', stderr)
- self.assertIn('UDP Read 3: Hello World 1', stderr)
- self.assertIn('UDP Write 4 OK', stderr)
- self.assertIn('UDP Read 4: Hello World 2', stderr)
- def test_500_thread(self):
- stdout, stderr = self.run_binary(['Thread'])
- # Thread Creation
- self.assertIn('Child Thread Created', stderr)
- self.assertIn('Run in Child Thread: Hello World', stderr)
- # Multiple Threads Run in Parallel
- self.assertIn('Threads Run in Parallel OK', stderr)
- # Set Thread Private Segment Register
- self.assertIn('Private Message (FS Segment) 1: Hello World 1', stderr)
- self.assertIn('Private Message (FS Segment) 2: Hello World 2', stderr)
- # Thread Exit
- self.assertIn('Child Thread Exited', stderr)
- def test_510_thread2(self):
- stdout, stderr = self.run_binary(['Thread2'])
- # Thread Cleanup: Exit by return.
- self.assertIn('Thread 2 ok.', stderr)
- # Thread Cleanup: Exit by DkThreadExit.
- self.assertIn('Thread 3 ok.', stderr)
- self.assertNotIn('Exiting thread 3 failed.', stderr)
- # Thread Cleanup: Can still start threads.
- self.assertIn('Thread 4 ok.', stderr)
- def test_900_misc(self):
- stdout, stderr = self.run_binary(['Misc'])
- # Query System Time
- self.assertIn('Query System Time OK', stderr)
- # Delay Execution for 10000 Microseconds
- self.assertIn('Delay Execution for 10000 Microseconds OK', stderr)
- # Delay Execution for 3 Seconds
- self.assertIn('Delay Execution for 3 Seconds OK', stderr)
- # Generate Random Bits
- self.assertIn('Generate Random Bits OK', stderr)
- def test_910_hex(self):
- stdout, stderr = self.run_binary(['Hex'])
- # Hex 2 String Helper Function
- self.assertIn('Hex test 1 is deadbeef', stderr)
- self.assertIn('Hex test 2 is cdcdcdcdcdcdcdcd', stderr)
- class TC_21_ProcessCreation(RegressionTestCase):
- def test_100_process(self):
- stdout, stderr = self.run_binary(['Process'], timeout=8)
- counter = collections.Counter(stderr.split('\n'))
- # Process Creation
- self.assertEqual(counter['Child Process Created'], 3)
- # Process Creation Arguments
- self.assertEqual(counter['argv[0] = Process'], 3)
- self.assertEqual(counter['argv[1] = Child'], 3)
- # Process Channel Transmission
- self.assertEqual(counter['Process Write 1 OK'], 3)
- self.assertEqual(counter['Process Read 1: Hello World 1'], 3)
- self.assertEqual(counter['Process Write 2 OK'], 3)
- self.assertEqual(counter['Process Read 2: Hello World 2'], 3)
- def test_110_process_broadcast(self):
- stdout, stderr = self.run_binary(['Process'], timeout=8)
- counter = collections.Counter(stderr.split('\n'))
- # Multi-Process Broadcast Channel Transmission
- if ('Warning: broadcast stream is not open. '
- 'Do you have a multicast route configured?') in stderr:
- self.skipTest('Could not open broadcast stream. '
- 'Do you have a multicast route configured?')
- self.assertEqual(counter['Broadcast Write OK'], 1)
- self.assertEqual(counter['Broadcast Read: Hello World 1'], 3)
- def test_200_process2(self):
- # Process Creation with a Different Binary
- stdout, stderr = self.run_binary(['Process2'])
- counter = collections.Counter(stderr.split('\n'))
- self.assertEqual(counter['User Program Started'], 1)
- def test_300_process3(self):
- # Process Creation without Executable
- stdout, stderr = self.run_binary(['Process3'])
- counter = collections.Counter(stderr.split('\n'))
- self.assertEqual(counter['Binary 1 Preloaded'], 2)
- self.assertEqual(counter['Binary 2 Preloaded'], 2)
- class TC_23_SendHandle(RegressionTestCase):
- def test_000_send_handle(self):
- stdout, stderr = self.run_binary(['SendHandle'])
- counter = collections.Counter(stderr.split('\n'))
- # Send and Receive Handles across Processes
- self.assertEqual(counter['Send Handle OK'], 3)
- self.assertEqual(counter['Receive Handle OK'], 3)
- # Send Pipe Handle
- self.assertEqual(counter['Receive Pipe Handle: Hello World'], 1)
- # Send Socket Handle
- self.assertEqual(counter['Receive Socket Handle: Hello World'], 1)
- # Send File Handle
- self.assertEqual(counter['Receive File Handle: Hello World'], 1)
- @unittest.skipUnless(HAS_SGX, 'need SGX')
- class TC_40_AVXDisable(RegressionTestCase):
- @unittest.expectedFailure
- def test_000_avx_disable(self):
- # Disable AVX bit in XFRM
- stdout, stderr = self.run_binary(['AvxDisable'])
- self.assertIn('Illegal instruction executed in enclave', stderr)
- @unittest.skipUnless(HAS_SGX, 'need SGX')
- class TC_50_Attestation(RegressionTestCase):
- def test_000_remote_attestation(self):
- stdout, stderr = self.run_binary(["Attestation"])
- for line in stderr.split("\n"):
- # Check the attestation status
- if line.startswith("Attestation status:"):
- status = line[19:].strip()
- self.assertIn(status, ["OK", "GROUP_OUT_OF_DATE", "CONFIGURATION_NEEDED"])
- # Check the timestamp
- if line.startswith("Attestation timestamp:"):
- timestamp = datetime.strptime(line[22:].strip(), "%Y-%m-%dT%H:%M:%S.%f")
- # The timestamp may be in another time zone, but should be
- # within 24 hours of the current time.
- self.assertTrue(datetime.now() - timedelta(hours=24) <= timestamp and \
- datetime.now() + timedelta(hours=24) >= timestamp);
|