diff --git a/fcrepo-api-x-integration/pom.xml b/fcrepo-api-x-integration/pom.xml
index 8aeb92c..50133c4 100644
--- a/fcrepo-api-x-integration/pom.xml
+++ b/fcrepo-api-x-integration/pom.xml
@@ -287,6 +287,13 @@
test
+
+ commons-io
+ commons-io
+ 2.5
+ test
+
+
org.fcrepo
diff --git a/fcrepo-api-x-integration/src/test/java/org/fcrepo/apix/integration/KarafIT.java b/fcrepo-api-x-integration/src/test/java/org/fcrepo/apix/integration/KarafIT.java
index 8d52bde..3884761 100644
--- a/fcrepo-api-x-integration/src/test/java/org/fcrepo/apix/integration/KarafIT.java
+++ b/fcrepo-api-x-integration/src/test/java/org/fcrepo/apix/integration/KarafIT.java
@@ -30,6 +30,7 @@
import java.io.File;
import java.io.FileInputStream;
+import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
@@ -204,10 +205,19 @@ public default Option deployFile(String path) {
* @return the resulting WebResource
*/
public default WebResource testResource(String path) {
+ return testResource(path, "text/turtle");
+ }
+ /**
+ * Get a test resource from test-classes
+ *
+ * @param path the resource path relative to {@link #testResources}
+ * @return the resulting WebResource
+ */
+ public default WebResource testResource(String path, String contentType) {
final File file = new File(testResources, path);
try {
- return WebResource.of(new FileInputStream(file), "text/turtle", URI.create(FilenameUtils.getBaseName(
+ return WebResource.of(new FileInputStream(file), contentType, URI.create(FilenameUtils.getBaseName(
path)), null);
} catch (final Exception e) {
throw new RuntimeException(e);
@@ -225,6 +235,34 @@ public default URI postFromTestResource(final String filePath, final URI intoCon
}
}
+ public default URI postFromTestResource(final String filePath, final URI intoContainer, final String contentType)
+ throws Exception {
+ return postFromTestResource(filePath, intoContainer, contentType,
+ String.format("%s_%s", testMethodName(), getBaseName(filePath)));
+ }
+
+ public default URI postFromTestResource(final String filePath, final URI intoContainer,
+ final String contentType, final String slug) throws Exception {
+ try (final WebResource object = testResource(filePath, contentType);
+ final FcrepoResponse response = client.post(intoContainer)
+ .body(object.representation(), object.contentType())
+ .slug(slug)
+ .perform()) {
+ return response.getLocation();
+ }
+ }
+
+ public default URI postFromStream(final InputStream in, final URI intoContainer, final String contentType,
+ final String slug) throws Exception {
+ try (final WebResource object = WebResource.of(in, contentType);
+ final FcrepoResponse response = client.post(intoContainer)
+ .body(object.representation(), object.contentType())
+ .slug(slug)
+ .perform()) {
+ return response.getLocation();
+ }
+ }
+
public static T attempt(final int times, final Callable it) {
Exception caught = null;
diff --git a/fcrepo-api-x-integration/src/test/java/org/fcrepo/apix/integration/StreamingIT.java b/fcrepo-api-x-integration/src/test/java/org/fcrepo/apix/integration/StreamingIT.java
new file mode 100644
index 0000000..0e57533
--- /dev/null
+++ b/fcrepo-api-x-integration/src/test/java/org/fcrepo/apix/integration/StreamingIT.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to DuraSpace under one or more contributor license agreements.
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership.
+ *
+ * DuraSpace licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.fcrepo.apix.integration;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.AdviceWithRouteBuilder;
+import org.apache.camel.model.ModelCamelContext;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.jena.rdf.model.ModelFactory;
+import org.apache.jena.rdf.model.ResourceFactory;
+import org.fcrepo.client.FcrepoOperationFailedException;
+import org.fcrepo.client.FcrepoResponse;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.util.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.DigestInputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests relating to streaming content proxied by API-X
+ *
+ * @author esm
+ */
+@RunWith(PaxExam.class)
+public class StreamingIT implements KarafIT {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamingIT.class);
+
+ private static final URI APIX_BASE_URI = URI.create(apixBaseURI);
+
+ private static final String INTERCEPT_ROUTE_ID = "execute-intercept";
+
+ private static final String CONTEXT_NAME = "apix-core";
+
+ private static final String CONTEXT_ROLE = "routing-context";
+
+ private URI binaryContainer;
+
+ private URI binaryResource;
+
+ private String binaryResourceSha;
+
+ private static MessageDigest sha1;
+
+ @Rule
+ public TestName name = new TestName();
+
+ @Inject
+ @Filter("(role=" + CONTEXT_ROLE + ")")
+ private CamelContext ctx;
+
+ @Override
+ public String testClassName() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public String testMethodName() {
+ return name.getMethodName();
+ }
+
+ @BeforeClass
+ public static void initMessageDigest() throws NoSuchAlgorithmException {
+ sha1 = MessageDigest.getInstance("SHA-1");
+ }
+
+ /**
+ * Creates a container and a binary resource of 2MiB + 1 bytes long. Retrieves checksum of the resource.
+ *
+ * @throws FcrepoOperationFailedException if unexpected things go wrong
+ * @throws IOException if unexpected things go wrong
+ * @throws URISyntaxException if unexpected things go wrong
+ */
+ @Before
+ public void initBinaryResources() throws FcrepoOperationFailedException, IOException, URISyntaxException {
+ binaryContainer = URI.create(String.format("%s%s", fcrepoBaseURI, testClassName() + "/binaries/"));
+
+ // Create container if it doesn't already exist
+ if (!resourceExists(binaryContainer)) {
+ try (FcrepoResponse r = client.put(binaryContainer)
+ .body(
+ new FileInputStream(
+ new File(testResources, "objects/binary_container.ttl")), "text/turtle")
+ .perform()) {
+ assertEquals("Failed to create binary container '" + binaryContainer + "'",
+ 201, r.getStatusCode());
+ }
+ }
+
+ // Create the binary resource if it doesn't already exist
+ URI expectedBinaryResource = appendToPath(binaryContainer, "large-binary");
+ if (!resourceExists(expectedBinaryResource)) {
+ LOG.warn("Expected resource did not exist {}", expectedBinaryResource);
+ try {
+ binaryResource = postFromStream(
+ new NullInputStream((2 * 1024 * 1024) + 1), binaryContainer,
+ "application/octet-stream", "large-binary");
+ } catch (Exception e) {
+ fail(String.format("Failed to create binary LDPR: %s", e.getMessage()));
+ }
+ } else {
+ binaryResource = expectedBinaryResource;
+ }
+
+ // Retrieve the checksum calculated by Fedora
+ binaryResourceSha = ModelFactory.createDefaultModel()
+ .read(client
+ .get(appendToPath(binaryResource, "/fcr:metadata"))
+ .accept("application/rdf+xml")
+ .perform().getBody(),
+ null)
+ .listObjectsOfProperty(
+ ResourceFactory
+ .createProperty("http://www.loc.gov/premis/rdf/v1#", "hasMessageDigest"))
+ .mapWith((digestValue) -> digestValue.toString().substring("urn:sha1:".length()))
+ .next();
+ assertNotNull("Missing http://www.loc.gov/premis/rdf/v1#hasMessageDigest on " +
+ appendToPath(binaryResource, "/fcr:metadata").toString(), binaryResourceSha);
+
+ }
+
+ /**
+ * Smoke test insuring Karaf is doing what we think it is doing
+ */
+ @Before
+ public void verifyContextAndRoute() {
+ assertNotNull("No context", ctx);
+ assertEquals("Unexpected context " + ctx.getName(), CONTEXT_NAME, ctx.getName());
+ assertNotNull("No route (ctx name: " + ctx.getName() + ")", ctx.getRouteDefinition(INTERCEPT_ROUTE_ID));
+ }
+
+ /**
+ * Verify the binary can be retrieved from Fedora. The request should not be intercepted.
+ *
+ * @throws Exception if unexpected things go wrong
+ */
+ @Test
+ public void testRetrieveLargeBinaryFromFedora() throws Exception {
+
+ // Record 'true' if the intercepting route is triggered
+ AtomicBoolean intercepted = new AtomicBoolean(false);
+ ctx.getRouteDefinition(INTERCEPT_ROUTE_ID).adviceWith((ModelCamelContext) ctx, new AdviceWithRouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ weaveAddFirst().process((ex) -> intercepted.set(true));
+ }
+ });
+
+ long expectedSize = (2 * 1024 * 1024) + 1;
+ long actualSize;
+ String actualDigest;
+
+ try (FcrepoResponse r = client.get(binaryResource).perform();
+ DigestInputStream body = new DigestInputStream(r.getBody(), sha1)) {
+ actualSize = drain(body);
+ actualDigest = asHex(body.getMessageDigest().digest());
+ }
+
+ // The resource can be retrieved intact
+ assertEquals(expectedSize, actualSize);
+ assertEquals(binaryResourceSha, actualDigest);
+
+ // And the request was not proxied by API-X
+ assertFalse(String.format("Unexpected interception of a Fedora resource URI %s by route %s",
+ binaryResource.toString(), INTERCEPT_ROUTE_ID), intercepted.get());
+ }
+
+ /**
+ * Verify the binary can be retrieved through the API-X proxy. The request should be intercepted and proxied
+ * by API-X.
+ *
+ * @throws Exception if unexpected things go wrong
+ */
+ @Test
+ public void testRetrieveLargeBinaryFromApix() throws Exception {
+
+ // Record 'true' if the intercepting route is triggered
+ AtomicBoolean intercepted = new AtomicBoolean(false);
+ ctx.getRouteDefinition(INTERCEPT_ROUTE_ID).adviceWith((ModelCamelContext) ctx, new AdviceWithRouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ weaveAddFirst().process((ex) -> intercepted.set(true));
+ }
+ });
+
+ long expectedSize = (2 * 1024 * 1024) + 1;
+ long actualSize;
+ String actualDigest;
+
+ final URI proxiedResource = proxied(binaryResource);
+ try (FcrepoResponse r = KarafIT.attempt(30, () -> client.get(proxiedResource).perform());
+ DigestInputStream body = new DigestInputStream(r.getBody(), sha1)) {
+ actualSize = drain(body);
+ actualDigest = asHex(body.getMessageDigest().digest());
+ }
+
+ // The request _was_ proxied by API-X
+ assertTrue(String.format("Expected the retrieval of %s to be proxied by API-X, route id %s",
+ proxiedResource, INTERCEPT_ROUTE_ID), intercepted.get());
+
+ // And resource can be retrieved intact
+ assertEquals(expectedSize, actualSize);
+ assertEquals(binaryResourceSha, actualDigest);
+ }
+
+ /**
+ * Returns true if the URI exists (i.e. responds with a 200 to a HEAD request).
+ *
+ * @param resource some HTTP resource
+ * @return true if the resource exists, false otherwise
+ * @throws IOException if there is an error determining whether the resource exists
+ */
+ private boolean resourceExists(URI resource) throws IOException {
+ try (FcrepoResponse r = client.head(resource).perform()) {
+ if (r.getStatusCode() == 200) {
+ return true;
+ }
+ } catch (FcrepoOperationFailedException e) {
+ // Probably the resource doesn't exist.
+ LOG.debug("Error retrieving resource '" + resource + "': " + e.getMessage(), e);
+ }
+
+ return false;
+ }
+
+ /**
+ * Reads the input stream to exhaustion and returns the number of bytes read.
+ *
+ * @param in the stream to exhaust
+ * @return the number of bytes read
+ * @throws IOException
+ */
+ private static long drain(InputStream in) throws IOException {
+ byte[] buf = new byte[1024 * 128];
+ long size = 0;
+
+ for (int i = in.read(buf, 0, buf.length); i > -1; i = in.read(buf, 0, i)) {
+ size += i;
+ }
+
+ return size;
+ }
+
+ /**
+ * Coverts the supplied byte array to a String hexadecimal representation, starting with the most significant bit.
+ *
+ * @param digest a byte array containing a message digest
+ * @return a hexadecimal string representation of the message digest
+ */
+ private static String asHex(byte[] digest) {
+ StringBuilder buf = new StringBuilder();
+ for (int i = 0; i < digest.length; i++) {
+ buf.append(
+ String.format("%02x", Byte.toUnsignedInt(digest[i])));
+ }
+
+ return buf.toString();
+ }
+
+ /**
+ * Assuming the supplied URI targets the Fedora repository (i.e. a URI that is not proxied by
+ * API-X), returns an equivalent URI that targets the same resource through the API-X proxy.
+ *
+ * @param toProxy a URI that targets an un-proxied Fedora resource
+ * @return an equivalent URI targeting the proxied Fedora resource
+ * @throws URISyntaxException
+ */
+ private static URI proxied(URI toProxy) throws URISyntaxException {
+ if (isProxied(toProxy)) {
+ return toProxy;
+ }
+ return appendToPath(APIX_BASE_URI, toProxy.getPath());
+ }
+
+ /**
+ * Appends the path to the URI. All other components of the URI are preserved.
+ *
+ * @param uri the URI with the path being appended to
+ * @param toAppend the path to be appended to the URI
+ * @return a new URI with a path component ending with {@code toAppend}
+ * @throws URISyntaxException
+ */
+ private static URI appendToPath(URI uri, String toAppend) throws URISyntaxException {
+ return new URI(uri.getScheme(),
+ uri.getUserInfo(),
+ uri.getHost(),
+ uri.getPort(),
+ uri.getPath() + toAppend,
+ uri.getRawQuery(),
+ uri.getRawFragment());
+ }
+
+ /**
+ * Returns true if the supplied URI will be proxied by API-X
+ *
+ * @param uri a candidate uri that might be proxied API-X
+ * @return true if the supplied URI will be proxied by API-X, false otherwise
+ */
+ private static boolean isProxied(final URI uri) {
+ return uri.getScheme().equals(APIX_BASE_URI.getScheme())
+ && uri.getHost().equals(APIX_BASE_URI.getHost())
+ && uri.getPort() == APIX_BASE_URI.getPort();
+ }
+
+}
diff --git a/fcrepo-api-x-integration/src/test/resources/objects/binary_container.ttl b/fcrepo-api-x-integration/src/test/resources/objects/binary_container.ttl
new file mode 100644
index 0000000..9a3a1a0
--- /dev/null
+++ b/fcrepo-api-x-integration/src/test/resources/objects/binary_container.ttl
@@ -0,0 +1,3 @@
+@prefix ldp: .
+
+<> a ldp:Container , ldp:BasicContainer .
\ No newline at end of file
diff --git a/fcrepo-api-x-routing/pom.xml b/fcrepo-api-x-routing/pom.xml
index 5d07921..db6534b 100644
--- a/fcrepo-api-x-routing/pom.xml
+++ b/fcrepo-api-x-routing/pom.xml
@@ -50,6 +50,13 @@
provided
+
+ org.apache.camel
+ camel-http4
+ ${camel.version}
+ provided
+
+
org.osgi
org.osgi.service.component.annotations
diff --git a/fcrepo-api-x-routing/src/main/feature/feature.xml b/fcrepo-api-x-routing/src/main/feature/feature.xml
index fd09797..eca9ed0 100644
--- a/fcrepo-api-x-routing/src/main/feature/feature.xml
+++ b/fcrepo-api-x-routing/src/main/feature/feature.xml
@@ -5,6 +5,7 @@
camel-blueprint
- camel-jetty
+ camel-jetty
+ camel-http4
\ No newline at end of file
diff --git a/fcrepo-api-x-routing/src/main/java/org/fcrepo/apix/routing/impl/RoutingImpl.java b/fcrepo-api-x-routing/src/main/java/org/fcrepo/apix/routing/impl/RoutingImpl.java
index 618ee7c..402c9c6 100644
--- a/fcrepo-api-x-routing/src/main/java/org/fcrepo/apix/routing/impl/RoutingImpl.java
+++ b/fcrepo-api-x-routing/src/main/java/org/fcrepo/apix/routing/impl/RoutingImpl.java
@@ -63,6 +63,8 @@ public class RoutingImpl extends RouteBuilder {
private URI fcrepoBaseURI;
+ private URI fcrepoProxyURI;
+
public static final String EXECUTION_EXPOSE_MODALITY = "direct:execute_expose";
public static final String EXPOSING_EXTENSION = "CamelApixExposingExtension";
@@ -96,6 +98,14 @@ public void setFcrepoBaseURI(final URI uri) {
this.fcrepoBaseURI = uri;
}
+ /**
+ * Fedora's proxyURI
+ * @param fcrepoProxyURI
+ */
+ public void setFcrepoProxyURI(final URI fcrepoProxyURI) {
+ this.fcrepoProxyURI = fcrepoProxyURI;
+ }
+
/**
* Set the discovery component for generating a service doc.
*
@@ -186,7 +196,7 @@ public void configure() throws Exception {
.choice().when(e -> !e.getIn().getHeaders().containsKey(
HEADER_INVOKE_STATUS) || e.getIn().getHeader(
HEADER_INVOKE_STATUS, Integer.class) < 300)
- .to("jetty:{{fcrepo.proxyURI}}" +
+ .to("http4:" + stripHttpScheme(fcrepoProxyURI) +
"?bridgeEndpoint=true" +
"&throwExceptionOnFailure=false" +
"&disableStreamCache=true" +
@@ -303,6 +313,17 @@ private URI fcrepoResourceFromPath(final String proxied) {
}
}
+ private static String stripHttpScheme(URI uri) {
+ if (uri.getScheme().startsWith("http")) {
+ return uri.toString().substring("http://".length());
+ }
+
+ if (uri.getScheme().startsWith("https")) {
+ return uri.toString().substring("https://".length());
+ }
+ return uri.toString();
+ }
+
private static T exactlyOne(final Collection of, final String errMsg) {
if (of.size() != 1) {
throw new ResourceNotFoundException(errMsg);
diff --git a/fcrepo-api-x-routing/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/fcrepo-api-x-routing/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 1a29bdd..1968980 100644
--- a/fcrepo-api-x-routing/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/fcrepo-api-x-routing/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -58,6 +58,7 @@
+
@@ -92,8 +93,15 @@
-
+
+
+
+
+
+
+
+