Skip to content

Commit

Permalink
Merge pull request #80 from kbase/develop
Browse files Browse the repository at this point in the history
Develop -> Master
  • Loading branch information
Tianhao-Gu authored Mar 24, 2020
2 parents 44361b1 + 28d33b9 commit 2042fb8
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 18 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ RUN pip install -r /requirements.txt

COPY ./ /kb/module
COPY ./globus.cfg /etc/globus.cfg

RUN touch /var/log/globus.log && chmod 777 /var/log/globus.log
RUN cp -r /kb/module/staging_service /kb/deployment/lib
RUN cp -r /kb/module/deployment /kb


EXPOSE 3000

WORKDIR /kb/deployment/lib
Expand Down
7 changes: 7 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@

### Version 1.1.3
- Add a add-acl-concierge endpoint
- Added configs for that endpoint
- Added options to dockerfile/docker-compose

### Version 1.1.2
- Added capability to check 'kbase_session_backup' cookie
- Added a `add-acl` and `remove-acl` endpoint for globus endpoint access
- Change logging to STDOUT


### Version 1.1.0
- Added a `download` endpoint for files
3 changes: 2 additions & 1 deletion deployment/conf/deployment.cfg
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[staging_service]
META_DIR = /kb/deployment/lib/src/data/metadata/
DATA_DIR = /kb/deployment/lib/src/data/bulk/
AUTH_URL = https://ci.kbase.us/services/auth/api/V2/token
AUTH_URL = https://ci.kbase.us/services/auth/api/V2/token
CONCIERGE_PATH = /kbaseconcierge
3 changes: 2 additions & 1 deletion deployment/conf/local.cfg
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[staging_service]
META_DIR = ./data/metadata/
DATA_DIR = ./data/bulk/
AUTH_URL = https://ci.kbase.us/services/auth/api/V2/token
AUTH_URL = https://ci.kbase.us/services/auth/api/V2/token
CONCIERGE_PATH = /kbaseconcierge
3 changes: 2 additions & 1 deletion deployment/conf/testing.cfg
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[staging_service]
META_DIR = ./data/metadata/
DATA_DIR = ./data/bulk/
AUTH_URL = https://ci.kbase.us/services/auth/api/V2/token
AUTH_URL = https://ci.kbase.us/services/auth/api/V2/token
CONCIERGE_PATH = /kbaseconcierge
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ services:
# it further assumes that there is a pre-existing /data/metadata directory
volumes:
- "./data:/kb/deployment/lib/src/data"
- "./:/staging_service"

environment:
- KB_DEPLOYMENT_CONFIG=/kb/deployment/conf/deployment.cfg
- FILE_LIFETIME="90"
- FILE_LIFETIME="90"
129 changes: 129 additions & 0 deletions prune_acls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/root/bulk/acl_manager/py3globus/bin/python

"""
Deletes ACLS from globus, and then clears out directories older than THRESHOLD (60) days
"""
from __future__ import print_function # for python 2

import logging
import time
import shutil
from collections import namedtuple

from os.path import getmtime

import globus_sdk
from globus_sdk import TransferAPIError
import configparser

"""
Setup clients and read token
"""
current_time = time.time()
THRESHOLD_DAYS = 60

admin_acls = ['9cb619d0-4417-11e8-8e06-0a6d4e044368', '580118b2-dc53-11e6-9d02-22000a1e3b52']
admin_names = ['dolsonadmin', 'dolson']

config = configparser.ConfigParser()
config.read("globus.cfg")
cf = config['globus']
endpoint_id = cf['endpoint_id']

client = globus_sdk.NativeAppAuthClient(cf['client_id'])
try:
transfer_authorizer = globus_sdk.RefreshTokenAuthorizer(cf['transfer_token'], client)
globus_transfer_client = globus_sdk.TransferClient(authorizer=transfer_authorizer)
auth_authorizer = globus_sdk.RefreshTokenAuthorizer(cf['auth_token'], client)
globus_auth_client = globus_sdk.AuthClient(authorizer=auth_authorizer)
except globus_sdk.GlobusAPIError as error:
logging.error(str(error.code) + error.raw_text)
raise Exception(str("Invalid Token Specified in globus.cfg file"))


def remove_directory(directory):
"""
:param directory: Directory to DELETE
:return: Log success or failure of deleting this directory to the log
"""
try:
logging.info("About to delete {}".format(directory))
#shutil.rmtree(directory)
except OSError as error:
logging.error("Couldn't delete {} {} {}".format(directory, error.message, error.filename))


