|
@@ -0,0 +1,292 @@
|
|
|
+
|
|
|
+
|
|
|
+import filecmp
|
|
|
+import os
|
|
|
+import shutil
|
|
|
+import sys
|
|
|
+import unittest
|
|
|
+
|
|
|
+from regression import (
|
|
|
+ HAS_SGX,
|
|
|
+ RegressionTestCase,
|
|
|
+ expectedFailureIf,
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+class TC_00_FileSystem(RegressionTestCase):
|
|
|
+ @classmethod
|
|
|
+ def setUpClass(c):
|
|
|
+ c.FILE_SIZES = [0, 1, 2, 15, 16, 17, 255, 256, 257, 1023, 1024, 1025, 65535, 65536, 65537, 1048575, 1048576, 1048577]
|
|
|
+ c.TEST_DIR = 'tmp'
|
|
|
+ c.INDEXES = range(len(c.FILE_SIZES))
|
|
|
+ c.INPUT_DIR = os.path.join(c.TEST_DIR, 'input')
|
|
|
+ c.INPUT_FILES = [os.path.join(c.INPUT_DIR, str(x)) for x in c.FILE_SIZES]
|
|
|
+ c.OUTPUT_DIR = os.path.join(c.TEST_DIR, 'output')
|
|
|
+ c.OUTPUT_FILES = [os.path.join(c.OUTPUT_DIR, str(x)) for x in c.FILE_SIZES]
|
|
|
+
|
|
|
+
|
|
|
+ os.mkdir(c.TEST_DIR)
|
|
|
+ os.mkdir(c.INPUT_DIR)
|
|
|
+ for i in c.INDEXES:
|
|
|
+ with open(c.INPUT_FILES[i], 'wb') as file:
|
|
|
+ file.write(os.urandom(c.FILE_SIZES[i]))
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def tearDownClass(c):
|
|
|
+ shutil.rmtree(c.TEST_DIR)
|
|
|
+
|
|
|
+ def setUp(self):
|
|
|
+
|
|
|
+ shutil.rmtree(self.OUTPUT_DIR, ignore_errors=True)
|
|
|
+ os.mkdir(self.OUTPUT_DIR)
|
|
|
+
|
|
|
+
|
|
|
+ def copy_input(self, input, output):
|
|
|
+ shutil.copy(input, output)
|
|
|
+
|
|
|
+ def verify_open_close(self, stdout, stderr, input_path, output_path):
|
|
|
+ self.assertNotIn('ERROR: ', stderr)
|
|
|
+ self.assertIn('open(' + input_path + ') input OK', stdout)
|
|
|
+ self.assertIn('close(' + input_path + ') input OK', stdout)
|
|
|
+ self.assertIn('open(' + input_path + ') input 1 OK', stdout)
|
|
|
+ self.assertIn('open(' + input_path + ') input 2 OK', stdout)
|
|
|
+ self.assertIn('close(' + input_path + ') input 1 OK', stdout)
|
|
|
+ self.assertIn('close(' + input_path + ') input 2 OK', stdout)
|
|
|
+ self.assertIn('fopen(' + input_path + ') input OK', stdout)
|
|
|
+ self.assertIn('fclose(' + input_path + ') input OK', stdout)
|
|
|
+ self.assertIn('fopen(' + input_path + ') input 1 OK', stdout)
|
|
|
+ self.assertIn('fopen(' + input_path + ') input 2 OK', stdout)
|
|
|
+ self.assertIn('fclose(' + input_path + ') input 1 OK', stdout)
|
|
|
+ self.assertIn('fclose(' + input_path + ') input 2 OK', stdout)
|
|
|
+
|
|
|
+ self.assertIn('open(' + output_path + ') output OK', stdout)
|
|
|
+ self.assertIn('close(' + output_path + ') output OK', stdout)
|
|
|
+ self.assertTrue(os.path.isfile(output_path))
|
|
|
+ self.assertIn('open(' + output_path + ') output 1 OK', stdout)
|
|
|
+ self.assertIn('open(' + output_path + ') output 2 OK', stdout)
|
|
|
+ self.assertIn('close(' + output_path + ') output 1 OK', stdout)
|
|
|
+ self.assertIn('close(' + output_path + ') output 2 OK', stdout)
|
|
|
+ self.assertIn('fopen(' + output_path + ') output OK', stdout)
|
|
|
+ self.assertIn('fclose(' + output_path + ') output OK', stdout)
|
|
|
+ self.assertIn('fopen(' + output_path + ') output 1 OK', stdout)
|
|
|
+ self.assertIn('fopen(' + output_path + ') output 2 OK', stdout)
|
|
|
+ self.assertIn('fclose(' + output_path + ') output 1 OK', stdout)
|
|
|
+ self.assertIn('fclose(' + output_path + ') output 2 OK', stdout)
|
|
|
+
|
|
|
+ def test_100_open_close(self):
|
|
|
+ input_path = self.INPUT_FILES[-1]
|
|
|
+ output_path = os.path.join(self.OUTPUT_DIR, 'test_100')
|
|
|
+ stdout, stderr = self.run_binary(['open_close', input_path, output_path])
|
|
|
+ self.verify_open_close(stdout, stderr, input_path, output_path)
|
|
|
+
|
|
|
+ def test_110_read_write(self):
|
|
|
+ file_path = os.path.join(self.OUTPUT_DIR, 'test_110')
|
|
|
+ stdout, stderr = self.run_binary(['read_write', file_path])
|
|
|
+ self.assertNotIn('ERROR: ', stderr)
|
|
|
+ self.assertTrue(os.path.isfile(file_path))
|
|
|
+ self.assertIn('open(' + file_path + ') RW OK', stdout)
|
|
|
+ self.assertIn('write(' + file_path + ') RW OK', stdout)
|
|
|
+ self.assertIn('seek(' + file_path + ') RW OK', stdout)
|
|
|
+ self.assertIn('read(' + file_path + ') RW OK', stdout)
|
|
|
+ self.assertIn('compare(' + file_path + ') RW OK', stdout)
|
|
|
+ self.assertIn('close(' + file_path + ') RW OK', stdout)
|
|
|
+
|
|
|
+ def verify_seek_tell(self, stdout, stderr, input_path, output_path_1, output_path_2, size):
|
|
|
+ self.assertNotIn('ERROR: ', stderr)
|
|
|
+ self.assertIn('open(' + input_path + ') input OK', stdout)
|
|
|
+ self.assertIn('seek(' + input_path + ') input start OK', stdout)
|
|
|
+ self.assertIn('seek(' + input_path + ') input end OK', stdout)
|
|
|
+ self.assertIn('tell(' + input_path + ') input end OK: ' + str(size), stdout)
|
|
|
+ self.assertIn('seek(' + input_path + ') input rewind OK', stdout)
|
|
|
+ self.assertIn('tell(' + input_path + ') input start OK: 0', stdout)
|
|
|
+ self.assertIn('close(' + input_path + ') input OK', stdout)
|
|
|
+ self.assertIn('fopen(' + input_path + ') input OK', stdout)
|
|
|
+ self.assertIn('fseek(' + input_path + ') input start OK', stdout)
|
|
|
+ self.assertIn('fseek(' + input_path + ') input end OK', stdout)
|
|
|
+ self.assertIn('ftell(' + input_path + ') input end OK: ' + str(size), stdout)
|
|
|
+ self.assertIn('fseek(' + input_path + ') input rewind OK', stdout)
|
|
|
+ self.assertIn('ftell(' + input_path + ') input start OK: 0', stdout)
|
|
|
+ self.assertIn('fclose(' + input_path + ') input OK', stdout)
|
|
|
+
|
|
|
+ self.assertIn('open(' + output_path_1 + ') output OK', stdout)
|
|
|
+ self.assertIn('seek(' + output_path_1 + ') output start OK', stdout)
|
|
|
+ self.assertIn('seek(' + output_path_1 + ') output end OK', stdout)
|
|
|
+ self.assertIn('tell(' + output_path_1 + ') output end OK: ' + str(size), stdout)
|
|
|
+ self.assertIn('seek(' + output_path_1 + ') output end 2 OK', stdout)
|
|
|
+ self.assertIn('seek(' + output_path_1 + ') output end 3 OK', stdout)
|
|
|
+ self.assertIn('tell(' + output_path_1 + ') output end 2 OK: ' + str(size + 4098), stdout)
|
|
|
+ self.assertIn('close(' + output_path_1 + ') output OK', stdout)
|
|
|
+ self.assertIn('fopen(' + output_path_2 + ') output OK', stdout)
|
|
|
+ self.assertIn('fseek(' + output_path_2 + ') output start OK', stdout)
|
|
|
+ self.assertIn('fseek(' + output_path_2 + ') output end OK', stdout)
|
|
|
+ self.assertIn('ftell(' + output_path_2 + ') output end OK: ' + str(size), stdout)
|
|
|
+ self.assertIn('fseek(' + output_path_2 + ') output end 2 OK', stdout)
|
|
|
+ self.assertIn('fseek(' + output_path_2 + ') output end 3 OK', stdout)
|
|
|
+ self.assertIn('ftell(' + output_path_2 + ') output end 2 OK: ' + str(size + 4098), stdout)
|
|
|
+ self.assertIn('fclose(' + output_path_2 + ') output OK', stdout)
|
|
|
+
|
|
|
+ def test_115_seek_tell(self):
|
|
|
+ input_path = self.INPUT_FILES[-1]
|
|
|
+ output_path_1 = os.path.join(self.OUTPUT_DIR, 'test_115a')
|
|
|
+ output_path_2 = os.path.join(self.OUTPUT_DIR, 'test_115b')
|
|
|
+ self.copy_input(input_path, output_path_1)
|
|
|
+ self.copy_input(input_path, output_path_2)
|
|
|
+ stdout, stderr = self.run_binary(['seek_tell', input_path, output_path_1, output_path_2])
|
|
|
+ self.verify_seek_tell(stdout, stderr, input_path, output_path_1, output_path_2, self.FILE_SIZES[-1])
|
|
|
+
|
|
|
+ def test_120_file_delete(self):
|
|
|
+ file_path = 'test_120'
|
|
|
+ file_in = self.INPUT_FILES[-1]
|
|
|
+ file_out_1 = os.path.join(self.OUTPUT_DIR, file_path + 'a')
|
|
|
+ file_out_2 = os.path.join(self.OUTPUT_DIR, file_path + 'b')
|
|
|
+ file_out_3 = os.path.join(self.OUTPUT_DIR, file_path + 'c')
|
|
|
+ file_out_4 = os.path.join(self.OUTPUT_DIR, file_path + 'd')
|
|
|
+ file_out_5 = os.path.join(self.OUTPUT_DIR, file_path + 'e')
|
|
|
+
|
|
|
+ self.copy_input(file_in, file_out_1)
|
|
|
+ self.copy_input(file_in, file_out_2)
|
|
|
+ self.copy_input(file_in, file_out_3)
|
|
|
+ stdout, stderr = self.run_binary(['delete', file_out_1, file_out_2, file_out_3, file_out_4, file_out_5])
|
|
|
+
|
|
|
+ self.assertNotIn('ERROR: ', stderr)
|
|
|
+ self.assertFalse(os.path.isfile(file_out_1))
|
|
|
+ self.assertFalse(os.path.isfile(file_out_2))
|
|
|
+ self.assertFalse(os.path.isfile(file_out_3))
|
|
|
+ self.assertFalse(os.path.isfile(file_out_4))
|
|
|
+ self.assertFalse(os.path.isfile(file_out_5))
|
|
|
+ self.assertIn('unlink(' + file_out_1 + ') OK', stdout)
|
|
|
+ self.assertIn('open(' + file_out_2 + ') input 1 OK', stdout)
|
|
|
+ self.assertIn('close(' + file_out_2 + ') input 1 OK', stdout)
|
|
|
+ self.assertIn('unlink(' + file_out_2 + ') input 1 OK', stdout)
|
|
|
+ self.assertIn('open(' + file_out_3 + ') input 2 OK', stdout)
|
|
|
+ self.assertIn('unlink(' + file_out_3 + ') input 2 OK', stdout)
|
|
|
+ self.assertIn('close(' + file_out_3 + ') input 2 OK', stdout)
|
|
|
+ self.assertIn('open(' + file_out_4 + ') output 1 OK', stdout)
|
|
|
+ self.assertIn('close(' + file_out_4 + ') output 1 OK', stdout)
|
|
|
+ self.assertIn('unlink(' + file_out_4 + ') output 1 OK', stdout)
|
|
|
+ self.assertIn('open(' + file_out_5 + ') output 2 OK', stdout)
|
|
|
+ self.assertIn('unlink(' + file_out_5 + ') output 2 OK', stdout)
|
|
|
+ self.assertIn('close(' + file_out_5 + ') output 2 OK', stdout)
|
|
|
+
|
|
|
+ def verify_stat(self, stdout, stderr, input_path, output_path, size):
|
|
|
+ self.assertNotIn('ERROR: ', stderr)
|
|
|
+ self.assertIn('stat(' + input_path + ') input 1 OK', stdout)
|
|
|
+ self.assertIn('open(' + input_path + ') input 2 OK', stdout)
|
|
|
+ self.assertIn('stat(' + input_path + ') input 2 OK: ' + size, stdout)
|
|
|
+ self.assertIn('fstat(' + input_path + ') input 2 OK: ' + size, stdout)
|
|
|
+ self.assertIn('close(' + input_path + ') input 2 OK', stdout)
|
|
|
+
|
|
|
+ self.assertIn('stat(' + output_path + ') output 1 OK', stdout)
|
|
|
+ self.assertIn('open(' + output_path + ') output 2 OK', stdout)
|
|
|
+ self.assertIn('stat(' + output_path + ') output 2 OK: ' + size, stdout)
|
|
|
+ self.assertIn('fstat(' + output_path + ') output 2 OK: ' + size, stdout)
|
|
|
+ self.assertIn('close(' + output_path + ') output 2 OK', stdout)
|
|
|
+
|
|
|
+ def test_130_file_stat(self):
|
|
|
+
|
|
|
+
|
|
|
+ for i in self.INDEXES:
|
|
|
+ input_path = self.INPUT_FILES[i]
|
|
|
+ output_path = self.OUTPUT_FILES[i]
|
|
|
+ size = str(self.FILE_SIZES[i])
|
|
|
+ self.copy_input(input_path, output_path)
|
|
|
+ stdout, stderr = self.run_binary(['stat', input_path, output_path])
|
|
|
+ self.verify_stat(stdout, stderr, input_path, output_path, size)
|
|
|
+
|
|
|
+ def verify_size(self, file, size):
|
|
|
+ self.assertEqual(os.stat(file).st_size, size)
|
|
|
+
|
|
|
+ def do_truncate_test(self, size_in, size_out):
|
|
|
+
|
|
|
+ i = self.FILE_SIZES.index(size_in)
|
|
|
+ input = self.INPUT_FILES[i]
|
|
|
+ out_1 = self.OUTPUT_FILES[i] + 'a'
|
|
|
+ out_2 = self.OUTPUT_FILES[i] + 'b'
|
|
|
+ self.copy_input(input, out_1)
|
|
|
+ self.copy_input(input, out_2)
|
|
|
+
|
|
|
+ stdout, stderr = self.run_binary(['truncate', out_1, out_2, str(size_out)])
|
|
|
+ self.assertNotIn('ERROR: ', stderr)
|
|
|
+ self.assertIn('truncate(' + out_1 + ') to ' + str(size_out) + ' OK', stdout)
|
|
|
+ self.assertIn('open(' + out_2 + ') output OK', stdout)
|
|
|
+ self.assertIn('ftruncate(' + out_2 + ') to ' + str(size_out) + ' OK', stdout)
|
|
|
+ self.assertIn('close(' + out_2 + ') output OK', stdout)
|
|
|
+ self.verify_size(out_1, size_out)
|
|
|
+ self.verify_size(out_2, size_out)
|
|
|
+
|
|
|
+ def test_140_file_truncate(self):
|
|
|
+ self.do_truncate_test(0, 1)
|
|
|
+ self.do_truncate_test(0, 16)
|
|
|
+ self.do_truncate_test(0, 65537)
|
|
|
+ self.do_truncate_test(1, 0)
|
|
|
+ self.do_truncate_test(1, 17)
|
|
|
+ self.do_truncate_test(16, 0)
|
|
|
+ self.do_truncate_test(16, 1048576)
|
|
|
+ self.do_truncate_test(255, 15)
|
|
|
+ self.do_truncate_test(255, 256)
|
|
|
+ self.do_truncate_test(65537, 65535)
|
|
|
+ self.do_truncate_test(65537, 65536)
|
|
|
+
|
|
|
+ def verify_copy_content(self, input, output):
|
|
|
+ self.assertTrue(filecmp.cmp(input, output, shallow=False))
|
|
|
+
|
|
|
+ def verify_copy(self, stdout, stderr, input_dir, exec):
|
|
|
+ self.assertNotIn('ERROR: ', stderr)
|
|
|
+ self.assertIn('opendir(' + input_dir + ') OK', stdout)
|
|
|
+ self.assertIn('readdir(.) OK', stdout)
|
|
|
+ if input_dir[0] != '/':
|
|
|
+ self.assertIn('readdir(..) OK', stdout)
|
|
|
+ for i in self.INDEXES:
|
|
|
+ size = str(self.FILE_SIZES[i])
|
|
|
+ self.assertIn('readdir(' + size + ') OK', stdout)
|
|
|
+ self.assertIn('open(' + size + ') input OK', stdout)
|
|
|
+ self.assertIn('fstat(' + size + ') input OK', stdout)
|
|
|
+ self.assertIn('open(' + size + ') output OK', stdout)
|
|
|
+ self.assertIn('fstat(' + size + ') output 1 OK', stdout)
|
|
|
+ if exec == 'copy_whole':
|
|
|
+ self.assertIn('read_fd(' + size + ') input OK', stdout)
|
|
|
+ self.assertIn('write_fd(' + size + ') output OK', stdout)
|
|
|
+ if size != '0':
|
|
|
+ if 'copy_mmap' in exec:
|
|
|
+ self.assertIn('mmap_fd(' + size + ') input OK', stdout)
|
|
|
+ self.assertIn('mmap_fd(' + size + ') output OK', stdout)
|
|
|
+ self.assertIn('munmap_fd(' + size + ') input OK', stdout)
|
|
|
+ self.assertIn('munmap_fd(' + size + ') output OK', stdout)
|
|
|
+ if exec == 'copy_mmap_rev':
|
|
|
+ self.assertIn('ftruncate(' + size + ') output OK', stdout)
|
|
|
+ self.assertIn('fstat(' + size + ') output 2 OK', stdout)
|
|
|
+ self.assertIn('close(' + size + ') input OK', stdout)
|
|
|
+ self.assertIn('close(' + size + ') output OK', stdout)
|
|
|
+
|
|
|
+ for i in self.INDEXES:
|
|
|
+ self.verify_copy_content(self.INPUT_FILES[i], self.OUTPUT_FILES[i])
|
|
|
+
|
|
|
+ def do_copy_test(self, exec, timeout):
|
|
|
+ stdout, stderr = self.run_binary([exec, self.INPUT_DIR, self.OUTPUT_DIR], timeout=timeout)
|
|
|
+ self.verify_copy(stdout, stderr, self.INPUT_DIR, exec)
|
|
|
+
|
|
|
+ def test_200_copy_dir_whole(self):
|
|
|
+ self.do_copy_test('copy_whole', 30)
|
|
|
+
|
|
|
+ def test_201_copy_dir_seq(self):
|
|
|
+ self.do_copy_test('copy_seq', 60)
|
|
|
+
|
|
|
+ def test_202_copy_dir_rev(self):
|
|
|
+ self.do_copy_test('copy_rev', 60)
|
|
|
+
|
|
|
+ @expectedFailureIf(HAS_SGX)
|
|
|
+ def test_203_copy_dir_mmap_whole(self):
|
|
|
+ self.do_copy_test('copy_mmap_whole', 30)
|
|
|
+
|
|
|
+ @expectedFailureIf(HAS_SGX)
|
|
|
+ def test_204_copy_dir_mmap_seq(self):
|
|
|
+ self.do_copy_test('copy_mmap_seq', 60)
|
|
|
+
|
|
|
+ @expectedFailureIf(HAS_SGX)
|
|
|
+ def test_205_copy_dir_mmap_rev(self):
|
|
|
+ self.do_copy_test('copy_mmap_rev', 60)
|
|
|
+
|
|
|
+ def test_210_copy_dir_mounted(self):
|
|
|
+ exec = 'copy_whole'
|
|
|
+ stdout, stderr = self.run_binary([exec, '/mounted/input', '/mounted/output'], timeout=30)
|
|
|
+ self.verify_copy(stdout, stderr, '/mounted/input', exec)
|