import threading # def parse_bytes(bytes_str): conversions = {'B':1, 'KiB':1024, 'MiB':1024**2, 'GiB':1024**3, 'TiB':1024**4} # matching_conversions = [x for x in conversions if bytes_str.endswith(x)] if len(matching_conversions) > 0: # if any conversion suffix matched most_precise_match = max(matching_conversions, key=len) number = int(bytes_str[:-len(most_precise_match)]) return number*conversions[most_precise_match] # return int(bytes_str) # def parse_range_list(range_list_str): """ Take an input like '1-3,5,7-10' and return a list like [1,2,3,5,7,8,9,10]. """ # range_strings = range_list_str.split(',') all_items = [] for range_str in range_strings: if '-' in range_str: # in the form '12-34' range_ends = [int(x) for x in range_str.split('-')] assert(len(range_ends) == 2) all_items.extend(range(range_ends[0], range_ends[1]+1)) else: # just a number all_items.append(int(range_str)) # # return all_items # def generate_range_list(l): """ Take a list like [1,2,3,5,7,8,9,10] and return a string like '1-3,5,7-10'. """ # l = list(set(sorted(l))) ranges = [] current_range_start = None # for index in range(len(l)): if current_range_start is None: current_range_start = l[index] else: if l[index] != l[index-1]+1: ranges.append((current_range_start, l[index-1])) current_range_start = l[index] # # # ranges.append((current_range_start, l[-1])) # return ','.join(['-'.join([str(y) for y in x]) if x[0] != x[1] else str(x[0]) for x in ranges]) # class QueueGetter: def __init__(self, queue, callback): self.queue = queue self.callback = callback # self.t = threading.Thread(target=self._unqueue) self.t.start() # def _unqueue(self): while True: val = self.queue.get() if val is None: break # self.callback(val) # # def stop(self): self.queue.put(None) # def join(self, timeout=None): self.t.join(timeout=timeout) if self.t.is_alive(): raise TimeoutError('Could not join QueueGetter thread') # # #