Skip to content

Commit

Permalink
feat: add synchronous shutdown
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Goller <[email protected]>
  • Loading branch information
goller committed Nov 13, 2024
1 parent d568875 commit a7c3fa2
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 30 deletions.
4 changes: 4 additions & 0 deletions proto/depot/cloud/v3/machine.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ service MachineService {
rpc PingMachineHealth(PingMachineHealthRequest) returns (PingMachineHealthResponse);
rpc ReportMachineHealth(stream ReportMachineHealthRequest) returns (ReportMachineHealthResponse);
rpc Usage(UsageRequest) returns (UsageResponse);
rpc Shutdown(ShutdownRequest) returns (ShutdownResponse);
}

message RegisterMachineRequest {
Expand Down Expand Up @@ -159,3 +160,6 @@ message Cache {
}

message UsageResponse {}

message ShutdownRequest {}
message ShutdownResponse {}
11 changes: 11 additions & 0 deletions src/gen/ts/depot/cloud/v3/machine_connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
RegisterMachineResponse,
ReportMachineHealthRequest,
ReportMachineHealthResponse,
ShutdownRequest,
ShutdownResponse,
UsageRequest,
UsageResponse,
} from './machine_pb'
Expand Down Expand Up @@ -57,5 +59,14 @@ export const MachineService = {
O: UsageResponse,
kind: MethodKind.Unary,
},
/**
* @generated from rpc depot.cloud.v3.MachineService.Shutdown
*/
shutdown: {
name: 'Shutdown',
I: ShutdownRequest,
O: ShutdownResponse,
kind: MethodKind.Unary,
},
},
} as const
66 changes: 66 additions & 0 deletions src/gen/ts/depot/cloud/v3/machine_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1140,3 +1140,69 @@ export class UsageResponse extends Message<UsageResponse> {
return proto3.util.equals(UsageResponse, a, b)
}
}

/**
* @generated from message depot.cloud.v3.ShutdownRequest
*/
export class ShutdownRequest extends Message<ShutdownRequest> {
constructor(data?: PartialMessage<ShutdownRequest>) {
super()
proto3.util.initPartial(data, this)
}

static readonly runtime: typeof proto3 = proto3
static readonly typeName = 'depot.cloud.v3.ShutdownRequest'
static readonly fields: FieldList = proto3.util.newFieldList(() => [])

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ShutdownRequest {
return new ShutdownRequest().fromBinary(bytes, options)
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ShutdownRequest {
return new ShutdownRequest().fromJson(jsonValue, options)
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ShutdownRequest {
return new ShutdownRequest().fromJsonString(jsonString, options)
}

static equals(
a: ShutdownRequest | PlainMessage<ShutdownRequest> | undefined,
b: ShutdownRequest | PlainMessage<ShutdownRequest> | undefined,
): boolean {
return proto3.util.equals(ShutdownRequest, a, b)
}
}

/**
* @generated from message depot.cloud.v3.ShutdownResponse
*/
export class ShutdownResponse extends Message<ShutdownResponse> {
constructor(data?: PartialMessage<ShutdownResponse>) {
super()
proto3.util.initPartial(data, this)
}

static readonly runtime: typeof proto3 = proto3
static readonly typeName = 'depot.cloud.v3.ShutdownResponse'
static readonly fields: FieldList = proto3.util.newFieldList(() => [])

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ShutdownResponse {
return new ShutdownResponse().fromBinary(bytes, options)
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ShutdownResponse {
return new ShutdownResponse().fromJson(jsonValue, options)
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ShutdownResponse {
return new ShutdownResponse().fromJsonString(jsonString, options)
}

static equals(
a: ShutdownResponse | PlainMessage<ShutdownResponse> | undefined,
b: ShutdownResponse | PlainMessage<ShutdownResponse> | undefined,
): boolean {
return proto3.util.equals(ShutdownResponse, a, b)
}
}
81 changes: 55 additions & 26 deletions src/tasks/buildkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as fsp from 'fs/promises'
import {onShutdown, onShutdownError} from 'node-graceful-shutdown'
import {RegisterMachineResponse, RegisterMachineResponse_BuildKitTask} from '../gen/ts/depot/cloud/v3/machine_pb'
import {pathExists} from '../utils/common'
import {client} from '../utils/grpc'
import {ensureMounted, fstrim, mountExecutor, unmapBlockDevice, unmountDevice} from '../utils/mounts'
import {reportHealth} from './health'
import {reportUsage} from './usage'
Expand Down Expand Up @@ -209,6 +210,9 @@ keepBytes = ${cacheSizeBytes}
// Ignore this error, it's expected when the process is killed.
} else if (isAbortError(error)) {
// Ignore this error, it's expected when the process is killed.
} else if (error instanceof Error && error.message.includes('Command failed with exit code 2')) {
console.error(`BuildKit exited with panic: ${error}`)
throw error
} else {
throw error
}
Expand Down Expand Up @@ -237,41 +241,66 @@ keepBytes = ${cacheSizeBytes}
console.log(`BuildKit exited with error: ${error}`)
}

// Remove estargz cache because we will rely on the buildkit layer cache instead.
await execa('rm', ['-rf', `${rootDir}/runc-stargz/snapshots/stargz`], {stdio: 'inherit'}).catch((err) => {
console.error(err)
})

// Print the time it takes to sync the filesystem.
const start = Date.now()
// sync the filesystem to ensure all data is written to disk.
await execa('sync', {stdio: 'inherit'}).catch((err) => {
console.error(err)
})
console.log(`sync took ${Date.now() - start}ms`)

for (const mount of task.mounts) {
if (mount.cephVolume) {
if (!task.disableFstrim) {
await fstrim(mount.path)
}
await unmountDevice(mount.path)
await unmapBlockDevice(mount.cephVolume.volumeName, mount.cephVolume.imageSpec)
} else {
await unmountDevice(mount.path)
}
}
await shutdown(rootDir, task)
})

