-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathgha.py
128 lines (100 loc) · 3.88 KB
/
gha.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import os
import requests
import gzip
import json
from core import *
import pandas as pd
from functools import reduce
from calendar import monthrange
class GHA:
def __init__(self, year, mounth, day, hour, clean=True):
self.year = year
self.mounth = mounth
self.day = day
self.hour = hour
self.data = None
self.clean = clean
def get_gz_path(self):
return f'./tmp/{self.year}-{self.mounth:02}-{self.day:02}-{self.hour}.json.gz'
def clean_files(self):
os.remove(self.get_gz_path())
def download(self):
url = f'http://data.gharchive.org/{self.year}-{self.mounth:02}-{self.day:02}-{self.hour}.json.gz'
r = requests.get(url)
with open(self.get_gz_path(), 'wb') as f:
f.write(r.content)
def is_dowloaded(self):
return os.path.exists(self.get_gz_path())
def __read_gz(self):
if not self.is_dowloaded():
self.download()
try:
file_content = ''
with gzip.open(self.get_gz_path(), 'rb') as f:
file_content = f.read()
return file_content.decode("utf-8")
except OSError:
return ''
def read(self):
self.raw_data = self.__read_gz()
self.raw_lines = self.raw_data.split('\n')
self.data = [ json.loads(line) for line in self.raw_lines if len(line) >= 2 ]
self.df = pd.DataFrame.from_dict(self.data)
return self.data
def __len__(self):
if self.data is None:
self.read()
return len(self.data)
def __iter__(self):
if self.data is None:
self.read()
return ( line for line in self.data )
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
if self.clean:
self.clean_files()
def get_schema():
schema = open_json('./schema.json')
return [ element['name'] for element in schema ]
def download_day(year, month, day, java_projects):
queue = [ (year, month, day, hour) for hour in range(24) ]
# define the task for the threads
event_black_list = ('WatchEvent')
def task(year, mounth, day, hour):
events = list()
with GHA(year, mounth, day, hour, clean=False) as gha:
gha.read()
events = gha.df[gha.df.repo.apply(lambda x: x['name'] in java_projects)]
return events
res = multi_threading(queue, 4, task)
return reduce(lambda x,y: x.append(y, ignore_index=True), res.values())
def download_mounth(year, month, java_projects):
number_of_days = monthrange(year, month)[1] # (weekday of the first day, number of days)
day_of_the_mounth = [ (2016, 1, day) for day in range(1,number_of_days+1) ]
create_dir('./raw/')
for day in tqdm(day_of_the_mounth):
data_of_the_day = download_day(*day, java_projects)
data_of_the_day.to_csv('./raw/%d-%d-%d.csv' % day)
# with open('./raw/%d-%d-%d.json' % day, mode='w') as outfile:
# json.dump( data_of_the_day, outfile)
# fieldnames = tuple(data_of_the_day[0].keys())
# print(fieldnames)
# with open('./raw/%d-%d-%d.csv', mode='w') as csv_file:
# writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
# writer.writeheader()
# for row in data_of_the_day:
# writer.writerow(row)
if __name__ == '__main__':
create_dir('./tmp')
# load the list of java_projects
java_projects_df = pd.read_csv('../graph_data/projects.csv')
java_projects = tuple(java_projects_df['full_name'])
schema = get_schema()
# with GHA(2016, 1, 1, 1, clean=False) as gha:
# gha.read()
# # print(gha.df[gha.df.repo.apply(lambda x: x['name'] in java_projects)])
# create the queue for 1 day
# download_day(2016, 1, 1, java_projects).to_csv('./raw/test.csv')
# print(schema)
download_mounth(2016,1, java_projects)