forked from awslabs/lambda-streams-to-firehose
-
Notifications
You must be signed in to change notification settings - Fork 0
/
transformer.js
149 lines (133 loc) · 4.85 KB
/
transformer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/**
* Example transformer that adds a newline to each event
*
* Args:
*
* data - Object or string containing the data to be transformed
*
* callback(err, Buffer) - callback to be called once transformation is
* completed. If supplied callback is with a null/undefined output (such as
* filtering) then nothing will be sent to Firehose
*/
var async = require('async');
require('./constants');
var debug = process.env.DEBUG || false;
function addNewlineTransformer(data, callback) {
// emitting a new buffer as text with newline
callback(null, new Buffer(data + "\n", targetEncoding));
};
exports.addNewlineTransformer = addNewlineTransformer;
/** Convert JSON data to its String representation */
function jsonToStringTransformer(data, callback) {
// emitting a new buffer as text with newline
callback(null, new Buffer(JSON.stringify(data) + "\n", targetEncoding));
};
exports.jsonToStringTransformer = jsonToStringTransformer;
/** literally nothing at all transformer - just wrap the object in a buffer */
function doNothingTransformer(data, callback) {
// emitting a new buffer as text with newline
callback(null, new Buffer(data, targetEncoding));
};
exports.doNothingTransformer = doNothingTransformer;
/**
* Example transformer that converts a regular expression to delimited text
*/
function regexToDelimiter(regex, delimiter, data, callback) {
var tokens = JSON.stringify(data).match(regex);
if (tokens) {
// emitting a new buffer as delimited text whose contents are the regex
// character classes
callback(null, new Buffer(tokens.slice(1).join(delimiter) + "\n"));
} else {
callback("Configured Regular Expression does not match any tokens", null);
}
};
exports.regexToDelimiter = regexToDelimiter;
//
// example regex transformer
// var transformer = exports.regexToDelimiter.bind(undefined, /(myregex) (.*)/,
// "|");
function transformRecords(serviceName, transformer, userRecords, callback) {
async.map(userRecords, function(userRecord, userRecordCallback) {
var dataItem = serviceName === KINESIS_SERVICE_NAME ? new Buffer(userRecord.data, 'base64').toString(targetEncoding) : userRecord;
transformer.call(undefined, dataItem, function(err, transformed) {
if (err) {
console.log(JSON.stringify(err));
userRecordCallback(err);
} else {
if (transformed && transformed instanceof Buffer) {
// call the map callback with the
// transformed Buffer decorated for use as a
// Firehose batch entry
userRecordCallback(null, transformed);
} else {
// don't know what this transformed
// object is
userRecordCallback("Output of Transformer was malformed. Must be instance of Buffer or routable Object");
}
}
});
}, function(err, transformed) {
// user records have now been transformed, so call
// errors or invoke the transformed record processor
if (err) {
callback(err);
} else {
callback(null, transformed);
}
});
};
exports.transformRecords = transformRecords;
function setupTransformer(callback) {
// set the default transformer
var t = jsonToStringTransformer.bind(undefined);
// set the transformer based on specified datatype of the stream
if (process.env[STREAM_DATATYPE_ENV]) {
var found = false;
Object.keys(supportedDatatypeTransformerMappings).forEach(function(key) {
if (process.env[STREAM_DATATYPE_ENV] === key) {
found = true;
}
});
if (found === true) {
// set the transformer class via a cross reference to the
// transformer mapping
if (debug) {
console.log("Setting data transformer based on Stream Datatype configuration");
}
t = this[transformerRegistry[supportedDatatypeTransformerMappings[process.env[STREAM_DATATYPE_ENV]]]].bind(undefined);
}
} else {
if (debug) {
console.log("No Stream Datatype Environment Configuration found");
}
}
// check if the transformer has been overridden by environment settings
if (process.env[TRANSFORMER_FUNCTION_ENV]) {
console.log(process.env[TRANSFORMER_FUNCTION_ENV])
var found = false;
Object.keys(transformerRegistry).forEach(function(key) {
if (process.env[TRANSFORMER_FUNCTION_ENV] === transformerRegistry[key]) {
found = true;
}
});
if (found === false) {
callback("Configured Transformer function " + process.env[TRANSFORMER_FUNCTION_ENV] + " is not a valid transformation method in the transformer.js module");
} else {
if (debug) {
console.log("Setting data transformer based on Transformer Override configuration");
}
// dynamically bind in the transformer function
t = this[process.env[TRANSFORMER_FUNCTION_ENV]].bind(undefined);
}
} else {
if (debug) {
console.log("No Transformer Override Environment Configuration found");
}
}
if (debug) {
console.log("Using Transformer function " + t.name);
}
callback(null, t);
}
exports.setupTransformer = setupTransformer;