def remove_acl(acl):
"""
:param acl: ACL To Delete
:return: Logs success or failure of deleting this ACL to the log
"""
logging.info(
"{}:About to remove ACL {} for {} (> {} days)".format(current_time, acl['id'], acl['path'],
THRESHOLD_DAYS))
# try:
# resp = globus_transfer_client.delete_endpoint_acl_rule(endpoint_id, acl['id'])
# except TransferAPIError as error:
# logging.error(error.raw_text)


def main():
logging.basicConfig(filename='prune_acl.log', level=logging.INFO)
logging.info("{}:BEGIN RUN".format(current_time))

old_acls = get_old_acls()

logging.info("{}:ATTEMPTING TO DELETE {} OLD ACLS".format(current_time, len(old_acls)))
for acl in old_acls:
remove_acl(acl.acl)
remove_directory(acl.dir)
logging.info("{}:END RUN".format(current_time))


def get_endpoint_acls():
"""
:return: Return a dictionary of endpoint ACLS using the Globus API
"""
try:
return globus_transfer_client.endpoint_acl_list(endpoint_id)['DATA']
except TransferAPIError as error:
print(error)


def directory_is_old(directory):
"""
:param directory:
:return: True or False depending on whether the directory has not been modified in more than THRESHOLD days
"""
try:
age = current_time - getmtime(directory)
except OSError:
return False

days = age / 60 / 60 / 24
if days > THRESHOLD_DAYS:
return True
return False


def get_old_acls():
"""
Get the size and modified date of the directories for each ACL
If the directory > threshold days, add it to the list of old_acls to be removed
:return: A list of ACLs to be removed
"""
acls = get_endpoint_acls()
logging.info("{}:FOUND {} acls".format(current_time, len(acls)))
old_acls = []
old_acl_and_dir = namedtuple("old_acl_and_dir", "acl dir")
for acl in acls:
directory = "/dtn/disk0/bulk" + acl['path']
if directory_is_old(directory) and acl['id'] not in admin_acls:
oad = old_acl_and_dir(acl, directory)
old_acls.append(oad)

return old_acls


if __name__ == '__main__':
main()
60 changes: 48 additions & 12 deletions staging_service/app.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,33 @@
from aiohttp import web
import aiohttp_cors
import os
from .metadata import stat_data, some_metadata, dir_info, add_upa, similar
import shutil
from .utils import Path, run_command, AclManager

import aiohttp_cors
from aiohttp import web

from .JGIMetadata import read_metadata_for, translate_for_importer
from .auth2Client import KBaseAuth2
from .globus import assert_globusid_exists, is_globusid
from .JGIMetadata import read_metadata_for, translate_for_importer
from .metadata import some_metadata, dir_info, add_upa, similar
from .utils import Path, run_command, AclManager

routes = web.RouteTableDef()
VERSION = '1.1.6'


@routes.get('/add-acl-concierge')
async def add_acl_concierge(request: web.Request):
username = await authorize_request(request)
user_dir = Path.validate_path(username).full_path
concierge_path = f"{Path._CONCIERGE_PATH}/{username}/"
aclm = AclManager()
result = aclm.add_acl_concierge(shared_directory=user_dir,
concierge_path=concierge_path)
result['msg'] = f'Requesting Globus Perms for the following globus dir: {concierge_path}'
result[
'link'] = f"https://app.globus.org/file-manager?destination_id={aclm.endpoint_id}&destination_path={concierge_path}"
return web.json_response(result)


@routes.get('/add-acl')
async def add_acl(request: web.Request):
username = await authorize_request(request)
Expand Down Expand Up @@ -240,7 +256,7 @@ async def upload_files_chunked(request: web.Request):

if not os.path.exists(path.full_path):
error_msg = 'We are sorry but upload was interrupted. Please try again.'.format(
path=path.full_path)
path=path.full_path)
raise web.HTTPNotFound(text=error_msg)

response = await some_metadata(
Expand Down Expand Up @@ -271,7 +287,7 @@ async def define_UPA(request: web.Request):
await add_upa(path, UPA)
return web.Response(
text='succesfully updated UPA {UPA} for file {path}'.format(UPA=UPA, path=path.user_path)
)
)


