Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Plugin Capabilities implementation #384

Merged
merged 25 commits into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7c1fbc2
WIP: capabilities implementation
neilkakkar May 17, 2021
7804942
Merge branch 'master' of https://github.com/PostHog/plugin-server int…
neilkakkar May 17, 2021
bf03525
add tests for plugin capabilities
neilkakkar May 17, 2021
95670b5
address comments, update tests
neilkakkar May 18, 2021
a04a197
fix tests
neilkakkar May 18, 2021
7cf3b44
clean up
neilkakkar May 18, 2021
e5bd9d1
merge master
neilkakkar May 18, 2021
063a7e5
fix tests relevant to new types
neilkakkar May 18, 2021
8dcb16c
update capabilities type definition
neilkakkar May 18, 2021
1697073
update tests, only set capabilities if theyve changed
neilkakkar May 18, 2021
b2fd679
fix remaining tests
neilkakkar May 18, 2021
378f5bc
address comments
neilkakkar May 19, 2021
227aa19
merge master
neilkakkar May 19, 2021
a6837d5
more typing fixes
neilkakkar May 19, 2021
56e0bee
address comments
neilkakkar May 20, 2021
902db6e
merge master
neilkakkar May 20, 2021
f9fc935
Merge branch 'master' into capabilities
neilkakkar May 20, 2021
c72b680
Merge branch 'master' of https://github.com/PostHog/plugin-server int…
neilkakkar May 21, 2021
c1d4d95
address comments new tests
neilkakkar May 24, 2021
8d7f160
prettify
neilkakkar May 24, 2021
caf3cde
fix failing test
neilkakkar May 24, 2021
56fda74
Merge branch 'master' into capabilities
neilkakkar May 25, 2021
81bc013
Merge branch 'master' into capabilities
mariusandra May 25, 2021
a4ee3ab
merge master
neilkakkar May 25, 2021
f560a8f
few more server->hub
neilkakkar May 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,14 @@ TODO

TODO

#### How VM Extensions Work

TODO

### Plugins vs. plugin configs

### End Notes

An `organization_id` is tied to a _company_ and its _installed plugins_, a `team_id` is tied to a _project_ and its _plugin configs_ (enabled/disabled+extra config).

