| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596 | #!/usr/bin/env python3import astimport collectionsimport mmapimport osimport pathlibimport randomimport shutilimport stringimport subprocessimport unittestfrom datetime import datetime, timedeltafrom regression import (    HAS_SGX,    RegressionTestCase,    expectedFailureIf,)CPUINFO_FLAGS_WHITELIST = [    'fpu', 'vme', 'de', 'pse', 'tsc', 'msr', 'pae', 'mce', 'cx8', 'apic', 'sep',    'mtrr', 'pge', 'mca', 'cmov', 'pat', 'pse36', 'pn', 'clflush', 'dts',    'acpi', 'mmx', 'fxsr', 'sse', 'sse2', 'ss', 'ht', 'tm', 'ia64', 'pbe',]class TC_00_Basic(RegressionTestCase):    def test_000_atomic_math(self):        stdout, stderr = self.run_binary(['AtomicMath'])        self.assertIn('Subtract INT_MIN: Both values match 2147483648', stderr)        self.assertIn('Subtract INT_MAX: Both values match -2147483647', stderr)        self.assertIn('Subtract LLONG_MIN: Both values match -9223372036854775808', stderr)        self.assertIn('Subtract LLONG_MAX: Both values match -9223372036854775807', stderr)    def test_001_path_normalization(self):        stdout, stderr = self.run_binary(['normalize_path'])        self.assertIn("Success!\n", stderr)class TC_01_Bootstrap(RegressionTestCase):    def test_100_basic_boostrapping(self):        stdout, stderr = self.run_binary(['Bootstrap'])        # Basic Bootstrapping        self.assertIn('User Program Started', stderr)        # Control Block: Executable Name        self.assertIn('Loaded Executable: file:Bootstrap', stderr)        # One Argument Given        self.assertIn('# of Arguments: 1', stderr)        self.assertIn('argv[0] = Bootstrap', stderr)        # Control Block: Debug Stream (Inline)        self.assertIn('Written to Debug Stream', stdout)        # Control Block: Allocation Alignment        self.assertIn('Allocation Alignment: {}'.format(mmap.ALLOCATIONGRANULARITY), stderr)        # Control Block: Executable Range        self.assertIn('Executable Range OK', stderr)    def test_101_basic_boostrapping_five_arguments(self):        stdout, stderr = self.run_binary(['Bootstrap', 'a', 'b', 'c', 'd'])        # Five Arguments Given        self.assertIn('# of Arguments: 5', stderr)        self.assertIn('argv[1] = a', stderr)        self.assertIn('argv[2] = b', stderr)        self.assertIn('argv[3] = c', stderr)        self.assertIn('argv[4] = d', stderr)    def test_102_cpuinfo(self):        with open('/proc/cpuinfo') as file:            cpuinfo = file.read().strip().split('\n\n')[-1]        cpuinfo = dict(map(str.strip, line.split(':'))            for line in cpuinfo.split('\n'))        if 'flags' in cpuinfo:            cpuinfo['flags'] = ' '.join(flag for flag in cpuinfo['flags']                if flag in CPUINFO_FLAGS_WHITELIST)        stdout, stderr = self.run_binary(['Bootstrap'])        self.assertIn('CPU num: {}'.format(int(cpuinfo['processor']) + 1),            stderr)        self.assertIn('CPU vendor: {[vendor_id]}'.format(cpuinfo), stderr)        self.assertIn('CPU brand: {[model name]}'.format(cpuinfo), stderr)        self.assertIn('CPU family: {[cpu family]}'.format(cpuinfo), stderr)        self.assertIn('CPU model: {[model]}'.format(cpuinfo), stderr)        self.assertIn('CPU stepping: {[stepping]}'.format(cpuinfo), stderr)        self.assertIn('CPU flags: {[flags]}'.format(cpuinfo), stderr)    def test_103_dotdot(self):        stdout, stderr = self.run_binary(['..Bootstrap'])        self.assertIn('User Program Started', stderr)    def test_104_manifest_as_executable_name(self):        manifest = self.get_manifest('Bootstrap2')        stdout, stderr = self.run_binary([manifest])        self.assertIn('User Program Started', stderr)        self.assertIn('Loaded Manifest: file:' + manifest, stderr)    def test_105_manifest_as_argument(self):        manifest = self.get_manifest('Bootstrap4')        stdout, stderr = self.run_binary([manifest])        self.assertIn('Loaded Manifest: file:' + manifest, stderr)        self.assertIn('Loaded Executable: file:Bootstrap', stderr)    def test_106_manifest_with_shebang(self):        manifest = self.get_manifest('Bootstrap4')        stdout, stderr = self.run_binary(['./' + manifest])        self.assertIn('Loaded Manifest: file:' + manifest, stderr)        self.assertIn('Loaded Executable: file:Bootstrap', stderr)        self.assertIn('argv[0] = Bootstrap', stderr)    @unittest.skipUnless(HAS_SGX, 'need SGX')    def test_107_manifest_with_nonelf_binary(self):        manifest = self.get_manifest('nonelf_binary')        #Expect return code is -ENOEXEC(248 as unsigned char)        with self.expect_returncode(248):            self.run_binary([manifest])    def test_110_preload_libraries(self):        stdout, stderr = self.run_binary(['Bootstrap3'])        self.assertIn('Binary 1 Preloaded', stderr)        self.assertIn('Binary 2 Preloaded', stderr)        self.assertIn('Preloaded Function 1 Called', stderr)        self.assertIn('Preloaded Function 2 Called', stderr)    def test_111_preload_libraries(self):        # Bootstrap without Executable but Preload Libraries        stdout, stderr = self.run_binary([self.get_manifest('Bootstrap5')])        self.assertIn('Binary 1 Preloaded', stderr)        self.assertIn('Binary 2 Preloaded', stderr)    @unittest.skipUnless(HAS_SGX, 'this test requires SGX')    def test_120_8gb_enclave(self):        manifest = self.get_manifest('Bootstrap6')        stdout, stderr = self.run_binary([manifest], timeout=360)        self.assertIn('Loaded Manifest: file:' + manifest, stderr)        self.assertIn('Executable Range OK', stderr)    def test_130_large_number_of_items_in_manifest(self):        stdout, stderr = self.run_binary([self.get_manifest('Bootstrap7')])        self.assertIn('key1000=na', stderr)        self.assertIn('key1=na', stderr)    @unittest.skip('this is broken on non-SGX, see #860')    def test_140_missing_executable_and_manifest(self):        try:            stdout, stderr = self.run_binary(['fakenews'])            self.fail(                'expected non-zero returncode, stderr: {!r}'.format(stderr))        except subprocess.CalledProcessError as e:            self.assertIn('USAGE: ', e.stderr.decode())class TC_02_Symbols(RegressionTestCase):    ALL_SYMBOLS = [        'DkVirtualMemoryAlloc',        'DkVirtualMemoryFree',        'DkVirtualMemoryProtect',        'DkProcessCreate',        'DkProcessExit',        'DkStreamOpen',        'DkStreamWaitForClient',        'DkStreamRead',        'DkStreamWrite',        'DkStreamDelete',        'DkStreamMap',        'DkStreamUnmap',        'DkStreamSetLength',        'DkStreamFlush',        'DkSendHandle',        'DkReceiveHandle',        'DkStreamAttributesQuery',        'DkStreamAttributesQueryByHandle',        'DkStreamAttributesSetByHandle',        'DkStreamGetName',        'DkStreamChangeName',        'DkThreadCreate',        'DkThreadDelayExecution',        'DkThreadYieldExecution',        'DkThreadExit',        'DkThreadResume',        'DkSetExceptionHandler',        'DkExceptionReturn',        'DkMutexCreate',        'DkMutexRelease',        'DkNotificationEventCreate',        'DkSynchronizationEventCreate',        'DkEventSet',        'DkEventClear',        'DkObjectsWaitAny',        'DkObjectClose',        'DkSystemTimeQuery',        'DkRandomBitsRead',        'DkInstructionCacheFlush',        'DkSegmentRegister',        'DkMemoryAvailableQuota',    ]    def test_000_symbols(self):        stdout, stderr = self.run_binary(['Symbols'])        found_symbols = dict(line.split(' = ')            for line in stderr.strip().split('\n') if line.startswith('Dk'))        self.assertCountEqual(found_symbols, self.ALL_SYMBOLS)        for k, v in found_symbols.items():            v = ast.literal_eval(v)            self.assertNotEqual(v, 0, 'symbol {} has value 0'.format(k))class TC_10_Exception(RegressionTestCase):    def test_000_exception(self):        stdout, stderr = self.run_binary(['Exception'])        # Exception Handling (Div-by-Zero)        self.assertIn('Arithmetic Exception Handler', stderr)        # Exception Handling (Memory Fault)        self.assertIn('Memory Fault Exception Handler', stderr)        # Exception Handler Swap        self.assertIn('Arithmetic Exception Handler 1', stderr)        self.assertIn('Arithmetic Exception Handler 2', stderr)        # Exception Handling (Set Context)        self.assertIn('Arithmetic Exception Handler 1', stderr)        # Exception Handling (Red zone)        self.assertIn('Red zone test ok.', stderr)class TC_20_SingleProcess(RegressionTestCase):    def test_000_exit_code(self):        with self.expect_returncode(112):            self.run_binary(['Exit'])    def test_100_file(self):        try:            pathlib.Path('file_nonexist.tmp').unlink()        except FileNotFoundError:            pass        pathlib.Path('file_delete.tmp').touch()        with open('File', 'rb') as file:            file_exist = file.read()        stdout, stderr = self.run_binary(['File'])        # Basic File Opening        self.assertIn('File Open Test 1 OK', stderr)        self.assertIn('File Open Test 2 OK', stderr)        self.assertIn('File Open Test 3 OK', stderr)        # Basic File Creation        self.assertIn('File Creation Test 1 OK', stderr)        self.assertIn('File Creation Test 2 OK', stderr)        self.assertIn('File Creation Test 3 OK', stderr)        # File Reading        self.assertIn('Read Test 1 (0th - 40th): {}'.format(            file_exist[0:40].hex()), stderr)        self.assertIn('Read Test 2 (0th - 40th): {}'.format(            file_exist[0:40].hex()), stderr)        self.assertIn('Read Test 3 (200th - 240th): {}'.format(            file_exist[200:240].hex()), stderr)        # File Writing        with open('file_nonexist.tmp', 'rb') as file:            file_nonexist = file.read()        self.assertEqual(file_exist[0:40], file_nonexist[200:240])        self.assertEqual(file_exist[200:240], file_nonexist[0:40])        # File Attribute Query        self.assertIn('Query: type = ', stderr)        self.assertIn(', size = {}'.format(len(file_exist)), stderr)        # File Attribute Query by Handle        self.assertIn('Query by Handle: type = ', stderr)        self.assertIn(', size = {}'.format(len(file_exist)), stderr)        # File Mapping        self.assertIn(            'Map Test 1 (0th - 40th): {}'.format(file_exist[0:40].hex()),            stderr)        self.assertIn(            'Map Test 2 (200th - 240th): {}'.format(file_exist[200:240].hex()),            stderr)        self.assertIn(            'Map Test 3 (4096th - 4136th): {}'.format(file_exist[4096:4136].hex()),            stderr)        self.assertIn(            'Map Test 4 (4296th - 4336th): {}'.format(file_exist[4296:4336].hex()),            stderr)        # Set File Length        self.assertEqual(            pathlib.Path('file_nonexist.tmp').stat().st_size,            mmap.ALLOCATIONGRANULARITY)        # File Deletion        self.assertFalse(pathlib.Path('file_delete.tmp').exists())    @unittest.skipUnless(HAS_SGX, 'this test requires SGX')    def test_101_nonexist_file(self):        # Explicitly remove the file file_nonexist_disallowed.tmp before        # running binary. Otherwise this test will fail if these tests are        # run repeatedly.        os.remove('file_nonexist_disallowed.tmp')        stdout, stderr = self.run_binary(['File'])        # Run file creation for non-existing file. This behavior is        # disallowed unless sgx.allow_file_creation is explicitly set to 1.        self.assertIn('File Creation Test 4 OK', stderr)    def test_110_directory(self):        for path in ['dir_exist.tmp', 'dir_nonexist.tmp', 'dir_delete.tmp']:            try:                shutil.rmtree(path)            except FileNotFoundError:                pass        path = pathlib.Path('dir_exist.tmp')        files = [path / ''.join(random.choice(string.ascii_letters)                for j in range(8))            for i in range(5)]        path.mkdir()        for p in files:            p.touch()        pathlib.Path('dir_delete.tmp').mkdir()        stdout, stderr = self.run_binary(['Directory'])        # Basic Directory Opening        self.assertIn('Directory Open Test 1 OK', stderr)        self.assertIn('Directory Open Test 2 OK', stderr)        self.assertIn('Directory Open Test 3 OK', stderr)        # Basic Directory Creation        self.assertIn('Directory Creation Test 1 OK', stderr)        self.assertIn('Directory Creation Test 2 OK', stderr)        self.assertIn('Directory Creation Test 3 OK', stderr)        # Directory Reading        for p in files:            self.assertIn('Read Directory: {}'.format(p.name), stderr)        # Directory Attribute Query        self.assertIn('Query: type = ', stderr)        # Directory Attribute Query by Handle        self.assertIn('Query by Handle: type = ', stderr)        # Directory Deletion        self.assertFalse(pathlib.Path('dir_delete.tmp').exists())    def test_200_event(self):        stdout, stderr = self.run_binary(['Event'])        self.assertIn('Wait with too short timeout ok.', stderr)        self.assertIn('Wait with long enough timeout ok.', stderr)    def test_210_semaphore(self):        stdout, stderr = self.run_binary(['Semaphore'])        # Semaphore: Timeout on Locked Semaphores        self.assertIn('Locked binary semaphore timed out (1000).', stderr)        self.assertIn('Locked binary semaphore timed out (0).', stderr)        # Semaphore: Acquire Unlocked Semaphores        self.assertIn('Locked binary semaphore successfully (-1).', stderr)        self.assertIn('Locked binary semaphore successfully (0).', stderr)    def test_300_memory(self):        stdout, stderr = self.run_binary(['Memory'])        # Memory Allocation        self.assertIn('Memory Allocation OK', stderr)        # Memory Allocation with Address        self.assertIn('Memory Allocation with Address OK', stderr)        # Get Memory Total Quota        self.assertIn('Total Memory:', stderr)        for line in stderr.split('\n'):            if line.startswith('Total Memory:'):                self.assertNotEqual(line, 'Total Memory: 0')        # Get Memory Available Quota        self.assertIn('Get Memory Available Quota OK', stderr)    @expectedFailureIf(HAS_SGX)    def test_301_memory_nosgx(self):        stdout, stderr = self.run_binary(['Memory'])        # SGX1 does not support unmapping a page or changing its permission        # after enclave init. Therefore the memory protection and deallocation        # tests will fail. By utilizing SGX2 it's possibile to fix this.        # Memory Protection        self.assertIn('Memory Allocation Protection (RW) OK', stderr)        self.assertIn('Memory Protection (R) OK', stderr)        # Memory Deallocation        self.assertIn('Memory Deallocation OK', stderr)    def test_400_pipe(self):        stdout, stderr = self.run_binary(['Pipe'])        # Pipe Creation        self.assertIn('Pipe Creation 1 OK', stderr)        # Pipe Attributes        self.assertIn('Pipe Attribute Query 1 on pipesrv returned OK', stderr)        # Pipe Connection        self.assertIn('Pipe Connection 1 OK', stderr)        # Pipe Transmission        self.assertIn('Pipe Write 1 OK', stderr)        self.assertIn('Pipe Read 1: Hello World 1', stderr)        self.assertIn('Pipe Write 2 OK', stderr)        self.assertIn('Pipe Read 2: Hello World 2', stderr)    def test_410_socket(self):        stdout, stderr = self.run_binary(['Socket'])        # TCP Socket Creation        self.assertIn('TCP Creation 1 OK', stderr)        # TCP Socket Connection        self.assertIn('TCP Connection 1 OK', stderr)        # TCP Socket Transmission        self.assertIn('TCP Write 1 OK', stderr)        self.assertIn('TCP Read 1: Hello World 1', stderr)        self.assertIn('TCP Write 2 OK', stderr)        self.assertIn('TCP Read 2: Hello World 2', stderr)        # UDP Socket Creation        self.assertIn('UDP Creation 1 OK', stderr)        # UDP Socket Connection        self.assertIn('UDP Connection 1 OK', stderr)        # UDP Socket Transmission        self.assertIn('UDP Write 1 OK', stderr)        self.assertIn('UDP Read 1: Hello World 1', stderr)        self.assertIn('UDP Write 2 OK', stderr)        self.assertIn('UDP Read 2: Hello World 2', stderr)        # Bound UDP Socket Transmission        self.assertIn('UDP Write 3 OK', stderr)        self.assertIn('UDP Read 3: Hello World 1', stderr)        self.assertIn('UDP Write 4 OK', stderr)        self.assertIn('UDP Read 4: Hello World 2', stderr)    def test_500_thread(self):        stdout, stderr = self.run_binary(['Thread'])        # Thread Creation        self.assertIn('Child Thread Created', stderr)        self.assertIn('Run in Child Thread: Hello World', stderr)        # Multiple Threads Run in Parallel        self.assertIn('Threads Run in Parallel OK', stderr)        # Set Thread Private Segment Register        self.assertIn('Private Message (FS Segment) 1: Hello World 1', stderr)        self.assertIn('Private Message (FS Segment) 2: Hello World 2', stderr)        # Thread Exit        self.assertIn('Child Thread Exited', stderr)    def test_510_thread2(self):        stdout, stderr = self.run_binary(['Thread2'])        # Thread Cleanup: Exit by return.        self.assertIn('Thread 2 ok.', stderr)        # Thread Cleanup: Exit by DkThreadExit.        self.assertIn('Thread 3 ok.', stderr)        self.assertNotIn('Exiting thread 3 failed.', stderr)        # Thread Cleanup: Can still start threads.        self.assertIn('Thread 4 ok.', stderr)    def test_900_misc(self):        stdout, stderr = self.run_binary(['Misc'])        # Query System Time        self.assertIn('Query System Time OK', stderr)        # Delay Execution for 10000 Microseconds        self.assertIn('Delay Execution for 10000 Microseconds OK', stderr)        # Delay Execution for 3 Seconds        self.assertIn('Delay Execution for 3 Seconds OK', stderr)        # Generate Random Bits        self.assertIn('Generate Random Bits OK', stderr)    def test_910_hex(self):        stdout, stderr = self.run_binary(['Hex'])        # Hex 2 String Helper Function        self.assertIn('Hex test 1 is deadbeef', stderr)        self.assertIn('Hex test 2 is cdcdcdcdcdcdcdcd', stderr)class TC_21_ProcessCreation(RegressionTestCase):    def test_100_process(self):        stdout, stderr = self.run_binary(['Process'], timeout=8)        counter = collections.Counter(stderr.split('\n'))        # Process Creation        self.assertEqual(counter['Child Process Created'], 3)        # Process Creation Arguments        self.assertEqual(counter['argv[0] = Process'], 3)        self.assertEqual(counter['argv[1] = Child'], 3)        # Process Channel Transmission        self.assertEqual(counter['Process Write 1 OK'], 3)        self.assertEqual(counter['Process Read 1: Hello World 1'], 3)        self.assertEqual(counter['Process Write 2 OK'], 3)        self.assertEqual(counter['Process Read 2: Hello World 2'], 3)    def test_110_process_broadcast(self):        stdout, stderr = self.run_binary(['Process'], timeout=8)        counter = collections.Counter(stderr.split('\n'))        # Multi-Process Broadcast Channel Transmission        if ('Warning: broadcast stream is not open. '                'Do you have a multicast route configured?') in stderr:            self.skipTest('Could not open broadcast stream. '                'Do you have a multicast route configured?')        self.assertEqual(counter['Broadcast Write OK'], 1)        self.assertEqual(counter['Broadcast Read: Hello World 1'], 3)    def test_200_process2(self):        # Process Creation with a Different Binary        stdout, stderr = self.run_binary(['Process2'])        counter = collections.Counter(stderr.split('\n'))        self.assertEqual(counter['User Program Started'], 1)    def test_300_process3(self):        # Process Creation without Executable        stdout, stderr = self.run_binary(['Process3'])        counter = collections.Counter(stderr.split('\n'))        self.assertEqual(counter['Binary 1 Preloaded'], 2)        self.assertEqual(counter['Binary 2 Preloaded'], 2)class TC_23_SendHandle(RegressionTestCase):    def test_000_send_handle(self):        stdout, stderr = self.run_binary(['SendHandle'])        counter = collections.Counter(stderr.split('\n'))        # Send and Receive Handles across Processes        self.assertEqual(counter['Send Handle OK'], 3)        self.assertEqual(counter['Receive Handle OK'], 3)        # Send Pipe Handle        self.assertEqual(counter['Receive Pipe Handle: Hello World'], 1)        # Send Socket Handle        self.assertEqual(counter['Receive Socket Handle: Hello World'], 1)        # Send File Handle        self.assertEqual(counter['Receive File Handle: Hello World'], 1)@unittest.skipUnless(HAS_SGX, 'need SGX')class TC_40_AVXDisable(RegressionTestCase):    @unittest.expectedFailure    def test_000_avx_disable(self):        # Disable AVX bit in XFRM        stdout, stderr = self.run_binary(['AvxDisable'])        self.assertIn('Illegal instruction executed in enclave', stderr)@unittest.skipUnless(HAS_SGX, 'need SGX')class TC_50_Attestation(RegressionTestCase):    def test_000_remote_attestation(self):        stdout, stderr = self.run_binary(["Attestation"])        for line in stderr.split("\n"):            # Check the attestation status            if line.startswith("Attestation status:"):                status = line[19:].strip()                self.assertIn(status, ["OK", "GROUP_OUT_OF_DATE", "CONFIGURATION_NEEDED"])            # Check the timestamp            if line.startswith("Attestation timestamp:"):                timestamp = datetime.strptime(line[22:].strip(), "%Y-%m-%dT%H:%M:%S.%f")                # The timestamp may be in another time zone, but should be                # within 24 hours of the current time.                self.assertTrue(datetime.now() - timedelta(hours=24) <= timestamp and \                                datetime.now() + timedelta(hours=24) >= timestamp);
 |