Skip to content

Commit

Permalink
Let undertow handle the dirty work.
Browse files Browse the repository at this point in the history
  • Loading branch information
noboomu committed Aug 2, 2018
1 parent 20d00c8 commit 23b1c46
Showing 1 changed file with 91 additions and 41 deletions.
132 changes: 91 additions & 41 deletions src/main/java/io/sinistral/proteus/server/ServerRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@
import java.util.Deque;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xnio.channels.StreamSourceChannel;

import io.sinistral.proteus.server.predicates.ServerPredicates;
import io.undertow.attribute.BytesSentAttribute;
import io.undertow.attribute.ExchangeAttributes;
import io.undertow.connector.PooledByteBuffer;
import io.undertow.io.Receiver;
import io.undertow.security.api.SecurityContext;
import io.undertow.server.HttpServerExchange;
import io.undertow.server.handlers.form.FormData;
Expand All @@ -28,7 +26,6 @@
import io.undertow.util.AttachmentKey;
import io.undertow.util.FastConcurrentDirectDeque;
import io.undertow.util.Headers;
import io.undertow.util.MalformedMessageException;

/**
*
Expand All @@ -37,6 +34,16 @@
*/
public class ServerRequest
{
private static final LinkedBlockingDeque<IOException> EXCEPTIONS = new LinkedBlockingDeque<>();

public static final Receiver.ErrorCallback ERROR_CALLBACK = new Receiver.ErrorCallback() {
@Override
public void error(HttpServerExchange exchange, IOException e) {
EXCEPTIONS.add(e);
exchange.endExchange();
}
};

public static final AttachmentKey<ByteBuffer> BYTE_BUFFER_KEY = AttachmentKey.create(ByteBuffer.class);

private static Logger log = LoggerFactory.getLogger(ServerRequest.class.getCanonicalName());
Expand Down Expand Up @@ -146,47 +153,90 @@ public SecurityContext getSecurityContext()
return exchange.getSecurityContext();
}



private void extractBytes() throws IOException
{
//log.debug("start extracting bytes!");
log.debug("start extracting bytes!");

this.exchange.startBlocking();

try (PooledByteBuffer pooled = exchange.getConnection().getByteBufferPool().getArrayBackedPool().allocate()){
ByteBuffer buf = pooled.getBuffer();

final StreamSourceChannel channel = this.exchange.getRequestChannel();

while (true) {

buf.clear();

int c = channel.read(buf);

if (c == -1) {

int pos = buf.limit();

ByteBuffer buffer = ByteBuffer.allocate(pos);

System.arraycopy(buf.array(), 0, buffer.array(), 0, pos);

buffer.rewind();

log.debug("buffer " + new String(buffer.array()));

exchange.putAttachment(BYTE_BUFFER_KEY, buffer);

break;

} else if (c != 0) {
buf.limit(c);
}
}
} catch (MalformedMessageException e) {
throw new IOException(e);
}

this.exchange.getRequestReceiver().receiveFullBytes(new Receiver.FullBytesCallback() {
@Override
public void handle(HttpServerExchange exchange, byte[] message) {

ByteBuffer buffer = ByteBuffer.wrap(message);

exchange.putAttachment(BYTE_BUFFER_KEY, buffer);

}
}, ERROR_CALLBACK);


// long contentLength;
// final ByteArrayOutputStream sb;
//
// String contentLengthString = exchange.getRequestHeaders().getFirst(Headers.CONTENT_LENGTH);
//
// if (contentLengthString != null) {
// contentLength = Long.parseLong(contentLengthString);
// sb = new ByteArrayOutputStream((int) contentLength);
// }
// else {
// contentLength = -1;
// sb = new ByteArrayOutputStream();
// }
//
// int s;
//
// final InputStream is = this.exchange.getInputStream();
//
// try (PooledByteBuffer pooled = exchange.getConnection().getByteBufferPool().getArrayBackedPool().allocate()) {
// while ((s = is.read(pooled.getBuffer().array(), pooled.getBuffer().arrayOffset(), pooled.getBuffer().remaining())) > 0) {
// sb.write(pooled.getBuffer().array(), pooled.getBuffer().arrayOffset(), s);
// }
// }
//
// ByteBuffer buffer = ByteBuffer.wrap(sb.toByteArray());
//
// exchange.putAttachment(BYTE_BUFFER_KEY, buffer);

// try (PooledByteBuffer pooled = exchange.getConnection().getByteBufferPool().getArrayBackedPool().allocate()){
// ByteBuffer buf = pooled.getBuffer();
//
// log.debug("buf: " + buf);
//
// final StreamSourceChannel channel = this.exchange.getRequestChannel();
//
// while (true) {
//
// buf.clear();
//
// int c = channel.read(buf);
//
// if (c == -1) {
//
// int pos = buf.limit();
//
// ByteBuffer buffer = ByteBuffer.allocate(pos);
//
// System.arraycopy(buf.array(), 0, buffer.array(), 0, pos);
//
// buffer.rewind();
//
// log.debug("buffer " + buffer + "\n" + new String(buffer.array()));
//
// exchange.putAttachment(BYTE_BUFFER_KEY, buffer);
//
// break;
//
// } else if (c != 0) {
// buf.limit(c);
// }
// }
// } catch (MalformedMessageException e) {
// throw new IOException(e);
// }

}

Expand Down

0 comments on commit 23b1c46

Please sign in to comment.