## Questions?
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/vm/memory.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ test('test vm memory usage', async () => {

for (let i = 0; i < numEventsPerVM; i++) {
for (let j = 0; j < numVMs; j++) {
await vms[j].methods.processEvent(createEvent(i + j))
await vms[j].methods.processEvent!(createEvent(i + j))
}
if (debug || i === numEventsPerVM - 1) {
global?.gc?.()
Expand Down
20 changes: 13 additions & 7 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,13 @@ export interface Plugin {
from_web?: boolean
created_at: string
updated_at: string
capabilities?: Record<string, any>
capabilities?: PluginCapabilities
}

export interface PluginCapabilities {
jobs?: string[]
scheduled_tasks?: string[]
methods?: string[]
}

export interface PluginConfig {
Expand Down Expand Up @@ -255,13 +261,13 @@ export type WorkerMethods = {
export interface PluginConfigVMReponse {
vm: VM
methods: {
setupPlugin: () => Promise<void>
teardownPlugin: () => Promise<void>
onEvent: (event: PluginEvent) => Promise<void>
onSnapshot: (event: PluginEvent) => Promise<void>
processEvent: (event: PluginEvent) => Promise<PluginEvent>
setupPlugin?: () => Promise<void>
teardownPlugin?: () => Promise<void>
onEvent?: (event: PluginEvent) => Promise<void>
onSnapshot?: (event: PluginEvent) => Promise<void>
processEvent?: (event: PluginEvent) => Promise<PluginEvent>
// DEPRECATED
processEventBatch: (batch: PluginEvent[]) => Promise<PluginEvent[]>
processEventBatch?: (batch: PluginEvent[]) => Promise<PluginEvent[]>
}
tasks: Record<PluginTaskType, Record<string, PluginTask>>
}
Expand Down
39 changes: 24 additions & 15 deletions src/utils/db/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
Hub,
Plugin,
PluginAttachmentDB,
PluginCapabilities,
PluginConfig,
PluginConfigId,
PluginError,
Expand All @@ -22,8 +23,8 @@ function pluginConfigsInForceQuery(specificField?: keyof PluginConfig): string {
)`
}

export async function getPluginRows(server: Hub): Promise<Plugin[]> {
const { rows: pluginRows }: { rows: Plugin[] } = await server.db.postgresQuery(
export async function getPluginRows(hub: Hub): Promise<Plugin[]> {
const { rows: pluginRows }: { rows: Plugin[] } = await hub.db.postgresQuery(
`SELECT posthog_plugin.* FROM posthog_plugin
WHERE id IN (${pluginConfigsInForceQuery('plugin_id')} GROUP BY posthog_pluginconfig.plugin_id)`,
undefined,
Expand All @@ -32,8 +33,8 @@ export async function getPluginRows(server: Hub): Promise<Plugin[]> {
return pluginRows
}

export async function getPluginAttachmentRows(server: Hub): Promise<PluginAttachmentDB[]> {
const { rows }: { rows: PluginAttachmentDB[] } = await server.db.postgresQuery(
export async function getPluginAttachmentRows(hub: Hub): Promise<PluginAttachmentDB[]> {
const { rows }: { rows: PluginAttachmentDB[] } = await hub.db.postgresQuery(
`SELECT posthog_pluginattachment.* FROM posthog_pluginattachment
WHERE plugin_config_id IN (${pluginConfigsInForceQuery('id')})`,
undefined,
Expand All @@ -42,39 +43,47 @@ export async function getPluginAttachmentRows(server: Hub): Promise<PluginAttach
return rows
}

export async function getPluginConfigRows(server: Hub): Promise<PluginConfig[]> {
const { rows }: { rows: PluginConfig[] } = await server.db.postgresQuery(
export async function getPluginConfigRows(hub: Hub): Promise<PluginConfig[]> {
const { rows }: { rows: PluginConfig[] } = await hub.db.postgresQuery(
pluginConfigsInForceQuery(),
undefined,
'getPluginConfigRows'
)
return rows
}

export async function setError(
server: Hub,
pluginError: PluginError | null,
pluginConfig: PluginConfig
export async function setPluginCapabilities(
hub: Hub,
pluginConfig: PluginConfig,
capabilities: PluginCapabilities
): Promise<void> {
await server.db.postgresQuery(
await hub.db.postgresQuery(
'UPDATE posthog_plugin SET capabilities = ($1) WHERE id = $2',
[capabilities, pluginConfig.plugin_id],
'setPluginCapabilities'
)
}

export async function setError(hub: Hub, pluginError: PluginError | null, pluginConfig: PluginConfig): Promise<void> {
await hub.db.postgresQuery(
'UPDATE posthog_pluginconfig SET error = $1 WHERE id = $2',
[pluginError, typeof pluginConfig === 'object' ? pluginConfig?.id : pluginConfig],
'updatePluginConfigError'
)
if (pluginError) {
await server.db.createPluginLogEntry(
await hub.db.createPluginLogEntry(
pluginConfig,
PluginLogEntrySource.Plugin,
PluginLogEntryType.Error,
pluginError.message,
server.instanceId,
hub.instanceId,
pluginError.time
)
}
}

export async function disablePlugin(server: Hub, pluginConfigId: PluginConfigId): Promise<void> {
await server.db.postgresQuery(
export async function disablePlugin(hub: Hub, pluginConfigId: PluginConfigId): Promise<void> {
await hub.db.postgresQuery(
`UPDATE posthog_pluginconfig SET enabled='f' WHERE id=$1 AND enabled='t'`,
[pluginConfigId],
'disablePlugin'
Expand Down
2 changes: 1 addition & 1 deletion src/worker/plugins/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export async function runProcessEvent(server: Hub, event: PluginEvent): Promise<

try {
returnedEvent = (await processEvent(returnedEvent)) || null
if (returnedEvent.team_id != teamId) {
if (returnedEvent.team_id !== teamId) {
returnedEvent.team_id = teamId
throw new IllegalOperationError('Plugin tried to change event.team_id')
}
Expand Down
1 change: 0 additions & 1 deletion src/worker/plugins/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import { teardownPlugins } from './teardown'

export async function setupPlugins(server: Hub): Promise<void> {
const { plugins, pluginConfigs, pluginConfigsPerTeam } = await loadPluginsFromDB(server)

const pluginVMLoadPromises: Array<Promise<any>> = []
for (const [id, pluginConfig] of pluginConfigs) {
const plugin = plugins.get(pluginConfig.plugin_id)
Expand Down
75 changes: 62 additions & 13 deletions src/worker/vm/lazy.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import equal from 'fast-deep-equal'

import {
Hub,
PluginCapabilities,
PluginConfig,
PluginConfigVMReponse,
PluginLogEntrySource,
Expand All @@ -8,41 +11,42 @@ import {
PluginTaskType,
} from '../../types'
import { clearError, processError } from '../../utils/db/error'
import { disablePlugin } from '../../utils/db/sql'
import { disablePlugin, setPluginCapabilities } from '../../utils/db/sql'
import { status } from '../../utils/status'
import { createPluginConfigVM } from './vm'

export class LazyPluginVM {
initialize?: (server: Hub, pluginConfig: PluginConfig, indexJs: string, logInfo: string) => Promise<void>
initialize?: (hub: Hub, pluginConfig: PluginConfig, indexJs: string, logInfo: string) => Promise<void>
failInitialization?: () => void
resolveInternalVm: Promise<PluginConfigVMReponse | null>

constructor() {
this.resolveInternalVm = new Promise((resolve) => {
this.initialize = async (server: Hub, pluginConfig: PluginConfig, indexJs: string, logInfo = '') => {
this.initialize = async (hub: Hub, pluginConfig: PluginConfig, indexJs: string, logInfo = '') => {
try {
const vm = await createPluginConfigVM(server, pluginConfig, indexJs)
await server.db.createPluginLogEntry(
const vm = await createPluginConfigVM(hub, pluginConfig, indexJs)
await hub.db.createPluginLogEntry(
pluginConfig,
PluginLogEntrySource.System,
PluginLogEntryType.Info,
`Plugin loaded (instance ID ${server.instanceId}).`,
server.instanceId
`Plugin loaded (instance ID ${hub.instanceId}).`,
hub.instanceId
)
status.info('🔌', `Loaded ${logInfo}`)
void clearError(server, pluginConfig)
void clearError(hub, pluginConfig)
await this.inferPluginCapabilities(hub, pluginConfig, vm)
resolve(vm)
} catch (error) {
await server.db.createPluginLogEntry(
await hub.db.createPluginLogEntry(
pluginConfig,
PluginLogEntrySource.System,
PluginLogEntryType.Error,
`Plugin failed to load and was disabled (instance ID ${server.instanceId}).`,
server.instanceId
`Plugin failed to load and was disabled (instance ID ${hub.instanceId}).`,
hub.instanceId
)
status.warn('⚠️', `Failed to load ${logInfo}`)
void disablePlugin(server, pluginConfig.id)
void processError(server, pluginConfig, error)
void disablePlugin(hub, pluginConfig.id)
void processError(hub, pluginConfig, error)
resolve(null)
}
}
Expand Down Expand Up @@ -79,4 +83,49 @@ export class LazyPluginVM {
async getTasks(type: PluginTaskType): Promise<Record<string, PluginTask>> {
return (await this.resolveInternalVm)?.tasks?.[type] || {}
}

private async inferPluginCapabilities(
hub: Hub,
pluginConfig: PluginConfig,
vm: PluginConfigVMReponse
): Promise<void> {
if (!pluginConfig.plugin) {
throw new Error(`'PluginConfig missing plugin: ${pluginConfig}`)
}

const capabilities: Required<PluginCapabilities> = { scheduled_tasks: [], jobs: [], methods: [] }

const tasks = vm?.tasks
const methods = vm?.methods

if (methods) {
for (const [key, value] of Object.entries(methods)) {
if (value) {
capabilities.methods.push(key)
}
}
}

if (tasks?.schedule) {
for (const [key, value] of Object.entries(tasks.schedule)) {
if (value) {
capabilities.scheduled_tasks.push(key)
}
}
}

if (tasks?.job) {
for (const [key, value] of Object.entries(tasks.job)) {
if (value) {
capabilities.jobs.push(key)
}
}
}

const prevCapabilities = pluginConfig.plugin.capabilities
if (!equal(prevCapabilities, capabilities)) {
await setPluginCapabilities(hub, pluginConfig, capabilities)
pluginConfig.plugin.capabilities = capabilities
}
}
}
10 changes: 9 additions & 1 deletion tests/helpers/plugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export const plugin60: Plugin = {
is_preinstalled: false,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
capabilities: {},
capabilities: {}, // inferred on setup
}

export const pluginAttachment1: PluginAttachmentDB = {
Expand Down Expand Up @@ -139,3 +139,11 @@ export function mockPluginTempFolder(indexJs: string, pluginJson?: string): [Plu
},
]
}

export const mockPluginSourceCode = (indexJs: string): Plugin => ({
...plugin60,
archive: null,
plugin_type: 'source',
url: undefined,
source: indexJs,
})
21 changes: 14 additions & 7 deletions tests/helpers/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,15 @@ async function insertRow(db: Pool, table: string, object: Record<string, any>):
const params = Object.keys(object)
.map((_, i) => `\$${i + 1}`)
.join(',')
const values = Object.values(object).map((value) => {
if (Array.isArray(value) && value.length > 0) {
return JSON.stringify(value)
}
return value
})

try {
await db.query(`INSERT INTO ${table} (${keys}) VALUES (${params})`, Object.values(object))
await db.query(`INSERT INTO ${table} (${keys}) VALUES (${params})`, values)
} catch (error) {
console.error(`Error on table ${table} when inserting object:\n`, object, '\n', error)
throw error
Expand Down Expand Up @@ -124,11 +131,11 @@ export async function createUserTeamAndOrganization(
organization_id: organizationId,
app_urls: [],
name: 'TEST PROJECT',
event_names: JSON.stringify([]),
event_names_with_usage: JSON.stringify([]),
event_properties: JSON.stringify([]),
event_properties_with_usage: JSON.stringify([]),
event_properties_numerical: JSON.stringify([]),
event_names: [],
event_names_with_usage: [],
event_properties: [],
event_properties_with_usage: [],
event_properties_numerical: [],
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
anonymize_ips: false,
Expand All @@ -142,7 +149,7 @@ export async function createUserTeamAndOrganization(
api_token: `THIS IS NOT A TOKEN FOR TEAM ${teamId}`,
test_account_filters: [],
timezone: 'UTC',
data_attributes: JSON.stringify(['data-attr']),
data_attributes: ['data-attr'],
})
}

Expand Down
3 changes: 3 additions & 0 deletions tests/helpers/sqlMock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,8 @@ export const getPluginAttachmentRows = (s.getPluginAttachmentRows as unknown) as
export const getPluginConfigRows = (s.getPluginConfigRows as unknown) as jest.MockedFunction<
UnPromisify<typeof s.getPluginConfigRows>
>
export const setPluginCapabilities = (s.setPluginCapabilities as unknown) as jest.MockedFunction<
UnPromisify<typeof s.setPluginCapabilities>
>
export const setError = (s.setError as unknown) as jest.MockedFunction<UnPromisify<typeof s.setError>>
export const disablePlugin = (s.disablePlugin as unknown) as jest.MockedFunction<UnPromisify<void>>
Loading