Skip to content

Commit

Permalink
added more verbose logging of uprocessed requests to DynamoDB
Browse files Browse the repository at this point in the history
  • Loading branch information
albogdano committed Mar 24, 2021
1 parent 46c6a99 commit 24af8f0
Showing 1 changed file with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import com.erudika.para.utils.Pager;
import java.lang.annotation.Annotation;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -581,7 +583,9 @@ protected static <P extends ParaObject> void batchGet(Map<String, KeysAndAttribu

if (result.unprocessedKeys() != null && !result.unprocessedKeys().isEmpty()) {
Thread.sleep((long) backoff * 1000L);
logger.warn("{} UNPROCESSED read requests!", result.unprocessedKeys().size());
logger.warn("{} UNPROCESSED read requests for keys {}!", result.unprocessedKeys().size(),
result.unprocessedKeys().values().stream().flatMap(k -> k.keys().stream()).
flatMap(r -> r.keySet().stream()).collect(Collectors.joining(",")));
batchGet(result.unprocessedKeys(), results, backoff * 2);
}
} catch (ProvisionedThroughputExceededException ex) {
Expand Down Expand Up @@ -620,7 +624,10 @@ protected static void batchWrite(Map<String, List<WriteRequest>> items, int back

if (result.unprocessedItems() != null && !result.unprocessedItems().isEmpty()) {
Thread.sleep((long) backoff * 1000L);
logger.warn("{} UNPROCESSED write requests!", result.unprocessedItems().size());
logger.warn("{} UNPROCESSED write requests for keys {}!", result.unprocessedItems().size(),
result.unprocessedItems().values().stream().flatMap(Collection::stream).
map(r -> r.getValueForField(Config._KEY, String.class).orElse("")).
collect(Collectors.joining(",")));
batchWrite(result.unprocessedItems(), backoff * 2);
}
} catch (ProvisionedThroughputExceededException ex) {
Expand Down Expand Up @@ -831,7 +838,7 @@ private static void waitForActive(String table, String region) throws Interrupte

protected static void throwIfNecessary(Throwable t) {
if (t != null && Config.getConfigBoolean("fail_on_write_errors", true)) {
throw new RuntimeException("DAO write operation failed!", t);
throw new RuntimeException("DAO write operation failed! - " + t.getMessage(), t);
}
}

Expand Down

0 comments on commit 24af8f0

Please sign in to comment.