123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292 |
- #!/usr/bin/env python3
- import filecmp
- import os
- import shutil
- import sys
- import unittest
- from regression import (
- HAS_SGX,
- RegressionTestCase,
- expectedFailureIf,
- )
- # Generic FS tests that mimic probable usage patterns in applications.
- class TC_00_FileSystem(RegressionTestCase):
- @classmethod
- def setUpClass(c):
- c.FILE_SIZES = [0, 1, 2, 15, 16, 17, 255, 256, 257, 1023, 1024, 1025, 65535, 65536, 65537, 1048575, 1048576, 1048577]
- c.TEST_DIR = 'tmp'
- c.INDEXES = range(len(c.FILE_SIZES))
- c.INPUT_DIR = os.path.join(c.TEST_DIR, 'input')
- c.INPUT_FILES = [os.path.join(c.INPUT_DIR, str(x)) for x in c.FILE_SIZES]
- c.OUTPUT_DIR = os.path.join(c.TEST_DIR, 'output')
- c.OUTPUT_FILES = [os.path.join(c.OUTPUT_DIR, str(x)) for x in c.FILE_SIZES]
- # create directory structure and test files
- os.mkdir(c.TEST_DIR)
- os.mkdir(c.INPUT_DIR)
- for i in c.INDEXES:
- with open(c.INPUT_FILES[i], 'wb') as file:
- file.write(os.urandom(c.FILE_SIZES[i]))
- @classmethod
- def tearDownClass(c):
- shutil.rmtree(c.TEST_DIR)
- def setUp(self):
- # clean output for each test
- shutil.rmtree(self.OUTPUT_DIR, ignore_errors=True)
- os.mkdir(self.OUTPUT_DIR)
- # copy input file to output dir (for tests that alter the file so input remains untouched)
- def copy_input(self, input, output):
- shutil.copy(input, output)
- def verify_open_close(self, stdout, stderr, input_path, output_path):
- self.assertNotIn('ERROR: ', stderr)
- self.assertIn('open(' + input_path + ') input OK', stdout)
- self.assertIn('close(' + input_path + ') input OK', stdout)
- self.assertIn('open(' + input_path + ') input 1 OK', stdout)
- self.assertIn('open(' + input_path + ') input 2 OK', stdout)
- self.assertIn('close(' + input_path + ') input 1 OK', stdout)
- self.assertIn('close(' + input_path + ') input 2 OK', stdout)
- self.assertIn('fopen(' + input_path + ') input OK', stdout)
- self.assertIn('fclose(' + input_path + ') input OK', stdout)
- self.assertIn('fopen(' + input_path + ') input 1 OK', stdout)
- self.assertIn('fopen(' + input_path + ') input 2 OK', stdout)
- self.assertIn('fclose(' + input_path + ') input 1 OK', stdout)
- self.assertIn('fclose(' + input_path + ') input 2 OK', stdout)
- self.assertIn('open(' + output_path + ') output OK', stdout)
- self.assertIn('close(' + output_path + ') output OK', stdout)
- self.assertTrue(os.path.isfile(output_path))
- self.assertIn('open(' + output_path + ') output 1 OK', stdout)
- self.assertIn('open(' + output_path + ') output 2 OK', stdout)
- self.assertIn('close(' + output_path + ') output 1 OK', stdout)
- self.assertIn('close(' + output_path + ') output 2 OK', stdout)
- self.assertIn('fopen(' + output_path + ') output OK', stdout)
- self.assertIn('fclose(' + output_path + ') output OK', stdout)
- self.assertIn('fopen(' + output_path + ') output 1 OK', stdout)
- self.assertIn('fopen(' + output_path + ') output 2 OK', stdout)
- self.assertIn('fclose(' + output_path + ') output 1 OK', stdout)
- self.assertIn('fclose(' + output_path + ') output 2 OK', stdout)
- def test_100_open_close(self):
- input_path = self.INPUT_FILES[-1] # existing file
- output_path = os.path.join(self.OUTPUT_DIR, 'test_100') # new file to be created
- stdout, stderr = self.run_binary(['open_close', input_path, output_path])
- self.verify_open_close(stdout, stderr, input_path, output_path)
- def test_110_read_write(self):
- file_path = os.path.join(self.OUTPUT_DIR, 'test_110') # new file to be created
- stdout, stderr = self.run_binary(['read_write', file_path])
- self.assertNotIn('ERROR: ', stderr)
- self.assertTrue(os.path.isfile(file_path))
- self.assertIn('open(' + file_path + ') RW OK', stdout)
- self.assertIn('write(' + file_path + ') RW OK', stdout)
- self.assertIn('seek(' + file_path + ') RW OK', stdout)
- self.assertIn('read(' + file_path + ') RW OK', stdout)
- self.assertIn('compare(' + file_path + ') RW OK', stdout)
- self.assertIn('close(' + file_path + ') RW OK', stdout)
- def verify_seek_tell(self, stdout, stderr, input_path, output_path_1, output_path_2, size):
- self.assertNotIn('ERROR: ', stderr)
- self.assertIn('open(' + input_path + ') input OK', stdout)
- self.assertIn('seek(' + input_path + ') input start OK', stdout)
- self.assertIn('seek(' + input_path + ') input end OK', stdout)
- self.assertIn('tell(' + input_path + ') input end OK: ' + str(size), stdout)
- self.assertIn('seek(' + input_path + ') input rewind OK', stdout)
- self.assertIn('tell(' + input_path + ') input start OK: 0', stdout)
- self.assertIn('close(' + input_path + ') input OK', stdout)
- self.assertIn('fopen(' + input_path + ') input OK', stdout)
- self.assertIn('fseek(' + input_path + ') input start OK', stdout)
- self.assertIn('fseek(' + input_path + ') input end OK', stdout)
- self.assertIn('ftell(' + input_path + ') input end OK: ' + str(size), stdout)
- self.assertIn('fseek(' + input_path + ') input rewind OK', stdout)
- self.assertIn('ftell(' + input_path + ') input start OK: 0', stdout)
- self.assertIn('fclose(' + input_path + ') input OK', stdout)
- self.assertIn('open(' + output_path_1 + ') output OK', stdout)
- self.assertIn('seek(' + output_path_1 + ') output start OK', stdout)
- self.assertIn('seek(' + output_path_1 + ') output end OK', stdout)
- self.assertIn('tell(' + output_path_1 + ') output end OK: ' + str(size), stdout)
- self.assertIn('seek(' + output_path_1 + ') output end 2 OK', stdout)
- self.assertIn('seek(' + output_path_1 + ') output end 3 OK', stdout)
- self.assertIn('tell(' + output_path_1 + ') output end 2 OK: ' + str(size + 4098), stdout)
- self.assertIn('close(' + output_path_1 + ') output OK', stdout)
- self.assertIn('fopen(' + output_path_2 + ') output OK', stdout)
- self.assertIn('fseek(' + output_path_2 + ') output start OK', stdout)
- self.assertIn('fseek(' + output_path_2 + ') output end OK', stdout)
- self.assertIn('ftell(' + output_path_2 + ') output end OK: ' + str(size), stdout)
- self.assertIn('fseek(' + output_path_2 + ') output end 2 OK', stdout)
- self.assertIn('fseek(' + output_path_2 + ') output end 3 OK', stdout)
- self.assertIn('ftell(' + output_path_2 + ') output end 2 OK: ' + str(size + 4098), stdout)
- self.assertIn('fclose(' + output_path_2 + ') output OK', stdout)
- def test_115_seek_tell(self):
- input_path = self.INPUT_FILES[-1] # existing file
- output_path_1 = os.path.join(self.OUTPUT_DIR, 'test_115a') # writable files
- output_path_2 = os.path.join(self.OUTPUT_DIR, 'test_115b')
- self.copy_input(input_path, output_path_1)
- self.copy_input(input_path, output_path_2)
- stdout, stderr = self.run_binary(['seek_tell', input_path, output_path_1, output_path_2])
- self.verify_seek_tell(stdout, stderr, input_path, output_path_1, output_path_2, self.FILE_SIZES[-1])
- def test_120_file_delete(self):
- file_path = 'test_120'
- file_in = self.INPUT_FILES[-1] # existing file to be copied
- file_out_1 = os.path.join(self.OUTPUT_DIR, file_path + 'a')
- file_out_2 = os.path.join(self.OUTPUT_DIR, file_path + 'b')
- file_out_3 = os.path.join(self.OUTPUT_DIR, file_path + 'c')
- file_out_4 = os.path.join(self.OUTPUT_DIR, file_path + 'd')
- file_out_5 = os.path.join(self.OUTPUT_DIR, file_path + 'e')
- # 3 existing files, 2 new files
- self.copy_input(file_in, file_out_1)
- self.copy_input(file_in, file_out_2)
- self.copy_input(file_in, file_out_3)
- stdout, stderr = self.run_binary(['delete', file_out_1, file_out_2, file_out_3, file_out_4, file_out_5])
- # verify
- self.assertNotIn('ERROR: ', stderr)
- self.assertFalse(os.path.isfile(file_out_1))
- self.assertFalse(os.path.isfile(file_out_2))
- self.assertFalse(os.path.isfile(file_out_3))
- self.assertFalse(os.path.isfile(file_out_4))
- self.assertFalse(os.path.isfile(file_out_5))
- self.assertIn('unlink(' + file_out_1 + ') OK', stdout)
- self.assertIn('open(' + file_out_2 + ') input 1 OK', stdout)
- self.assertIn('close(' + file_out_2 + ') input 1 OK', stdout)
- self.assertIn('unlink(' + file_out_2 + ') input 1 OK', stdout)
- self.assertIn('open(' + file_out_3 + ') input 2 OK', stdout)
- self.assertIn('unlink(' + file_out_3 + ') input 2 OK', stdout)
- self.assertIn('close(' + file_out_3 + ') input 2 OK', stdout)
- self.assertIn('open(' + file_out_4 + ') output 1 OK', stdout)
- self.assertIn('close(' + file_out_4 + ') output 1 OK', stdout)
- self.assertIn('unlink(' + file_out_4 + ') output 1 OK', stdout)
- self.assertIn('open(' + file_out_5 + ') output 2 OK', stdout)
- self.assertIn('unlink(' + file_out_5 + ') output 2 OK', stdout)
- self.assertIn('close(' + file_out_5 + ') output 2 OK', stdout)
- def verify_stat(self, stdout, stderr, input_path, output_path, size):
- self.assertNotIn('ERROR: ', stderr)
- self.assertIn('stat(' + input_path + ') input 1 OK', stdout)
- self.assertIn('open(' + input_path + ') input 2 OK', stdout)
- self.assertIn('stat(' + input_path + ') input 2 OK: ' + size, stdout)
- self.assertIn('fstat(' + input_path + ') input 2 OK: ' + size, stdout)
- self.assertIn('close(' + input_path + ') input 2 OK', stdout)
- self.assertIn('stat(' + output_path + ') output 1 OK', stdout)
- self.assertIn('open(' + output_path + ') output 2 OK', stdout)
- self.assertIn('stat(' + output_path + ') output 2 OK: ' + size, stdout)
- self.assertIn('fstat(' + output_path + ') output 2 OK: ' + size, stdout)
- self.assertIn('close(' + output_path + ') output 2 OK', stdout)
- def test_130_file_stat(self):
- # running for every file separately so the process doesn't need to enumerate directory
- # (different code path, enumeration also performs stat)
- for i in self.INDEXES:
- input_path = self.INPUT_FILES[i] # existing file
- output_path = self.OUTPUT_FILES[i] # file that will be opened in write mode
- size = str(self.FILE_SIZES[i])
- self.copy_input(input_path, output_path)
- stdout, stderr = self.run_binary(['stat', input_path, output_path])
- self.verify_stat(stdout, stderr, input_path, output_path, size)
- def verify_size(self, file, size):
- self.assertEqual(os.stat(file).st_size, size)
- def do_truncate_test(self, size_in, size_out):
- # prepare paths/files
- i = self.FILE_SIZES.index(size_in)
- input = self.INPUT_FILES[i] # source file to be truncated
- out_1 = self.OUTPUT_FILES[i] + 'a'
- out_2 = self.OUTPUT_FILES[i] + 'b'
- self.copy_input(input, out_1)
- self.copy_input(input, out_2)
- # run test
- stdout, stderr = self.run_binary(['truncate', out_1, out_2, str(size_out)])
- self.assertNotIn('ERROR: ', stderr)
- self.assertIn('truncate(' + out_1 + ') to ' + str(size_out) + ' OK', stdout)
- self.assertIn('open(' + out_2 + ') output OK', stdout)
- self.assertIn('ftruncate(' + out_2 + ') to ' + str(size_out) + ' OK', stdout)
- self.assertIn('close(' + out_2 + ') output OK', stdout)
- self.verify_size(out_1, size_out)
- self.verify_size(out_2, size_out)
- def test_140_file_truncate(self):
- self.do_truncate_test(0, 1)
- self.do_truncate_test(0, 16)
- self.do_truncate_test(0, 65537)
- self.do_truncate_test(1, 0)
- self.do_truncate_test(1, 17)
- self.do_truncate_test(16, 0)
- self.do_truncate_test(16, 1048576)
- self.do_truncate_test(255, 15)
- self.do_truncate_test(255, 256)
- self.do_truncate_test(65537, 65535)
- self.do_truncate_test(65537, 65536)
- def verify_copy_content(self, input, output):
- self.assertTrue(filecmp.cmp(input, output, shallow=False))
- def verify_copy(self, stdout, stderr, input_dir, exec):
- self.assertNotIn('ERROR: ', stderr)
- self.assertIn('opendir(' + input_dir + ') OK', stdout)
- self.assertIn('readdir(.) OK', stdout)
- if input_dir[0] != '/':
- self.assertIn('readdir(..) OK', stdout)
- for i in self.INDEXES:
- size = str(self.FILE_SIZES[i])
- self.assertIn('readdir(' + size + ') OK', stdout)
- self.assertIn('open(' + size + ') input OK', stdout)
- self.assertIn('fstat(' + size + ') input OK', stdout)
- self.assertIn('open(' + size + ') output OK', stdout)
- self.assertIn('fstat(' + size + ') output 1 OK', stdout)
- if exec == 'copy_whole':
- self.assertIn('read_fd(' + size + ') input OK', stdout)
- self.assertIn('write_fd(' + size + ') output OK', stdout)
- if size != '0':
- if 'copy_mmap' in exec:
- self.assertIn('mmap_fd(' + size + ') input OK', stdout)
- self.assertIn('mmap_fd(' + size + ') output OK', stdout)
- self.assertIn('munmap_fd(' + size + ') input OK', stdout)
- self.assertIn('munmap_fd(' + size + ') output OK', stdout)
- if exec == 'copy_mmap_rev':
- self.assertIn('ftruncate(' + size + ') output OK', stdout)
- self.assertIn('fstat(' + size + ') output 2 OK', stdout)
- self.assertIn('close(' + size + ') input OK', stdout)
- self.assertIn('close(' + size + ') output OK', stdout)
- # compare
- for i in self.INDEXES:
- self.verify_copy_content(self.INPUT_FILES[i], self.OUTPUT_FILES[i])
- def do_copy_test(self, exec, timeout):
- stdout, stderr = self.run_binary([exec, self.INPUT_DIR, self.OUTPUT_DIR], timeout=timeout)
- self.verify_copy(stdout, stderr, self.INPUT_DIR, exec)
- def test_200_copy_dir_whole(self):
- self.do_copy_test('copy_whole', 30)
- def test_201_copy_dir_seq(self):
- self.do_copy_test('copy_seq', 60)
- def test_202_copy_dir_rev(self):
- self.do_copy_test('copy_rev', 60)
- @expectedFailureIf(HAS_SGX)
- def test_203_copy_dir_mmap_whole(self):
- self.do_copy_test('copy_mmap_whole', 30)
- @expectedFailureIf(HAS_SGX)
- def test_204_copy_dir_mmap_seq(self):
- self.do_copy_test('copy_mmap_seq', 60)
- @expectedFailureIf(HAS_SGX)
- def test_205_copy_dir_mmap_rev(self):
- self.do_copy_test('copy_mmap_rev', 60)
- def test_210_copy_dir_mounted(self):
- exec = 'copy_whole'
- stdout, stderr = self.run_binary([exec, '/mounted/input', '/mounted/output'], timeout=30)
- self.verify_copy(stdout, stderr, '/mounted/input', exec)
|