diff --git a/.github/workflows/gh-pages.yml b/.github/workflows/gh-pages.yml new file mode 100644 index 00000000..8acb1f45 --- /dev/null +++ b/.github/workflows/gh-pages.yml @@ -0,0 +1,50 @@ +name: Build and deploy gh-pages branch with Mkdocs + +on: + # Runs every time main branch is updated + push: + branches: ["main"] + # Runs every time a PR is open against main + pull_request: + branches: ["main"] + workflow_dispatch: + +concurrency: + # Prevent 2+ copies of this workflow from running concurrently + group: dts-docs-action + +jobs: + Build-and-Deploy-docs: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + show-progress: false + fetch-depth: 0 # Needed, or else gh-pages won't be fetched, and push rejected + submodules: false # speeds up clone and not building anything in submodules + - name: Show action trigger + run: echo "= The job was automatically triggered by a ${{github.event_name}} event." + - name: Set up Python 3.10 + uses: actions/setup-python@v4.7.0 + with: + python-version: "3.10" + - name: Install python deps + run: python3 -m pip install mkdocs-material pymdown-extensions mkdocs-monorepo-plugin mdutils + # build every time (PR or push to main) + - name: Build + run: mkdocs build --strict --verbose + # deploy only when it is a push + - if: ${{ github.event_name == 'push' }} + name: GitHub Pages action + uses: JamesIves/github-pages-deploy-action@v4 + with: + # Do not remove existing pr-preview pages + clean-exclude: pr-preview + folder: ./site/ + # If it's a PR from within the same repo, deploy to a preview page + # For security reasons, PRs from forks cannot write into gh-pages for now + - if: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name == github.repository }} + name: Preview docs + uses: rossjrw/pr-preview-action@v1 + with: + source-dir: ./site/ diff --git a/.gitignore b/.gitignore index 2f7084d8..59097c6e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ .DS_Store dts -dts.yaml + +site/ +data/ diff --git a/README.md b/README.md index 51d86b3a..1affd942 100644 --- a/README.md +++ b/README.md @@ -72,9 +72,24 @@ to do: capabilities * `DTS_JDP_SECRET`: a string containing a shared secret that allows the DTS to authenticate with the JGI Data Portal -* `DTS_ON_LBL_VPN`: set this environment variable to any value (e.g. "1") to - indicate that the DTS is running on Lawrence Berkeley Lab's Virtual Private - Network. This enables the DTS to get information about files from JAMO that - are not available from the JGI Data Portal itself. +### Recording JAMO queries for testing in GitHub's CI environment +Currently, the JGI Data Portal does not provide a way to retrieve detailed +file information, so the DTS uses the JAMO service instead. This service is +only available from within LBNL's virtual private network, so the DTS provides +a way to "record" JAMO queries when it's run within this network. These recorded +queries can then be automatically played back in any testing environment in +which JAMO is unavailable. + +To record the JAMO queries needed by the testing environment, run the unit +tests for the JDP database with the `-record-jamo` argument: + +``` +go test ./databases/jdp/... -args -record-jamo +``` + +This places one or more "cassette" files in the `databases/jdp/fixtures` folder, +where they can be accessed by the testing system. Make sure to commit this +folder to the repository after recording the JAMO queries. You should also +delete any old fixture replaced by a new one. diff --git a/config/config.go b/config/config.go index b9c5c8c3..f304780f 100644 --- a/config/config.go +++ b/config/config.go @@ -44,11 +44,15 @@ type serviceConfig struct { // (for generating and transferring manifests) Endpoint string `json:"endpoint" yaml:"endpoint"` // name of existing directory in which DTS can store persistent data - // default: none (persistent storage disabled) - DataDirectory string `json:"data_dir,omitempty" yaml:"data_dir,omitempty"` + DataDirectory string `json:"data_dir" yaml:"data_dir,omitempty"` + // name of existing directory in which DTS writes manifest files (must be + // visible to endpoints) + ManifestDirectory string `json:"manifest_dir" yaml:"manifest_dir"` // time after which information about a completed transfer is deleted (seconds) // default: 7 days DeleteAfter int `json:"delete_after" yaml:"delete_after"` + // flag indicating whether debug logging and other tools are enabled + Debug bool `json:"debug" yaml:"debug"` } // global config variables @@ -86,7 +90,15 @@ func readConfig(bytes []byte) error { // copy the config data into place, performing any needed conversions Service = conf.Service + Endpoints = conf.Endpoints + for name, endpoint := range Endpoints { + if endpoint.Root == "" { + endpoint.Root = "/" + Endpoints[name] = endpoint + } + } + Databases = conf.Databases MessageQueues = conf.MessageQueues diff --git a/databases/jdp/database.go b/databases/jdp/database.go index 82ce7e25..ffb32d6b 100644 --- a/databases/jdp/database.go +++ b/databases/jdp/database.go @@ -61,7 +61,7 @@ var suffixToFormat = map[string]string{ "fasta.gz": "fasta", "fastq": "fastq", "fastq.gz": "fastq", - "fna": "fna", + "fna": "fasta", "gff": "gff", "gff3": "gff3", "gz": "gz", @@ -120,13 +120,17 @@ func formatFromFileName(fileName string) string { } } - // determine whether the file matches any of the supported suffixes + // determine whether the file matches any of the supported suffixes, + // selecting the longest matching suffix + format := "unknown" + longestSuffix := 0 for _, suffix := range supportedSuffixes { - if strings.HasSuffix(fileName, suffix) { - return suffixToFormat[suffix] + if strings.HasSuffix(fileName, suffix) && len(suffix) > longestSuffix { + format = suffixToFormat[suffix] + longestSuffix = len(suffix) } } - return "unknown" + return format } // extracts the file format from the name and type of the file @@ -219,7 +223,7 @@ func creditFromIdAndMetadata(id string, md Metadata) credit.CreditMetadata { func trimFileSuffix(filename string) string { for _, suffix := range supportedSuffixes { - trimmedFilename, trimmed := strings.CutSuffix(filename, suffix) + trimmedFilename, trimmed := strings.CutSuffix(filename, "."+suffix) if trimmed { return trimmedFilename } diff --git a/databases/jdp/database_test.go b/databases/jdp/database_test.go index d815bf2f..eba818ec 100644 --- a/databases/jdp/database_test.go +++ b/databases/jdp/database_test.go @@ -1,6 +1,7 @@ package jdp import ( + "flag" "os" "testing" @@ -39,6 +40,11 @@ func setup() { config.Init([]byte(jdpConfig)) databases.RegisterDatabase("jdp", NewDatabase) endpoints.RegisterEndpointProvider("globus", globus.NewEndpoint) + + // check for a "record-jamo" flag and stash the result in the recordJamo + // global package variable + flag.BoolVar(&recordJamo, "record-jamo", false, "records JAMO test queries for use in CI system") + flag.Parse() } // this function gets called after all tests have been run diff --git a/databases/jdp/fixtures/dts-jamo-cassette-a3d56e53c1e79718cb9bc6c9ebb2bb77.yaml b/databases/jdp/fixtures/dts-jamo-cassette-a3d56e53c1e79718cb9bc6c9ebb2bb77.yaml new file mode 100644 index 00000000..85b93fad --- /dev/null +++ b/databases/jdp/fixtures/dts-jamo-cassette-a3d56e53c1e79718cb9bc6c9ebb2bb77.yaml @@ -0,0 +1,56 @@ +--- +version: 2 +interactions: + - id: 0 + request: + proto: HTTP/1.1 + proto_major: 1 + proto_minor: 1 + content_length: 389 + transfer_encoding: [] + trailer: {} + host: jamo-dev.jgi.doe.gov + remote_addr: "" + request_uri: "" + body: '{"query":"select _id, file_name, file_path, metadata.file_format, file_size, md5_sum where _id in ( 57f9e03f7ded5e3135bc069e, 57f9d2b57ded5e3135bc0612, 57f9bcb77ded5e3135bc05a5, 584486367ded5e2d305c9a76, 582511047ded5e2d305af605, 582511047ded5e2d305af606, 57f7320c7ded5e3135bbd14d, 584486377ded5e2d305c9a77, 584486397ded5e2d305c9a7a, 582511037ded5e2d305af603 )","requestor":"dts@kbase.us"}' + form: {} + headers: + Content-Type: + - application/json; charset=utf-8 + url: https://jamo-dev.jgi.doe.gov/api/metadata/pagequery + method: POST + response: + proto: HTTP/2.0 + proto_major: 2 + proto_minor: 0 + transfer_encoding: [] + trailer: {} + content_length: -1 + uncompressed: true + body: '{"start": 1, "end": 10, "cursor_id": "L5ZHYRZFNS", "record_count": 10, "records": [{"_id": "57f7320c7ded5e3135bbd14d", "file_name": "10914.1.183618.CTCTCTA-TACTCCT.fastq.gz", "file_size": 2113188802, "file_path": "/global/dna/dm_archive/sdm/illumina/01/09/14", "metadata": {}}, {"_id": "57f9bcb77ded5e3135bc05a5", "file_name": "10927.1.183804.CTCTCTA-AGGCTTA.fastq.gz", "file_size": 1966895000, "file_path": "/global/dna/dm_archive/sdm/illumina/01/09/27", "metadata": {}}, {"_id": "57f9d2b57ded5e3135bc0612", "file_name": "10927.1.183804.CTCTCTA-AGGCTTA.filter-SAG.fastq.gz", "file_size": 2225019092, "file_path": "/global/dna/dm_archive/rqc/filtered_seq_unit/00/01/09/27", "metadata": {}}, {"_id": "57f9e03f7ded5e3135bc069e", "file_name": "10927.1.183804.CTCTCTA-AGGCTTA.QC.pdf", "metadata": {}, "file_size": 227745, "file_path": "/global/dna/dm_archive/rqc"}, {"_id": "582511037ded5e2d305af603", "file_name": "sag_decontam_output_clean.fna", "file_size": 405150, "file_path": "/global/dna/dm_archive/qaqc/analyses/AUTO-33725", "metadata": {"file_format": "fasta"}}, {"_id": "582511047ded5e2d305af605", "file_name": "sag-BHAPS-screen.txt", "file_size": 4080, "file_path": "/global/dna/dm_archive/qaqc/analyses/AUTO-33725", "metadata": {"file_format": "txt"}}, {"_id": "582511047ded5e2d305af606", "file_name": "10914.1.183618.CTCTCTA-TACTCCT.filter-SAG.norm.subsample.fastq.gz", "file_size": 73043045, "file_path": "/global/dna/dm_archive/qaqc/analyses/AUTO-33725", "metadata": {"file_format": "fastq"}}, {"_id": "584486367ded5e2d305c9a76", "file_name": "101345.assembled.faa", "file_size": 131265, "file_path": "/global/dna/dm_archive/img/submissions/101345", "metadata": {}}, {"_id": "584486377ded5e2d305c9a77", "file_name": "101345.assembled.names_map", "file_size": 1470, "file_path": "/global/dna/dm_archive/img/submissions/101345", "metadata": {}}, {"_id": "584486397ded5e2d305c9a7a", "file_name": "101345.pipeline_version.info", "file_size": 673, "file_path": "/global/dna/dm_archive/img/submissions/101345", "metadata": {}}], "fields": ["_id", "file_name", "file_path", "metadata.file_format", "file_size", "md5_sum"], "timeout": 540}' + headers: + Access-Control-Allow-Headers: + - Content-Type, Authorization, X-Requested-With + Access-Control-Allow-Methods: + - GET, PUT, POST, DELETE, OPTIONS + Access-Control-Allow-Origin: + - '*' + Access-Control-Max-Age: + - "1000" + Cf-Cache-Status: + - DYNAMIC + Cf-Ray: + - 85fcf008182b983d-SJC + Content-Type: + - application/json;charset=utf-8 + Date: + - Tue, 05 Mar 2024 20:43:19 GMT + Server: + - cloudflare + Vary: + - Accept-Encoding + Via: + - 1.1 jamo-dev.jgi.doe.gov + status: 200 OK + code: 200 + duration: 94.148234ms diff --git a/databases/jdp/fixtures/dts-jamo-cassette.yaml b/databases/jdp/fixtures/dts-jamo-cassette.yaml deleted file mode 100644 index df48e796..00000000 --- a/databases/jdp/fixtures/dts-jamo-cassette.yaml +++ /dev/null @@ -1,56 +0,0 @@ ---- -version: 2 -interactions: - - id: 0 - request: - proto: HTTP/1.1 - proto_major: 1 - proto_minor: 1 - content_length: 389 - transfer_encoding: [] - trailer: {} - host: jamo-dev.jgi.doe.gov - remote_addr: "" - request_uri: "" - body: '{"query":"select _id, file_name, file_path, metadata.file_format, file_size, md5_sum where _id in ( 57f9e03f7ded5e3135bc069e, 57f9d2b57ded5e3135bc0612, 57f9bcb77ded5e3135bc05a5, 57f7e5c47ded5e3135bbd21e, 582509de7ded5e2d305af5d2, 584486257ded5e2d305c9a5c, 582509dd7ded5e2d305af5d1, 584486227ded5e2d305c9a56, 584486237ded5e2d305c9a57, 584486237ded5e2d305c9a58 )","requestor":"dts@kbase.us"}' - form: {} - headers: - Content-Type: - - application/json; charset=utf-8 - url: https://jamo-dev.jgi.doe.gov/api/metadata/pagequery - method: POST - response: - proto: HTTP/2.0 - proto_major: 2 - proto_minor: 0 - transfer_encoding: [] - trailer: {} - content_length: -1 - uncompressed: true - body: '{"start": 1, "end": 10, "cursor_id": "8OAX28LMI7", "record_count": 10, "records": [{"_id": "57f7e5c47ded5e3135bbd21e", "file_name": "10914.1.183618.CTCTCTA-ATTAGAC.QC.pdf", "metadata": {}, "file_size": 229766, "file_path": "/global/dna/dm_archive/rqc"}, {"_id": "57f9bcb77ded5e3135bc05a5", "file_name": "10927.1.183804.CTCTCTA-AGGCTTA.fastq.gz", "file_size": 1966895000, "file_path": "/global/dna/dm_archive/sdm/illumina/01/09/27", "metadata": {}}, {"_id": "57f9d2b57ded5e3135bc0612", "file_name": "10927.1.183804.CTCTCTA-AGGCTTA.filter-SAG.fastq.gz", "file_size": 2225019092, "file_path": "/global/dna/dm_archive/rqc/filtered_seq_unit/00/01/09/27", "metadata": {}}, {"_id": "57f9e03f7ded5e3135bc069e", "file_name": "10927.1.183804.CTCTCTA-AGGCTTA.QC.pdf", "metadata": {}, "file_size": 227745, "file_path": "/global/dna/dm_archive/rqc"}, {"_id": "582509dd7ded5e2d305af5d1", "file_name": "sag-BHAPU-screen.pdf", "file_size": 692808, "file_path": "/global/dna/dm_archive/qaqc/analyses/AUTO-33721", "metadata": {"file_format": "pdf"}}, {"_id": "582509de7ded5e2d305af5d2", "file_name": "sag-BHAPU-screen.txt", "file_size": 4272, "file_path": "/global/dna/dm_archive/qaqc/analyses/AUTO-33721", "metadata": {"file_format": "txt"}}, {"_id": "584486227ded5e2d305c9a56", "file_name": "2706794829.tar.gz", "metadata": {}, "file_size": 1206871, "file_path": "/global/dna/dm_archive/img/submissions/101341"}, {"_id": "584486237ded5e2d305c9a57", "file_name": "101341.assembled.fna", "file_size": 1151157, "file_path": "/global/dna/dm_archive/img/submissions/101341", "metadata": {}}, {"_id": "584486237ded5e2d305c9a58", "file_name": "101341.assembled.faa", "file_size": 370266, "file_path": "/global/dna/dm_archive/img/submissions/101341", "metadata": {}}, {"_id": "584486257ded5e2d305c9a5c", "file_name": "101341.assembled.gbk", "file_size": 2282302, "file_path": "/global/dna/dm_archive/img/submissions/101341", "metadata": {}}], "fields": ["_id", "file_name", "file_path", "metadata.file_format", "file_size", "md5_sum"], "timeout": 180}' - headers: - Access-Control-Allow-Headers: - - Content-Type, Authorization, X-Requested-With - Access-Control-Allow-Methods: - - GET, PUT, POST, DELETE, OPTIONS - Access-Control-Allow-Origin: - - '*' - Access-Control-Max-Age: - - "1000" - Cf-Cache-Status: - - DYNAMIC - Cf-Ray: - - 84dd6135cefa96a7-SJC - Content-Type: - - application/json;charset=utf-8 - Date: - - Tue, 30 Jan 2024 23:08:56 GMT - Server: - - cloudflare - Vary: - - Accept-Encoding - Via: - - 1.1 jamo-dev.jgi.doe.gov - status: 200 OK - code: 200 - duration: 166.261801ms diff --git a/databases/jdp/jamo.go b/databases/jdp/jamo.go index af5f6083..2635f54d 100644 --- a/databases/jdp/jamo.go +++ b/databases/jdp/jamo.go @@ -7,12 +7,13 @@ package jdp import ( "bytes" + "crypto/md5" "encoding/json" "fmt" "io" "log/slog" "net/http" - "os" + "strings" "time" "gopkg.in/dnaeon/go-vcr.v3/recorder" @@ -76,26 +77,60 @@ type jamoPageQueryResponse struct { Records []jamoFileRecord `json:"records"` } +// this flag is true until the first query to JAMO +var jamoFirstQuery = true + +// this flag indicates whether JAMO is available (i.e. whether DTS is running +// in the proper domain), and can only be trusted when jamoFirstQuery is false +var jamoIsAvailable = false + +// this flag indicates whether we want to record JAMO queries (usually in a +// testing environment, where it's set) +var recordJamo = false + // This function gathers and returns all jamo file records that correspond to // the given list of file IDs. The list of files is returned in the same order // as the list of file IDs. func queryJamo(fileIds []string) ([]jamoFileRecord, error) { - // fire up a recorder based on our testing needs (since we rely on JAMO - // for fetching file paths, and since JAMO only works within LBL's VPN) - var recordingMode recorder.Mode - if _, onVPN := os.LookupEnv("DTS_ON_LBL_VPN"); onVPN { + const jamoBaseUrl = "https://jamo-dev.jgi.doe.gov/" + + if jamoFirstQuery { + // poke JAMO to see whether it's available in the current domain + resp, err := http.Get(jamoBaseUrl) + if err != nil { + return nil, err + } + if resp.StatusCode == http.StatusOK { // success! + jamoIsAvailable = true + } + jamoFirstQuery = false + } + + // create a checksum that uniquely identifies the set of requested file IDs + checksum := md5.Sum([]byte(strings.Join(fileIds, ","))) + + // set up a "VCR" to manage the recording and playback of JAMO queries + vcrMode := recorder.ModePassthrough // no recording or playback by default + cassetteName := fmt.Sprintf("fixtures/dts-jamo-cassette-%x", checksum) + if jamoIsAvailable { slog.Debug("Querying JAMO for file resource info") - recordingMode = recorder.ModeRecordOnly - } else { - slog.Debug("Using pre-recorded JAMO results for query") - recordingMode = recorder.ModeReplayOnly + if recordJamo { + slog.Debug("Recording JAMO query") + vcrMode = recorder.ModeRecordOnly + } + } else { // JAMO not available -- playback + slog.Debug("JAMO unavailable -- using pre-recorded results for query") + vcrMode = recorder.ModeReplayOnly } - r, err := recorder.NewWithOptions(&recorder.Options{ - CassetteName: "fixtures/dts-jamo-cassette", - Mode: recordingMode, + vcr, err := recorder.NewWithOptions(&recorder.Options{ + CassetteName: cassetteName, + Mode: vcrMode, }) - defer r.Stop() - client := r.GetDefaultClient() + if err != nil { + return nil, fmt.Errorf("queryJamo: %s", err.Error()) + } + defer vcr.Stop() + client := vcr.GetDefaultClient() // prepare a JAMO query with the desired file IDs // (also record the indices of each file ID so we can preserve their order) @@ -120,9 +155,8 @@ func queryJamo(fileIds []string) ([]jamoFileRecord, error) { } // do the initial POST to JAMO and fetch results - const jamoBaseURL = "https://jamo-dev.jgi.doe.gov/api/metadata/" - - const jamoPageQueryURL = jamoBaseURL + "pagequery" + const jamoApiUrl = jamoBaseUrl + "api/metadata/" + const jamoPageQueryURL = jamoApiUrl + "pagequery" req, err := http.NewRequest(http.MethodPost, jamoPageQueryURL, bytes.NewReader(payload)) if err != nil { return nil, err @@ -161,8 +195,8 @@ func queryJamo(fileIds []string) ([]jamoFileRecord, error) { // go back for more records if results.End < results.RecordCount { - jamoNextPageURL := fmt.Sprintf("%snextpage/%s", jamoBaseURL, results.CursorId) - req, err = http.NewRequest(http.MethodGet, jamoNextPageURL, http.NoBody) + jamoNextPageUrl := fmt.Sprintf("%snextpage/%s", jamoApiUrl, results.CursorId) + req, err = http.NewRequest(http.MethodGet, jamoNextPageUrl, http.NoBody) if err != nil { break } diff --git a/databases/kbase/database.go b/databases/kbase/database.go index 61578601..ffbee243 100644 --- a/databases/kbase/database.go +++ b/databases/kbase/database.go @@ -25,6 +25,7 @@ import ( "encoding/json" "fmt" "os" + "path/filepath" "github.com/google/uuid" @@ -76,12 +77,13 @@ func (db *Database) Endpoint() (endpoints.Endpoint, error) { } // FIXME: currently we store a mapping of orcid IDs -> KBase user names -// FIXME: in a file called "kbase_users.json" in the DTS's working dir. +// FIXME: in a file called "kbase_users.json" in the DTS's data folder var kbaseUsers map[string]string func (db *Database) LocalUser(orcid string) (string, error) { if kbaseUsers == nil { - data, err := os.ReadFile("kbase_users.json") + kbaseUsersFile := filepath.Join(config.Service.DataDirectory, "kbase_users.json") + data, err := os.ReadFile(kbaseUsersFile) if err == nil { err = json.Unmarshal(data, &kbaseUsers) } else { diff --git a/deployment/Dockerfile b/deployment/Dockerfile new file mode 100644 index 00000000..f7ab1a9c --- /dev/null +++ b/deployment/Dockerfile @@ -0,0 +1,137 @@ +# This is the DTS Docker file. It's a multi-ѕtage build that results in a +# minimal-esque image with a small number of files: +# +# 1. /bin/dts (statically linked) +# 2. /etc/dts.yaml +# +# Additionally, some useful operating system utilities are provided. +# +# The service runs on a port determined by the PORT environment variable. You +# can map this port to any port on the host using the -p flag of the +# `docker run` command. +# +# --------------- +# BUILD ARGUMENTS +# --------------- +# +# You can provide a username/UID pair and a group name/GID pair that identify +# a user/group on the host system that confer the privileges for the service +# to operate properly (e.g. file system access). Specify each of these as build +# arguments to `docker build` with `--build-arg NAME=VAL`. +# +# The following build variables are used when the Docker image is being built: +# CONTACT_NAME: The name of the point of contact for the service +# CONTACT_EMAIL: The email address for the point of contact +# SERVER_URL: The URL for the running service (with slashes escaped) +# TERMS_OF_SERVICE: The URL at which the terms of service may be retrieved (with +# slashes escaped) +# +# The following build variables are used as environment variables during +# run-time: +# USERNAME: The name of the (non-root) user that runs the service +# (default: gsuser) +# UID: The ID corresonding to the user on the host system (default: $UID) +# GROUP: The name of the group to which the user belongs while running the +# service (default: gsuser) +# GID: The ID corresonding to the group on the host system (default: $UID) +# +# All of these build arguments have defaults in case they are not needed. +# +# ------------------ +# CONFIGURATION FILE +# ------------------ +# +# The docker build process assumes that there exists a configuration file +# named dts.yaml in the deployment/ directory of the project. +# This file contains the configuration that the Docker image uses to run the +# service. +# +# --------------------- +# ENVIRONMENT VARIABLES +# --------------------- +# +# The docker image assumes that certain environment variables are defined at +# run-time. See `dts.yaml` for a list of these variables and an illustration +# of how they are used. +# +# ------------------ +# BUILDING THE IMAGE +# ------------------ +# +# To build the image, issue the following commands from the top-level +# directory: +# +# 1. docker build --network=host [--build-arg ARG1=VAL1 ...] -t dts . +# 2. docker image prune --filter label=stage=builder +# +# The first command builds the image using an intermediate "builder" image. +# The second command deletes the builder image. + +#------------------------------------------------------- +# Dockerfile content starts here +#------------------------------------------------------- + +# Here's a reasonable Go-equipped Docker image. +FROM golang:1.21-alpine AS builder + +# Process build arguments and pass them along to the environment. +ARG CONTACT_NAME=Unspecified +ARG CONTACT_EMAIL=dts@example.com +ARG TERMS_OF_SERVICE_URL=http://example.com +ARG SERVER_URL=http://example.com + +#------------------------------------------------------- +# Build the service executable. +#------------------------------------------------------- +# We label this image as "builder" so we can prune it afterward with +# docker image prune --filter label=stage=builder +LABEL stage=builder +# We need Git, Curl, Zlib, and npm. +RUN apk update && apk add --no-cache git curl build-base zlib-dev zlib-static npm +# Set the directory we work in within the image. +WORKDIR /go/dts +# Copy files from our context to the working directory. +COPY . . +# Use redoc-cli to generate API documentation +RUN npm install -g redoc-cli +RUN go mod vendor +RUN go generate +# Build dts as a static executable with documentation endpoints. +RUN env CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -tags docs -a + +#-------------------------------------------------------- +# Now we create a smaller image containing only +# 1. the service executable +# 2. a generated configuration file (dts.yaml) +# 3. other UNIX stuff provided by the `alpine` base image +#-------------------------------------------------------- + +# Here's the alpine Docker container. If you ever want to peek at its contents, +# type `docker run -it alpine /bin/ls`. +FROM alpine:3.19 + +# Process environment variable build arguments +ARG USERNAME=gsuser +ENV USERNAME=$USERNAME +ARG UID=1000 +ENV UID=$UID +ARG GROUP=gsgroup +ENV GROUP=$GROUP +ARG GID=1001 +ENV GID=$GID + +# Copy essential stuff from the builder image. +COPY --from=builder /go/dts/dts /bin/dts +COPY --from=builder /go/dts/deployment/dts.yaml /etc/dts.yaml + +ENV PATH=/bin:/usr/bin:/usr/sbin + +# Add the user and group for this image and set it up. +RUN /usr/sbin/adduser -u $UID -H -D $USERNAME && \ + /usr/sbin/addgroup -g $GID $GROUP && \ + /usr/sbin/addgroup $USERNAME $GROUP +USER $USERNAME + +# Run the service on the given port with the given config file. +ENTRYPOINT ["/bin/dts"] +CMD ["/etc/dts.yaml"] diff --git a/deployment/deploy-to-spin.sh b/deployment/deploy-to-spin.sh new file mode 100755 index 00000000..6476fb53 --- /dev/null +++ b/deployment/deploy-to-spin.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash + +# This script builds a Docker image for DTS and deploys it to the +# NERSC Spin Registry. Run it like so from your top-level DTS folder: +# +# `./deployment/deploy-to-spin.sh ` +# +# where the arguments refer to your NERSC username, user ID, the group to +# which you want your user to belong within the Docker container, and the +# corresponding group ID. These identifiers are used to provide proper +# access to resources on NERSC global filesystems. +# +# For this script to work, Docker must be installed on your machine. When the +# `docker login` command is run, you'll be prompted for your NERSC password +# (WITHOUT a one-time code). +TAG=$1 +USERNAME=$2 +USERID=$3 +GROUP=$4 +GID=$5 + +if [[ "$1" == "" || "$2" == "" || "$3" == "" || "$4" == "" || "$5" == "" ]]; then + echo "Usage: $0 " + exit +fi + +# Build the image locally. It's a multi-stage build (see Dockerfile up top), so +# make sure we prune the "builder" image afterward. +docker build -f deployment/Dockerfile -t dts:$TAG --network=host \ + --build-arg CONTACT_NAME="Jeffrey N. Johnson" \ + --build-arg CONTACT_EMAIL="jeff@cohere-llc.com" \ + --build-arg SERVER_URL="https:\/\/dts.kbase.us" \ + --build-arg TERMS_OF_SERVICE_URL="TBD" \ + --build-arg USERNAME=$USERNAME \ + --build-arg UID=$USERID \ + --build-arg GROUP=$GROUP \ + --build-arg GID=$GID \ + . +if [[ "$?" != "0" ]]; then + exit +fi +docker image prune -f --filter label=stage=builder + +# Tag the image as instructed in Lesson 2 of the NERSC Spin Overview +# (https://docs.nersc.gov/services/spin/getting_started/lesson-2/) +docker image tag dts:$TAG registry.nersc.gov/kbase/dts:$TAG + +# Log in to the NERSC Spin Registry, push the image, and log out. +echo "Please enter $USERNAME's NERSC password (without a one-time token) below." +docker login -u $USERNAME https://registry.nersc.gov/ +docker image push registry.nersc.gov/kbase/dts:$TAG +docker logout https://registry.nersc.gov/ + diff --git a/deployment/dts.yaml b/deployment/dts.yaml new file mode 100644 index 00000000..73b7fb6c --- /dev/null +++ b/deployment/dts.yaml @@ -0,0 +1,67 @@ +# This file defines a configuration for the Data Transfer Service (DTS) +# running in a Docker container. The host environment must define the following +# environment variables: +# +# DATA_DIRECTORY: an absolute path to the directory used by DTS to manage its +# own data +# MANIFEST_DIRECTORY: an absolute path to the directory used by DTS to write +# transfer manifests (must be visible to an endpoint) +# DEBUG: true to enable debug-level logging, false to disable +# GLOBUS_CLIENT_ID: the Globus client ID registered for the DTS application +# GLOBUS_CLIENT_SECRET: the Globus client secret associated with the client ID +# JDP_ENDPOINT_ID: a UUID identifying the JGI Data Portal Globus endpoint +# KBASE_ENDPOINT_ID: a UUID identifying the KBase Globus endpoint +# LOCAL_ENDPOINT_ID: a UUID identifying the DTS local Globus endpoint +# (for writing transfer manifests) +# MAX_CONNECTIONS: the maximum number of connections allowed for accepting +# and queueing queries. Any additional connections are +# rejected. +# POLL_INTERVAL: the interval at which DTS updates tasks (milliseconds) +# PORT: the port on which the service listens +# PURGE_INTERVAL: the interval at which DTS purges completed task records +# (seconds) + +service: + port: ${PORT} + max_connections: ${MAX_CONNECTIONS} + poll_interval: ${POLL_INTERVAL} + endpoint: globus-local + data_dir: ${DATA_DIRECTORY} + manifest_dir: ${MANIFEST_DIRECTORY} + delete_after: ${PURGE_INTERVAL} + debug: ${DEBUG} + +endpoints: + globus-local: + name: DTS Local Endpoint + id: ${LOCAL_ENDPOINT_ID} + provider: globus + auth: + client_id: ${GLOBUS_CLIENT_ID} + client_secret: ${GLOBUS_CLIENT_SECRET} + globus-jdp: + name: DTS JGI Share + id: ${JDP_ENDPOINT_ID} + provider: globus + root: /dm_archive + auth: + client_id: ${GLOBUS_CLIENT_ID} + client_secret: ${GLOBUS_CLIENT_SECRET} + globus-kbase: + name: KBase Bulk Share + id: ${KBASE_ENDPOINT_ID} + provider: globus + root: /jeff_cohere + auth: + client_id: ${GLOBUS_CLIENT_ID} + client_secret: ${GLOBUS_CLIENT_SECRET} + +databases: # databases between which files can be transferred + jdp: + name: JGI Data Portal + organization: Joint Genome Institute + endpoint: globus-jdp + kbase: + name: KBase Workspace Service (KSS) + organization: KBase + endpoint: globus-kbase diff --git a/docs/admin/config.md b/docs/admin/config.md new file mode 100644 index 00000000..cecbf0b6 --- /dev/null +++ b/docs/admin/config.md @@ -0,0 +1,83 @@ +# Configuring DTS + +You can configure a DTS instance by creating a [YAML](https://yaml.org/) text +file similar to [dts.yaml.example](https://github.com/kbase/dts/blob/main/dts.yaml.example) +in the repository. Typically this file is named `dts.yaml`, and is passed as an +argument to the `dts` executable. Here we describe the different sections in +this file and how they affect your DTS instance. + +## Configuration File Sections + +Click on any of the links below to see the relevant details for a section. + +* [service](config.md#service): configureѕ settings for the DTS web service + such as the port on which it listens, the maximum number of connections, + intervals for polling and scrubbing completed tasks, data directories, and + diagnostics +* [endpoints](config.md#endpoints): configures the endpoints used to transfer + files from one place to another +* [databases](config.md#databases): configures databases for organizations that + integrate with the DTS + +Each of these sections is described below, with a motivating example. + +## `service` + +```yaml +service: + port: 8080 + max_connections: 100 + poll_interval: 60000 + endpoint: globus-local + data_dir: /path/to/dir + delete_after: 604800 + debug: true +``` + +**TODO: write some stuff!** + +## `endpoints` + +```yaml +endpoints: + globus-local: + name: name-of-local-endpoint + id: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx + provider: globus + auth: + client_id: + client_secret: + globus-jdp: + name: name-of-jdp-endpoint + id: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx + provider: globus + auth: + client_id: + client_secret: + globus-kbase: + name: name-of-kbase-endpoint + id: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx + provider: globus + auth: + client_id: + client_secret: +``` + +**TODO: Things and stuff** + +## `databases` + +```yaml +databases: + jdp: + name: JGI Data Portal + organization: Joint Genome Institute + endpoint: globus-jdp + kbase: + name: KBase Workspace Service (KSS) + organization: KBase + endpoint: globus-kbase +``` + +**TODO: Alll the things** + diff --git a/docs/admin/deployment.md b/docs/admin/deployment.md new file mode 100644 index 00000000..86928bfb --- /dev/null +++ b/docs/admin/deployment.md @@ -0,0 +1,107 @@ +# Deploying DTS via Docker + +You can use the `Dockerfile` and `dts.yaml` files in the `deployment` folder to +build a Docker image for DTS. The Docker image contains two files: + +1. `/bin/dts`: the statically-linked `dts` executable +2. `/etc/dts.yaml`: a [DTS configuration file](config.md) with embedded + environment variables that control parameters of interest + +This image can be deployed in any Docker-friendly environment. The use of +environment variables in the configuration file allows you to configure +DTS without regenerating the image. + +## Deploying to NERSC's Spin Environment + +DTS is hosted in NERSC's [Spin](https://www.nersc.gov/systems/spin/) +environment under [Rancher 2](https://rancher2.spin.nersc.gov/login). +It runs in the `Production` environment under the `kbase` organization. +You can read about Spin in NERSC's documentation, and Rancher 2 +[here](https://rancher.com/docs/rancher/v2.x/en/). The documentation +isn't great, but fortunately there's not a lot to know--most of the +materials you'll need are right here in the `deployment` folder. + +Deploying DTS to Spin involves + +1. updating and pushing a new Docker image with any code changes and + documentation updates +2. editing the `dts` Spin deployment via NERSC's + [Rancher 2](https://rancher2.spin.nersc.gov/login) console + +Each of these steps are fairly simple. + +Before you perform an update, take some time to familiarize yourself +with the Rancher 2 console and the `dts` production deployment. +The most important details are: + +* The service and its supporting executables and configuration data are + supplied by its Docker image +* Configurable settings for the service are stored in environment variables + that can be set in the Rancher 2 console +* The DTS data directory (used for keeping track of ongoing tasks and for + generating transfer manifests) resides on the NERSC Community File System + (CFS) under `/global/cfs/cdirs/kbase/dts/`. This volume is visible to the + service as `/data`, so the `DATA_DIRECTORY` environment variable should be + set to `/data`. +* The DTS manifest directory (used for writing transfer manifest files that + get transferred to destination endpoints) also resides on the NERSC + Community File System (CFS), but under `/global/cfs/cdirs/kbase/gsharing/dts/manifests` + so that it is accessible via a Globus endpoint. This volume is visible to + the service as `/manifests`, so the `MANIFEST_DIRECTORY` environment variable + should be set to `/manifests`. **NOTE: the directory must be the same when + viewed by the service and the Globus Collection! If there is a mismatch, + the service will not be able to write the manifest OR Globus will not be + able to transfer it.** + +Let's walk through the process of updating and redeploying the DTS in Spin. + +### 1. Update and Push a New Docker Image to Spin + +From within a clone of the [DTS GitHub repo](https://github.com/kbase/dts), make +sure the repo is up to date by typing `git pull` in the `main` branch. + +Then, sitting in the top-level `dts` source folder of your `dts`, execute +the `deploy-to-spin.sh` script, passing as arguments + +1. the name of a tag to identify the new Docker image +2. the name of the NERSC user whose permissions are used for CFS +3. the UID of the NERSC user +4. the group used to determine the user's group permissions +5. the GID of the above group + +For example, + +``` +./deployment/deploy-to-spin.sh v1.1 johnson 52710 kbase 54643 +``` + +builds a new DTS Docker image for to be run as the user `johnson`, +with the tag `v1.1`. The script pushes the Docker image to [Harbor, the +NERSC Docker registry](https://registry.nersc.gov). Make sure the tag +indicates the current version of `dts` (e.g. `v1.1`) for clarity. + +After building the Docker image and tagging it, the script prompts you for the +NERSC password for the user you specified. This allows it to push the image to +Harbor so it can be accessed via the Rancher 2 console. + +### 2. Edit the Deployment in Rancher 2 and Restart the Service + +Now log in to [Rancher 2](https://rancher2.spin.nersc.gov/login) and +navigate to the `dts` deployment. + +1. Click on the `dts` pod to view its status and information +2. Click on the three dots near the right side of the screen and select + `Edit` to update its configuration. +3. If needed, navigate to the `Volumes` section and edit the CFS directory for + the volume mounted at `/data`. Usually, this is set to `/global/cfs/cdirs/kbase/dts/`, + so you usually don't need to edit this. Similarly, check the volume mounted + at `/manifests` (usually set to `/global/cfs/cdirs/kbase/gsharing/manifests/`). +4. Edit the Docker image for the deployment, changing the tag after the colon + to match the tag of the Docker image pushed by `deploy-to-spin.sh` above. +5. Make sure that the Scaling/Upgrade Policy on the Deployment is set to + `Recreate: KILL ALL pods, then start new pods.` This ensures that the + service in the existing pod can save a record of its ongoing tasks before a + service in a new pod tries to restore them. +6. Click `Save` to restart the deployment with this new information. + +That's it! You've now updated the service with new features and bugfixes. diff --git a/docs/admin/globus.md b/docs/admin/globus.md new file mode 100644 index 00000000..e7431a57 --- /dev/null +++ b/docs/admin/globus.md @@ -0,0 +1,52 @@ +# Granting DTS Access to a Globus Endpoint + +The Data Transfer Service relies heavily on [Globus](https://www.globus.org/) +for performing file transfers between different databases. Globus is an elaborate +and continuously evolving platform, so configuring access from an application +can be confusing. Here we describe all the things you need to know to grant +DTS access to a Globus endpoint. + +## Globus Glossary + +Globus has its own set of terminology that is slightly different from that we've + used to describe DTS, so let's clarify some definitions first. + +* **Globus Endpoint**: A Globus endpoint is a server running Globus software, + providing access to a filesystem that can be shared with Globus users. To DTS, + an endpoint is "a thing that can send and receive files." +* **Globus Collection**: A Globus Collection is a portion of a filesystem on a + Globus Endpoint associated with roles and permissions for Globus users. It is + not a server--it's a set of metadata that tells Globus which users have what + access privileges on a Globus Endpoint. +* **Globus Guest Collection**: A Guest Collection is a Globus Collection that allows + a Globus user to share files on with other Globus users and with applications. + In particular, a Guest Collections is the _only mechanism_ that can provide + client applications with access to resources on Globus endpoints. This is the + closest concept to what DTS considers an endpoint. + +## Setting up Access to a Globus Endpoint + +[This guide](https://docs.globus.org/guides/recipes/automate-with-service-account/) +gives a complete set of instructions using the terminology above. Below, we briefly +summarize the steps in the guide. Of course, you need a Globus user account to play +this game. + +1. **Obtain an Application/Service Credential for DTS.** The credential consists of + a unique client ID and an associated client secret. The client ID can be used to + identify the DTS as an entity that can be granted access permissions. Of course, + the primary instance of the DTS already has one of these. + +2. **Create a Guest Collection on the Globus Endpoint or on an existing Collection.** + Without a Guest Collection, you can't grant the DTS access to anything. You might + have to poke around a bit to find an endpoint or existing collection that (a) you + have access to and (b) that exposes the resources that you want to grant to the + DTS. + +3. **Grant DTS read or read/write access to the Guest Collection.** Since the DTS + has its own client ID, you can grant it access to a Guest Collection just as you + would any other Globus user. + +The DTS stores its Globus credentials (client ID, client secret) in environment +variables to prevent them from being read from a file or mined from the executable. +The [deployment](deployment.md) section describes how these environment variables +are managed in practice. diff --git a/docs/admin/index.md b/docs/admin/index.md new file mode 100644 index 00000000..04d3e4a5 --- /dev/null +++ b/docs/admin/index.md @@ -0,0 +1,8 @@ +# DTS Administrator Guide + +* [Installing DTS Locally](installation.md) +* [Deploying DTS via Docker](deployment.md) +* [Configuring DTS](config.md) +* [Granting DTS Access to a Globus Endpoint](globus.md) + +More soon! diff --git a/docs/admin/installation.md b/docs/admin/installation.md new file mode 100644 index 00000000..3bb04032 --- /dev/null +++ b/docs/admin/installation.md @@ -0,0 +1,59 @@ +# Installing DTS Locally + +Here we describe how to build, test, and install the Data Transfer Service +in a local environment. + +## Building and Testing + +DTS is written in [Go](https://go.dev/), so you'll need a working Go compiler +to build, test, and run it locally. If you have a Go compiler, you can clone +this repository and build it from the top-level directory: + +``` +go build +``` + +### Running Unit Tests + +DTS comes with several unit tests that demonstrate its capabilities, and you can +run these tests as you would any other Go project: + +``` +go test ./... +``` + +You can add a `-v` flag to see output from the tests. + +Because DTS is primarily an orchestrator of network resources, its unit tests +must be able to connect to and utilize these resources. Accordingly, you must +set the following environment variables to make sure DTS can do what it needs +to do: + +* `DTS_KBASE_DEV_TOKEN`: a developer token for the KBase **production** + environment (available to [KBase developers](https://docs.kbase.us/development/create-a-kbase-developer-account) + used to connect to the KBase Auth Server, which provides a context for + authenticating and authorizing DTS for its basic operations. You can create + a token [from your KBase developer account](https://kbase.github.io/kb_sdk_docs/tutorial/3_initialize.html#set-up-your-developer-credentials). +* `DTS_KBASE_TEST_ORCID`: an [ORCID](https://orcid.org/) identifier that can be + used to run DTS's unit test. This identifier must match a registered ORCID ID + associated with a [KBase user account](https://narrative.kbase.us/#signup). +* `DTS_KBASE_TEST_USER`: the KBase user associated with the ORCID specified + by `DTS_KBASE_TEST_ORCID`. **NOTE: at the time of writing, KBase does not have + a mechanism for mapping ORCID IDs to local users, so the DTS uses a file in + its data directory called `kbase_users.json` consisting of a single JSON + object whose keys are ORCID IDs and whose values are local usernames.** +* `DTS_GLOBUS_CLIENT_ID`: a client ID registered using the + [Globus Developers](https://docs.globus.org/globus-connect-server/v5/use-client-credentials/#register-application) + web interface. This ID must be registered specifically for an instance of DTS. +* `DTS_GLOBUS_CLIENT_SECRET`: a client secret associated with the client ID + specified by `DTS_GLOBUS_CLIENT_ID` +* `DTS_GLOBUS_TEST_ENDPOINT`: a Globus endpoint used to test DTS's transfer + capabilities +* `DTS_JDP_SECRET`: a string containing a shared secret that allows the DTS to + authenticate with the JGI Data Portal + +## Installation + +The only remaining step is to copy the `dts` executable from your source +directory to wherever you want it to reside. This executable is statically +linked against all libraries, so it's completely portable. diff --git a/docs/developer/auth.md b/docs/developer/auth.md new file mode 100644 index 00000000..dbc602d7 --- /dev/null +++ b/docs/developer/auth.md @@ -0,0 +1,3 @@ +# The `auth` Package + +**TODO: stuff goes here.** diff --git a/docs/developer/config.md b/docs/developer/config.md new file mode 100644 index 00000000..aea6358b --- /dev/null +++ b/docs/developer/config.md @@ -0,0 +1,3 @@ +# The `config` Package + +**TODO: stuff goes here.** diff --git a/docs/developer/credit.md b/docs/developer/credit.md new file mode 100644 index 00000000..422e33e6 --- /dev/null +++ b/docs/developer/credit.md @@ -0,0 +1,4 @@ +# The `credit` Package + +**TODO: stuff goes here.** + diff --git a/docs/developer/databases.md b/docs/developer/databases.md new file mode 100644 index 00000000..bc639fa3 --- /dev/null +++ b/docs/developer/databases.md @@ -0,0 +1,4 @@ +# The `databases` Package + +**TODO: stuff goes here.** + diff --git a/docs/developer/endpoints.md b/docs/developer/endpoints.md new file mode 100644 index 00000000..9fd9d723 --- /dev/null +++ b/docs/developer/endpoints.md @@ -0,0 +1,3 @@ +# The `endpoints` Package + +**TODO: stuff goes here.** diff --git a/docs/developer/frictionless.md b/docs/developer/frictionless.md new file mode 100644 index 00000000..10b144b5 --- /dev/null +++ b/docs/developer/frictionless.md @@ -0,0 +1,4 @@ +# The `frictionless` Package + +**TODO: stuff goes here.** + diff --git a/docs/developer/index.md b/docs/developer/index.md new file mode 100644 index 00000000..a7b2bd39 --- /dev/null +++ b/docs/developer/index.md @@ -0,0 +1,27 @@ +# DTS Developer Guide + +This guide explains the internal workings of the Data Transfer Service (DTS). + +## Code Organization + +The following [packages](https://go.dev/doc/code) implement the features in +the Data Transfer Service. + +* [auth](auth.md): handles the authorization of the DTS using KBase's + authentication/authorization server +* [config](config.md): handles the parsing of the DTS [YAML configuration + file](../admin/config.md), placing the data into read-only global variables + for use by other packages +* [credit](credit.md): defines metadata types used by the Credit Engine to + establish the provenance of transferred data +* [databases](databases.md): defines database types that implement the + integration of DTS with database providers +* [endpoints](endpoints.md): defines endpoint types for file transfer + providers used by DTS, such as [Globus](https://globus.org) +* [frictionless](frictionless.md): defines [data structures](https://frictionlessdata.io/) + that describe data for [individual files](https://specs.frictionlessdata.io/data-resource/) + and [packages containing multiple files](https://specs.frictionlessdata.io/data-package/) +* [services](services.md): defines types that implement the REST endpoints + provided by the DTS +* [tasks](tasks.md): implements the "heart" of the DTS, which creates and + manages transfer tasks through their entire lifecycle diff --git a/docs/developer/services.md b/docs/developer/services.md new file mode 100644 index 00000000..ea971358 --- /dev/null +++ b/docs/developer/services.md @@ -0,0 +1,4 @@ +# The `services` Package + +**TODO: stuff goes here.** + diff --git a/docs/developer/tasks.md b/docs/developer/tasks.md new file mode 100644 index 00000000..24b9483e --- /dev/null +++ b/docs/developer/tasks.md @@ -0,0 +1,5 @@ +# The `tasks` Package + +**TODO: stuff goes here.** + + diff --git a/docs/index.md b/docs/index.md new file mode 100644 index 00000000..19147bc6 --- /dev/null +++ b/docs/index.md @@ -0,0 +1,20 @@ +# Overview + +The Data Transfer Service (DTS) is a web service that handles requests for file +transfers between participating organizations interested in exchanging +data. The DTS coordinates provides a single point of access for these +organizations, allowing an end user or another service to + +* search for datasets / files within any participating organization based on + criteria specified in an [ElasticSearch](https://www.elastic.co/elasticsearch/) + style query +* select any or all files from a search and request a transfer from the source + organization to another participating organization + +DTS is designed for easy deployment and maintenance behind a gateway that +provides TLS/SSL encryption. Requests to the DTS include headers with +authentication information, so these requests rely on the HTTPS protocol to +protect this information. + +It's very easy to deploy DTS in a Docker environment and configure it using +environment variables. diff --git a/dts.yaml.example b/dts.yaml.example index 6b1201d9..61d79e9f 100644 --- a/dts.yaml.example +++ b/dts.yaml.example @@ -3,13 +3,15 @@ # service parameters service: - port: 8080 # port on which the service listenѕ - max_connections: 100 # maximum number of incoming HTTP connections - poll_interval: 60000 # interval at which DTS checks transfer statuses (ms) - endpoint: globus-local # name of endpoint used for manifest generation - data_dir: /path/to/dir # directory DTS uses for data storage - delete_after: 604800 # period after which info about completed transfers - # is deleted (seconds) + port: 8080 # port on which the service listenѕ + max_connections: 100 # maximum number of incoming HTTP connections + poll_interval: 60000 # interval at which DTS checks transfer statuses (ms) + endpoint: globus-local # name of endpoint used for manifest generation + data_dir: /path/to/dir # directory DTS uses for internal data storage + manifest_dir: /path/to/dir # directory DTS uses for writing transfer manifests + delete_after: 604800 # period after which info about completed transfers + # is deleted (seconds) + debug: true # set to enable debug-level logging and other tools endpoints: # file transfer endpoints globus-local: diff --git a/endpoints/endpoints.go b/endpoints/endpoints.go index cda9d259..95af1c26 100644 --- a/endpoints/endpoints.go +++ b/endpoints/endpoints.go @@ -56,6 +56,8 @@ const ( type TransferStatus struct { // status code (see above) Code TransferStatusCode + // message describing a failure status + Message string // total number of files being transferred NumFiles int // number of files that have been transferred diff --git a/endpoints/globus/endpoint.go b/endpoints/globus/endpoint.go index ea39183e..5d31c5de 100644 --- a/endpoints/globus/endpoint.go +++ b/endpoints/globus/endpoint.go @@ -48,13 +48,25 @@ const ( globusTransferApiVersion = "v0.10" ) -// this type captures results from Globus Transfer API responses, including -// any errors encountered (https://docs.globus.org/api/transfer/overview/#errors) -type globusResult struct { - // string indicating the Globus error condition (e.g. "EndpointNotFound") - Code string `json:"code"` - // error message +// this error type is returned when a Globus operation fails for any reason +type GlobusError struct { + Code string `json:"code"` Message string `json:"message"` + + // ConsentRequired error field + RequiredScopes []string `json:"required_scopes"` +} + +func (e GlobusError) Error() string { + return fmt.Sprintf("%s (%s)", e.Message, e.Code) +} + +// returns true if a Globus response body matches an error +func responseIsError(body []byte) bool { + bodyStr := string(body) + return strings.Contains(bodyStr, "\"code\"") && + !strings.Contains(bodyStr, "\"code\": \"Accepted\"") && + strings.Contains(string(body), "\"message\"") } // this type satisfies the endpoints.Endpoint interface for Globus endpoints @@ -63,12 +75,18 @@ type Endpoint struct { Name string // endpoint UUID (obtained from config) Id uuid.UUID - // root directory of endpoint (host_root) - root string + // root directory for endpoint + RootDir string // HTTP client that caches queries Client http.Client // OAuth2 access token AccessToken string + // access scopes + Scopes []string + + // authentication stuff + ClientId uuid.UUID + ClientSecret string } // creates a new Globus endpoint using the information supplied in the @@ -81,62 +99,51 @@ func NewEndpoint(endpointName string) (endpoints.Endpoint, error) { if epConfig.Provider != "globus" { return nil, fmt.Errorf("'%s' is not a Globus endpoint", endpointName) } - if epConfig.Root != "" { - return nil, fmt.Errorf("As a Globus endpoint, '%s' cannot have its root directory specified", endpointName) - } + defaultScopes := []string{"urn:globus:auth:scope:transfer.api.globus.org:all"} ep := &Endpoint{ - Name: epConfig.Name, - Id: epConfig.Id, + Name: epConfig.Name, + Id: epConfig.Id, + Scopes: defaultScopes, + ClientId: epConfig.Auth.ClientId, + ClientSecret: epConfig.Auth.ClientSecret, } // if needed, authenticate to obtain a Globus Transfer API access token var zeroId uuid.UUID - if epConfig.Auth.ClientId != zeroId { // nonzero value - err := ep.authenticate(epConfig.Auth.ClientId, epConfig.Auth.ClientSecret) + if ep.ClientId != zeroId { + err := ep.authenticate() if err != nil { return ep, err } } - // fetch the endpoint's root path - resource := fmt.Sprintf("endpoint/%s", ep.Id.String()) - resp, err := ep.get(resource, url.Values{}) - if err != nil { - return ep, err - } - if resp.StatusCode != 200 { - return ep, err - } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - return ep, err - } - type EndpointDocument struct { - HostRoot string `json:"host_root"` - } - var endpointResp EndpointDocument - err = json.Unmarshal(body, &endpointResp) - if err != nil { - return ep, err + // if present, the root entry overrides the endpoint's root, and is expressed + // as a path relative to it + if epConfig.Root != "" { + ep.RootDir = epConfig.Root + } else { + ep.RootDir = "/" } - ep.root = endpointResp.HostRoot + slog.Debug(fmt.Sprintf("Endpoint %s: root directory is %s", + endpointName, epConfig.Root)) + return ep, nil } -// authenticates with Globus using a client ID and secret to obtain an access -// token (https://docs.globus.org/api/auth/reference/#client_credentials_grant) -func (ep *Endpoint) authenticate(clientId uuid.UUID, clientSecret string) error { +// (re)authenticates with Globus using its client ID and secret to obtain an +// access token with consents for its relevant list of scopes +// (https://docs.globus.org/api/auth/reference/#client_credentials_grant) +func (ep *Endpoint) authenticate() error { authUrl := "https://auth.globus.org/v2/oauth2/token" data := url.Values{} - data.Set("scope", "urn:globus:auth:scope:transfer.api.globus.org:all") + data.Set("scope", strings.Join(ep.Scopes, "+")) data.Set("grant_type", "client_credentials") req, err := http.NewRequest(http.MethodPost, authUrl, strings.NewReader(data.Encode())) if err != nil { return err } - req.SetBasicAuth(clientId.String(), clientSecret) + req.SetBasicAuth(ep.ClientId.String(), ep.ClientSecret) req.Header.Add("Content-Type", "application-x-www-form-urlencoded") // send the request @@ -160,39 +167,73 @@ func (ep *Endpoint) authenticate(clientId uuid.UUID, clientSecret string) error ExpiresIn int `json:"expires_in"` TokenType string `json:"token_type"` } - var authResponse AuthResponse err = json.Unmarshal(body, &authResponse) if err != nil { return err } + // FIXME: check the scopes to see if they match our requested ones? + // stash the access token ep.AccessToken = authResponse.AccessToken return nil } -// constructs a new request to the auth server with the correct headers, etc -// * method can be http.MethodGet, http.MethodPut, http.MethodPost, etc -// * resource is the name of the desired endpoint/resource -// * body can be http.NoBody -func (ep *Endpoint) newRequest(method, resource string, - body io.Reader) (*http.Request, error) { - req, err := http.NewRequest(method, - fmt.Sprintf("%s/%s/%s", globusTransferBaseURL, globusTransferApiVersion, resource), - body, - ) +// This helper sends the given HTTP request, parsing the response for +// Globus-style error codes/messages and handling the ones that can be +// handled automatically (e.g. consent/scope related errors). In any case, +// it returns a byte slice containing the body of the response or an +// error indicating failure. +func (ep *Endpoint) sendRequest(request *http.Request) ([]byte, error) { + // send the initial request and read its contents + resp, err := ep.Client.Do(request) if err != nil { return nil, err } - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ep.AccessToken)) - return req, err + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + resp.Body.Close() + + // check the response for a Globus-style error code / message + if responseIsError(body) { + var errResp GlobusError + err = json.Unmarshal(body, &errResp) + if err != nil { + return nil, err + } + if errResp.Code == "ConsentRequired" { + // we're missing a required scope, so reauthenticate with it + ep.Scopes = errResp.RequiredScopes + err = ep.authenticate() + if err != nil { + return nil, err + } + // try the request again + resp, err = ep.Client.Do(request) + if err != nil { + return nil, err + } + body, err = io.ReadAll(resp.Body) + resp.Body.Close() + } else { + // other errors are propagated + err = &errResp + } + } + return body, err } -// performs a GET request on the given resource, returning the resulting -// response and error -func (ep *Endpoint) get(resource string, values url.Values) (*http.Response, error) { +// Performs a GET request on the given Globus resource, handling any obvious +// errors and returning a byte slice containing the body of the response, +// and/or any unhandled error. +// This method handles scope-related errors by reauthenticating as needed and +// retrying the operation. See https://docs.globus.org/api/flows/working-with-consents/ +// for details on Globus scopes and consents. +func (ep *Endpoint) get(resource string, values url.Values) ([]byte, error) { u, err := url.ParseRequestURI(globusTransferBaseURL) if err != nil { return nil, err @@ -206,12 +247,17 @@ func (ep *Endpoint) get(resource string, values url.Values) (*http.Response, err return nil, err } req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ep.AccessToken)) - return ep.Client.Do(req) + + return ep.sendRequest(req) } -// performs a POST request on the given resource, returning the resulting -// response and error -func (ep *Endpoint) post(resource string, body io.Reader) (*http.Response, error) { +// Performs a POST request on the given Globus resource, handling any obvious +// errors and returning a byte slice containing the body of the response, +// and/or any unhandled error. +// This method handles scope-related errors by reauthenticating as needed and +// retrying the operation. See https://docs.globus.org/api/flows/working-with-consents/ +// for details on Globus scopes and consents. +func (ep *Endpoint) post(resource string, body io.Reader) ([]byte, error) { u, err := url.ParseRequestURI(globusTransferBaseURL) if err != nil { return nil, err @@ -224,11 +270,12 @@ func (ep *Endpoint) post(resource string, body io.Reader) (*http.Response, error return nil, err } req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ep.AccessToken)) - return ep.Client.Do(req) + + return ep.sendRequest(req) } func (ep *Endpoint) Root() string { - return ep.root + return ep.RootDir } func (ep *Endpoint) FilesStaged(files []frictionless.DataResource) (bool, error) { @@ -236,29 +283,30 @@ func (ep *Endpoint) FilesStaged(files []frictionless.DataResource) (bool, error) filesInDir := make(map[string][]string) for _, resource := range files { dir, file := filepath.Split(resource.Path) + dir = filepath.Join(ep.RootDir, dir) if _, found := filesInDir[dir]; !found { filesInDir[dir] = make([]string, 0) } filesInDir[dir] = append(filesInDir[dir], file) } - // for each directory, check that its files are present + // for each directory, check for its existence and that its files are present // (https://docs.globus.org/api/transfer/file_operations/#list_directory_contents) for dir, files := range filesInDir { values := url.Values{} values.Add("path", "/"+dir) values.Add("orderby", "name ASC") resource := fmt.Sprintf("operation/endpoint/%s/ls", ep.Id.String()) - - resp, err := ep.get(resource, values) - if err != nil { - return false, err - } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) + body, err := ep.get(resource, values) if err != nil { + // it's okay if the directory doesn't exist -- it might need to be staged + globusErr := err.(*GlobusError) + if globusErr.Code == "ClientError.NotFound" { + return false, nil + } return false, err } + // https://docs.globus.org/api/transfer/file_operations/#dir_listing_response type DirListingResponse struct { Data []struct { @@ -291,12 +339,7 @@ func (ep *Endpoint) Transfers() ([]uuid.UUID, error) { values.Add("limit", "1000") values.Add("orderby", "name ASC") - resp, err := ep.get("task_list", url.Values{}) - if err != nil { - return nil, err - } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) + body, err := ep.get("task_list", url.Values{}) if err != nil { return nil, err } @@ -322,12 +365,7 @@ func (ep *Endpoint) Transfers() ([]uuid.UUID, error) { // https://docs.globus.org/api/transfer/task_submit/#get_submission_id func (ep *Endpoint) getSubmissionId() (uuid.UUID, error) { var id uuid.UUID - resp, err := ep.get("submission_id", url.Values{}) - if err != nil { - return id, err - } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) + body, err := ep.get("submission_id", url.Values{}) if err != nil { return id, err } @@ -341,7 +379,8 @@ func (ep *Endpoint) getSubmissionId() (uuid.UUID, error) { // https://docs.globus.org/api/transfer/task_submit/#submit_transfer_task // https://docs.globus.org/api/transfer/task_submit/#transfer_item_fields -func (ep *Endpoint) submitTransfer(destination endpoints.Endpoint, submissionId uuid.UUID, files []endpoints.FileTransfer) (uuid.UUID, error) { +func (ep *Endpoint) submitTransfer(destination endpoints.Endpoint, + submissionId uuid.UUID, files []endpoints.FileTransfer) (uuid.UUID, error) { var xferId uuid.UUID type TransferItem struct { @@ -366,7 +405,7 @@ func (ep *Endpoint) submitTransfer(destination endpoints.Endpoint, submissionId for i, file := range files { xferItems[i] = TransferItem{ DataType: "transfer_item", - SourcePath: file.SourcePath, + SourcePath: filepath.Join(ep.RootDir, file.SourcePath), DestinationPath: file.DestinationPath, ExternalChecksum: file.Hash, ChecksumAlgorithm: file.HashAlgorithm, @@ -393,19 +432,20 @@ func (ep *Endpoint) submitTransfer(destination endpoints.Endpoint, submissionId if err != nil { return xferId, err } - resp, err := ep.post("transfer", bytes.NewReader(data)) + body, err := ep.post("transfer", bytes.NewReader(data)) if err != nil { return xferId, err } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { + if responseIsError(body) { + var globusErr GlobusError + err = json.Unmarshal(body, &globusErr) + if err == nil { + err = &globusErr + } return xferId, err } type SubmissionResponse struct { - TaskId uuid.UUID `json:"task_id"` - Code string `json:"code"` - Message string `json:"message"` + TaskId uuid.UUID `json:"task_id"` } var gResp SubmissionResponse @@ -414,10 +454,6 @@ func (ep *Endpoint) submitTransfer(destination endpoints.Endpoint, submissionId return xferId, err } xferId = gResp.TaskId - var zeroId uuid.UUID - if xferId == zeroId { // trouble! - return xferId, fmt.Errorf("%s (%s)", gResp.Message, gResp.Code) - } return xferId, nil } @@ -456,32 +492,66 @@ var statusCodesForStrings = map[string]endpoints.TransferStatusCode{ func (ep *Endpoint) Status(id uuid.UUID) (endpoints.TransferStatus, error) { resource := fmt.Sprintf("task/%s", id.String()) - resp, err := ep.get(resource, url.Values{}) + body, err := ep.get(resource, url.Values{}) if err != nil { return endpoints.TransferStatus{}, err } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { + if responseIsError(body) { + var globusErr GlobusError + err := json.Unmarshal(body, &globusErr) + if err == nil { + err = &globusErr + } return endpoints.TransferStatus{}, err } type TaskResponse struct { - Files int `json:"files"` - FilesSkipped int `json:"files_skipped"` - FilesTransferred int `json:"files_transferred"` - IsPaused bool `json:"is_paused"` - Status string `json:"status"` - // the following fields are present only when an error occurs - Code string `json:"code"` - Message string `json:"message"` + Files int `json:"files"` + FilesSkipped int `json:"files_skipped"` + FilesTransferred int `json:"files_transferred"` + IsPaused bool `json:"is_paused"` + NiceStatus string `json:"nice_status"` + NiceStatusShortDescription string `json:"nice_status_short_description"` + Status string `json:"status"` } var response TaskResponse err = json.Unmarshal(body, &response) if err != nil { return endpoints.TransferStatus{}, err } - if strings.Contains(response.Code, "ClientError") { // e.g. not found - return endpoints.TransferStatus{}, fmt.Errorf(response.Message) + // check for an error condition in NiceStatus + if response.NiceStatus != "" && response.NiceStatus != "OK" && response.NiceStatus != "Queued" { + // get the event list for this task + resource := fmt.Sprintf("task/%s/event_list", id.String()) + body, err := ep.get(resource, url.Values{}) + if err != nil { + // fine, we'll just use the "nice status" + return endpoints.TransferStatus{}, fmt.Errorf(response.NiceStatusShortDescription) + } + type Event struct { + DataType string `json:"DATA_TYPE"` + Code string `json:"code"` + IsError bool `json:"is_error"` + Description string `json:"description"` + Details string `json:"details"` + Time string `json:"time"` + } + type EventList struct { + Data []Event `json:"DATA"` + } + var eventList EventList + err = json.Unmarshal(body, &eventList) + if err == nil { + // find the first error event + for _, event := range eventList.Data { + if event.IsError { + return endpoints.TransferStatus{}, + fmt.Errorf("%s (%s):\n%s", event.Description, event.Code, + event.Details) + } + } + } + // fall back to the "nice status" + return endpoints.TransferStatus{}, fmt.Errorf(response.NiceStatusShortDescription) } return endpoints.TransferStatus{ Code: statusCodesForStrings[response.Status], diff --git a/endpoints/local/endpoint.go b/endpoints/local/endpoint.go index ecfa5438..64ea1d9b 100644 --- a/endpoints/local/endpoint.go +++ b/endpoints/local/endpoint.go @@ -177,7 +177,7 @@ func (ep *Endpoint) Transfer(dst endpoints.Endpoint, files []endpoints.FileTrans var xferId uuid.UUID _, ok := dst.(*Endpoint) if !ok { - return xferId, fmt.Errorf("Destination endpoint must be local!") + return xferId, fmt.Errorf("Cannot transfer files between a local endpoint and another type of endpoint!") } // first, we check that all requested files are staged on this endpoint diff --git a/main.go b/main.go index 943b2360..11fa76f7 100644 --- a/main.go +++ b/main.go @@ -45,7 +45,7 @@ import ( // gives it an endpoint prefix of "docs". To enable these endpoints, you must // use the "docs" build: go build -tags docs -// Prints usage info. +// prints usage info func usage() { fmt.Fprintf(os.Stderr, "%s: usage:\n", os.Args[0]) fmt.Fprintf(os.Stderr, "%s \n", os.Args[0]) @@ -53,51 +53,52 @@ func usage() { os.Exit(1) } +func enableLogging() { + logLevel := new(slog.LevelVar) + if config.Service.Debug { + logLevel.Set(slog.LevelDebug) + } else { + logLevel.Set(slog.LevelInfo) + } + handler := slog.NewJSONHandler(os.Stdout, + &slog.HandlerOptions{Level: logLevel}) + slog.SetDefault(slog.New(handler)) + slog.Debug("Debug logging enabled.") +} + func main() { - // The only argument is the configuration filename. + // the only argument is the configuration filename if len(os.Args) < 2 { usage() } configFile := os.Args[1] - // enables a default structured logger with JSON output - logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) - slog.SetDefault(logger) - - // Read the configuration file. + // read the configuration file and initialize the config package log.Printf("Reading configuration from '%s'...\n", configFile) file, err := os.Open(configFile) if err != nil { log.Panicf("Couldn't open %s: %s\n", configFile, err.Error()) } defer file.Close() - b, err := io.ReadAll(file) if err != nil { log.Panicf("Couldn't read configuration data: %s\n", err.Error()) } - - // Initialize our configuration and create the service. err = config.Init(b) if err != nil { log.Panicf("Couldn't initialize the configuration: %s\n", err.Error()) } + + enableLogging() + service, err := services.NewDTSPrototype() if err != nil { log.Panicf("Couldn't create the service: %s\n", err.Error()) } - // Start the service in a goroutine so it doesn't block. - go func() { - err = service.Start(config.Service.Port) - if err != nil { - log.Println(err.Error()) - } - }() - - // Intercept the SIGINT, SIGHUP, SIGTERM, and SIGQUIT signals, shutting down - // the service as gracefully as possible if they are encountered. + // intercept the SIGINT, SIGHUP, SIGTERM, and SIGQUIT signals so we can shut + // down the service gracefully if they are encountered sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, @@ -105,14 +106,24 @@ func main() { syscall.SIGTERM, syscall.SIGQUIT) - // Block till we receive one of the above signals. + // start the service in a goroutine so it doesn't block + go func() { + err = service.Start(config.Service.Port) + if err != nil { // on error, log the error message and issue a SIGINT + log.Println(err.Error()) + thisProcess, _ := os.FindProcess(os.Getpid()) + thisProcess.Signal(os.Interrupt) + } + }() + + // block till we receive one of the above signals <-sigChan - // Create a deadline to wait for. + // create a deadline to wait for ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - // Wait for connections to close until the deadline elapses. + // wait for connections to close until the deadline elapses service.Shutdown(ctx) log.Println("Shutting down") os.Exit(0) diff --git a/mkdocs.yml b/mkdocs.yml new file mode 100644 index 00000000..19be731f --- /dev/null +++ b/mkdocs.yml @@ -0,0 +1,61 @@ +site_name: "DTS: A Data Transfer System" + +nav: + - 'Home': 'index.md' + - 'Administrator Guide': + - 'Overview': 'admin/index.md' + - 'Installing DTS Locally': 'admin/installation.md' + - 'Deploying DTS via Docker': 'admin/deployment.md' + - 'Configuring DTS': 'admin/config.md' + - 'Granting DTS Access to a Globus Endpoint': 'admin/globus.md' + - 'Developer Guide': + - 'Overview': 'developer/index.md' + - 'Code Organization': + - 'config package': 'developer/config.md' + - 'credit package': 'developer/credit.md' + - 'databases package': 'developer/databases.md' + - 'endpoints package': 'developer/endpoints.md' + - 'frictionless package': 'developer/frictionless.md' + - 'services package': 'developer/services.md' + - 'tasks package': 'developer/tasks.md' + +edit_uri: "" + +theme: + name: material + palette: + palette: + - media: "(prefers-color-scheme: light)" + scheme: default + toggle: + icon: material/weather-sunny + name: Switch to dark mode + - media: "(prefers-color-scheme: dark)" + scheme: slate + primary: cyan + toggle: + icon: material/weather-night + name: Switch to light mode + features: + - navigation.indices + - navigation.instant + - navigation.sections + - navigation.top +# - navigation.tabs + +markdown_extensions: + - admonition + - pymdownx.highlight + - pymdownx.superfences + - pymdownx.tabbed: + alternate_style: true + - pymdownx.arithmatex: + generic: true + - tables + +extra_javascript: + - https://polyfill.io/v3/polyfill.min.js?features=es6 + +repo_url: https://github.com/kbase/dts + +use_directory_urls: false diff --git a/services/prototype.go b/services/prototype.go index 6365e4dd..d40923a1 100644 --- a/services/prototype.go +++ b/services/prototype.go @@ -7,7 +7,7 @@ import ( "encoding/json" "fmt" "io" - "log" + "log/slog" "net" "net/http" "slices" @@ -98,12 +98,12 @@ func (service *prototype) getRoot(w http.ResponseWriter, _, _, err := getAuthInfo(r.Header) if err != nil { - log.Print(err.Error()) + slog.Error(err.Error()) writeError(w, err.Error(), http.StatusUnauthorized) return } - log.Printf("Querying root endpoint...") + slog.Info("Querying root endpoint...") data := RootResponse{ Name: service.Name, Version: service.Version, @@ -129,12 +129,12 @@ func (service *prototype) getDatabases(w http.ResponseWriter, _, _, err := getAuthInfo(r.Header) if err != nil { - log.Print(err.Error()) + slog.Error(err.Error()) writeError(w, err.Error(), http.StatusUnauthorized) return } - log.Printf("Querying organizational databases...") + slog.Info("Querying organizational databases...") dbs := make([]dbMetadata, 0) for dbName, db := range config.Databases { dbs = append(dbs, dbMetadata{ @@ -156,7 +156,7 @@ func (service *prototype) getDatabase(w http.ResponseWriter, _, _, err := getAuthInfo(r.Header) if err != nil { - log.Print(err.Error()) + slog.Error(err.Error()) writeError(w, err.Error(), http.StatusUnauthorized) return } @@ -164,11 +164,11 @@ func (service *prototype) getDatabase(w http.ResponseWriter, vars := mux.Vars(r) dbName := vars["db"] - log.Printf("Querying database %s...", dbName) + slog.Info(fmt.Sprintf("Querying database %s...", dbName)) db, ok := config.Databases[dbName] if !ok { errStr := fmt.Sprintf("Database %s not found", dbName) - log.Print(errStr) + slog.Error(errStr) writeError(w, errStr, http.StatusNotFound) } else { data, _ := json.Marshal(dbMetadata{ @@ -180,7 +180,7 @@ func (service *prototype) getDatabase(w http.ResponseWriter, } } -// this helper translates an array of engines.SearchResults to a JSON object +// this helper translates an array of SearchResults to a JSON object // containing search results for the query (including the database name) func jsonFromSearchResults(dbName string, query string, results databases.SearchResults) ([]byte, error) { @@ -231,7 +231,7 @@ func (service *prototype) searchDatabase(w http.ResponseWriter, _, orcid, err := getAuthInfo(r.Header) if err != nil { - log.Print(err.Error()) + slog.Error(err.Error()) writeError(w, err.Error(), http.StatusUnauthorized) return } @@ -243,7 +243,7 @@ func (service *prototype) searchDatabase(w http.ResponseWriter, _, ok := config.Databases[dbName] if !ok { errStr := fmt.Sprintf("Database %s not found", dbName) - log.Print(errStr) + slog.Error(errStr) writeError(w, errStr, http.StatusNotFound) return } @@ -255,7 +255,7 @@ func (service *prototype) searchDatabase(w http.ResponseWriter, return } - log.Printf("Searching database %s for files...", dbName) + slog.Info(fmt.Sprintf("Searching database %s for files...", dbName)) db, err := databases.NewDatabase(orcid, dbName) if err != nil { writeError(w, err.Error(), http.StatusNotFound) @@ -363,7 +363,7 @@ func (service *prototype) getTransferStatus(w http.ResponseWriter, _, _, err := getAuthInfo(r.Header) if err != nil { - log.Print(err.Error()) + slog.Error(err.Error()) writeError(w, err.Error(), http.StatusUnauthorized) return } @@ -389,6 +389,7 @@ func (service *prototype) getTransferStatus(w http.ResponseWriter, resp := TransferStatusResponse{ Id: xferId.String(), Status: statusAsString(status.Code), + Message: status.Message, NumFiles: status.NumFiles, NumFilesTransferred: status.NumFilesTransferred, } @@ -402,7 +403,7 @@ func (service *prototype) deleteTransfer(w http.ResponseWriter, _, _, err := getAuthInfo(r.Header) if err != nil { - log.Print(err.Error()) + slog.Error(err.Error()) writeError(w, err.Error(), http.StatusUnauthorized) return } @@ -482,8 +483,8 @@ func NewDTSPrototype() (TransferService, error) { // starts the prototype data transfer service func (service *prototype) Start(port int) error { - log.Printf("Starting %s service on port %d...", service.Name, port) - log.Printf("(Accepting up to %d connections)", config.Service.MaxConnections) + slog.Info(fmt.Sprintf("Starting %s service on port %d...", service.Name, port)) + slog.Info(fmt.Sprintf("(Accepting up to %d connections)", config.Service.MaxConnections)) service.StartTime = time.Now() @@ -497,7 +498,10 @@ func (service *prototype) Start(port int) error { listener = netutil.LimitListener(listener, config.Service.MaxConnections) // start tasks processing - tasks.Start() + err = tasks.Start() + if err != nil { + return err + } // start the server service.Server = &http.Server{ @@ -507,19 +511,23 @@ func (service *prototype) Start(port int) error { // we don't report the server closing as an error if err != http.ErrServerClosed { return err - } else { - return nil } + return nil } // gracefully shuts down the service without interrupting active connections func (service *prototype) Shutdown(ctx context.Context) error { tasks.Stop() - return service.Server.Shutdown(ctx) + if service.Server != nil { + return service.Server.Shutdown(ctx) + } + return nil } // closes down the service abruptly, freeing all resources func (service *prototype) Close() { tasks.Stop() - service.Server.Close() + if service.Server != nil { + service.Server.Close() + } } diff --git a/services/prototype_test.go b/services/prototype_test.go index f4e52673..aa339997 100644 --- a/services/prototype_test.go +++ b/services/prototype_test.go @@ -55,6 +55,7 @@ service: max_connections: 100 poll_interval: 100 data_dir: TESTING_DIR/data + manifest_dir: TESTING_DIR/manifests delete_after: 24 endpoint: local-endpoint databases: @@ -75,7 +76,6 @@ endpoints: name: Local endpoint id: 8816ec2d-4a48-4ded-b68a-5ab46a4417b6 provider: local - root: TESTING_DIR source-endpoint: name: Endpoint 1 id: 26d61236-39f6-4742-a374-8ec709347f2f @@ -167,8 +167,9 @@ func setup() { dtstest.RegisterDatabase("destination1", nil) dtstest.RegisterDatabase("destination2", nil) - // create the DTS data directory + // create the DTS data and manifest directories os.Mkdir(config.Service.DataDirectory, 0755) + os.Mkdir(config.Service.ManifestDirectory, 0755) // Start the service. log.Print("Starting test mapping service...\n") diff --git a/services/transfer_service.go b/services/transfer_service.go index 4e015f33..10b0f062 100644 --- a/services/transfer_service.go +++ b/services/transfer_service.go @@ -40,23 +40,6 @@ func writeError(w http.ResponseWriter, message string, code int) { w.Write(data) } -// This "enum" type identifies the status of a file transfer operation, -// including both staging and endpoint-to-endpoint transfer stages. Contrast -// this with the endpoints.TransferStatus type, which describes the status only -// for endpoint-to-endpoint transfer stages. -type TransferStatus int - -// possible statuses of file transfers -const ( - Unknown TransferStatus = iota // unknown transfer or status not available - Staging // requested files are being staged at source - StagingSucceeded // requested files have been staged - StagingFailed // requested files could not be staged - Transferring // files are being transferred between source/destination endpoints - TransferSucceeded // files have been successfully transferred - TransferFailed // files could not be transferred -) - // a response for an ElasticSearch query (GET) type ElasticSearchResponse struct { // name of organization database @@ -91,6 +74,8 @@ type TransferStatusResponse struct { Id string `json:"id"` // transfer job status Status string `json:"status"` + // message (if any) related to status + Message string `json:"message,omitempty"` // number of files being transferred NumFiles int `json:"num_files"` // number of files that have been completely transferred diff --git a/tasks/tasks.go b/tasks/tasks.go index c253addb..14a7cd5e 100644 --- a/tasks/tasks.go +++ b/tasks/tasks.go @@ -25,7 +25,9 @@ import ( "bytes" "encoding/gob" "encoding/json" + "errors" "fmt" + "io/fs" "log/slog" "os" "path/filepath" @@ -78,7 +80,7 @@ type taskType struct { CompletionTime time.Time // time at which the transfer completed } -// starts a task going, initiating staging +// starts a task going, initiating staging if needed func (task *taskType) start() error { source, err := databases.NewDatabase(task.Orcid, task.Source) if err != nil { @@ -91,18 +93,32 @@ func (task *taskType) start() error { return err } - // tell the source DB to stage the files, stash the task, and return - // its new ID - task.Staging.UUID, err = source.StageFiles(task.FileIds) - task.Staging.Valid = true + // are the files already staged? + sourceEndpoint, err := source.Endpoint() if err != nil { return err } - task.Status = TransferStatus{ - Code: TransferStatusStaging, - NumFiles: len(task.FileIds), + staged, err := sourceEndpoint.FilesStaged(task.Resources) + if err != nil { + return err } - return nil + + if staged { + err = task.beginTransfer() + } else { + // tell the source DB to stage the files, stash the task, and return + // its new ID + task.Staging.UUID, err = source.StageFiles(task.FileIds) + task.Staging.Valid = true + if err != nil { + return err + } + task.Status = TransferStatus{ + Code: TransferStatusStaging, + NumFiles: len(task.FileIds), + } + } + return err } // updates the status of a canceled task depending on where it is in its @@ -119,9 +135,11 @@ func (task *taskType) checkCancellation() error { return err } task.Status, err = endpoint.Status(task.Id) + return err } else { - // at any other point in the lifecycle, cut and run + // at any other point in the lifecycle, terminate the task task.Status.Code = TransferStatusFailed + task.Status.Message = "Task canceled at user request" } if task.Completed() { task.CompletionTime = time.Now() @@ -129,6 +147,55 @@ func (task *taskType) checkCancellation() error { return nil } +// initiates a file transfer on a set of staged files +func (task *taskType) beginTransfer() error { + source, err := databases.NewDatabase(task.Orcid, task.Source) + if err != nil { + return err + } + destination, err := databases.NewDatabase(task.Orcid, task.Destination) + if err != nil { + return err + } + + // construct the source/destination file paths + username, err := destination.LocalUser(task.Orcid) + if err != nil { + return err + } + fileXfers := make([]FileTransfer, len(task.Resources)) + for i, resource := range task.Resources { + destinationPath := filepath.Join(username, task.Id.String(), resource.Path) + fileXfers[i] = FileTransfer{ + SourcePath: resource.Path, + DestinationPath: destinationPath, + Hash: resource.Hash, + } + } + + // initiate the transfer + sourceEndpoint, err := source.Endpoint() + if err != nil { + return err + } + destinationEndpoint, err := destination.Endpoint() + if err != nil { + return err + } + task.Transfer.UUID, err = sourceEndpoint.Transfer(destinationEndpoint, fileXfers) + if err != nil { + return err + } + + task.Status = TransferStatus{ + Code: TransferStatusActive, + NumFiles: len(task.FileIds), + } + task.Staging = uuid.NullUUID{} + task.Transfer.Valid = true + return nil +} + // checks whether files for a task are finished staging and, if so, // initiates the transfer process func (task *taskType) checkStaging() error { @@ -145,44 +212,9 @@ func (task *taskType) checkStaging() error { return err } if staged { - destination, err := databases.NewDatabase(task.Orcid, task.Destination) - if err != nil { - return err - } - destinationEndpoint, err := destination.Endpoint() - if err != nil { - return err - } - - // construct the source/destination file paths - username, err := source.LocalUser(task.Orcid) - if err != nil { - return err - } - fileXfers := make([]FileTransfer, len(task.Resources)) - for i, resource := range task.Resources { - destinationPath := filepath.Join(username, task.Id.String(), resource.Path) - fileXfers[i] = FileTransfer{ - SourcePath: resource.Path, - DestinationPath: destinationPath, - Hash: resource.Hash, - } - } - - // initiate the transfer - task.Transfer.UUID, err = sourceEndpoint.Transfer(destinationEndpoint, fileXfers) - if err != nil { - return err - } - - task.Status = TransferStatus{ - Code: TransferStatusActive, - NumFiles: len(task.FileIds), - } - task.Staging = uuid.NullUUID{} - task.Transfer.Valid = true + err = task.beginTransfer() } - return nil + return err } // checks whether files for a task are finished transferring and, if so, @@ -221,52 +253,54 @@ func (task *taskType) checkTransfer() error { var manifestBytes []byte manifestBytes, err = json.Marshal(manifest) if err != nil { - return err + return fmt.Errorf("marshalling manifest content: %s", err.Error()) } var manifestFile *os.File - manifestFile, err = os.CreateTemp(localEndpoint.Root(), "manifest.json") + manifestFile, err = os.CreateTemp(config.Service.ManifestDirectory, + "manifest.json") if err != nil { - return err + return fmt.Errorf("creating manifest file: %s", err.Error()) } _, err = manifestFile.Write(manifestBytes) if err != nil { - return err + return fmt.Errorf("writing manifest file content: %s", err.Error()) } task.ManifestFile = manifestFile.Name() err = manifestFile.Close() if err != nil { - return err + return fmt.Errorf("closing manifest file: %s", err.Error()) } // construct the source/destination file manifest paths - username, err := source.LocalUser(task.Orcid) + destination, err := databases.NewDatabase(task.Orcid, task.Destination) + if err != nil { + return err + } + username, err := destination.LocalUser(task.Orcid) if err != nil { return err } fileXfers := []FileTransfer{ FileTransfer{ - SourcePath: filepath.Base(task.ManifestFile), // relative to root! + SourcePath: task.ManifestFile, DestinationPath: filepath.Join(username, task.Id.String(), "manifest.json"), }, } // begin transferring the manifest - destination, err := databases.NewDatabase(task.Orcid, task.Destination) - if err != nil { - return err - } destinationEndpoint, err := destination.Endpoint() if err != nil { return err } task.Manifest.UUID, err = localEndpoint.Transfer(destinationEndpoint, fileXfers) if err != nil { - return err + return fmt.Errorf("transferring manifest file: %s", err.Error()) } task.Status = TransferStatus{ Code: TransferStatusFinalizing, } + task.Transfer.Valid = false task.Manifest.Valid = true } } @@ -291,6 +325,7 @@ func (task *taskType) checkManifest() error { os.Remove(task.ManifestFile) task.ManifestFile = "" task.Status.Code = xferStatus.Code + task.Status.Message = "" task.CompletionTime = time.Now() } return nil @@ -298,7 +333,7 @@ func (task *taskType) checkManifest() error { // returns the duration since the task completed (successfully or otherwise), // or 0 if the task has not completed -func (task *taskType) Age() time.Duration { +func (task taskType) Age() time.Duration { if task.Status.Code == TransferStatusSucceeded || task.Status.Code == TransferStatusFailed { return time.Since(task.CompletionTime) @@ -308,35 +343,28 @@ func (task *taskType) Age() time.Duration { } // returns true if the task has completed (successfully or not), false otherwise -func (task *taskType) Completed() bool { +func (task taskType) Completed() bool { return task.Status.Code == TransferStatusSucceeded || task.Status.Code == TransferStatusFailed } // requests that the task be canceled -func (task *taskType) Cancel() { +func (task *taskType) Cancel() error { task.Canceled = true // mark as canceled // fetch the source endpoint var endpoint endpoints.Endpoint source, err := databases.NewDatabase(task.Orcid, task.Source) if err != nil { - goto errorOccurred + return err } endpoint, err = source.Endpoint() if err != nil { - goto errorOccurred + return err } // request that the task be canceled - err = endpoint.Cancel(task.Id) - if err != nil { - goto errorOccurred - } - return -errorOccurred: - slog.Error(fmt.Sprintf("Task %s: error in cancellation: %s", - task.Id, err.Error())) + return endpoint.Cancel(task.Id) } // this function updates the state of a task, setting its status as necessary @@ -397,6 +425,11 @@ func saveTasks(tasks map[uuid.UUID]taskType, dataFile string) error { return fmt.Errorf("Writing task file %s: %s", dataFile, err.Error()) } slog.Debug(fmt.Sprintf("Saved %d tasks to %s", len(tasks), dataFile)) + } else { + _, err := os.Stat(dataFile) + if !errors.Is(err, fs.ErrNotExist) { // file exists + os.Remove(dataFile) + } } return nil } @@ -446,7 +479,14 @@ func processTasks() { case taskId := <-cancelTaskChan: // Cancel() called if task, found := tasks[taskId]; found { slog.Info(fmt.Sprintf("Task %s: received cancellation request", taskId.String())) - task.Cancel() + err := task.Cancel() + if err != nil { + task.Status.Code = TransferStatusUnknown + task.Status.Message = fmt.Sprintf("error in cancellation: %s", err.Error()) + task.CompletionTime = time.Now() + slog.Error(fmt.Sprintf("Task %s: %s", task.Id.String(), task.Status.Message)) + tasks[task.Id] = task + } } else { err := NotFoundError{Id: taskId} errorChan <- err @@ -464,8 +504,12 @@ func processTasks() { oldStatus := task.Status err := task.Update() if err != nil { - // we log task update errors but do not propagate them - slog.Error(err.Error()) + // We log task update errors but do not propagate them. All + // task errors result in a failed status. + task.Status.Code = TransferStatusFailed + task.Status.Message = err.Error() + task.CompletionTime = time.Now() + slog.Error(fmt.Sprintf("Task %s: %s", task.Id.String(), err.Error())) } if task.Status.Code != oldStatus.Code { switch task.Status.Code { @@ -516,9 +560,9 @@ func heartbeat(pollInterval time.Duration, pollChan chan<- struct{}) { // this function checks for the existence of the data directory and whether it // is readable/writeable, returning a non-nil error if any of these conditions // are not met -func validateDataDirectory(dir string) error { +func validateDirectory(dirType, dir string) error { if dir == "" { - return fmt.Errorf("no data directory was specified!") + return fmt.Errorf("no %s directory was specified!", dirType) } info, err := os.Stat(dir) if err != nil { @@ -526,9 +570,9 @@ func validateDataDirectory(dir string) error { } if !info.IsDir() { return &os.PathError{ - Op: "validateDataDirectory", + Op: "validateDirectory", Path: dir, - Err: fmt.Errorf("%s is not a directory!", dir), + Err: fmt.Errorf("%s is not a valid %s directory!", dir, dirType), } } @@ -538,9 +582,9 @@ func validateDataDirectory(dir string) error { err = os.WriteFile(testFile, writtenTestData, 0644) if err != nil { return &os.PathError{ - Op: "validateDataDirectory", + Op: "validateDirectory", Path: dir, - Err: fmt.Errorf("Could not write to data directory %s!", dir), + Err: fmt.Errorf("Could not write to %s directory %s!", dirType, dir), } } readTestData, err := os.ReadFile(testFile) @@ -549,9 +593,9 @@ func validateDataDirectory(dir string) error { } if err != nil || !bytes.Equal(readTestData, writtenTestData) { return &os.PathError{ - Op: "validateDataDirectory", + Op: "validateDirectory", Path: dir, - Err: fmt.Errorf("Could not read from data directory %s!", dir), + Err: fmt.Errorf("Could not read from %s directory %s!", dirType, dir), } } return nil @@ -605,8 +649,12 @@ func Start() error { firstCall = false } - // does the directory exist and is it writable/readable? - err := validateDataDirectory(config.Service.DataDirectory) + // do the necessary directories exist, and are they writable/readable? + err := validateDirectory("data", config.Service.DataDirectory) + if err != nil { + return err + } + err = validateDirectory("manifest", config.Service.ManifestDirectory) if err != nil { return err } diff --git a/tasks/tasks_test.go b/tasks/tasks_test.go index fecbc0a9..1147facd 100644 --- a/tasks/tasks_test.go +++ b/tasks/tasks_test.go @@ -60,6 +60,7 @@ service: max_connections: 100 poll_interval: 50 # milliseconds data_dir: TESTING_DIR/data + manifest_dir: TESTING_DIR/manifests delete_after: 2 # seconds endpoint: local-endpoint databases: @@ -76,7 +77,6 @@ endpoints: name: Local endpoint id: 8816ec2d-4a48-4ded-b68a-5ab46a4417b6 provider: test - root: TESTING_DIR source-endpoint: name: Endpoint 1 id: 26d61236-39f6-4742-a374-8ec709347f2f @@ -139,8 +139,9 @@ func setup() { // register test databases/endpoints referred to in config file dtstest.RegisterTestFixturesFromConfig(endpointOptions, testResources) - // Create the data directory used to save/restore tasks + // Create the data and manifest directories os.Mkdir(config.Service.DataDirectory, 0755) + os.Mkdir(config.Service.ManifestDirectory, 0755) } // this function gets called after all tests have been run @@ -187,11 +188,11 @@ func (t *SerialTests) TestCreateTask() { assert.Nil(err) assert.Equal(TransferStatusUnknown, status.Code) - // make sure the status switches to staging + // make sure the status switches to staging or active time.Sleep(pause + pollInterval) status, err = Status(taskId) assert.Nil(err) - assert.Equal(TransferStatusStaging, status.Code) + assert.True(status.Code == TransferStatusStaging || status.Code == TransferStatusActive) // wait for the staging to complete and then check its status // again (should be actively transferring)