diff --git a/packages/test/local-server-stress-tests/.eslintrc.cjs b/packages/test/local-server-stress-tests/.eslintrc.cjs new file mode 100644 index 000000000000..e9e7183c15eb --- /dev/null +++ b/packages/test/local-server-stress-tests/.eslintrc.cjs @@ -0,0 +1,18 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +module.exports = { + extends: [ + require.resolve("@fluidframework/eslint-config-fluid/minimal-deprecated"), + "prettier", + ], + rules: { + "import/no-nodejs-modules": "off", + "@fluid-internal/fluid/no-unchecked-record-access": "warn", + }, + parserOptions: { + project: ["./src/test/tsconfig.json"], + }, +}; diff --git a/packages/test/local-server-stress-tests/.gitignore b/packages/test/local-server-stress-tests/.gitignore new file mode 100644 index 000000000000..ee26a5e7bdbf --- /dev/null +++ b/packages/test/local-server-stress-tests/.gitignore @@ -0,0 +1,52 @@ +# Compiled TypeScript and CSS +dist +lib + +# Babel +public/scripts/es5 + +# Logs +logs +*.log + +# Runtime data +pids +*.pid +*.seed + +# Directory for instrumented libs generated by jscoverage/JSCover +lib-cov + +# Coverage directory used by tools like istanbul +coverage + +# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) +.grunt +.cache-loader + +# node-waf configuration +.lock-wscript + +# Compiled binary addons (http://nodejs.org/api/addons.html) +build/Release + +# Dependency directory +# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git- +node_modules + +# Typings +typings + +# Debug log from npm +npm-debug.log + +# Code coverage +nyc +.nyc_output/ + +# Chart dependencies +**/charts/*.tgz + +# Generated modules +intel_modules/ +temp_modules/ diff --git a/packages/test/local-server-stress-tests/.mocharc.cjs b/packages/test/local-server-stress-tests/.mocharc.cjs new file mode 100644 index 000000000000..cddbf0e44d55 --- /dev/null +++ b/packages/test/local-server-stress-tests/.mocharc.cjs @@ -0,0 +1,12 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +"use strict"; + +const getFluidTestMochaConfig = require("@fluid-internal/mocha-test-setup/mocharc-common"); + +const packageDir = __dirname; +const config = getFluidTestMochaConfig(packageDir); +module.exports = config; diff --git a/packages/test/local-server-stress-tests/.npmignore b/packages/test/local-server-stress-tests/.npmignore new file mode 100644 index 000000000000..f518002fc4dd --- /dev/null +++ b/packages/test/local-server-stress-tests/.npmignore @@ -0,0 +1,7 @@ +nyc +*.log +**/*.tsbuildinfo +src/test +dist/test +lib/test +**/_api-extractor-temp/** diff --git a/packages/test/local-server-stress-tests/LICENSE b/packages/test/local-server-stress-tests/LICENSE new file mode 100644 index 000000000000..60af0a6a40e9 --- /dev/null +++ b/packages/test/local-server-stress-tests/LICENSE @@ -0,0 +1,21 @@ +Copyright (c) Microsoft Corporation and contributors. All rights reserved. + +MIT License + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/packages/test/local-server-stress-tests/biome.jsonc b/packages/test/local-server-stress-tests/biome.jsonc new file mode 100644 index 000000000000..11302bab0e3e --- /dev/null +++ b/packages/test/local-server-stress-tests/biome.jsonc @@ -0,0 +1,7 @@ +{ + "$schema": "./node_modules/@biomejs/biome/configuration_schema.json", + "extends": ["../../../biome.jsonc"], + "files": { + "ignore": ["results/**"] + } +} diff --git a/packages/test/local-server-stress-tests/package.json b/packages/test/local-server-stress-tests/package.json new file mode 100644 index 000000000000..9b767493562f --- /dev/null +++ b/packages/test/local-server-stress-tests/package.json @@ -0,0 +1,120 @@ +{ + "name": "@fluid-internal/local-server-stress-tests", + "version": "2.23.0", + "private": true, + "description": "Stress tests that can only run against the local server", + "homepage": "https://fluidframework.com", + "repository": { + "type": "git", + "url": "https://github.com/microsoft/FluidFramework.git", + "directory": "packages/test/local-server-stress-tests" + }, + "license": "MIT", + "author": "Microsoft and contributors", + "sideEffects": false, + "type": "commonjs", + "scripts": { + "build": "fluid-build . --task build", + "build:compile": "fluid-build . --task compile", + "build:test": "tsc --project ./src/test/tsconfig.json", + "check:biome": "biome check .", + "check:format": "npm run check:biome", + "check:prettier": "prettier --check . --cache --ignore-path ../../../.prettierignore", + "clean": "rimraf --glob dist lib \"**/*.tsbuildinfo\" \"**/*.build.log\" nyc", + "eslint": "eslint --format stylish src", + "eslint:fix": "eslint --format stylish src --fix --fix-type problem,suggestion,layout", + "format": "npm run format:biome", + "format:biome": "biome check . --write", + "format:prettier": "prettier --write . --cache --ignore-path ../../../.prettierignore", + "lint": "fluid-build . --task lint", + "lint:fix": "fluid-build . --task eslint:fix --task format", + "test": "npm run test:mocha", + "test:coverage": "c8 npm test", + "test:mocha": "mocha \"lib/test/**/*.spec.*js\" --exit", + "test:mocha:verbose": "cross-env FLUID_TEST_VERBOSE=1 npm run test:mocha" + }, + "c8": { + "all": true, + "cache-dir": "nyc/.cache", + "exclude": [ + "src/test/**/*.*ts", + "dist/test/**/*.*js", + "lib/test/**/*.*js" + ], + "exclude-after-remap": false, + "include": [ + "src/**/*.*ts", + "dist/**/*.*js", + "lib/**/*.*js" + ], + "report-dir": "nyc/report", + "reporter": [ + "cobertura", + "html", + "text" + ], + "temp-directory": "nyc/.nyc_output" + }, + "dependencies": { + "@fluid-experimental/tree": "workspace:~", + "@fluid-internal/client-utils": "workspace:~", + "@fluid-internal/mocha-test-setup": "workspace:~", + "@fluid-private/stochastic-test-utils": "workspace:~", + "@fluid-private/test-dds-utils": "workspace:~", + "@fluidframework/aqueduct": "workspace:~", + "@fluidframework/build-common": "^2.0.3", + "@fluidframework/build-tools": "^0.53.0", + "@fluidframework/container-definitions": "workspace:~", + "@fluidframework/container-loader": "workspace:~", + "@fluidframework/container-runtime": "workspace:~", + "@fluidframework/container-runtime-definitions": "workspace:~", + "@fluidframework/core-interfaces": "workspace:~", + "@fluidframework/core-utils": "workspace:~", + "@fluidframework/datastore": "workspace:~", + "@fluidframework/datastore-definitions": "workspace:~", + "@fluidframework/driver-definitions": "workspace:~", + "@fluidframework/driver-utils": "workspace:~", + "@fluidframework/eslint-config-fluid": "^5.7.3", + "@fluidframework/id-compressor": "workspace:~", + "@fluidframework/local-driver": "workspace:~", + "@fluidframework/map": "workspace:~", + "@fluidframework/runtime-definitions": "workspace:~", + "@fluidframework/runtime-utils": "workspace:~", + "@fluidframework/sequence": "workspace:~", + "@fluidframework/server-local-server": "^5.0.0", + "@fluidframework/telemetry-utils": "workspace:~", + "@fluidframework/test-utils": "workspace:~", + "@fluidframework/tree": "workspace:~", + "uuid": "^9.0.0" + }, + "devDependencies": { + "@biomejs/biome": "~1.9.3", + "@types/mocha": "^10.0.10", + "@types/node": "^18.19.0", + "@types/uuid": "^9.0.2", + "c8": "^8.0.1", + "cross-env": "^7.0.3", + "eslint": "~8.55.0", + "mocha": "^10.2.0", + "mocha-multi-reporters": "^1.5.1", + "prettier": "~3.0.3", + "rimraf": "^4.4.0", + "ts-loader": "^9.5.1", + "typescript": "~5.4.5" + }, + "fluidBuild": { + "tasks": { + "build:test": [ + "^tsc", + "^api-extractor:commonjs", + "@fluidframework/sequence#build:test", + "@fluidframework/map#build:test" + ] + } + }, + "typeValidation": { + "disabled": true, + "broken": {}, + "entrypoint": "internal" + } +} diff --git a/packages/test/local-server-stress-tests/prettier.config.cjs b/packages/test/local-server-stress-tests/prettier.config.cjs new file mode 100644 index 000000000000..d4870022599f --- /dev/null +++ b/packages/test/local-server-stress-tests/prettier.config.cjs @@ -0,0 +1,8 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +module.exports = { + ...require("@fluidframework/build-common/prettier.config.cjs"), +}; diff --git a/packages/test/local-server-stress-tests/src/ddsModels.ts b/packages/test/local-server-stress-tests/src/ddsModels.ts new file mode 100644 index 000000000000..7b2df9db91a1 --- /dev/null +++ b/packages/test/local-server-stress-tests/src/ddsModels.ts @@ -0,0 +1,68 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { done, type AsyncGenerator } from "@fluid-private/stochastic-test-utils"; +import { DDSFuzzModel, DDSFuzzTestState } from "@fluid-private/test-dds-utils"; +import type { IChannelFactory } from "@fluidframework/datastore-definitions/internal"; +// eslint-disable-next-line import/no-internal-modules +import { baseMapModel, baseDirModel } from "@fluidframework/map/internal/test"; +import { + baseSharedStringModel, + baseIntervalModel, + // eslint-disable-next-line import/no-internal-modules +} from "@fluidframework/sequence/internal/test"; + +function repeatFactoryAsync( + factory: () => AsyncGenerator, +): AsyncGenerator { + let generator = factory(); + return async (state: TState) => { + const next = await generator(state); + if (next !== done) { + return next; + } + generator = factory(); + return generator(state); + }; +} + +const generateSubModelMap = ( + ...models: Omit, "workloadName">[] +) => { + const modelMap = new Map< + string, + { + factory: IChannelFactory; + generator: AsyncGenerator>; + reducer: DDSFuzzModel["reducer"]; + validateConsistency: DDSFuzzModel["validateConsistency"]; + minimizationTransforms?: DDSFuzzModel["minimizationTransforms"]; + } + >(); + for (const model of models) { + const { reducer, generatorFactory, factory, validateConsistency, minimizationTransforms } = + model; + const generator = repeatFactoryAsync(generatorFactory); + modelMap.set(factory.attributes.type, { + generator, + reducer, + factory, + validateConsistency, + minimizationTransforms, + }); + } + + return modelMap; +}; + +/** + * here we import the dds models, and do some minor changes to make this easier to nest in the local server stress model. + */ +export const ddsModelMap = generateSubModelMap( + baseMapModel, + baseDirModel, + baseSharedStringModel, + baseIntervalModel, +); diff --git a/packages/test/local-server-stress-tests/src/ddsOperations.ts b/packages/test/local-server-stress-tests/src/ddsOperations.ts new file mode 100644 index 000000000000..e51a0b2d5f73 --- /dev/null +++ b/packages/test/local-server-stress-tests/src/ddsOperations.ts @@ -0,0 +1,157 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { type AsyncGenerator, type AsyncReducer } from "@fluid-private/stochastic-test-utils"; +import { DDSFuzzTestState, Client as DDSClient } from "@fluid-private/test-dds-utils"; +import { fluidHandleSymbol } from "@fluidframework/core-interfaces"; +import { assert, isObject } from "@fluidframework/core-utils/internal"; +import type { + IChannel, + IChannelFactory, +} from "@fluidframework/datastore-definitions/internal"; +import { toFluidHandleInternal } from "@fluidframework/runtime-utils/internal"; + +import { ddsModelMap } from "./ddsModels.js"; +import { LocalServerStressState, Client } from "./localServerStressHarness.js"; +import { makeUnreachableCodePathProxy } from "./utils.js"; + +export interface DDSModelOp { + type: "DDSModelOp"; + op: unknown; +} + +const createDDSClient = (channel: IChannel): DDSClient => { + return { + channel, + containerRuntime: makeUnreachableCodePathProxy("containerRuntime"), + dataStoreRuntime: makeUnreachableCodePathProxy("dataStoreRuntime"), + }; +}; + +const covertLocalServerStateToDdsState = async ( + state: LocalServerStressState, +): Promise> => { + const channels = await state.datastore.getChannels(); + const allHandles = [ + ...channels.map((c) => ({ tag: c.id, handle: c.handle })), + ...(await state.client.entryPoint.getContainerObjects()).filter( + (v) => v.handle !== undefined, + ), + ]; + return { + clients: makeUnreachableCodePathProxy("clients"), + client: createDDSClient(state.channel), + containerRuntimeFactory: makeUnreachableCodePathProxy("containerRuntimeFactory"), + isDetached: state.isDetached, + summarizerClient: makeUnreachableCodePathProxy("containerRuntimeFactory"), + random: { + ...state.random, + handle: () => { + /** + * here we do some funky stuff with handles so we can serialize them like json for output, but not bind them, + * as they may not be attached. look at the reduce code to see how we deserialized these fake handles into real + * handles. + */ + const { tag, handle } = state.random.pick(allHandles); + const realHandle = toFluidHandleInternal(handle); + return { + tag, + absolutePath: realHandle.absolutePath, + get [fluidHandleSymbol]() { + return realHandle[fluidHandleSymbol]; + }, + async get() { + return realHandle.get(); + }, + get isAttached() { + return realHandle.isAttached; + }, + }; + }, + }, + }; +}; + +export const DDSModelOpGenerator: AsyncGenerator = async ( + state, +) => { + const channel = state.channel; + const model = ddsModelMap.get(channel.attributes.type); + assert(model !== undefined, "must have model"); + + const op = await model.generator(await covertLocalServerStateToDdsState(state)); + + return { + type: "DDSModelOp", + op, + } satisfies DDSModelOp; +}; + +export const DDSModelOpReducer: AsyncReducer = async ( + state, + op, +) => { + const baseModel = ddsModelMap.get(state.channel.attributes.type); + assert(baseModel !== undefined, "must have base model"); + const channels = await state.datastore.getChannels(); + const globalObjects = await state.client.entryPoint.getContainerObjects(); + const allHandles = [ + ...channels.map((c) => ({ tag: c.id, handle: c.handle })), + ...globalObjects.filter((v) => v.handle !== undefined), + ]; + + // we always serialize and then deserialize withe a handle look + // up, as this ensure we all do the same thing, regardless of if + // we are replaying from a file with serialized generated operations, or + // running live with in-memory generated operations. + const subOp = JSON.parse(JSON.stringify(op.op), (key, value: unknown) => { + if (isObject(value) && "absolutePath" in value && "tag" in value) { + const entry = allHandles.find((h) => h.tag === value.tag); + assert(entry !== undefined, "entry must exist"); + return entry.handle; + } + return value; + }); + await baseModel.reducer(await covertLocalServerStateToDdsState(state), subOp); +}; + +export const validateConsistencyOfAllDDS = async (clientA: Client, clientB: Client) => { + const buildChannelMap = async (client: Client) => { + /** + * here we build a map of all the channels in the container based on their absolute path, + * once we have this we can match channels in different container (clientA and clientB), + * and then reuse the per dds validators to ensure eventual consistency. + */ + const channelMap = new Map(); + for (const entry of (await client.entryPoint.getContainerObjects()).map((v) => + v.type === "stressDataObject" ? v : undefined, + )) { + if (entry !== undefined) { + const stressDataObject = entry?.stressDataObject; + if (stressDataObject?.attached === true) { + const channels = await stressDataObject.getChannels(); + for (const channel of channels) { + if (channel.isAttached()) { + channelMap.set(`${entry.tag}/${channel.id}`, channel); + } + } + } + } + } + return channelMap; + }; + const aMap = await buildChannelMap(clientA); + const bMap = await buildChannelMap(clientB); + assert(aMap.size === bMap.size, "channel maps should be the same size"); + for (const key of aMap.keys()) { + const aChannel = aMap.get(key); + const bChannel = bMap.get(key); + assert(aChannel !== undefined, "channel must exist"); + assert(aChannel.attributes.type === bChannel?.attributes.type, "channel types must match"); + const model = ddsModelMap.get(aChannel.attributes.type); + assert(model !== undefined, "model must exist"); + await model.validateConsistency(createDDSClient(aChannel), createDDSClient(bChannel)); + } +}; diff --git a/packages/test/local-server-stress-tests/src/localServerStressHarness.ts b/packages/test/local-server-stress-tests/src/localServerStressHarness.ts new file mode 100644 index 000000000000..ff69c7e02643 --- /dev/null +++ b/packages/test/local-server-stress-tests/src/localServerStressHarness.ts @@ -0,0 +1,1223 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "node:assert"; +import { mkdirSync, readFileSync } from "node:fs"; + +import type { + AsyncGenerator, + AsyncReducer, + BaseFuzzTestState, + BaseOperation, + IRandom, + MinimizationTransform, + SaveDestination, + SaveInfo, +} from "@fluid-private/stochastic-test-utils"; +import { + ExitBehavior, + FuzzTestMinimizer, + asyncGeneratorFromArray, + chainAsync, + createFuzzDescribe, + defaultOptions, + done, + generateTestSeeds, + getSaveDirectory, + getSaveInfo, + interleaveAsync, + isOperationType, + makeRandom, + performFuzzActionsAsync, + saveOpsToFile, + takeAsync, +} from "@fluid-private/stochastic-test-utils"; +import { + type ICodeDetailsLoader, + type IContainer, + type IFluidCodeDetails, +} from "@fluidframework/container-definitions/internal"; +import { + ConnectionState, + createDetachedContainer, + loadExistingContainer, +} from "@fluidframework/container-loader/internal"; +import type { FluidObject } from "@fluidframework/core-interfaces"; +import { unreachableCase } from "@fluidframework/core-utils/internal"; +import type { IChannel } from "@fluidframework/datastore-definitions/internal"; +import { + createLocalResolverCreateNewRequest, + LocalDocumentServiceFactory, + LocalResolver, +} from "@fluidframework/local-driver/internal"; +import { + ILocalDeltaConnectionServer, + LocalDeltaConnectionServer, +} from "@fluidframework/server-local-server"; +import { LocalCodeLoader } from "@fluidframework/test-utils/internal"; + +import { + createRuntimeFactory, + StressDataObject, + type DefaultStressDataObject, +} from "./stressDataObject.js"; +import { makeUnreachableCodePathProxy } from "./utils.js"; + +export interface Client { + container: IContainer; + tag: `client-${number}`; + entryPoint: DefaultStressDataObject; +} + +/** + * @internal + */ +export interface LocalServerStressState extends BaseFuzzTestState { + localDeltaConnectionServer: ILocalDeltaConnectionServer; + codeLoader: ICodeDetailsLoader; + validationClient: Client; + random: IRandom; + clients: Client[]; + client: Client; + datastore: StressDataObject; + channel: IChannel; + isDetached: boolean; + tag(prefix: T): `${T}-${number}`; +} + +/** + * @internal + */ +interface SelectedClientSpec { + clientTag: `client-${number}`; + datastoreTag: `datastore-${number}`; + channelTag: `channel-${number}`; +} + +/** + * @internal + */ +interface Attach { + type: "attach"; +} + +/** + * @internal + */ +interface AddClient { + type: "addClient"; + clientTag: `client-${number}`; + url: string; +} + +/** + * @internal + */ +interface RemoveClient { + type: "removeClient"; + clientTag: `client-${number}`; +} + +/** + * @internal + */ +interface Synchronize { + type: "synchronize"; + clients?: Client[]; +} + +export interface LocalServerStressModel< + TOperation extends BaseOperation, + TState extends LocalServerStressState = LocalServerStressState, +> { + /** + * Name for this model. This is used for test case naming, and should generally reflect properties + * about the kinds of operations that are generated. + * For example, SharedString might fuzz test several different workloads--some involving intervals, + * some without, some that never delete text, etc. + * This name should also be relatively friendly for file system; if the "save to disk" option of + * {@link (createLocalServerStressSuite:function)} is enabled, it will be kebab cased for failure files. + */ + workloadName: string; + + /** + * Factory which creates a generator for this model. + * @remarks DDS model generators can decide to use the "channel" or "client" field to decide which + * client to perform the operation on. + */ + generatorFactory: () => AsyncGenerator; + + /** + * Reducer capable of updating the test state according to the operations generated. + */ + reducer: AsyncReducer; + + /** + * Equivalence validation function, which should verify that the provided channels contain the same data. + * This is run at each synchronization point for all connected clients (as disconnected clients won't + * necessarily have the same set of ops applied). + * @throws - An informative error if the channels don't have equivalent data. + */ + validateConsistency: (channelA: Client, channelB: Client) => void | Promise; + + /** + * An array of transforms used during fuzz test minimization to reduce test + * cases. See {@link MinimizationTransform} for additional context. + * + * If no transforms are supplied, minimization will still occur, but the + * contents of the operations will remain unchanged. + */ + minimizationTransforms?: MinimizationTransform[]; +} + +/** + * @internal + */ +export interface LocalServerStressHarnessEvents { + /** + * Raised for each non-summarizer client created during fuzz test execution. + */ + (event: "clientCreate", listener: (client: Client) => void); + + /** + * Raised after creating the initialState but prior to performing the fuzzActions.. + */ + (event: "testStart", listener: (initialState: LocalServerStressState) => void); + + /** + * Raised after all fuzzActions have been completed. + */ + (event: "testEnd", listener: (finalState: LocalServerStressState) => void); + + /** + * Raised before each generated operation is run by its reducer. + */ + (event: "operationStart", listener: (operation: BaseOperation) => void); +} + +/** + * @internal + */ +export interface LocalServerStressOptions { + /** + * Number of tests to generate for correctness modes (which are run in the PR gate). + */ + defaultTestCount: number; + + /** + * Number of clients to perform operations on following the attach phase. + * This does not include the read-only client created for consistency validation + * and summarization--see {@link LocalServerStressState.summarizerClient}. + * + * See {@link LocalServerStressOptions.detachedStartOptions} for more details on the detached start phase. + * See {@link LocalServerStressOptions.clientJoinOptions} for more details on clients joining after those in the initial attach. + */ + numberOfClients: number; + + /** + * Options dictating if and when to simulate new clients joining the collaboration session. + * If not specified, no new clients will be added after the test starts. + * + * This option is useful for testing eventual consistency bugs related to summarization. + * + * @remarks Even without enabling this option, DDS fuzz models can generate {@link AddClient} + * operations with whatever strategy is appropriate. + * This is useful for nudging test cases towards a particular pattern of clients joining. + */ + clientJoinOptions: { + /** + * The maximum number of clients that will ever be added to the test. + * @remarks Due to current mock limitations, clients will only ever be added to the collaboration session, + * not removed. + * Adding an excessive number of clients may cause performance issues. + */ + maxNumberOfClients: number; + + /** + * The probability that a client will be added at any given operation. + * If the current number of clients has reached the maximum, this probability is ignored. + */ + clientAddProbability: number; + }; + + /** + * Dictates simulation of edits made to a DDS while that DDS is detached. + * + * When enabled, the fuzz test starts with a single client generating edits. After a certain number of ops (dictated by `numOpsBeforeAttach`), + * an attach op will be generated, at which point: + * - getAttachSummary will be invoked on this client + * - The remaining clients (as dictated by {@link LocalServerStressOptions.numberOfClients}) will load from this summary and join the session + * + * This setup simulates application code initializing state in a data store before attaching it, e.g. running code to edit a DDS from + * `DataObject.initializingFirstTime`. + * Default: tests are run with this setting enabled, with 5 ops being generated before an attach op. A new client is also rehydrated from + * summary. To disable the generation of rehydrate ops, set `rehydrateDisabled` to `true`. + */ + detachedStartOptions: { + numOpsBeforeAttach: number; + }; + + /** + * Strategy for validating eventual consistency of DDSes. + * In random mode, each generated operation has the specified probability to instead be a synchronization point + * (all connected clients process all ops) followed by validation that all clients agree on their shared state. + * In fixed interval mode, this synchronization happens on a predictable cadence: every `interval` operations + * generated. + */ + validationStrategy: + | { type: "random"; probability: number } + | { type: "fixedInterval"; interval: number } + // WIP: This validation strategy still currently synchronizes all clients. + | { type: "partialSynchronization"; probability: number; clientProbability: number }; + parseOperations: (serialized: string) => BaseOperation[]; + + /** + * Each non-synchronization option has this probability of instead generating a disconnect/reconnect. + * The reconnect operation currently *replaces* the operation generated by the model's generator. + * + * TODO: Expose options for how to inject reconnection in a more flexible way. + */ + reconnectProbability: number; + + /** + * Seed which should be replayed from disk. + * + * This option is intended for quick, by-hand minimization of failure JSON. As such, it adds a `.only` + * to the corresponding replay test. + * + * TODO: Improving workflows around fuzz test minimization, regression test generation for a particular seed, + * or more flexibility around replay of test files would be a nice value add to this harness. + */ + replay?: number; + + /** + * Runs only the provided seeds. + * + * @example + * + * ```typescript + * // Runs only seed 42 for the given model. + * createLocalServerStressSuite(model, { only: [42] }); + * ``` + * + * @remarks + * If you prefer, a variant of the standard `.only` syntax works. See {@link (createLocalServerStressSuite:namespace).only}. + */ + only: Iterable; + + /** + * Skips the provided seeds. + * + * @example + * + * ```typescript + * // Skips seed 42 for the given model. + * createLocalServerStressSuite(model, { skip: [42] }); + * ``` + * + * @remarks + * If you prefer, a variant of the standard `.skip` syntax works. See {@link (createLocalServerStressSuite:namespace).skip}. + */ + skip: Iterable; + + /** + * Whether failure files should be saved to disk, and if so, the directory in which they should be saved. + * Each seed will be saved in a subfolder of this directory obtained by kebab-casing the model name. + * + * Turning on this feature is encouraged for quick minimization. + */ + saveFailures: undefined | { directory: string }; + + /** + * Whether successful runs should be saved to disk and where. + * Minimization will be skipped for these files. + * + * This feature is useful to audit the scenarios generated by a given fuzz configuration. + */ + saveSuccesses: undefined | { directory: string }; + + /** + * Whether or not to skip minimization of fuzz failing test cases. This is useful + * when one only cares about the counts or types of errors, and not the + * exact contents of the test cases. + * + * Minimization only works when the failure occurs as part of a reducer, and is mostly + * useful if the model being tested defines {@link LocalServerStressModel.minimizationTransforms}. + * + * It can also add a couple seconds of overhead per failing + * test case. See {@link MinimizationTransform} for additional context. + */ + skipMinimization?: boolean; +} + +/** + * @internal + */ +const defaultLocalServerStressSuiteOptions: LocalServerStressOptions = { + defaultTestCount: defaultOptions.defaultTestCount, + detachedStartOptions: { + numOpsBeforeAttach: 5, + }, + numberOfClients: 3, + clientJoinOptions: { + clientAddProbability: 0.01, + maxNumberOfClients: 6, + }, + only: [], + skip: [], + parseOperations: (serialized: string) => JSON.parse(serialized) as BaseOperation[], + reconnectProbability: 0.01, + saveFailures: undefined, + saveSuccesses: undefined, + validationStrategy: { type: "random", probability: 0.05 }, +}; + +/** + * Mixes in functionality to add new clients to a DDS fuzz model. + * @privateRemarks This is currently file-exported for testing purposes, but it could be reasonable to + * expose at the package level if we want to expose some of the harness's building blocks. + */ +function mixinAddRemoveClient< + TOperation extends BaseOperation, + TState extends LocalServerStressState, +>( + model: LocalServerStressModel, + options: LocalServerStressOptions, +): LocalServerStressModel { + const generatorFactory: () => AsyncGenerator = + () => { + const baseGenerator = model.generatorFactory(); + return async ( + state: TState, + ): Promise => { + const { clients, random, isDetached, validationClient } = state; + if ( + options.clientJoinOptions !== undefined && + !isDetached && + random.bool(options.clientJoinOptions.clientAddProbability) + ) { + if (clients.length > options.numberOfClients && random.bool()) { + return { + type: "removeClient", + clientTag: random.pick(clients).tag, + } satisfies RemoveClient; + } + + if (clients.length < options.clientJoinOptions.maxNumberOfClients) { + const url = await validationClient.container.getAbsoluteUrl(""); + assert(url !== undefined, "url for client must exist"); + return { + type: "addClient", + url, + clientTag: state.tag("client"), + } satisfies AddClient; + } + } + return baseGenerator(state); + }; + }; + + const minimizationTransforms: MinimizationTransform< + TOperation | AddClient | RemoveClient + >[] = + (model.minimizationTransforms as + | MinimizationTransform[] + | undefined) ?? []; + + const reducer: AsyncReducer = async ( + state, + op, + ) => { + if (isOperationType("addClient", op)) { + const newClient = await loadClient( + state.localDeltaConnectionServer, + state.codeLoader, + op.clientTag, + op.url, + ); + state.clients.push(newClient); + return state; + } + if (isOperationType("removeClient", op)) { + const removed = state.clients.splice( + state.clients.findIndex((c) => c.tag === op.clientTag), + 1, + ); + removed[0].container.dispose(); + return state; + } + return model.reducer(state, op); + }; + + return { + ...model, + minimizationTransforms, + generatorFactory, + reducer, + }; +} + +/** + * Mixes in functionality to generate an 'attach' op, which + * @privateRemarks This is currently file-exported for testing purposes, but it could be reasonable to + * expose at the package level if we want to expose some of the harness's building blocks. + */ +function mixinAttach( + model: LocalServerStressModel, + options: LocalServerStressOptions, +): LocalServerStressModel { + const { numOpsBeforeAttach } = options.detachedStartOptions; + const attachOp = async (): Promise => { + return { type: "attach" }; + }; + + const generatorFactory: () => AsyncGenerator = () => { + const baseGenerator = model.generatorFactory(); + return chainAsync( + takeAsync(numOpsBeforeAttach, baseGenerator), + takeAsync(1, attachOp), + baseGenerator, + ); + }; + + const minimizationTransforms = model.minimizationTransforms as + | MinimizationTransform[] + | undefined; + + const reducer: AsyncReducer = async (state, operation) => { + if (isOperationType("attach", operation)) { + state.isDetached = false; + assert.equal(state.clients.length, 1); + const clientA: Client = state.clients[0]; + + await clientA.container.attach(createLocalResolverCreateNewRequest("stress test")); + const url = await clientA.container.getAbsoluteUrl(""); + assert(url !== undefined, "container must have a url"); + const clients: Client[] = await Promise.all( + Array.from({ length: options.numberOfClients }, async (_, index) => + loadClient( + state.localDeltaConnectionServer, + state.codeLoader, + state.tag("client"), + url, + ), + ), + ); + + // After attaching, we use a newly loaded client as a read-only client for consistency comparison validation. + // This makes debugging easier as the state of a client is easier to interpret if it has no local changes. + const validationClient: Client = clients[0]; + clients[0] = state.clients[0]; + + return { + ...state, + isDetached: false, + clients, + validationClient, + } satisfies LocalServerStressState; + } + return model.reducer(state, operation); + }; + return { + ...model, + minimizationTransforms, + generatorFactory, + reducer, + }; +} + +/** + * Mixes in functionality to generate ops which synchronize all clients and assert the resulting state is consistent. + * @privateRemarks This is currently file-exported for testing purposes, but it could be reasonable to + * expose at the package level if we want to expose some of the harness's building blocks. + */ +function mixinSynchronization< + TOperation extends BaseOperation, + TState extends LocalServerStressState, +>( + model: LocalServerStressModel, + options: LocalServerStressOptions, +): LocalServerStressModel { + const { validationStrategy } = options; + let generatorFactory: () => AsyncGenerator; + + switch (validationStrategy.type) { + case "random": { + // passing 1 here causes infinite loops. passing close to 1 is wasteful + // as synchronization + eventual consistency validation should be idempotent. + // 0.5 is arbitrary but there's no reason anyone should want a probability near this. + assert(validationStrategy.probability < 0.5, "Use a lower synchronization probability."); + generatorFactory = (): AsyncGenerator => { + const baseGenerator = model.generatorFactory(); + return async (state: TState): Promise => + !state.isDetached && state.random.bool(validationStrategy.probability) + ? { type: "synchronize" } + : baseGenerator(state); + }; + break; + } + + case "fixedInterval": { + generatorFactory = (): AsyncGenerator => { + const baseGenerator = model.generatorFactory(); + return interleaveAsync( + baseGenerator, + async (state) => + state.isDetached ? baseGenerator(state) : ({ type: "synchronize" } as const), + validationStrategy.interval, + 1, + ExitBehavior.OnEitherExhausted, + ); + }; + break; + } + + case "partialSynchronization": { + // passing 1 here causes infinite loops. passing close to 1 is wasteful + // as synchronization + eventual consistency validation should be idempotent. + // 0.5 is arbitrary but there's no reason anyone should want a probability near this. + assert(validationStrategy.probability < 0.5, "Use a lower synchronization probability."); + generatorFactory = (): AsyncGenerator => { + const baseGenerator = model.generatorFactory(); + return async (state: TState): Promise => { + if (!state.isDetached && state.random.bool(validationStrategy.probability)) { + const selectedClients = new Set( + state.clients + .filter( + (client) => client.container.connectionState === ConnectionState.Connected, + ) + .filter(() => state.random.bool(validationStrategy.clientProbability)), + ); + + return { type: "synchronize", clients: [...selectedClients] }; + } else { + return baseGenerator(state); + } + }; + }; + break; + } + default: { + unreachableCase(validationStrategy); + } + } + + const minimizationTransforms = model.minimizationTransforms as + | MinimizationTransform[] + | undefined; + + const isSynchronizeOp = (op: BaseOperation): op is Synchronize => op.type === "synchronize"; + const reducer: AsyncReducer = async (state, operation) => { + // TODO: Only synchronize listed clients if specified + if (isSynchronizeOp(operation)) { + const { clients, validationClient } = state; + + const connectedClients = clients.filter((client) => { + if (client.container.closed || client.container.disposed === true) { + throw new Error(`Client ${client.tag} is closed`); + } + return client.container.connectionState !== ConnectionState.Disconnected; + }); + connectedClients.push(validationClient); + + const rejects = new Map void)[]>( + connectedClients.map((c) => [c, []]), + ); + + const cleanUps: (() => void)[] = []; + for (const c of connectedClients) { + const rejector = (err) => rejects.get(c)?.forEach((r) => r(err)); + c.container.once("closed", rejector); + c.container.once("disposed", rejector); + cleanUps.push(() => { + c.container.off("closed", rejector); + c.container.off("disposed", rejector); + }); + } + try { + await Promise.all( + connectedClients.map( + async (c) => + new Promise((resolve, reject) => { + if (c.container.connectionState !== ConnectionState.Connected) { + c.container.once("connected", () => resolve()); + rejects.get(c)?.push(reject); + } else { + resolve(); + } + }), + ), + ); + + await Promise.all( + connectedClients.map( + async (c) => + new Promise((resolve, reject) => { + if (c.container.isDirty) { + c.container.once("saved", () => resolve()); + rejects.get(c)?.push(reject); + } else { + resolve(); + } + }), + ), + ); + const maxSeq = Math.max( + ...connectedClients.map((c) => c.container.deltaManager.lastKnownSeqNumber), + ); + + const makeOpHandler = (c: Client, resolve: () => void, reject: () => void) => { + if (c.container.deltaManager.lastKnownSeqNumber < maxSeq) { + const handler = (msg) => { + if (msg.sequenceNumber >= maxSeq) { + c.container.off("op", handler); + resolve(); + } + }; + c.container.on("op", handler); + rejects.get(c)?.push(reject); + } else { + resolve(); + } + }; + await Promise.all( + connectedClients.map( + async (c) => + new Promise((resolve, reject) => makeOpHandler(c, resolve, reject)), + ), + ); + } finally { + cleanUps.forEach((f) => f()); + } + + if (connectedClients.length > 0) { + for (const client of connectedClients) { + try { + await model.validateConsistency(validationClient, client); + } catch (error: unknown) { + if (error instanceof Error) { + error.message = `Comparing client ${validationClient.tag} vs client ${client.tag}\n${error.message}`; + } + throw error; + } + } + } + + return state; + } + return model.reducer(state, operation); + }; + return { + ...model, + minimizationTransforms, + generatorFactory, + reducer, + }; +} + +const hasSelectedClientSpec = (op: unknown): op is SelectedClientSpec => + (op as SelectedClientSpec).clientTag !== undefined; + +/** + * Mixes in the ability to select a client to perform an operation on. + * Makes this available to existing generators and reducers in the passed-in model via {@link LocalServerStressState.client} + * and {@link @fluid-private/test-dds-utils#LocalServerStressTestState.channel}. + * + * @remarks This exists purely for convenience, as "pick a client to perform an operation on" is a common concern. + * @privateRemarks This is currently file-exported for testing purposes, but it could be reasonable to + * expose at the package level if we want to expose some of the harness's building blocks. + */ +function mixinClientSelection< + TOperation extends BaseOperation, + TState extends LocalServerStressState, +>( + model: LocalServerStressModel, + _: LocalServerStressOptions, +): LocalServerStressModel { + const generatorFactory: () => AsyncGenerator = () => { + const baseGenerator = model.generatorFactory(); + return async (state): Promise => { + // Pick a channel, and: + // 1. Make it available for the DDS model generators (so they don't need to + // do the boilerplate of selecting a client to perform the operation on) + // 2. Make it available to the subsequent reducer logic we're going to inject + // (so that we can recover the channel from serialized data) + const client = state.random.pick(state.clients); + const globalObjects = await client.entryPoint.getContainerObjects(); + const entry = state.random.pick( + globalObjects.filter((v) => v.type === "stressDataObject"), + ); + assert(entry?.type === "stressDataObject"); + const datastore = entry.stressDataObject; + const channels = await datastore.StressDataObject.getChannels(); + const channel = state.random.pick(channels); + assert(channel !== undefined, "channel must exist"); + const baseOp = await runInStateWithClient(state, client, datastore, channel, async () => + baseGenerator(state), + ); + return baseOp === done + ? done + : ({ + ...baseOp, + clientTag: client.tag, + datastoreTag: entry.tag, + channelTag: channel.id as `channel-${number}`, + } satisfies SelectedClientSpec); + }; + }; + + const reducer: AsyncReducer = async (state, operation) => { + assert(hasSelectedClientSpec(operation), "operation should have been given a client"); + const client = state.clients.find((c) => c.tag === operation.clientTag); + assert(client !== undefined); + const globalObjects = await client.entryPoint.getContainerObjects(); + const entry = globalObjects.find((e) => e.tag === operation.datastoreTag); + assert(entry?.type === "stressDataObject"); + const datastore = entry.stressDataObject; + const channels = await datastore.StressDataObject.getChannels(); + const channel = channels.find((c) => c.id === operation.channelTag); + assert(channel !== undefined, "channel must exist"); + await runInStateWithClient(state, client, datastore, channel, async () => { + await model.reducer(state, operation as TOperation); + }); + }; + return { + ...model, + generatorFactory, + reducer, + }; +} + +/** + * This modifies the value of "client" while callback is running, then restores it. + * This is does instead of copying the state since the state object is mutable, and running callback might make changes to state (like add new members) which are lost if state is just copied. + * + * Since the callback is async, this modification to the state could be an issue if multiple runs of this function are done concurrently. + */ +async function runInStateWithClient( + state: TState, + client: TState["client"], + datastore: TState["datastore"], + channel: TState["channel"], + callback: (state: TState) => Promise, +): Promise { + const old = { ...state }; + state.client = client; + state.datastore = datastore; + state.channel = channel; + try { + return await callback(state); + } finally { + // This code is explicitly trying to "update" to the old value. + + state.client = old.client; + state.datastore = old.datastore; + state.channel = old.channel; + } +} + +async function createDetachedClient( + localDeltaConnectionServer: ILocalDeltaConnectionServer, + codeLoader: ICodeDetailsLoader, + codeDetails: IFluidCodeDetails, + tag: `client-${number}`, +): Promise { + const container = await createDetachedContainer({ + codeLoader, + documentServiceFactory: new LocalDocumentServiceFactory(localDeltaConnectionServer), + urlResolver: new LocalResolver(), + codeDetails, + }); + + const maybe: FluidObject | undefined = + await container.getEntryPoint(); + assert(maybe.DefaultStressDataObject !== undefined, "must be DefaultStressDataObject"); + + const newClient: Client = { + container, + tag, + entryPoint: maybe.DefaultStressDataObject, + }; + return newClient; +} + +async function loadClient( + localDeltaConnectionServer: ILocalDeltaConnectionServer, + codeLoader: ICodeDetailsLoader, + tag: `client-${number}`, + url: string, +): Promise { + const container = await loadExistingContainer({ + documentServiceFactory: new LocalDocumentServiceFactory(localDeltaConnectionServer), + request: { url }, + urlResolver: new LocalResolver(), + codeLoader, + }); + + const maybe: FluidObject | undefined = + await container.getEntryPoint(); + assert(maybe.DefaultStressDataObject !== undefined, "must be DefaultStressDataObject"); + + return { + container, + tag, + entryPoint: maybe.DefaultStressDataObject, + }; +} +/** + * Runs the provided DDS fuzz model. All functionality is already assumed to be mixed in. + * @privateRemarks This is currently file-exported for testing purposes, but it could be reasonable to + * expose at the package level if we want to expose some of the harness's building blocks. + */ +async function runTestForSeed( + model: LocalServerStressModel, + options: Omit, + seed: number, + saveInfo?: SaveInfo, +): Promise { + const random = makeRandom(seed); + + const localDeltaConnectionServer = LocalDeltaConnectionServer.create(); + const codeDetails: IFluidCodeDetails = { + package: "local-server-stress-tests", + }; + const codeLoader = new LocalCodeLoader([[codeDetails, createRuntimeFactory()]]); + const tagCount: Partial> = {}; + const tag: LocalServerStressState["tag"] = (prefix) => + `${prefix}-${(tagCount[prefix] = (tagCount[prefix] ??= 0) + 1)}`; + + const detachedClient = await createDetachedClient( + localDeltaConnectionServer, + codeLoader, + codeDetails, + tag("client"), + ); + + const initialState: LocalServerStressState = { + clients: [detachedClient], + localDeltaConnectionServer, + codeLoader, + random, + validationClient: detachedClient, + client: makeUnreachableCodePathProxy("client"), + datastore: makeUnreachableCodePathProxy("datastore"), + channel: makeUnreachableCodePathProxy("channel"), + isDetached: true, + tag, + }; + + let operationCount = 0; + const generator = model.generatorFactory(); + const finalState = await performFuzzActionsAsync( + generator, + async (state, operation) => { + operationCount++; + return model.reducer(state, operation); + }, + initialState, + saveInfo, + ); + + // Sanity-check that the generator produced at least one operation. If it failed to do so, + // this usually indicates an error on the part of the test author. + assert(operationCount > 0, "Generator should have produced at least one operation."); + + finalState.clients.forEach((c) => c.container.dispose()); + finalState.validationClient.container.dispose(); +} + +function runTest( + model: LocalServerStressModel, + options: InternalOptions, + seed: number, + saveInfo: SaveInfo | undefined, +): void { + const itFn = options.only.has(seed) ? it.only : options.skip.has(seed) ? it.skip : it; + itFn(`workload: ${model.workloadName} seed: ${seed}`, async function () { + const inCi = process.env.TF_BUILD !== undefined; + const shouldMinimize: boolean = + options.skipMinimization !== true && + saveInfo !== undefined && + saveInfo.saveOnFailure !== false && + !inCi; + + // 10 seconds per test should be quite a bit more than is necessary, but + // a timeout during minimization can cause bad UX because it obfuscates + // the actual error + // + // it should be noted that if a timeout occurs during minimization, the + // intermediate results are not lost and will still be written to the file. + const noMinimizationTimeout = this.timeout() === 0 ? 0 : Math.max(2000, this.timeout()); + this.timeout(shouldMinimize ? 5 * noMinimizationTimeout : noMinimizationTimeout); + + try { + // don't write to files in CI + await runTestForSeed(model, options, seed, inCi ? undefined : saveInfo); + } catch (error) { + if (!shouldMinimize || saveInfo === undefined) { + throw error; + } + const savePath: string = (saveInfo?.saveOnFailure as SaveDestination).path; + let file: Buffer; + try { + file = readFileSync(savePath); + } catch { + // File could not be read and likely does not exist. + // Test may have failed outside of the fuzz test portion (on setup or teardown). + // Throw original error that made test fail. + throw error; + } + const operations = JSON.parse(file.toString()) as TOperation[]; + const minimizer = new FuzzTestMinimizer( + model.minimizationTransforms, + operations, + saveInfo, + async (generator) => replayTest(model, seed, generator, saveInfo, options), + 3, + ); + + const minimized = await minimizer.minimize(); + await saveOpsToFile(savePath, minimized); + + throw error; + } + }); +} + +type InternalOptions = Omit & { + only: Set; + skip: Set; +}; + +function isInternalOptions(options: LocalServerStressOptions): options is InternalOptions { + return options.only instanceof Set && options.skip instanceof Set; +} + +/** + * Performs the test again to verify if the DDS still fails with the same error message. + * + * @internal + */ +export async function replayTest( + ddsModel: LocalServerStressModel, + seed: number, + generator: AsyncGenerator, + saveInfo?: SaveInfo, + providedOptions?: Partial, +): Promise { + const options = { + ...defaultLocalServerStressSuiteOptions, + ...providedOptions, + only: new Set(providedOptions?.only ?? []), + skip: new Set(providedOptions?.skip ?? []), + }; + + const _model = getFullModel(ddsModel, options); + + const model = { + ..._model, + // We lose some type safety here because the options interface isn't generic + generatorFactory: () => generator, + }; + + await runTestForSeed(model, options, seed, saveInfo); +} + +/** + * Creates a suite of eventual consistency tests for a particular DDS model. + * @internal + */ +export function createLocalServerStressSuite( + ddsModel: LocalServerStressModel, + providedOptions?: Partial, +): void { + const options = { + ...defaultLocalServerStressSuiteOptions, + ...providedOptions, + }; + + const only = new Set(options.only); + const skip = new Set(options.skip); + Object.assign(options, { only, skip }); + assert(isInternalOptions(options)); + + const model = getFullModel(ddsModel, options); + + const describeFuzz = createFuzzDescribe({ defaultTestCount: options.defaultTestCount }); + describeFuzz(model.workloadName, ({ testCount, stressMode }) => { + before(() => { + if (options.saveFailures !== undefined) { + mkdirSync(getSaveDirectory(options.saveFailures.directory, model), { + recursive: true, + }); + } + if (options.saveSuccesses !== undefined) { + mkdirSync(getSaveDirectory(options.saveSuccesses.directory, model), { + recursive: true, + }); + } + }); + + const seeds = generateTestSeeds(testCount, stressMode); + for (const seed of seeds) { + runTest(model, options, seed, getSaveInfo(model, options, seed)); + } + + if (options.replay !== undefined) { + const seed = options.replay; + describe.only(`replay from file`, () => { + const saveInfo = getSaveInfo(model, options, seed); + assert( + saveInfo.saveOnFailure !== false, + "Cannot replay a file without a directory to save files in!", + ); + const operations = options.parseOperations( + readFileSync(saveInfo.saveOnFailure.path).toString(), + ); + + const replayModel = { + ...model, + // We lose some type safety here because the options interface isn't generic + generatorFactory: (): AsyncGenerator => + asyncGeneratorFromArray(operations as TOperation[]), + }; + runTest(replayModel, options, seed, undefined); + }); + } + }); +} + +/** + * @internal + */ +export interface ChangeConnectionState { + type: "changeConnectionState"; + connected: boolean; +} + +/** + * Mixes in functionality to disconnect and reconnect clients in a DDS fuzz model. + * @privateRemarks This is currently file-exported for testing purposes, but it could be reasonable to + * expose at the package level if we want to expose some of the harness's building blocks. + */ +export function mixinReconnect< + TOperation extends BaseOperation, + TState extends LocalServerStressState, +>( + model: LocalServerStressModel, + options: LocalServerStressOptions, +): LocalServerStressModel { + const generatorFactory: () => AsyncGenerator = + () => { + const baseGenerator = model.generatorFactory(); + return async (state): Promise => { + if (!state.isDetached && state.random.bool(options.reconnectProbability)) { + const client = state.clients.find((c) => c.tag === state.client.tag); + assert(client !== undefined); + return { + type: "changeConnectionState", + connected: client.container.connectionState === ConnectionState.Connected, + }; + } + + return baseGenerator(state); + }; + }; + + const minimizationTransforms = model.minimizationTransforms as + | MinimizationTransform[] + | undefined; + + const reducer: AsyncReducer = async ( + state, + operation, + ) => { + if (isOperationType("changeConnectionState", operation)) { + if (operation.connected) { + state.client.container.disconnect(); + } else { + state.client.container.connect(); + } + return state; + } else { + return model.reducer(state, operation); + } + }; + return { + ...model, + minimizationTransforms, + generatorFactory, + reducer, + }; +} + +const getFullModel = ( + ddsModel: LocalServerStressModel, + options: LocalServerStressOptions, +): LocalServerStressModel< + TOperation | AddClient | RemoveClient | Attach | Synchronize | ChangeConnectionState +> => + mixinAttach( + mixinSynchronization( + mixinAddRemoveClient( + mixinClientSelection(mixinReconnect(ddsModel, options), options), + options, + ), + options, + ), + options, + ); + +/** + * {@inheritDoc (createLocalServerStressSuite:function)} + * @internal + */ +// Explicit usage of namespace needed for api-extractor. +// eslint-disable-next-line @typescript-eslint/no-namespace +export namespace createLocalServerStressSuite { + /** + * Runs only the provided seeds. + * + * @example + * + * ```typescript + * // Runs only seed 42 for the given model. + * createLocalServerStressSuite.only(42)(model); + * ``` + * @internal + */ + export const only = + (...seeds: number[]) => + ( + ddsModel: LocalServerStressModel, + providedOptions?: Partial, + ): void => + createLocalServerStressSuite(ddsModel, { + ...providedOptions, + only: [...seeds, ...(providedOptions?.only ?? [])], + }); + + /** + * Skips the provided seeds. + * + * @example + * + * ```typescript + * // Skips seed 42 for the given model. + * createLocalServerStressSuite.skip(42)(model); + * ``` + * @internal + */ + export const skip = + (...seeds: number[]) => + ( + ddsModel: LocalServerStressModel, + providedOptions?: Partial, + ): void => + createLocalServerStressSuite(ddsModel, { + ...providedOptions, + skip: [...seeds, ...(providedOptions?.skip ?? [])], + }); +} diff --git a/packages/test/local-server-stress-tests/src/stressDataObject.ts b/packages/test/local-server-stress-tests/src/stressDataObject.ts new file mode 100644 index 000000000000..8e0a18ab2ff5 --- /dev/null +++ b/packages/test/local-server-stress-tests/src/stressDataObject.ts @@ -0,0 +1,313 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { stringToBuffer } from "@fluid-internal/client-utils"; +import { DataObject, DataObjectFactory } from "@fluidframework/aqueduct/internal"; +import { + AttachState, + type IRuntimeFactory, +} from "@fluidframework/container-definitions/internal"; +import { + loadContainerRuntime, + RuntimeHeaders, + type IContainerRuntimeOptionsInternal, +} from "@fluidframework/container-runtime/internal"; +// eslint-disable-next-line import/no-deprecated +import type { IContainerRuntimeWithResolveHandle_Deprecated } from "@fluidframework/container-runtime-definitions/internal"; +import type { + IFluidHandle, + FluidObject, + IFluidLoadable, +} from "@fluidframework/core-interfaces"; +import { assert, LazyPromise, unreachableCase } from "@fluidframework/core-utils/internal"; +import type { IChannel } from "@fluidframework/datastore-definitions/internal"; +import { ISharedMap, SharedMap } from "@fluidframework/map/internal"; +import { toFluidHandleInternal } from "@fluidframework/runtime-utils/internal"; + +import { ddsModelMap } from "./ddsModels.js"; +import { makeUnreachableCodePathProxy } from "./utils.js"; + +export interface UploadBlob { + type: "uploadBlob"; + tag: `blob-${number}`; +} +export interface CreateDataStore { + type: "createDataStore"; + asChild: boolean; + tag: `datastore-${number}`; +} + +export interface CreateChannel { + type: "createChannel"; + channelType: string; + tag: `channel-${number}`; +} + +export type StressDataObjectOperations = UploadBlob | CreateDataStore | CreateChannel; + +export class StressDataObject extends DataObject { + public static readonly factory: DataObjectFactory = new DataObjectFactory( + "StressDataObject", + StressDataObject, + [...ddsModelMap.values()].map((v) => v.factory), + {}, + [["StressDataObject", new LazyPromise(async () => StressDataObject.factory)]], + ); + + get StressDataObject() { + return this; + } + + private defaultStressObject: DefaultStressDataObject = makeUnreachableCodePathProxy( + "defaultStressDataObject", + ); + protected async getDefaultStressDataObject(): Promise { + const defaultDataStore = + await this.context.containerRuntime.getAliasedDataStoreEntryPoint("default"); + assert(defaultDataStore !== undefined, "default must exist"); + + const maybe: FluidObject | undefined = + await defaultDataStore.get(); + assert(maybe.DefaultStressDataObject !== undefined, "must be DefaultStressDataObject"); + return maybe.DefaultStressDataObject; + } + + /** + * this map is special, and doesn't participate in stress. it hold data + * about the name of channels which have been created. these created channel + * may or may not be attached and be available + */ + private channelNameMap: ISharedMap = makeUnreachableCodePathProxy("channelNameMap"); + protected async initializingFirstTime(props?: any): Promise { + this.channelNameMap = SharedMap.create(this.runtime, "channelNameMap"); + this.channelNameMap.bindToContext(); + this.channelNameMap.set("root", this.root.attributes.type); + } + + public async getChannels() { + const channels: IChannel[] = []; + for (const [name] of this.channelNameMap.entries()) { + // similar to container objects, the entries in this map + // can appear before the underlying channel is attached, + // so getting the channel can fail, and we need to try + // to get all channel each time, as we have no way to + // observer when a channel moves from detached to attached, + // especially on remove clients/ + const channel = await this.runtime.getChannel(name).catch(() => undefined); + if (channel !== undefined) { + channels.push(channel); + } + } + return channels; + } + + protected async hasInitialized(): Promise { + this.defaultStressObject = await this.getDefaultStressDataObject(); + + this.channelNameMap = (await this.runtime.getChannel( + "channelNameMap", + )) as any as ISharedMap; + } + + public get attached() { + return this.runtime.attachState === AttachState.Attached; + } + + public async uploadBlob(tag: `blob-${number}`, contents: string) { + const handle = await this.runtime.uploadBlob(stringToBuffer(contents, "utf-8")); + this.defaultStressObject.registerLocallyCreatedObject({ + type: "newBlob", + handle, + tag, + }); + } + + public createChannel(tag: `channel-${number}`, type: string) { + this.runtime.createChannel(tag, type); + this.channelNameMap.set(tag, type); + } + + public async createDataStore(tag: `datastore-${number}`, asChild: boolean) { + const dataStore = await this.context.containerRuntime.createDataStore( + asChild + ? [...this.context.packagePath, StressDataObject.factory.type] + : StressDataObject.factory.type, + ); + + const maybe: FluidObject | undefined = await dataStore.entryPoint.get(); + assert(maybe?.StressDataObject !== undefined, "must be stressDataObject"); + this.defaultStressObject.registerLocallyCreatedObject({ + type: "stressDataObject", + handle: dataStore.entryPoint, + tag, + stressDataObject: maybe.StressDataObject, + }); + } +} +export type ContainerObjects = + | { type: "newBlob"; handle: IFluidHandle; tag: `blob-${number}` } + | { + type: "stressDataObject"; + tag: `datastore-${number}`; + handle: IFluidHandle; + stressDataObject: StressDataObject; + }; + +export class DefaultStressDataObject extends StressDataObject { + public static readonly alias = "default"; + + public get DefaultStressDataObject() { + return this; + } + + /** + * these are object created in memory by this instance of the datastore, they + * will also be in these the containerObjectMap, but are not necessarily usable + * as they could be detached, in which can only this instance can access them. + */ + private readonly _locallyCreatedObjects: ContainerObjects[] = []; + public async getContainerObjects(): Promise[]> { + const containerObjects: Readonly[] = [...this._locallyCreatedObjects]; + const containerRuntime = // eslint-disable-next-line import/no-deprecated + this.context.containerRuntime as IContainerRuntimeWithResolveHandle_Deprecated; + for (const [url, entry] of this.containerObjectMap as any as [ + string, + ContainerObjects, + ][]) { + // the container objects map will see things before they are attached, + // so they may not be available to remote clients yet. + // Additionally, there is no way to observe when an + // object goes from detached to attached. + // Due to the both the above, we need to always try + // to resolve each object, and just ignore those which can't + // be found. + const resp = await containerRuntime.resolveHandle({ + url, + headers: { [RuntimeHeaders.wait]: false }, + }); + if (resp.status === 200) { + const maybe: FluidObject | undefined = resp.value; + const handle = maybe?.IFluidLoadable?.handle; + if (handle !== undefined) { + const type = entry?.type; + switch (type) { + case "newBlob": + containerObjects.push({ + ...entry, + handle, + }); + break; + case "stressDataObject": + assert(maybe?.StressDataObject !== undefined, "must be stressDataObject"); + + containerObjects.push({ + type: "stressDataObject", + tag: entry.tag, + handle, + stressDataObject: maybe.StressDataObject, + }); + break; + default: + unreachableCase(type, `${type}`); + } + } + } + } + return containerObjects; + } + + protected override async getDefaultStressDataObject(): Promise { + return this; + } + + /** + * this map is special, and doesn't participate in stress. it holds data + * about the name of container objects which have been created. these created objects + * may or may not be attached and be available + */ + private containerObjectMap: ISharedMap = makeUnreachableCodePathProxy("containerObjectMap"); + protected async initializingFirstTime(props?: any): Promise { + await super.initializingFirstTime(props); + this.containerObjectMap = SharedMap.create(this.runtime, "containerObjectMap"); + this.containerObjectMap.bindToContext(); + + this.registerLocallyCreatedObject({ + type: "stressDataObject", + handle: this.handle, + tag: `datastore-0`, + stressDataObject: this, + }); + } + + protected async initializingFromExisting(): Promise { + this.containerObjectMap = (await this.runtime.getChannel( + "containerObjectMap", + )) as any as ISharedMap; + } + + public registerLocallyCreatedObject(obj: ContainerObjects) { + if (obj.handle !== undefined) { + const handle = toFluidHandleInternal(obj.handle); + if (this.containerObjectMap.get(handle.absolutePath) === undefined) { + this.containerObjectMap.set(handle.absolutePath, { tag: obj.tag, type: obj.type }); + } + } + this._locallyCreatedObjects.push(obj); + } +} + +export const createRuntimeFactory = (): IRuntimeFactory => { + const defaultStressDataObjectFactory = new DataObjectFactory( + "DefaultStressDataObject", + DefaultStressDataObject, + [...ddsModelMap.values()].map((v) => v.factory), + {}, + [[StressDataObject.factory.type, StressDataObject.factory]], + ); + + const runtimeOptions: IContainerRuntimeOptionsInternal = { + summaryOptions: { + summaryConfigOverrides: { + maxOps: 3, + initialSummarizerDelayMs: 0, + } as any, + }, + }; + + return { + get IRuntimeFactory() { + return this; + }, + instantiateRuntime: async (context, existing) => { + const runtime = await loadContainerRuntime({ + context, + existing, + runtimeOptions, + registryEntries: [ + [ + defaultStressDataObjectFactory.type, + Promise.resolve(defaultStressDataObjectFactory), + ], + [StressDataObject.factory.type, Promise.resolve(StressDataObject.factory)], + ], + provideEntryPoint: async (rt) => { + const aliasedDefault = await rt.getAliasedDataStoreEntryPoint( + DefaultStressDataObject.alias, + ); + assert(aliasedDefault !== undefined, "default must exist"); + + return aliasedDefault.get(); + }, + }); + + if (!existing) { + const ds = await runtime.createDataStore(defaultStressDataObjectFactory.type); + await ds.trySetAlias(DefaultStressDataObject.alias); + } + + return runtime; + }, + }; +}; diff --git a/packages/test/local-server-stress-tests/src/test/dirname.cts b/packages/test/local-server-stress-tests/src/test/dirname.cts new file mode 100644 index 000000000000..ac1703eb418b --- /dev/null +++ b/packages/test/local-server-stress-tests/src/test/dirname.cts @@ -0,0 +1,15 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +/** + * Problem: + * - `__dirname` is not defined in ESM + * - `import.meta.url` is not defined in CJS + * Solution: + * - Export '__dirname' from a .cjs file in the same directory. + * + * Note that *.cjs files are always CommonJS, but can be imported from ESM. + */ +export const _dirname = __dirname; diff --git a/packages/test/local-server-stress-tests/src/test/localServerStress.spec.ts b/packages/test/local-server-stress-tests/src/test/localServerStress.spec.ts new file mode 100644 index 000000000000..70678d4994bc --- /dev/null +++ b/packages/test/local-server-stress-tests/src/test/localServerStress.spec.ts @@ -0,0 +1,103 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import * as path from "node:path"; + +import { + type AsyncGenerator, + combineReducersAsync, + createWeightedAsyncGenerator, + takeAsync, +} from "@fluid-private/stochastic-test-utils"; + +import { ddsModelMap } from "../ddsModels.js"; +import { + DDSModelOpGenerator, + DDSModelOpReducer, + validateConsistencyOfAllDDS, + type DDSModelOp, +} from "../ddsOperations"; +import { + createLocalServerStressSuite, + LocalServerStressModel, + type LocalServerStressState, +} from "../localServerStressHarness"; +import type { StressDataObjectOperations } from "../stressDataObject.js"; + +import { _dirname } from "./dirname.cjs"; + +type StressOperations = StressDataObjectOperations | DDSModelOp; + +const reducer = combineReducersAsync({ + createDataStore: async (state, op) => state.datastore.createDataStore(op.tag, op.asChild), + createChannel: async (state, op) => { + state.datastore.createChannel(op.tag, op.channelType); + }, + uploadBlob: async (state, op) => + // this will hang if we are offline due to disconnect, so we don't wait for blob upload + // this could potentially cause problems with replay if the blob upload doesn't finish + // before its handle is used. this hasn't been seen in practice, but nothing but timing and + // the fact that we assume local server is fast prevents it. + void state.datastore.uploadBlob(op.tag, state.random.string(state.random.integer(1, 16))), + DDSModelOp: DDSModelOpReducer, +}); + +function makeGenerator(): AsyncGenerator { + const asyncGenerator = createWeightedAsyncGenerator< + StressOperations, + LocalServerStressState + >([ + [ + async (state) => ({ + type: "createDataStore", + asChild: state.random.bool(), + tag: state.tag("datastore"), + }), + 1, + ], + [ + async (state) => ({ + type: "uploadBlob", + tag: state.tag("blob"), + }), + 10, + // local server doesn't support detached blobs + (state) => !state.isDetached, + ], + [ + async (state) => ({ + type: "createChannel", + channelType: state.random.pick([...ddsModelMap.keys()]), + tag: state.tag("channel"), + }), + 5, + ], + [DDSModelOpGenerator, 100], + ]); + + return async (state) => asyncGenerator(state); +} +export const saveFailures = { directory: path.join(_dirname, "../../src/test/results") }; +export const saveSuccesses = { directory: path.join(_dirname, "../../src/test/results") }; + +describe("Local Server Stress", () => { + const model: LocalServerStressModel = { + workloadName: "default", + generatorFactory: () => takeAsync(100, makeGenerator()), + reducer, + validateConsistency: validateConsistencyOfAllDDS, + }; + + createLocalServerStressSuite(model, { + defaultTestCount: 100, + // skipMinimization: true, + // Uncomment to replay a particular seed. + // replay: 93, + // only: [99], + saveFailures, + // saveSuccesses, + skip: [93], + }); +}); diff --git a/packages/test/local-server-stress-tests/src/test/tsconfig.json b/packages/test/local-server-stress-tests/src/test/tsconfig.json new file mode 100644 index 000000000000..3516d9e5ed83 --- /dev/null +++ b/packages/test/local-server-stress-tests/src/test/tsconfig.json @@ -0,0 +1,10 @@ +{ + "extends": "../../../../../common/build/build-common/tsconfig.test.node16.json", + "compilerOptions": { + "rootDir": "../", + "outDir": "../../lib", + "types": ["mocha", "node"], + "noUncheckedIndexedAccess": false, + "exactOptionalPropertyTypes": false, + }, +} diff --git a/packages/test/local-server-stress-tests/src/utils.ts b/packages/test/local-server-stress-tests/src/utils.ts new file mode 100644 index 000000000000..419b8fb0f6bf --- /dev/null +++ b/packages/test/local-server-stress-tests/src/utils.ts @@ -0,0 +1,13 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +export function makeUnreachableCodePathProxy(name: string): T { + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + return new Proxy({} as T, { + get: (): never => { + throw new Error(`Unexpected read of '${name}:' this indicates a bug in the harness.`); + }, + }); +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4f27abe48090..2fd4c6f11173 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -13732,6 +13732,139 @@ importers: specifier: ^5.1.4 version: 5.1.4(webpack-bundle-analyzer@4.10.2)(webpack@5.97.1) + packages/test/local-server-stress-tests: + dependencies: + '@fluid-experimental/tree': + specifier: workspace:~ + version: link:../../../experimental/dds/tree + '@fluid-internal/client-utils': + specifier: workspace:~ + version: link:../../common/client-utils + '@fluid-internal/mocha-test-setup': + specifier: workspace:~ + version: link:../mocha-test-setup + '@fluid-private/stochastic-test-utils': + specifier: workspace:~ + version: link:../stochastic-test-utils + '@fluid-private/test-dds-utils': + specifier: workspace:~ + version: link:../../dds/test-dds-utils + '@fluidframework/aqueduct': + specifier: workspace:~ + version: link:../../framework/aqueduct + '@fluidframework/build-common': + specifier: ^2.0.3 + version: 2.0.3 + '@fluidframework/build-tools': + specifier: ^0.53.0 + version: 0.53.0(@types/node@18.19.67) + '@fluidframework/container-definitions': + specifier: workspace:~ + version: link:../../common/container-definitions + '@fluidframework/container-loader': + specifier: workspace:~ + version: link:../../loader/container-loader + '@fluidframework/container-runtime': + specifier: workspace:~ + version: link:../../runtime/container-runtime + '@fluidframework/container-runtime-definitions': + specifier: workspace:~ + version: link:../../runtime/container-runtime-definitions + '@fluidframework/core-interfaces': + specifier: workspace:~ + version: link:../../common/core-interfaces + '@fluidframework/core-utils': + specifier: workspace:~ + version: link:../../common/core-utils + '@fluidframework/datastore': + specifier: workspace:~ + version: link:../../runtime/datastore + '@fluidframework/datastore-definitions': + specifier: workspace:~ + version: link:../../runtime/datastore-definitions + '@fluidframework/driver-definitions': + specifier: workspace:~ + version: link:../../common/driver-definitions + '@fluidframework/driver-utils': + specifier: workspace:~ + version: link:../../loader/driver-utils + '@fluidframework/eslint-config-fluid': + specifier: ^5.7.3 + version: 5.7.3(eslint@8.55.0)(typescript@5.4.5) + '@fluidframework/id-compressor': + specifier: workspace:~ + version: link:../../runtime/id-compressor + '@fluidframework/local-driver': + specifier: workspace:~ + version: link:../../drivers/local-driver + '@fluidframework/map': + specifier: workspace:~ + version: link:../../dds/map + '@fluidframework/runtime-definitions': + specifier: workspace:~ + version: link:../../runtime/runtime-definitions + '@fluidframework/runtime-utils': + specifier: workspace:~ + version: link:../../runtime/runtime-utils + '@fluidframework/sequence': + specifier: workspace:~ + version: link:../../dds/sequence + '@fluidframework/server-local-server': + specifier: ^5.0.0 + version: 5.0.0 + '@fluidframework/telemetry-utils': + specifier: workspace:~ + version: link:../../utils/telemetry-utils + '@fluidframework/test-utils': + specifier: workspace:~ + version: link:../test-utils + '@fluidframework/tree': + specifier: workspace:~ + version: link:../../dds/tree + uuid: + specifier: ^9.0.0 + version: 9.0.1 + devDependencies: + '@biomejs/biome': + specifier: ~1.9.3 + version: 1.9.4 + '@types/mocha': + specifier: ^10.0.10 + version: 10.0.10 + '@types/node': + specifier: ^18.19.0 + version: 18.19.67 + '@types/uuid': + specifier: ^9.0.2 + version: 9.0.8 + c8: + specifier: ^8.0.1 + version: 8.0.1 + cross-env: + specifier: ^7.0.3 + version: 7.0.3 + eslint: + specifier: ~8.55.0 + version: 8.55.0 + mocha: + specifier: ^10.2.0 + version: 10.8.2 + mocha-multi-reporters: + specifier: ^1.5.1 + version: 1.5.1(mocha@10.8.2) + prettier: + specifier: ~3.0.3 + version: 3.0.3 + rimraf: + specifier: ^4.4.0 + version: 4.4.1 + ts-loader: + specifier: ^9.5.1 + version: 9.5.1(typescript@5.4.5)(webpack@5.97.1) + typescript: + specifier: ~5.4.5 + version: 5.4.5 + packages/test/local-server-tests: devDependencies: '@biomejs/biome':