Skip to content

Commit

Permalink
allow querying from Censys Universal Dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
mandatran committed Dec 27, 2024
1 parent 93aee50 commit 9664f36
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 7 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__/
example_data/
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ python main.py

To collect data over time, schedule the pipeline to run the pipeline on an automated schedule (e.g., in a [cron job](https://man7.org/linux/man-pages/man5/crontab.5.html)).

While there are other ways to collect exposed services, the pipeline uses Censys and provides two ways for query Censys data:
1. [Censys API](https://censys-python.readthedocs.io/en/stable/usage-v2.html)
2. [Censys Universal Dataset](https://support.censys.io/hc/en-us/articles/360038761891-Research-Access-to-Censys-Data) via BigQuery


## License and Copyright

Expand Down
57 changes: 54 additions & 3 deletions data_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from scamper import *
from search_censys import *
from bq_upload import *
from google.cloud import bigquery
from google.auth import default


class DataCollection:
Expand Down Expand Up @@ -82,12 +84,13 @@ def create_data_dirs(self) -> None:
}


def get_censys_exposed_services(self, asn: int, ipv: int = None) -> pd.DataFrame:
def get_censys_exposed_services(self, asn: int, ipv: int = None, bq: str = None) -> pd.DataFrame:
"""
Queries Censys for exposed services and returns the result as a dataframe.
:param asn: the autonomous system number to query
:param ipv: (optional) specify 4 or 6 to filter for IP version
:param bq: (optional) the BigQuery table to pull data from
:return: dataframe of exposed services information
"""
def stringified_list_to_list(x):
Expand All @@ -97,6 +100,15 @@ def stringified_list_to_list(x):
return ast.literal_eval(x)
except:
return [x]

def repeated_field_to_list(x):
elements = x.strip('[]').strip().split()
if all(element.isdigit() for element in elements):
return [int(element) for element in elements]
elif all(element.lower() in ['true', 'false'] for element in elements):
return [element.lower() == 'true' for element in elements]
else:
return []

# temp to appease bigquery json file requirements
def list_of_nulls_to_empty_list(x):
Expand All @@ -105,8 +117,44 @@ def list_of_nulls_to_empty_list(x):
df = pd.DataFrame()
exposed_services = {}
try:
exposed_services = search_censys(asn, ipv)
df = pd.DataFrame.from_dict(exposed_services)
exposed_services = pd.DataFrame()
if bq:
client = bigquery.Client()
ip_col = 'host_identifier.ipv6' if ipv == 6 else 'host_identifier.ipv4'
QUERY = (
'SELECT DISTINCT '
' {ip_col} as ip, '
' CURRENT_DATE() as date, '
' {asn} as asn, '
' dns.reverse_dns.names as dns_name, '
' ports_list as port, '
' ARRAY( '
' SELECT '
' CASE '
' WHEN LOWER(service.tls.certificates.leaf_data.subject_dn) LIKE "%peplink%" '
' THEN TRUE '
' ELSE FALSE '
' END '
' FROM UNNEST(services) AS service '
' ) AS pep_link '
'FROM `{table}` '
'WHERE '
' autonomous_system.asn={asn} AND '
' TIMESTAMP_TRUNC(snapshot_date, DAY) = TIMESTAMP(DATE_SUB(CURRENT_DATE, INTERVAL 2 DAY)) ' # we can only guarantee that censys's data from yesterday is available , reverse dns names take another day to populate in dataset
).format(ip_col=ip_col, asn=asn, table=bq)
query_job = client.query(QUERY) # API request
query_job.result() # Waits for query to finish
bq_df = query_job.to_dataframe()

# cleaning
bq_df['dns_name'] = bq_df['dns_name'].apply(stringified_list_to_list)
bq_df['port'] = bq_df['port'].apply(repeated_field_to_list)
bq_df['pep_link'] = bq_df['pep_link'].apply(repeated_field_to_list)

return bq_df
else:
exposed_services = search_censys(asn, ipv)
df = pd.DataFrame.from_dict(exposed_services)
except:
return df
df['dns_name'] = df['dns_name'].apply(str)
Expand Down Expand Up @@ -158,10 +206,13 @@ def paris_traceroute_exposed_services(self, df: pd.DataFrame, ip_col: str, uploa
right_on='dst'
)
df = df.drop(columns=['dst'])
# drop rows where either 'sec_last_ip' or 'sec_last_hop' is None
df = df.dropna(subset=['sec_last_ip', 'sec_last_hop'])

# output to json file
if upload_to_bq and not fallback_file:
with tempfile.NamedTemporaryFile(mode='w+', suffix=".json") as temp_json:
df['date'] = df['date'].astype(str)
df.to_json(temp_json.name, orient="records", lines=True)
temp_json.seek(0)
upload_exposed_services_file(self.bq_exposed_services_table_id, temp_json.name)
Expand Down
8 changes: 8 additions & 0 deletions data_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,18 @@ def sort_hops(e):
return e['probe_ttl']

def get_sec_last_ip(hops):
# If the list has fewer than two elements, return None
if not isinstance(hops, list) or len(hops) < 2:
print('not and instance of list or len is not >= 2: ', hops)
return None
hops.sort(key=sort_hops)
return hops[-2]['addr']

def get_sec_last_probe_ttl(hops):
# If the list has fewer than two elements, return None
if not isinstance(hops, list) or len(hops) < 2:
print('not and instance of list or len is not >= 2: ', hops)
return None
hops.sort(key=sort_hops)
return hops[-2]['probe_ttl']

Expand Down
8 changes: 4 additions & 4 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@

def starlink_job():
print("running starlink job")
starlink_dc = DataCollection(bq_dataset_id="MY_BQ_DATASET") # FIXME: update with BQ dataset id
starlink_df = starlink_dc.get_censys_exposed_services(14593, 4)
starlink_dc = DataCollection(bq_dataset_id="hitchhiking_sample") # FIXME: update with BQ dataset id
starlink_df = starlink_dc.get_censys_exposed_services(14593, 4, 'censys-io.universal_internet_dataset_v2.base')
tr_df = starlink_dc.paris_traceroute_exposed_services(starlink_df, 'ip', True) # change to False to save to file instead of BQ
starlink_dc.ping_exposed_services(tr_df, 600, 1, True) # change to False to save to file instead of BQ
starlink_dc.ping_exposed_services(tr_df, 10, 1, True) # change to False to save to file instead of BQ


def oneweb_job():
Expand All @@ -42,4 +42,4 @@ def oneweb_job():

if __name__ == "__main__":
starlink_job()
oneweb_job()
# oneweb_job()

0 comments on commit 9664f36

Please sign in to comment.