diff --git a/src/main/java/io/sinistral/proteus/server/ServerRequest.java b/src/main/java/io/sinistral/proteus/server/ServerRequest.java index ea57d3c..1644a17 100644 --- a/src/main/java/io/sinistral/proteus/server/ServerRequest.java +++ b/src/main/java/io/sinistral/proteus/server/ServerRequest.java @@ -10,15 +10,13 @@ import java.util.Deque; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingDeque; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.xnio.channels.StreamSourceChannel; import io.sinistral.proteus.server.predicates.ServerPredicates; -import io.undertow.attribute.BytesSentAttribute; -import io.undertow.attribute.ExchangeAttributes; -import io.undertow.connector.PooledByteBuffer; +import io.undertow.io.Receiver; import io.undertow.security.api.SecurityContext; import io.undertow.server.HttpServerExchange; import io.undertow.server.handlers.form.FormData; @@ -28,7 +26,6 @@ import io.undertow.util.AttachmentKey; import io.undertow.util.FastConcurrentDirectDeque; import io.undertow.util.Headers; -import io.undertow.util.MalformedMessageException; /** * @@ -37,6 +34,16 @@ */ public class ServerRequest { + private static final LinkedBlockingDeque EXCEPTIONS = new LinkedBlockingDeque<>(); + + public static final Receiver.ErrorCallback ERROR_CALLBACK = new Receiver.ErrorCallback() { + @Override + public void error(HttpServerExchange exchange, IOException e) { + EXCEPTIONS.add(e); + exchange.endExchange(); + } + }; + public static final AttachmentKey BYTE_BUFFER_KEY = AttachmentKey.create(ByteBuffer.class); private static Logger log = LoggerFactory.getLogger(ServerRequest.class.getCanonicalName()); @@ -146,47 +153,90 @@ public SecurityContext getSecurityContext() return exchange.getSecurityContext(); } + private void extractBytes() throws IOException { - //log.debug("start extracting bytes!"); + log.debug("start extracting bytes!"); this.exchange.startBlocking(); - - try (PooledByteBuffer pooled = exchange.getConnection().getByteBufferPool().getArrayBackedPool().allocate()){ - ByteBuffer buf = pooled.getBuffer(); - - final StreamSourceChannel channel = this.exchange.getRequestChannel(); - - while (true) { - - buf.clear(); - - int c = channel.read(buf); - - if (c == -1) { - - int pos = buf.limit(); - - ByteBuffer buffer = ByteBuffer.allocate(pos); - - System.arraycopy(buf.array(), 0, buffer.array(), 0, pos); - - buffer.rewind(); - - log.debug("buffer " + new String(buffer.array())); - - exchange.putAttachment(BYTE_BUFFER_KEY, buffer); - - break; - - } else if (c != 0) { - buf.limit(c); - } - } - } catch (MalformedMessageException e) { - throw new IOException(e); - } + + this.exchange.getRequestReceiver().receiveFullBytes(new Receiver.FullBytesCallback() { + @Override + public void handle(HttpServerExchange exchange, byte[] message) { + + ByteBuffer buffer = ByteBuffer.wrap(message); + + exchange.putAttachment(BYTE_BUFFER_KEY, buffer); + + } + }, ERROR_CALLBACK); + + +// long contentLength; +// final ByteArrayOutputStream sb; +// +// String contentLengthString = exchange.getRequestHeaders().getFirst(Headers.CONTENT_LENGTH); +// +// if (contentLengthString != null) { +// contentLength = Long.parseLong(contentLengthString); +// sb = new ByteArrayOutputStream((int) contentLength); +// } +// else { +// contentLength = -1; +// sb = new ByteArrayOutputStream(); +// } +// +// int s; +// +// final InputStream is = this.exchange.getInputStream(); +// +// try (PooledByteBuffer pooled = exchange.getConnection().getByteBufferPool().getArrayBackedPool().allocate()) { +// while ((s = is.read(pooled.getBuffer().array(), pooled.getBuffer().arrayOffset(), pooled.getBuffer().remaining())) > 0) { +// sb.write(pooled.getBuffer().array(), pooled.getBuffer().arrayOffset(), s); +// } +// } +// +// ByteBuffer buffer = ByteBuffer.wrap(sb.toByteArray()); +// +// exchange.putAttachment(BYTE_BUFFER_KEY, buffer); + +// try (PooledByteBuffer pooled = exchange.getConnection().getByteBufferPool().getArrayBackedPool().allocate()){ +// ByteBuffer buf = pooled.getBuffer(); +// +// log.debug("buf: " + buf); +// +// final StreamSourceChannel channel = this.exchange.getRequestChannel(); +// +// while (true) { +// +// buf.clear(); +// +// int c = channel.read(buf); +// +// if (c == -1) { +// +// int pos = buf.limit(); +// +// ByteBuffer buffer = ByteBuffer.allocate(pos); +// +// System.arraycopy(buf.array(), 0, buffer.array(), 0, pos); +// +// buffer.rewind(); +// +// log.debug("buffer " + buffer + "\n" + new String(buffer.array())); +// +// exchange.putAttachment(BYTE_BUFFER_KEY, buffer); +// +// break; +// +// } else if (c != 0) { +// buf.limit(c); +// } +// } +// } catch (MalformedMessageException e) { +// throw new IOException(e); +// } }