Skip to content

Internals

Bill de hÓra edited this page Nov 7, 2016 · 2 revisions

Internals

Table of Contents DocToc


This section is not needed to use the client. It's here for the curious.

Internal Dependencies

The library has a small number of internal runtime dependencies. They are:

These dependencies have been selected because they have sane versioning models, are robust, don't have global/classloader state, and have no transitive dependencies. They are all shaded - using the client does not require declaring them as dependencies. Shading has downsides notably making the published jar larger, but preferable to dependency clashes, given the project's goals (also these libs avoid doing funky global/classloader things). The internal dependencies are considered private and may be revisited or removed at any time. (As a sidenote, the libraries were picked ahead of the client API design due to the goal of minimised dependencies.)

Logging

The project uses SLF4J with information at debug, info, warn and error levels. If there's no binding on the classpath, you'll see a warning but the client will continue to work.

SLF4J is the only dependency declared by the project, which breaks one of its goals (minimised depedencies) in favour of another (operational observability). The project could (and an original prototype did) use java.util.Logging but this meant every user had to configure it whether or not they use it to avoid noise in their application, and we observe that SLF4J seems to be in far wider use than java.util.Logging.

It's not something we're entirely comfortable with, and the dependency choice may be revisited in the future (eg where the client was considered sufficiently robust and well-instrumented that internal logging was not needed!).

Business and Undefined Event Categories

These two categories have special serialization and deserialization ("serdes") handling. In the API definition they are both effectively raw JSON objects, which in Business case has a well known Nakadi defined field called metadata. Because of this they are problematic to represent as an defined API object as there's no fields to bind data to. This limits a client API to something like a String or a HashMap, or where depedencies and versions are not a concern, a parsed representation of the JSON based on whichever JSON lib the client happens to use.

You can "solve" this by just defining a generic for the entire business or undefined object but since part of the goal of this client is a complete implementation of the Nakadi API, there are two classes for these two categories, called BusinessEventMapped and UndefinedEventMapped. They work by shifting the custom part of the event to a field called data. When you create one these events and post it, all the fields in the data field are lifted to direct children of the posted JSON object. That is, the posted JSON won't contain a field called data that carries the custom information. When an event is marshalled all the fields that are direct children of the JSON object are remapped into the classes data field, and for the business event the standard metadata field is marshalled into an EventMetdata field as per its Nakadi API definition.

By comparison, the DataChangeEvent doesn't have this issue, because the Nakadi API defines that category with a placeholder for custom data which gives a marshaller something to grab onto to. The DataChangeEvent class uses a generic type T which provides a much cleaner way to produce and consume custom data.

The pre-1.0.0 initial API uses a Map<String, Object> for the data field. It's very likely this will be superseded by a generic T field for 1.0.0 as that allows users to define the data using a local domain model instead of performing a second serdes step (it's a Java Map at the moment to verify the remapping idea was feasible to begin with). It also aligns with the DataChangeEvent class and provides a potential way forward to unify all three categories - currently they share a marker interface called Event but for practical purposes they are disjoint types.

Stream Processing

Understanding how streams are consumed requires going into some details.

Both the event and subscription based consumer API streams use a combination of RxJava and OkHttp. Which API is used depends on whether a subscription id or event type name is set on the StreamConfiguration supplied to the StreamProcessor.

Batch Consumption

Batches are read using a BufferedReader from the underlying OkHttp reader and iterating over its lines() method tokenizes the stream into newline delimited chunks of JSON. Those chunks are marshalled to a StreamBatchRecord<T> where T represents the type of the event required by the StreamObserver implementation. For a subscription based stream, the X-Nakadi-StreamId that is sent with the stream's response headers but not in each batch, is supplied in the StreamCursorContext for that batch along with the batch cursor.

Empty keepalive batches use the same technique and are also sent along to the StreamObserver.

Thread Scheduling

When a StreamProcessor is started, it's placed into an executor and run in the background away from the calling thread. Internally an RxJava observable is set up to consume the batch data coming from the HTTP response and the user supplied StreamObserver is registered to get callbacks via an RxJava Observer that is subscribed to the observer.

The setup runs on two standard RxJava schedulers, io and computation. The io scheduler is used to consume the stream from the HTTP response, and the computation scheduler is where the user's StreamObserver runs along with the underlying stream batch processor.

Because the supplied StreamObserver runs on the same computation scheduler as the stream's batch processor, it blocks batch processing but not the underlying io consumption layer (at least not until there's sufficient backpressure to cascade). This means that there is a small level of decoupling between network and batch processing. If upstream from your StreamObserver
becomes transiently slow, that doesn't immediately affect the HTTP connection. This may be useful in scenarios where your upstream event handler is falling behind or rate limiting you (eg Elasticsearch is throttling on batch inserts).

Observer and Offset Observer Interaction

The StreamOffsetObserver supplied to the StreamObserver when it invoked blocks the StreamObserver - it is not called asynchronously, which in turn means it blocks the underlying computation scheduler - if StreamOffsetObserver slows down, it slows down overall batch processing. This is done to make it easier to reason about the consequences of a problem with a custom offset observer that is checkpointing the stream and reduce the likelihood of issues where offsets are processed incorrectly or out of order. If it fails, the StreamObserver can experience and handle that directly, eg by retrying with a backoff, or in extreme cases, shutting down the stream. An example of this is the default StreamOffsetObserver used for the Subscription based stream; it calls the Nakadi server to checkpoint progress and if it crashes it will throw an exception back directly back to the StreamObserver.

Resumption

The stream has two levels of resumption - retries and restarts.

The first, inner level is a basic retry with an exponential backoff that will increment up to a maximum delay. This is invoked for transient errors such as dropped connections or where the server is returning either 5xx level responses or retryable 4xx responses (such as a 429). The subscription based stream will also retry if it cannot obtain a partition to consume from, eg because all partitions have been taken by other clients - this allows some level of recovery such that a client will periodically check and compete for a partition to consume. When a retry occurs the StreamObserver onStop and onStart methods are called. Some errors are considered unretryable and will cause the stream to stop - one example is where the connection has been given cursors the server knows nothing about.

The second, outer level, restarts the entire stream connection from scratch. This is invoked after the StreamObserver onCompleted or onError
methods are called. Clients that wish to stop the stream connection being restarted can call the StreamProcessor stop method. An exception to restart behaviour is when the stream is setup with a batch limit that indicates a bounded number of events are being asked for. Once that number of events is consumed the stream is closed and remains closed.