#!/usr/bin/python3 # import time import threading import json import gzip # PROC_STAT_HEADERS = ('user', 'nice', 'system', 'idle', 'iowait', 'irq', 'softirq', 'steal', 'guest', 'guest_nice') # def get_cpu_stats(path='/proc/stat'): """ Get CPU statistics from /proc/stat. Output is returned for the system and each CPU. Ex: {'system': {'user': 8696397, 'nice': 22431, ...}, 'cpus': { 0: {'user': 4199206, 'nice': 11615, ...}, 1: {'user': 4199308, 'nice': 10642, ...} } } """ # with open(path, 'r') as f: lines = f.readlines() # cpu_lines = [l for l in lines if l.startswith('cpu')] stats = {'system':None, 'cpus':{}} # for l in cpu_lines: l_split = l.split() cpu_index = l_split[0][3:] cpu_stats = {x[0]: int(x[1]) for x in zip(PROC_STAT_HEADERS, l_split[1:])} # if cpu_index == '': stats['system'] = cpu_stats else: stats['cpus'][int(cpu_index)] = cpu_stats # # return stats # def calculate_cpu_usage(initial, current): """ Calculation adapted from: https://stackoverflow.com/questions/23367857/accurate-calculation-of-cpu-usage-given-in-percentage-in-linux/ """ # initial_idle = initial['idle'] + initial['iowait'] current_idle = current['idle'] + current['iowait'] # initial_non_idle = initial['user'] + initial['nice'] + initial['system'] + initial['irq'] + initial['softirq'] + initial['steal'] current_non_idle = current['user'] + current['nice'] + current['system'] + current['irq'] + current['softirq'] + current['steal'] # initial_total = initial_idle + initial_non_idle current_total = current_idle + current_non_idle # return (current_non_idle-initial_non_idle)/(current_total-initial_total) # def log_cpu_stats(path, interval, stop_event): """ Log the cpu stats to a gz compressed JSON file. Storing JSON seems to only use about 10% more disk space than storing bytes directly (4 bytes per value), so JSON is used for simplicity. path: file to save to interval: how many seconds to wait before getting more data stop_event: a threading.Event which stops the function """ # stats = {'timestamps':[], 'stats':{'system':[], 'cpus':{}}} while not stop_event.is_set(): stats['timestamps'].append(time.time()) #stats['stats'].append(get_cpu_stats()) current_stats = get_cpu_stats() stats['stats']['system'].append(current_stats['system']) for cpu in current_stats['cpus']: if cpu not in stats['stats']['cpus']: stats['stats']['cpus'][cpu] = [] # stats['stats']['cpus'][cpu].append(current_stats['cpus'][cpu]) # stop_event.wait(interval) # with gzip.GzipFile(path, 'w') as f: # json.dump writes a string, but a gzip.GzipFile can only write bytes, so monkey-patch it old_write = f.write f.write = lambda s: old_write(s.encode('utf-8')) json.dump(stats, f) # # def load_cpu_stats(path): with gzip.GzipFile(path, 'r') as f: return json.load(f) # # ''' def log_cpu_stats(path, interval, stop_event): path: file to save to interval: how many seconds to wait before getting more data stop_event: a threading.Event which stops the function # with gzip.GzipFile(path+'.2.gz', 'w') as f: f.write(' '.join(PROC_STAT_HEADERS).encode('utf-8')) f.write('\n\n'.encode('utf-8')) # while not stop_event.is_set(): f.write(str(time.time()).encode('utf-8')) f.write('\n'.encode('utf-8')) stats = get_cpu_stats() f.write('cpu '.encode('utf-8')) #f.write(' '.join([str(stats['system'][x]) for x in PROC_STAT_HEADERS]).encode('utf-8')) f.write(b''.join([stats['system'][x].to_bytes(4, byteorder='big') for x in PROC_STAT_HEADERS])) f.write('\n'.encode('utf-8')) for cpu in stats['cpus']: f.write('cpu{} '.format(cpu).encode('utf-8')) #f.write(' '.join([str(stats['cpus'][cpu][x]) for x in PROC_STAT_HEADERS]).encode('utf-8')) f.write(b''.join([stats['cpus'][cpu][x].to_bytes(4, byteorder='big') for x in PROC_STAT_HEADERS])) f.write('\n'.encode('utf-8')) # f.write('\n'.encode('utf-8')) time.sleep(interval) # # # ''' if __name__ == '__main__': stop_event = threading.Event() t = threading.Thread(target=log_cpu_stats, args=('/tmp/cpu_stats.json.gz', 0.5, stop_event)) t.start() # try: while True: time.sleep(100) # except KeyboardInterrupt: stop_event.set() print() # t.join() #