Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Commit

Permalink
Plugin Capabilities implementation (#384)
Browse files Browse the repository at this point in the history
* add tests for plugin capabilities
* address comments, update tests
* clean up
* update capabilities type definition
  • Loading branch information
neilkakkar authored May 25, 2021
1 parent 6c98426 commit 09f1c34
Show file tree
Hide file tree
Showing 14 changed files with 367 additions and 109 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,14 @@ TODO

TODO

#### How VM Extensions Work

TODO

### Plugins vs. plugin configs

### End Notes

An `organization_id` is tied to a _company_ and its _installed plugins_, a `team_id` is tied to a _project_ and its _plugin configs_ (enabled/disabled+extra config).

## Questions?
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/vm/memory.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ test('test vm memory usage', async () => {

for (let i = 0; i < numEventsPerVM; i++) {
for (let j = 0; j < numVMs; j++) {
await vms[j].methods.processEvent(createEvent(i + j))
await vms[j].methods.processEvent!(createEvent(i + j))
}
if (debug || i === numEventsPerVM - 1) {
global?.gc?.()
Expand Down
20 changes: 13 additions & 7 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,13 @@ export interface Plugin {
from_web?: boolean
created_at: string
updated_at: string
capabilities?: Record<string, any>
capabilities?: PluginCapabilities
}

export interface PluginCapabilities {
jobs?: string[]
scheduled_tasks?: string[]
methods?: string[]
}

export interface PluginConfig {
Expand Down Expand Up @@ -255,13 +261,13 @@ export type WorkerMethods = {
export interface PluginConfigVMReponse {
vm: VM
methods: {
setupPlugin: () => Promise<void>
teardownPlugin: () => Promise<void>
onEvent: (event: PluginEvent) => Promise<void>
onSnapshot: (event: PluginEvent) => Promise<void>
processEvent: (event: PluginEvent) => Promise<PluginEvent>
setupPlugin?: () => Promise<void>
teardownPlugin?: () => Promise<void>
onEvent?: (event: PluginEvent) => Promise<void>
onSnapshot?: (event: PluginEvent) => Promise<void>
processEvent?: (event: PluginEvent) => Promise<PluginEvent>
// DEPRECATED
processEventBatch: (batch: PluginEvent[]) => Promise<PluginEvent[]>
processEventBatch?: (batch: PluginEvent[]) => Promise<PluginEvent[]>
}
tasks: Record<PluginTaskType, Record<string, PluginTask>>
}
Expand Down
39 changes: 24 additions & 15 deletions src/utils/db/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
Hub,
Plugin,
PluginAttachmentDB,
PluginCapabilities,
PluginConfig,
PluginConfigId,
PluginError,
Expand All @@ -22,8 +23,8 @@ function pluginConfigsInForceQuery(specificField?: keyof PluginConfig): string {
)`
}

export async function getPluginRows(server: Hub): Promise<Plugin[]> {
const { rows: pluginRows }: { rows: Plugin[] } = await server.db.postgresQuery(
export async function getPluginRows(hub: Hub): Promise<Plugin[]> {
const { rows: pluginRows }: { rows: Plugin[] } = await hub.db.postgresQuery(
`SELECT posthog_plugin.* FROM posthog_plugin
WHERE id IN (${pluginConfigsInForceQuery('plugin_id')} GROUP BY posthog_pluginconfig.plugin_id)`,
undefined,
Expand All @@ -32,8 +33,8 @@ export async function getPluginRows(server: Hub): Promise<Plugin[]> {
return pluginRows
}

export async function getPluginAttachmentRows(server: Hub): Promise<PluginAttachmentDB[]> {
const { rows }: { rows: PluginAttachmentDB[] } = await server.db.postgresQuery(
export async function getPluginAttachmentRows(hub: Hub): Promise<PluginAttachmentDB[]> {
const { rows }: { rows: PluginAttachmentDB[] } = await hub.db.postgresQuery(
`SELECT posthog_pluginattachment.* FROM posthog_pluginattachment
WHERE plugin_config_id IN (${pluginConfigsInForceQuery('id')})`,
undefined,
Expand All @@ -42,39 +43,47 @@ export async function getPluginAttachmentRows(server: Hub): Promise<PluginAttach
return rows
}

export async function getPluginConfigRows(server: Hub): Promise<PluginConfig[]> {
const { rows }: { rows: PluginConfig[] } = await server.db.postgresQuery(
export async function getPluginConfigRows(hub: Hub): Promise<PluginConfig[]> {
const { rows }: { rows: PluginConfig[] } = await hub.db.postgresQuery(
pluginConfigsInForceQuery(),
undefined,
'getPluginConfigRows'
)
return rows
}

export async function setError(
server: Hub,
pluginError: PluginError | null,
pluginConfig: PluginConfig
export async function setPluginCapabilities(
hub: Hub,
pluginConfig: PluginConfig,
capabilities: PluginCapabilities
): Promise<void> {
await server.db.postgresQuery(
await hub.db.postgresQuery(
'UPDATE posthog_plugin SET capabilities = ($1) WHERE id = $2',
[capabilities, pluginConfig.plugin_id],
'setPluginCapabilities'
)
}

export async function setError(hub: Hub, pluginError: PluginError | null, pluginConfig: PluginConfig): Promise<void> {
await hub.db.postgresQuery(
'UPDATE posthog_pluginconfig SET error = $1 WHERE id = $2',
[pluginError, typeof pluginConfig === 'object' ? pluginConfig?.id : pluginConfig],
'updatePluginConfigError'
)
if (pluginError) {
await server.db.createPluginLogEntry(
await hub.db.createPluginLogEntry(
pluginConfig,
PluginLogEntrySource.Plugin,
PluginLogEntryType.Error,
pluginError.message,
server.instanceId,
hub.instanceId,
pluginError.time
)
}
}

export async function disablePlugin(server: Hub, pluginConfigId: PluginConfigId): Promise<void> {
await server.db.postgresQuery(
export async function disablePlugin(hub: Hub, pluginConfigId: PluginConfigId): Promise<void> {
await hub.db.postgresQuery(
`UPDATE posthog_pluginconfig SET enabled='f' WHERE id=$1 AND enabled='t'`,
[pluginConfigId],
'disablePlugin'
Expand Down
2 changes: 1 addition & 1 deletion src/worker/plugins/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export async function runProcessEvent(server: Hub, event: PluginEvent): Promise<

try {
returnedEvent = (await processEvent(returnedEvent)) || null
if (returnedEvent.team_id != teamId) {
if (returnedEvent.team_id !== teamId) {
returnedEvent.team_id = teamId
throw new IllegalOperationError('Plugin tried to change event.team_id')
}
Expand Down
1 change: 0 additions & 1 deletion src/worker/plugins/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import { teardownPlugins } from './teardown'

export async function setupPlugins(server: Hub): Promise<void> {
const { plugins, pluginConfigs, pluginConfigsPerTeam } = await loadPluginsFromDB(server)

const pluginVMLoadPromises: Array<Promise<any>> = []
for (const [id, pluginConfig] of pluginConfigs) {
const plugin = plugins.get(pluginConfig.plugin_id)
Expand Down
75 changes: 62 additions & 13 deletions src/worker/vm/lazy.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import equal from 'fast-deep-equal'

import {
Hub,
PluginCapabilities,
PluginConfig,
PluginConfigVMReponse,
PluginLogEntrySource,
Expand All @@ -8,41 +11,42 @@ import {
PluginTaskType,
} from '../../types'
import { clearError, processError } from '../../utils/db/error'
import { disablePlugin } from '../../utils/db/sql'
import { disablePlugin, setPluginCapabilities } from '../../utils/db/sql'
import { status } from '../../utils/status'
import { createPluginConfigVM } from './vm'

export class LazyPluginVM {
initialize?: (server: Hub, pluginConfig: PluginConfig, indexJs: string, logInfo: string) => Promise<void>
initialize?: (hub: Hub, pluginConfig: PluginConfig, indexJs: string, logInfo: string) => Promise<void>
failInitialization?: () => void
resolveInternalVm: Promise<PluginConfigVMReponse | null>

constructor() {
this.resolveInternalVm = new Promise((resolve) => {
this.initialize = async (server: Hub, pluginConfig: PluginConfig, indexJs: string, logInfo = '') => {
this.initialize = async (hub: Hub, pluginConfig: PluginConfig, indexJs: string, logInfo = '') => {
try {
const vm = await createPluginConfigVM(server, pluginConfig, indexJs)
await server.db.createPluginLogEntry(
const vm = await createPluginConfigVM(hub, pluginConfig, indexJs)
await hub.db.createPluginLogEntry(
pluginConfig,
PluginLogEntrySource.System,
PluginLogEntryType.Info,
`Plugin loaded (instance ID ${server.instanceId}).`,
server.instanceId
`Plugin loaded (instance ID ${hub.instanceId}).`,
hub.instanceId
)
status.info('🔌', `Loaded ${logInfo}`)
void clearError(server, pluginConfig)
void clearError(hub, pluginConfig)
await this.inferPluginCapabilities(hub, pluginConfig, vm)
resolve(vm)
} catch (error) {
await server.db.createPluginLogEntry(
await hub.db.createPluginLogEntry(
pluginConfig,
PluginLogEntrySource.System,
PluginLogEntryType.Error,
`Plugin failed to load and was disabled (instance ID ${server.instanceId}).`,
server.instanceId
`Plugin failed to load and was disabled (instance ID ${hub.instanceId}).`,
hub.instanceId
)
status.warn('⚠️', `Failed to load ${logInfo}`)
void disablePlugin(server, pluginConfig.id)
void processError(server, pluginConfig, error)
void disablePlugin(hub, pluginConfig.id)
void processError(hub, pluginConfig, error)
resolve(null)
}
}
Expand Down Expand Up @@ -79,4 +83,49 @@ export class LazyPluginVM {
async getTasks(type: PluginTaskType): Promise<Record<string, PluginTask>> {
return (await this.resolveInternalVm)?.tasks?.[type] || {}
}

private async inferPluginCapabilities(
hub: Hub,
pluginConfig: PluginConfig,
vm: PluginConfigVMReponse
): Promise<void> {
if (!pluginConfig.plugin) {
throw new Error(`'PluginConfig missing plugin: ${pluginConfig}`)
}

const capabilities: Required<PluginCapabilities> = { scheduled_tasks: [], jobs: [], methods: [] }

const tasks = vm?.tasks
const methods = vm?.methods

if (methods) {
for (const [key, value] of Object.entries(methods)) {
if (value) {
capabilities.methods.push(key)
}
}
}

if (tasks?.schedule) {
for (const [key, value] of Object.entries(tasks.schedule)) {
if (value) {
capabilities.scheduled_tasks.push(key)
}
}
}

if (tasks?.job) {
for (const [key, value] of Object.entries(tasks.job)) {
if (value) {
capabilities.jobs.push(key)
}
}
}

const prevCapabilities = pluginConfig.plugin.capabilities
if (!equal(prevCapabilities, capabilities)) {
await setPluginCapabilities(hub, pluginConfig, capabilities)
pluginConfig.plugin.capabilities = capabilities
}
}
}
10 changes: 9 additions & 1 deletion tests/helpers/plugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export const plugin60: Plugin = {
is_preinstalled: false,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
capabilities: {},
capabilities: {}, // inferred on setup
}

export const pluginAttachment1: PluginAttachmentDB = {
Expand Down Expand Up @@ -139,3 +139,11 @@ export function mockPluginTempFolder(indexJs: string, pluginJson?: string): [Plu
},
]
}

export const mockPluginSourceCode = (indexJs: string): Plugin => ({
...plugin60,
archive: null,
plugin_type: 'source',
url: undefined,
source: indexJs,
})
21 changes: 14 additions & 7 deletions tests/helpers/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,15 @@ async function insertRow(db: Pool, table: string, object: Record<string, any>):
const params = Object.keys(object)
.map((_, i) => `\$${i + 1}`)
.join(',')
const values = Object.values(object).map((value) => {
if (Array.isArray(value) && value.length > 0) {
return JSON.stringify(value)
}
return value
})

try {
await db.query(`INSERT INTO ${table} (${keys}) VALUES (${params})`, Object.values(object))
await db.query(`INSERT INTO ${table} (${keys}) VALUES (${params})`, values)
} catch (error) {
console.error(`Error on table ${table} when inserting object:\n`, object, '\n', error)
throw error
Expand Down Expand Up @@ -124,11 +131,11 @@ export async function createUserTeamAndOrganization(
organization_id: organizationId,
app_urls: [],
name: 'TEST PROJECT',
event_names: JSON.stringify([]),
event_names_with_usage: JSON.stringify([]),
event_properties: JSON.stringify([]),
event_properties_with_usage: JSON.stringify([]),
event_properties_numerical: JSON.stringify([]),
event_names: [],
event_names_with_usage: [],
event_properties: [],
event_properties_with_usage: [],
event_properties_numerical: [],
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
anonymize_ips: false,
Expand All @@ -142,7 +149,7 @@ export async function createUserTeamAndOrganization(
api_token: `THIS IS NOT A TOKEN FOR TEAM ${teamId}`,
test_account_filters: [],
timezone: 'UTC',
data_attributes: JSON.stringify(['data-attr']),
data_attributes: ['data-attr'],
})
}

Expand Down
3 changes: 3 additions & 0 deletions tests/helpers/sqlMock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,8 @@ export const getPluginAttachmentRows = (s.getPluginAttachmentRows as unknown) as
export const getPluginConfigRows = (s.getPluginConfigRows as unknown) as jest.MockedFunction<
UnPromisify<typeof s.getPluginConfigRows>
>
export const setPluginCapabilities = (s.setPluginCapabilities as unknown) as jest.MockedFunction<
UnPromisify<typeof s.setPluginCapabilities>
>
export const setError = (s.setError as unknown) as jest.MockedFunction<UnPromisify<typeof s.setError>>
export const disablePlugin = (s.disablePlugin as unknown) as jest.MockedFunction<UnPromisify<void>>
Loading

0 comments on commit 09f1c34

Please sign in to comment.