123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- """
- Script to extract code churns.
- """
- __author__ = "Oscar Svensson"
- __copyright__ = "Copyright (c) 2018 Axis Communications AB"
- __license__ = "MIT"
- import csv
- import os
- import sys
- import time
- from argparse import ArgumentParser
- from multiprocessing import Process, Manager, cpu_count
- from pygit2 import Repository, GIT_SORT_REVERSE, GIT_SORT_TOPOLOGICAL
- from tqdm import tqdm
- # Global variables
- MANAGER = Manager()
- RES = MANAGER.dict()
- def parse_code_churns(pid, repo_path, branch, start, stop=-1):
- """
- Function that is intended to be runned by a process. It extracts the code churns
- for a set of commits and stores them in the RES dict.
- """
- repo = Repository(repo_path)
- head = repo.references.get(branch)
- commits = list(
- repo.walk(head.target, GIT_SORT_TOPOLOGICAL | GIT_SORT_REVERSE))
- start = start - 1 if (start > 0) else start
- commits = commits[start:stop] if (stop != -1) else commits[start:]
- code_churns = [[] for c in range(len(commits))]
- for i, commit in enumerate(tqdm(commits[1:], position=pid)):
- diff = repo.diff(commits[i], commit)
- tree = commit.tree
- patches = [p for p in diff]
- stats = diff.stats
- # Count the total lines of code and find the biggest file that have been changed
- total_tloc = 0
- line_of_code_old = 0
- for patch in patches:
- if patch.delta.is_binary:
- continue
- new_file = patch.delta.new_file
- # Total lines of code
- total_tloc += get_file_lines_of_code(repo, tree, new_file)
- old_file = patch.delta.old_file
- # Total lines of code in the old file
- line_of_code_old = max(
- line_of_code_old, get_file_lines_of_code(repo, tree, old_file))
- # Churned lines of code
- cloc = stats.insertions
- # Deleted lines of code
- dloc = stats.deletions
- # Churned files
- files_churned = len(patches)
- # File count
- num_files = count_files(tree, repo)
- # Apply relative code churns
- measure_one = float(cloc) / total_tloc if (total_tloc > 0) else float(cloc)
- measure_two = float(dloc) / total_tloc if (total_tloc > 0) else float(cloc)
- measure_three = (float(files_churned) / num_files if (num_files > 0)
- else float(files_churned))
- line_of_code_old = float(line_of_code_old)
- # Churn features
- code_churns[i].append(str(commit.hex))
- code_churns[i].append(str(measure_one))
- code_churns[i].append(str(measure_two))
- code_churns[i].append(str(measure_three))
- code_churns[i].append(str(line_of_code_old))
- RES[pid] = code_churns
- def count_files(tree, repo):
- """
- Count how many files there are in a repository.
- """
- num_files = 0
- trees = []
- visited = set()
- visited.add(tree.id)
- trees.append(tree)
- while trees:
- current_tree = trees.pop()
- for entry in current_tree:
- if entry.type == "tree":
- if entry.id not in visited:
- trees.append(repo[entry.id])
- visited.add(entry.id)
- else:
- num_files += 1
- return num_files
- def get_file_lines_of_code(repo, tree, dfile):
- """
- Count how many lines of code there are in a file.
- """
- tloc = 0
- try:
- blob = repo[tree[dfile.path].id]
- tloc = len(str(blob.data).split('\\n'))
- except Exception as _:
- return tloc
- return tloc
- def get_code_churns(repo_path, branch):
- """
- General function for extracting code churns. It first extracts the code churns for
- the first commit and then starts a number of processes(equal to the number of cores
- on the computer), which equally extracts the code churns for the remaining commits.
- """
- repo = Repository(repo_path)
- head = repo.references.get(branch)
- commits = list(
- repo.walk(head.target, GIT_SORT_TOPOLOGICAL | GIT_SORT_REVERSE))
- code_churns = [[]]
- initial = commits[0]
- # Relative code churns
- measure_one = 0.0
- measure_two = 0.0
- measure_three = 1.0
- line_of_code_old = 0.0
- code_churns[0].append(str(initial.hex))
- code_churns[0].append(str(measure_one))
- code_churns[0].append(str(measure_two))
- code_churns[0].append(str(measure_three))
- code_churns[0].append(str(line_of_code_old))
- # Check how many processes that could be spawned
- cpus = cpu_count()
- print("Using {} cpus...".format(cpus))
- # Equally split the commit set into the equally sized parts.
- quote, remainder = divmod(len(commits), cpus)
- processes = [
- Process(
- target=parse_code_churns,
- args=(i, repo_path, branch, i * quote + min(i, remainder),
- (i + 1) * quote + min(i + 1, remainder))) for i in range(cpus)
- ]
- for process in processes:
- process.start()
- start_time = time.time()
- for process in processes:
- process.join()
- end_time = time.time()
- print("Done")
- print("Overall processing time {}".format(end_time - start_time))
- # Assemble the results
- churns = []
- for _, churn in RES.items():
- churns.extend(churn)
- churns = list(reversed(churns))
- churns.append(code_churns[0])
- return churns
- def save_churns(churns, path="./results/code_churns_features_multithread.csv"):
- """
- Saves the code churns to a csv file.
- """
- with open(path, 'w') as csv_file:
- writer = csv.writer(csv_file)
- writer.writerow([
- "commit", "lines_of_code_added", "lines_of_code_deleted",
- "files_churned", "line_of_code_old"
- ])
- for row in churns:
- if row:
- writer.writerow([row[0], row[1], row[2], row[3], row[4]])
- if __name__ == "__main__":
- PARSER = ArgumentParser(description="Utility to extract code churns from" +
- " a repository or a single commit.")
- PARSER.add_argument(
- "--repository",
- "-r",
- type=str,
- default="./repos/jenkins",
- help="Path to local git repository.")
- PARSER.add_argument(
- "--branch",
- "-b",
- type=str,
- default="refs/heads/master",
- help="Which branch to use.")
- ARGS = PARSER.parse_args()
- REPOPATH = ARGS.repository
- BRANCH = ARGS.branch
- if not os.path.exists(REPOPATH):
- print("The repository path does not exist!")
- sys.exit(1)
- CHURNS = get_code_churns(REPOPATH, BRANCH)
- save_churns(CHURNS)
|