Skip to content
This repository has been archived by the owner on Apr 9, 2021. It is now read-only.

Fixed reading incomplete event JSON strings from the stream #97

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 28 additions & 47 deletions packages/event_store/src/eventHandler.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
const stream = require('stream');
const { promisify } = require('util');
const got = require('got');
const fs = require('fs');
const readline = require('readline');
const Storage = require('./storage');
const models = require('../src/models/index');
const env = process.env.NODE_ENV || 'development';
Expand All @@ -18,8 +16,12 @@ class EventHandler {
*/
async createInputStream(url) {
try {
const readStream = got.stream(url);
// Still to implement retry on failed connection
const readStream = readline.createInterface({
input: got.stream(url),
crlfDelay: Infinity
});

return readStream;
} catch (err) {
if (err instanceof got.stream.RequestError) {
Expand All @@ -33,7 +35,7 @@ class EventHandler {
/**
* Returns writeable stream pointed to the storage component
*/
async createOutputStream() {
async createStorage() {

// Sync database schema.
if (env == "production") {
Expand All @@ -45,53 +47,32 @@ class EventHandler {
}

// Initialise storage
let storage = new Storage(models);

// Extend empty writeable object
let outputStream = new stream.Writable();

outputStream._write = async (chunk, encoding, done) => {
// Removes 'data:' prefix from the event to convert it to JSON
chunk.toString().split("\n")
.filter(str => str.startsWith('data'))
.forEach(async str => {
let event;
try {
event = JSON.parse(str.substr(5));
if (event.DeployProcessed) {
await storage.onDeployProcessed(event.DeployProcessed);
} else if (event.BlockAdded) {
await storage.onBlockAdded(event.BlockAdded);
}
} catch (err) {
console.log(`Error while processing an event.\nEvent: ${str}\nError: ${err}`);
}
});
done();
}

return outputStream;
return new Storage(models);
}

/**
* Attempts to create a streaming pipeline given an input and output stream.
*
* @param {stream.Readable} inputStream
* @param {stream.Writable} outputStream
* @param {readline.Interface} inputStream
* @param {Storage} storage
*/
async createPipeline(inputStream, outputStream) {

// initialise pipeline
const pipeline = promisify(stream.pipeline);
async createPipeline(inputStream, storage) {
inputStream.on('line', async (eventString) => {
if (!eventString.startsWith('data')) {
return;
}

try {
await pipeline(
inputStream,
outputStream
);
} catch (err) {
console.error(err);
}
try {
const event = JSON.parse(eventString.substr(5));
if (event.DeployProcessed) {
await storage.onDeployProcessed(event.DeployProcessed);
} else if (event.BlockAdded) {
await storage.onBlockAdded(event.BlockAdded);
}
} catch (err) {
console.log(`Error while processing an event.\nEvent: ${eventString}\nError: ${err}`);
}
});
}


Expand Down Expand Up @@ -144,9 +125,9 @@ if (env !== 'test') {
let eventHandler = new EventHandler();
let nodeUrl = await eventHandler.formURL();
let eventStream = await eventHandler.createInputStream(nodeUrl);
let storageStream = await eventHandler.createOutputStream();
let storage = await eventHandler.createStorage();

eventHandler.createPipeline(eventStream, storageStream);
eventHandler.createPipeline(eventStream, storage);
}

runEventHandler();
Expand Down