Skip to content

Commit

Permalink
refactor: improve startup robustness
Browse files Browse the repository at this point in the history
The GraphQL server is now delayed until all modules are initialized,
and running. If `cardano-db-sync` reboots, all modules are shutdown,
and restarted and therefore links the server availability with a fully
initialized stack.
  • Loading branch information
rhyslbw committed Jun 24, 2021
1 parent c81aad8 commit 7054a59
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 48 deletions.
15 changes: 9 additions & 6 deletions packages/api-cardano-db-hasura/src/CardanoNodeClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { AssetSupply, Transaction } from './graphql_types'
import pRetry from 'p-retry'
import util, { errors } from '@cardano-graphql/util'
import util, { errors, ModuleState } from '@cardano-graphql/util'
import {
ConnectionConfig,
createStateQueryClient,
Expand All @@ -23,17 +23,17 @@ export class CardanoNodeClient {
public adaCirculatingSupply: AssetSupply['circulating']
private stateQueryClient: StateQueryClient
private txSubmissionClient: TxSubmissionClient
private isInitialized: boolean
private state: ModuleState

constructor (
readonly lastConfiguredMajorVersion: number,
private logger: Logger = dummyLogger
) {
this.isInitialized = false
this.state = null
}

public async getTipSlotNo () {
if (!this.isInitialized) {
if (this.state !== 'initialized') {
throw new errors.ModuleIsNotInitialized(MODULE_NAME, 'getTipSlotNo')
}
const tip = await this.stateQueryClient.ledgerTip()
Expand All @@ -43,7 +43,7 @@ export class CardanoNodeClient {
}

public async getProtocolParams (): Promise<Schema.ProtocolParametersShelley> {
if (!this.isInitialized) {
if (this.state !== 'initialized') {
throw new errors.ModuleIsNotInitialized(MODULE_NAME, 'getProtocolParams')
}
const protocolParams = await this.stateQueryClient.currentProtocolParameters()
Expand All @@ -52,6 +52,8 @@ export class CardanoNodeClient {
}

public async initialize (ogmiosConnectionConfig?: ConnectionConfig) {
if (this.state !== null) return
this.state = 'initializing'
this.logger.info({ module: MODULE_NAME }, 'Initializing')
this.isInitialized = true
await pRetry(async () => {
Expand All @@ -70,6 +72,7 @@ export class CardanoNodeClient {
this.logger
)
})
this.state = 'initialized'
this.logger.info({ module: MODULE_NAME }, 'Initialized')
}

Expand All @@ -91,7 +94,7 @@ export class CardanoNodeClient {
}

public async submitTransaction (transaction: string): Promise<Transaction['hash']> {
if (!this.isInitialized) {
if (this.state !== 'initialized') {
throw new errors.ModuleIsNotInitialized(MODULE_NAME, 'submitTransaction')
}
try {
Expand Down
18 changes: 12 additions & 6 deletions packages/api-cardano-db-hasura/src/ChainFollower.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
} from '@cardano-ogmios/client'
import { errors } from '@cardano-graphql/util'
import { Config } from './Config'
import { errors, RunnableModuleState } from '@cardano-graphql/util'
import { Asset } from './graphql_types'
import { HasuraClient } from './HasuraClient'
import PgBoss from 'pg-boss'
Expand All @@ -23,21 +24,23 @@ const MODULE_NAME = 'ChainFollower'
export class ChainFollower {
private chainSyncClient: ChainSyncClient
private queue: PgBoss
private isInitialized: boolean
private state: RunnableModuleState

constructor (
readonly hasuraClient: HasuraClient,
private logger: Logger = dummyLogger,
queueConfig: Config['db']
) {
this.isInitialized = false
this.state = null
this.queue = new PgBoss({
application_name: 'cardano-graphql',
...queueConfig
})
}

public async initialize (ogmiosConfig: Config['ogmios']) {
if (this.state !== null) return
this.state = 'initializing'
this.logger.info({ module: MODULE_NAME }, 'Initializing')
this.chainSyncClient = await createChainSyncClient({
rollBackward: async ({ point, tip }, requestNext) => {
Expand Down Expand Up @@ -85,13 +88,15 @@ export class ChainFollower {
}
requestNext()
}
}, { connection: ogmiosConfig })
this.isInitialized = true
},
this.logger.error,
{ connection: ogmiosConfig })
this.state = 'initialized'
this.logger.info({ module: MODULE_NAME }, 'Initialized')
}

public async start (points: Schema.Point[]) {
if (!this.isInitialized) {
if (this.state !== 'initialized') {
throw new errors.ModuleIsNotInitialized(MODULE_NAME, 'start')
}
this.logger.info({ module: MODULE_NAME }, 'Starting')
Expand All @@ -101,12 +106,13 @@ export class ChainFollower {
}

public async shutdown () {
if (!this.isInitialized) {
if (this.state !== 'running') {
throw new errors.ModuleIsNotInitialized(MODULE_NAME, 'shutdown')
}
this.logger.info({ module: MODULE_NAME }, 'Shutting down')
await this.chainSyncClient.shutdown()
await this.queue.stop()
this.state = 'initialized'
this.logger.info(
{ module: MODULE_NAME },
'Shutdown complete')
Expand Down
16 changes: 15 additions & 1 deletion packages/api-cardano-db-hasura/src/Db.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
import { ClientConfig } from 'pg'
import createSubscriber, { Subscriber } from 'pg-listen'
import { dummyLogger, Logger } from 'ts-log'
import { ModuleState } from '@cardano-graphql/util'

const MODULE_NAME = 'Db'

export class Db {
private state: ModuleState
pgSubscriber: Subscriber

constructor (
pgClientConfig: ClientConfig,
private logger: Logger = dummyLogger
) {
this.state = null
this.pgSubscriber = createSubscriber(pgClientConfig, {
parse: (value) => value
})
}

public async init ({ onDbSetup }: { onDbSetup: Function }): Promise<void> {
public async init ({
onDbInit,
onDbSetup
}: {
onDbInit: () => void,
onDbSetup: () => void
}): Promise<void> {
if (this.state !== null) return
this.state = 'initializing'
this.pgSubscriber.events.on('connected', async () => {
this.logger.debug({ module: MODULE_NAME }, 'pgSubscriber: Connected')
await onDbSetup()
Expand All @@ -32,6 +43,7 @@ export class Db {
switch (payload) {
case 'init' :
this.logger.warn({ module: 'Db' }, 'pgSubscriber: cardano-db-sync-extended starting, schema will be reset')
await onDbInit()
break
case 'db-setup' :
await onDbSetup()
Expand All @@ -43,12 +55,14 @@ export class Db {
try {
await this.pgSubscriber.connect()
await this.pgSubscriber.listenTo('cardano_db_sync_startup')
this.state = 'initialized'
} catch (error) {
this.logger.error({ err: error })
}
}

public async shutdown () {
if (this.state !== 'initialized') return
await this.pgSubscriber.close()
}
}
28 changes: 17 additions & 11 deletions packages/api-cardano-db-hasura/src/HasuraClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import util, { DataFetcher } from '@cardano-graphql/util'
import { Schema } from '@cardano-ogmios/client'
import { exec } from 'child_process'
import util, { DataFetcher, ModuleState } from '@cardano-graphql/util'
import fetch from 'cross-fetch'
import { DocumentNode, GraphQLSchema, print } from 'graphql'
import { GraphQLClient, gql } from 'graphql-request'
Expand Down Expand Up @@ -35,6 +36,7 @@ export class HasuraClient {
private applyingSchemaAndMetadata: boolean
public adaPotsToCalculateSupplyFetcher: DataFetcher<AdaPotsToCalculateSupply>
public currentProtocolVersionFetcher: DataFetcher<ShelleyProtocolParams['protocolVersion']>
private state: ModuleState
public schema: GraphQLSchema

constructor (
Expand All @@ -44,6 +46,7 @@ export class HasuraClient {
readonly lastConfiguredMajorVersion: number,
private logger: Logger = dummyLogger
) {
this.state = null
this.applyingSchemaAndMetadata = false
this.adaPotsToCalculateSupplyFetcher = new DataFetcher<AdaPotsToCalculateSupply>(
'AdaPotsToCalculateSupply',
Expand Down Expand Up @@ -86,7 +89,7 @@ export class HasuraClient {
private async getAdaPotsToCalculateSupply (): Promise<AdaPotsToCalculateSupply> {
const result = await this.client.request(
gql`query {
cardano {
cardano {
currentEpoch {
adaPots {
reserves
Expand Down Expand Up @@ -152,6 +155,8 @@ export class HasuraClient {
}

public async initialize () {
if (this.state !== null) return
this.state = 'initializing'
this.logger.info({ module: 'HasuraClient' }, 'Initializing')
await this.applySchemaAndMetadata()
await pRetry(async () => {
Expand Down Expand Up @@ -190,6 +195,7 @@ export class HasuraClient {
this.logger.debug({ module: 'HasuraClient' }, 'DB is in current era')
await this.currentProtocolVersionFetcher.initialize()
await this.adaPotsToCalculateSupplyFetcher.initialize()
this.state = 'initialized'
this.logger.info({ module: 'HasuraClient' }, 'Initialized')
}

Expand Down Expand Up @@ -372,7 +378,7 @@ export class HasuraClient {
}
}
url
policyId
policyId
}
quantity
}
Expand Down Expand Up @@ -615,16 +621,16 @@ export class HasuraClient {
)
const result = await this.client.request(
gql`mutation InsertAssets($assets: [Asset_insert_input!]!) {
insert_assets(objects: $assets) {
returning {
name
policyId
description
assetName
assetId
insert_assets(objects: $assets) {
returning {
name
policyId
description
assetName
assetId
}
affected_rows
}
affected_rows
}
}`,
{
assets
Expand Down
15 changes: 9 additions & 6 deletions packages/api-cardano-db-hasura/src/Worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import axios, { AxiosInstance } from 'axios'
import { errors } from '@cardano-graphql/util'
import { errors, RunnableModuleState } from '@cardano-graphql/util'
import hash from 'object-hash'
import { dummyLogger, Logger } from 'ts-log'
import { AssetMetadata } from './AssetMetadata'
Expand All @@ -18,7 +18,7 @@ const MODULE_NAME = 'Worker'
export class Worker {
private axiosClient: AxiosInstance
private queue: PgBoss
private isInitialized: boolean
private state: RunnableModuleState

constructor (
readonly hasuraClient: HasuraClient,
Expand All @@ -31,7 +31,7 @@ export class Worker {
}
}
) {
this.isInitialized = false
this.state = null
this.queue = new PgBoss({
application_name: 'cardano-graphql',
...queueConfig
Expand Down Expand Up @@ -76,14 +76,16 @@ export class Worker {
}

public async initialize () {
if (this.state !== null) return
this.state = 'initializing'
this.logger.info({ module: MODULE_NAME }, 'Initializing')
await this.ensureMetadataServerIsAvailable()
this.isInitialized = true
this.state = 'initialized'
this.logger.info({ module: MODULE_NAME }, 'Initialized')
}

public async start () {
if (!this.isInitialized) {
if (this.state !== 'initialized') {
throw new errors.ModuleIsNotInitialized(MODULE_NAME, 'start')
}
this.logger.info({ module: MODULE_NAME }, 'Starting')
Expand Down Expand Up @@ -149,14 +151,15 @@ export class Worker {
}

public async shutdown () {
if (!this.isInitialized) {
if (this.state !== 'running') {
throw new errors.ModuleIsNotInitialized(MODULE_NAME, 'shutdown')
}
this.logger.info({ module: MODULE_NAME }, 'Shutting down')
await Promise.all([
this.queue.unsubscribe(ASSET_METADATA_FETCH_INITIAL),
this.queue.unsubscribe(ASSET_METADATA_FETCH_UPDATE)
])
this.state = 'initialized'
this.logger.info({ module: MODULE_NAME }, 'Shutdown complete')
}
}
21 changes: 15 additions & 6 deletions packages/server/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { allowListMiddleware } from './express_middleware'
import { dummyLogger, Logger } from 'ts-log'
import { setIntervalAsync, SetIntervalAsyncTimer } from 'set-interval-async/dynamic'
import { clearIntervalAsync } from 'set-interval-async'
import { RunnableModuleState } from '@cardano-graphql/util'

export type Config = {
allowIntrospection: boolean
Expand All @@ -30,6 +31,7 @@ export type Config = {

export class Server {
public app: express.Application
public state: RunnableModuleState
private apolloServer: ApolloServer
private httpServer: http.Server
private schemas: GraphQLSchema[]
Expand All @@ -41,10 +43,14 @@ export class Server {
private logger: Logger = dummyLogger
) {
this.app = express()
this.state = null
this.schemas = schemas
}

async init () {
if (this.state !== null) return
this.state = 'initializing'
this.logger.info({ module: 'Server' }, 'Initializing')
let allowList: AllowList
const plugins: PluginDefinition[] = []
const validationRules = []
Expand Down Expand Up @@ -90,10 +96,13 @@ export class Server {
} : undefined,
path: '/'
})
this.state = 'initialized'
}

async start () {
if (this.state !== 'initialized') return
this.httpServer = await listenPromise(this.app, this.config.apiPort, this.config.listenAddress)
this.state = 'running'
this.logger.info({ module: 'Server' }, `GraphQL HTTP server at http://${this.config.listenAddress}:` +
`${this.config.apiPort}${this.apolloServer.graphqlPath} started`
)
Expand Down Expand Up @@ -125,12 +134,12 @@ export class Server {
}

async shutdown () {
if (this.state !== 'running') return
await clearIntervalAsync(this.syncProgress)
if (this.httpServer !== undefined) {
this.httpServer.close()
this.logger.info({ module: 'Server' }, `GraphQL HTTP server at http://${this.config.listenAddress}:` +
`${this.config.apiPort}${this.apolloServer.graphqlPath} shutting down`
)
}
this.httpServer.close()
this.logger.info({ module: 'Server' }, `GraphQL HTTP server at http://${this.config.listenAddress}:` +
`${this.config.apiPort}${this.apolloServer.graphqlPath} shutting down`
)
this.state = 'initialized'
}
}
Loading

0 comments on commit 7054a59

Please sign in to comment.