try {
await Promise.all([
const [result] = await Promise.allSettled([
buildkit,
reportHealth({signal, headers, path: rootDir}),
reportHealth({controller, headers, path: rootDir}),
reportUsage({machineId, signal, headers}),
])
if (result.status === 'rejected') {
throw result.reason
}

// If we have successfully stopped buildkit, we can shutdown.
await shutdown(rootDir, task)
} catch (error) {
throw error
} finally {
controller.abort()
}
}

async function shutdown(rootDir: string, task: RegisterMachineResponse_BuildKitTask) {
// Remove estargz cache because we will rely on the buildkit layer cache instead.
await execa('rm', ['-rf', `${rootDir}/runc-stargz/snapshots/stargz`], {stdio: 'inherit'}).catch((err) => {
console.error(err)
})

// Print the time it takes to sync the filesystem.
const start = Date.now()
// sync the filesystem to ensure all data is written to disk.
await execa('sync', {stdio: 'inherit'}).catch((err) => {
console.error(err)
})
console.log(`sync took ${Date.now() - start}ms`)

for (const mount of task.mounts) {
if (mount.cephVolume) {
if (!task.disableFstrim) {
await fstrim(mount.path)
}
await unmountDevice(mount.path)
await unmapBlockDevice(mount.cephVolume.volumeName, mount.cephVolume.imageSpec)
} else {
await unmountDevice(mount.path)
}
}

// Report shutdown to the API to indicate that the machine is no longer available.
await reportShutdown()
}

async function reportShutdown() {
const controller = new AbortController()
const signal = controller.signal

const shutdown = client.shutdown({}, {signal})

const timeout = 5000
const timeoutId = setTimeout(() => controller.abort(), timeout)

return shutdown.finally(() => clearTimeout(timeoutId))
}
18 changes: 14 additions & 4 deletions src/tasks/health.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import {PlainMessage} from '@bufbuild/protobuf'
import {isAbortError} from 'abort-controller-x'
import {execa} from 'execa'
import {DiskSpace} from '../gen/ts/depot/cloud/v3/machine_pb'
import {sleep} from '../utils/common'
import {stats} from '../utils/disk'
import {client} from '../utils/grpc'

export interface ReportHealthParams {
signal: AbortSignal
controller: AbortController
headers: HeadersInit
path: string
}

export async function reportHealth({signal, headers, path}: ReportHealthParams) {
export async function reportHealth({controller, headers, path}: ReportHealthParams) {
const signal = controller.signal

while (!signal.aborted) {
await waitForBuildKitWorkers(signal)

Expand All @@ -34,9 +37,16 @@ export async function reportHealth({signal, headers, path}: ReportHealthParams)
}

const res = await client.reportMachineHealth(stream(), {headers, signal})
if (res.shouldTerminate) return
if (res.shouldTerminate) {
console.log('shutdown requested')
controller.abort()

return
}
} catch (error) {
console.log('Error reporting health:', error)
if (!isAbortError(error)) {
console.log('Error reporting health:', error)
}
}
await sleep(1000)
}
Expand Down

0 comments on commit a7c3fa2

Please sign in to comment.