Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v3 #4

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
Draft
3 changes: 0 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ jobs:
strategy:
matrix:
node-version:
- '14'
- '16'
- '18'
- '20'
- '22'

Expand Down
150 changes: 137 additions & 13 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,82 @@
'use strict'

const createDebug = require('debug')
const {FeedHeader} = require('gtfs-rt-bindings')
const {Writable} = require('stream')
const createEntitiesStore = require('./lib/entities-store')

const {DIFFERENTIAL} = FeedHeader.Incrementality

const debug = createDebug('gtfs-rt-differential-to-full-dataset')

class UnsupportedFeedMessageError extends Error {}

class UnsupportedKindOfFeedEntityError extends Error {}

class FeedEntitySignatureError extends Error {}

const tripSignature = (u) => {
if (u.trip.trip_id) return u.trip.trip_id
if (u.trip.trip_id && u.trip.start_date) {
return u.trip.trip_id + '-' + u.trip.start_date
}
if (u.trip.route_id && u.vehicle.id) {
return u.trip.route_id + '-' + u.vehicle.id
}
// todo: u.trip.route_id + slugg(u.vehicle.label) ?
return null
}

const gtfsRtAsDump = (opt = {}) => {
const defaultTripUpdateExpiresAt = (tU, getNow, defaultTtl) => {
let maxArrDep = -1
if (Array.isArray(tU.stop_time_update)) {
for (const sTU of tU.stop_time_update) {
if (sTU.arrival && Number.isInteger(sTU.arrival.time)) {
maxArrDep = Math.max(maxArrDep, sTU.arrival.time)
}
if (sTU.departure && Number.isInteger(sTU.departure.time)) {
maxArrDep = Math.max(maxArrDep, sTU.departure.time)
}
}
}
if (maxArrDep !== -1) {
return maxArrDep + defaultTtl
}

// todo: fall back to tU.trip.start_{date,time} + buffer if available? – handle canceled trips without sTUs!
if (Number.isInteger(tU.timestamp)) {
return tU.timestamp + defaultTtl
}
return getNow() + defaultTtl
}

const defaultVehiclePositionExpiresAt = (vP, getNow, defaultTtl) => {
// todo: first use vP.trip.start_{date,time} + buffer if available?
if (Number.isInteger(vP.timestamp)) {
return vP.timestamp + defaultTtl
}
return getNow() + defaultTtl
}

const defaultAlertExpiresAt = (alert, getNow, defaultTtl) => {
if (alert.active_period) {
return Number.isInteger(alert.active_period.end)
? alert.active_period.end
: alert.active_period.start + defaultTtl
}
// todo: fall back to alert.informed_entity.trip.start_{date,time} if available?
return getNow() + defaultTtl
}

const gtfsRtDifferentialToFullDataset = (opt = {}) => {
const {
ttl,
timestamp,
ttl: defaultTtlMs,
timestamp: getNow,
tripUpdateSignature,
vehiclePositionSignature,
alertSignature,
tripUpdateExpiresAt,
vehiclePositionExpiresAt,
alertExpiresAt,
} = {
ttl: 5 * 60 * 1000, // 5 minutes
timestamp: () => Date.now() / 1000 | 0,
Expand All @@ -31,30 +90,85 @@ const gtfsRtAsDump = (opt = {}) => {
const tripSig = tripSignature(p)
return tripSig ? 'vehicle_position-' + tripSig : null
},
alertSignature: (a) => {
// todo: see #1
// // todo: sort informed entities & their keys
// const informed = JSON.stringify(a.informed_entity.map(ie => Object.entries(ie).filter((k, v) => v !== null)))
// // todo: also use .cause, .effect, .url & .severity_level?
return null
},
tripUpdateExpiresAt: defaultTripUpdateExpiresAt,
vehiclePositionExpiresAt: defaultVehiclePositionExpiresAt,
alertExpiresAt: defaultAlertExpiresAt,
...opt
}
const defaultTtl = Math.round(defaultTtlMs / 1000)

const entityExpiresAt = (entity) => {
let expiresAt = -1
if (entity.trip_update) {
const _expiresAt = tripUpdateExpiresAt(entity.trip_update, getNow, defaultTtl)
Number.isInteger(_expiresAt, 'tripUpdateExpiresAt() must return an integer or null')
expiresAt = Math.max(expiresAt, _expiresAt)
}
if (entity.vehicle) {
const _expiresAt = vehiclePositionExpiresAt(entity.vehicle, getNow, defaultTtl)
Number.isInteger(_expiresAt, 'vehiclePositionExpiresAt() must return an integer or null')
expiresAt = Math.max(expiresAt, _expiresAt)
}
if (entity.alert) {
const _expiresAt = alertExpiresAt(entity.alert, getNow, defaultTtl)
Number.isInteger(_expiresAt, 'alertExpiresAt() must return an integer or null')
expiresAt = Math.max(expiresAt, _expiresAt)
}
return expiresAt === -1
? getNow() + defaultTtl
: expiresAt
}

const entitiesStore = createEntitiesStore(ttl, timestamp)
const entitiesStore = createEntitiesStore(getNow)

const write = (entity) => {
const processFeedEntity = (entity) => {
// If the entity is not being deleted, exactly one of 'trip_update', 'vehicle' and 'alert' fields should be populated.
// https://developers.google.com/transit/gtfs-realtime/reference#message-feedentity
let sig = null
if (entity.trip_update) {
sig = tripUpdateSignature(entity.trip_update)
} else if (entity.vehicle) {
sig = vehiclePositionSignature(entity.vehicle)
} else if (entity.alert) {
sig = alertSignature(entity.alert)
} else {
const err = new UnsupportedKindOfFeedEntityError('invalid/unsupported kind of FeedEntity')
err.feedEntity = entity
throw err
}
// todo: alert

if (sig !== null) {
entitiesStore.put(sig, entity)
const expiresAt = entityExpiresAt(entity)
debug('storing entity', sig, 'expiresAt', expiresAt, 'entity', entity)
entitiesStore.put(sig, entity, expiresAt)
return;
}
const err = new Error('invalid/unsupported kind of FeedEntity')
const err = new FeedEntitySignatureError('could not determine FeedEntity signature')
err.feedEntity = entity
throw err
}
const processFeedMessage = (msg) => {
if (msg.header.gtfs_realtime_version !== '2.0') {
const err = new UnsupportedFeedMessageError('FeedMessage GTFS-RT version must be 2.0')
err.feedMessage = msg
throw err
}
if (msg.header.incrementality !== DIFFERENTIAL) {
const err = new UnsupportedFeedMessageError('FeedMessage must be DIFFERENTIAL')
err.feedMessage = msg
throw err
}
for (const entity of msg.entity) {
processFeedEntity(entity)
}
}

let feedMessage = null
const asFeedMessage = () => {
Expand All @@ -63,13 +177,13 @@ const gtfsRtAsDump = (opt = {}) => {

const out = new Writable({
objectMode: true,
write: (entity, _, cb) => {
write(entity)
write: (feedMsg, _, cb) => {
processFeedMessage(feedMsg)
out.emit('change')
cb(null)
},
writev: (chunks, cb) => {
for (const {chunk: entity} of chunks) write(entity)
for (const {chunk: feedMsg} of chunks) processFeedMessage(feedMsg)
out.emit('change')
cb(null)
},
Expand All @@ -84,7 +198,17 @@ const gtfsRtAsDump = (opt = {}) => {
// todo: let asFeedMessage return this
out.timeModified = () => entitiesStore.getTimestamp()
out.nrOfEntities = entitiesStore.nrOfEntities

// todo [breaking]: change return value to a regular object
return out
}

module.exports = gtfsRtAsDump
module.exports = {
gtfsRtDifferentialToFullDataset,
UnsupportedFeedMessageError,
UnsupportedKindOfFeedEntityError,
FeedEntitySignatureError,
defaultTripUpdateExpiresAt,
defaultVehiclePositionExpiresAt,
defaultAlertExpiresAt,
}
76 changes: 53 additions & 23 deletions lib/entities-store.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict'

const createDebug = require('debug')
const {ok} = require('assert')
const schema = require('gtfs-rt-bindings/gtfs-realtime.schema.json')
const {varint} = require('protocol-buffers-encodings')
const {FeedEntity, FeedHeader, FeedMessage} = require('gtfs-rt-bindings')
Expand All @@ -13,6 +15,8 @@ const FEED_MSG_ENTITIES = feedMsgFields.entity.id
// https://developers.google.com/protocol-buffers/docs/encoding#structure
const LENGTH_DELIMITED = 2

const debug = createDebug('gtfs-rt-differential-to-full-dataset:entities-store')

// https://developers.google.com/protocol-buffers/docs/encoding#structure
const encodeField = (fieldNumber, wireType, dataLength) => {
// If I'm not wrong, 4 bytes of varint allow for 268.435.456b
Expand All @@ -36,19 +40,26 @@ const entityTimestamp = (entity) => {
return NaN
}

const createEntitiesStore = (ttl, now) => {
const createEntitiesStore = (now) => {
const timers = new Map()
const datas = new Map()
const fields = new Map()
let timestamps = []
const timestampsById = new Map()
let cache = null // cached final `FeedMessage` buffer

const del = (id) => {
if (!timers.has(id)) return;

clearTimeout(timers.get(id))
timers.delete(id)
let lastMod = now()

const del = (id, debugLog = true) => {
const t0 = now()
if (debugLog) debug('del', id)
if (!datas.has(id)) {
if (debugLog) debug('entity does not exist', id)
return;
}
if (timers.has(id)) {
clearTimeout(timers.get(id))
timers.delete(id)
}
datas.delete(id)
fields.delete(id)
if (timestampsById.has(id)) {
Expand All @@ -57,24 +68,39 @@ const createEntitiesStore = (ttl, now) => {
timestampsById.delete(id)
}
cache = null
lastMod = t0
}

const put = (id, entity) => {
// console.error('put', id, entity) // todo: remove
del(id)

{
// todo: use sth more memory-efficient than closures?
// todo: set expiry relative to entity's timestamp?
const timer = setTimeout(del, ttl, id)
timers.set(id, timer)
if (timer && timer.unref) {
// allow Node.js to exit by not requiring its event loop to remain active
// note: not available in browsers
timer.unref()
const put = (id, entity, expiresAt) => {
const t0 = now()
debug('put', id, entity, expiresAt)
del(id, false)

if (expiresAt !== null) {
ok(Number.isInteger(expiresAt), 'expiresAt must be an integer or null')
const timeLeft = (expiresAt - t0) * 1000
if(timeLeft <= 0) {
debug('ignoring already expired entity', id, entity, expiresAt, timeLeft)
return;
}
// > When delay is larger than 2147483647 […], the delay will be set to 1.
// https://nodejs.org/docs/latest-v18.x/api/timers.html#settimeoutcallback-delay-args
if (timeLeft <= 2147483647) {
// todo: use sth more memory-efficient than closures?
const timer = setTimeout(del, timeLeft, id)
timers.set(id, timer)
if (timer && timer.unref) {
// allow Node.js to exit by not requiring its event loop to remain active
// note: not available in browsers
timer.unref()
}
} else {
// todo: what else? – periodic interval, setting timeouts for soon-to-expire entities?
}
}

cache = null

FeedEntity.verify(entity)
const data = FeedEntity.encode(entity).finish()
datas.set(id, data)
Expand All @@ -92,30 +118,34 @@ const createEntitiesStore = (ttl, now) => {
timestampsById.set(id, timestamp)

cache = null
lastMod = t0
}

const flush = () => {
const t0 = now()
debug('flush')
for (const timer of timers.values()) clearTimeout(timer)
timers.clear()
datas.clear()
fields.clear()
timestamps = []
timestampsById.clear()
cache = null
lastMod = t0
}

const nrOfEntities = () => timers.size
const nrOfEntities = () => datas.size

const getTimestamp = () => {
return timestamps.length > 0
? timestamps[timestamps.length - 1] // highest
: now()
: lastMod
}

const asFeedMessage = () => {
if (cache !== null) return cache

const ids = Array.from(timers.keys())
const ids = Array.from(datas.keys())
const chunks = new Array(2 + ids.length * 2)

const rawHeader = {
Expand Down
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "gtfs-rt-differential-to-full-dataset",
"description": "Transform a differential GTFS Realtime feed into a full dataset/dump.",
"version": "2.1.2",
"description": "Transform a stream of DIFFERENTIAL-mode GTFS Realtime (GTFS-RT) FeedEntities into a FULL_DATASET-mode feed.",
"version": "3.0.0-alpha.1",
"main": "index.js",
"files": [
"index.js",
Expand All @@ -16,14 +16,15 @@
"differential"
],
"author": "Jannis R <[email protected]>",
"homepage": "https://github.com/derhuerst/gtfs-rt-differential-to-full-dataset",
"homepage": "https://github.com/derhuerst/gtfs-rt-differential-to-full-dataset/tree/3.0.0-alpha.1",
"repository": "derhuerst/gtfs-rt-differential-to-full-dataset",
"bugs": "https://github.com/derhuerst/gtfs-rt-differential-to-full-dataset/issues",
"license": "ISC",
"engines": {
"node": ">=14"
"node": ">=20"
},
"dependencies": {
"debug": "^4.4.0",
"gtfs-rt-bindings": "^4.0.0",
"protocol-buffers-encodings": "^1.1.0",
"sorted-array-functions": "^1.3.0"
Expand Down
Loading
Loading