@routes.delete('/delete/{path:.+}')
Expand Down Expand Up @@ -350,7 +366,7 @@ async def decompress(request: web.Request):
elif file_extension == '.zip' or file_extension == '.ZIP':
await run_command('unzip', path.full_path, '-d', destination)
elif file_extension == '.tar':
await run_command('tar', 'xf', path.full_path, '-C', destination)
await run_command('tar', 'xf', path.full_path, '-C', destination)
elif file_extension == '.gz':
await run_command('gzip', '-d', path.full_path)
elif file_extension == '.bz2' or file_extension == 'bzip2':
Expand Down Expand Up @@ -383,10 +399,10 @@ def app_factory(config):
app.router.add_routes(routes)
cors = aiohttp_cors.setup(app, defaults={
"*": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_headers="*",
)
allow_credentials=True,
expose_headers="*",
allow_headers="*",
)
})
# Configure CORS on all routes.
for route in list(app.router.routes()):
Expand All @@ -395,12 +411,32 @@ def app_factory(config):
# potentially some type of code restructure would allow this without a bunch of globals
DATA_DIR = config['staging_service']['DATA_DIR']
META_DIR = config['staging_service']['META_DIR']
CONCIERGE_PATH = config['staging_service']['CONCIERGE_PATH']
if DATA_DIR.startswith('.'):
DATA_DIR = os.path.normpath(os.path.join(os.getcwd(), DATA_DIR))
if META_DIR.startswith('.'):
META_DIR = os.path.normpath(os.path.join(os.getcwd(), META_DIR))
if CONCIERGE_PATH.startswith('.'):
CONCIERGE_PATH = os.path.normpath(os.path.join(os.getcwd(), CONCIERGE_PATH))
Path._DATA_DIR = DATA_DIR
Path._META_DIR = META_DIR
Path._CONCIERGE_PATH = CONCIERGE_PATH

if Path._DATA_DIR is None:
raise Exception("Please provide DATA_DIR in the config file ")
else:
print("Setting DATA_DIR to", DATA_DIR)

if Path._META_DIR is None:
raise Exception("Please provide META_DIR in the config file ")
else:
print("Setting META_DIR dir to", META_DIR)

if Path._CONCIERGE_PATH is None:
raise Exception("Please provide CONCIERGE_PATH in the config file ")
else:
print("Setting CONCIERGE_PATH dir to", CONCIERGE_PATH)

global auth_client
auth_client = KBaseAuth2(config['staging_service']['AUTH_URL'])
return app
23 changes: 22 additions & 1 deletion staging_service/utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import asyncio
import configparser
import json
import logging
import os
import sys

import globus_sdk
from aiohttp.web import HTTPInternalServerError, HTTPOk
import json


async def run_command(*args):
Expand Down Expand Up @@ -41,6 +41,7 @@ async def run_command(*args):
class Path(object):
_META_DIR = None # expects to be set by config
_DATA_DIR = None # expects to be set by config
_CONCIERGE_PATH = None # expects to be set by config
__slots__ = ['full_path', 'metadata_path', 'user_path', 'name', 'jgi_metadata']

def __init__(self, full_path, metadata_path, user_path, name, jgi_metadata):
Expand All @@ -50,6 +51,7 @@ def __init__(self, full_path, metadata_path, user_path, name, jgi_metadata):
self.name = name
self.jgi_metadata = jgi_metadata


@staticmethod
def validate_path(username: str, path: str = ''):
"""
Expand All @@ -66,6 +68,7 @@ def validate_path(username: str, path: str = ''):
path = path[1:]
user_path = os.path.join(username, path)
full_path = os.path.join(Path._DATA_DIR, user_path)

metadata_path = os.path.join(Path._META_DIR, user_path)
name = os.path.basename(path)
jgi_metadata = os.path.join(os.path.dirname(full_path), '.' + name + '.jgi')
Expand Down Expand Up @@ -193,6 +196,24 @@ def _remove_acl(self, user_identity_id: str):
'user_identity_id': user_identity_id}
raise HTTPInternalServerError(text=json.dumps(response), content_type='application/json')

def add_acl_concierge(self, shared_directory: str, concierge_path: str):
"""
Add ACL to the concierge globus share via the globus API
:param shared_directory: Dir to get globus ID from and to generate id to create ACL for share
:param shared_concierge_directory: KBase Concierge Dir to add acl for
:return: Result of attempt to add acl
"""
user_identity_id = self._get_globus_identity(shared_directory)
cp_full = f"{Path._DATA_DIR}/{concierge_path}"
try:
os.mkdir(cp_full)
print(f"Attempting to create concierge dir {cp_full}")
except FileExistsError as e:
print(e)

return self._add_acl(user_identity_id, concierge_path)


def add_acl(self, shared_directory: str):
"""
Add ACL to the globus share via the globus API
Expand Down

0 comments on commit 2042fb8

Please sign in to comment.