-
Notifications
You must be signed in to change notification settings - Fork 2
/
pino-eventhub.js
121 lines (108 loc) · 3.57 KB
/
pino-eventhub.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#! /usr/bin/env node
'use strict'
const Writable = require('readable-stream').Writable
const split = require('split2')
const pump = require('pump')
const utf8 = require('utf8')
const crypto = require('crypto')
const https = require('https')
const debug = require('debug')('pino-eventhub')
const Parse = require('fast-json-parse')
const socketCount = 10
function giveSecurityWarning () {
console.warn('It is poor security practice to share your Shared Access Policy Key. It is better to calculate the Shared Access Signature, and share that.')
console.log("'pino-eventhub.createSignature' can be used to calculate the Shared Access Signature.")
}
function createSignature (uri, ttl, sapk, warn) {
if (warn) {
giveSecurityWarning()
}
const signature = uri + '\n' + ttl
const signatureUTF8 = utf8.encode(signature)
const hash = crypto.createHmac('sha256', sapk)
.update(signatureUTF8)
.digest('base64')
return encodeURIComponent(hash)
}
function pinoEventHub (opts) {
const splitter = split(function (line) {
return line
})
const agent = new https.Agent({keepAlive: true, maxSockets: opts.max || socketCount})
const options = {
method: 'POST',
host: opts.host.slice(8), // remove 'https://'
port: opts.port,
agent: agent,
path: '/' + opts.eh + '/messages?timeout=60&api-version=2014-01',
headers: {
Authorization: 'SharedAccessSignature sr=' + opts.sr + '&sig=' + opts.sig + '&se=' + opts.se + '&skn=' + opts.skn,
'Content-Type': 'application/atom+xml;type=entry;charset=utf-8'
}
}
const bulkHeaders = Object.assign({}, options.headers,
{ 'Content-Type': 'application/vnd.microsoft.servicebus.json' })
const bulkOptions = Object.assign({}, options,
{ headers: bulkHeaders })
function callback (done) {
return function inner (response) {
debug('response.statusCode =', response.statusCode)
debug('response.statusMessage =', response.statusMessage)
if (response.statusCode !== 201) {
splitter.emit('error', new Error(response.statusCode))
}
response.on('data', function (data) {
debug('data =', data.toString())
})
response.on('end', function () {
debug('call completed')
done()
})
}
}
const writable = new Writable({
objectMode: true,
highWaterMark: opts['bulk-size'] || 500,
writev: function (blocks, done) {
// https://docs.microsoft.com/en-us/rest/api/eventhub/send-batch-events
const events = blocks
.map(block => block.chunk)
.map(line => {
// check if console output is a string or object
const parsed = new Parse(line)
return (parsed.err)
? JSON.stringify({ Body: line })
: `{"UserProperties":${line}}`
})
.join(',')
const body = `[${events}]`
debug(`body =`, body)
const req = https.request(bulkOptions, callback(done))
req.on('error', (e) => {
console.error(`request error =`, e)
done()
})
req.write(body)
req.end()
},
write: function (line, enc, done) {
debug(`write: line =`, line)
debug(`write: typeof ===`, typeof line)
if (line) {
const req = https.request(options, callback(done))
req.on('error', (e) => {
console.error(`request error =`, e)
})
req.write(line)
req.end()
} else {
done()
}
}
})
pump(splitter, writable)
return splitter
}
module.exports = pinoEventHub
module.exports.createSignature = createSignature
module.exports.giveSecurityWarning = giveSecurityWarning