Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for -type runAll as described in documentation #28

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,17 @@ After building the tool, run it with customizable parameters:
- `-largeDocs`: Use large documents (2K) (default: false).
- `-dropDb`: Drop the database before running the test (default: true).
- `-uri`: MongoDB connection URI.
- `-type`: Type of test to run. Accepts `insert`, `update`, `delete`, `upsert`, or `runAll`:
- `-createIndex`: Create indexes (just in `insertdoc` mode)
- `-out`: Output file prefix. (Default: empty, using "benchmark_results_*")

- `-type`: Type of test to run. Accepts `insert`, `insertdoc`, `update`, `delete`, `upsert`, or `runAll`:
- `insert`: The tool will insert new documents.
- `insertdoc`: The tool will insert new more complex documents to simulate real-life data.
- `update`: The tool will update existing documents (requires that documents have been inserted in a prior run).
- `delete`: The tool will delete existing documents. (just if `docs` is given)
- `upsert`: The tool will perform upserts, repeatedly updating a specified range. (just if `docs` is given)
- `runAll`: Runs the `insert`, `update`, `delete`, and `upsert` tests sequentially. (just if `docs` is given)
- `runAll`: Runs the `insert`, `update` tests sequentially. (just if `duration` is given)
- `runAll`: Runs the `insert`, `update`, `delete`, `upsert` and `insertdoc` tests sequentially. (just if `docs` is given)
- `runAll`: Runs the `insert`, `insertdoc`, `update` tests sequentially. (just if `duration` is given)

### Example Commands

Expand Down
71 changes: 59 additions & 12 deletions docs_testing_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,30 @@ import (
"context"
"encoding/csv"
"fmt"
"github.com/rcrowley/go-metrics"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/options"
"log"
"math/rand"
"os"
"sync"
"time"

"github.com/rcrowley/go-metrics"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type DocCountTestingStrategy struct{}

func (t DocCountTestingStrategy) runTestSequence(collection CollectionAPI, config TestingConfig) {
tests := []string{"insert", "update", "delete", "upsert"}
tests := []string{"insert", "update", "delete", "upsert", "insertdoc"}
for _, test := range tests {
t.runTest(collection, test, config, fetchDocumentIDs)
}
}

func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI, int64, string) ([]primitive.ObjectID, error)) {
if testType == "insert" || testType == "upsert" {
if testType == "insert" || testType == "upsert" || testType == "insertdoc" {
if config.DropDb {
if err := collection.Drop(context.Background()); err != nil {
log.Fatalf("Failed to drop collection: %v", err)
Expand All @@ -38,6 +40,33 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri
log.Printf("Starting %s test...\n", testType)
}

// Create indexes before insertdoc test begins
if testType == "insertdoc" && config.CreateIndex == true {
log.Println("Creating indexes for insertdoc benchmark...")

indexes := []mongo.IndexModel{
{Keys: bson.D{{Key: "author", Value: 1}}},
{Keys: bson.D{{Key: "tags", Value: 1}}},
{Keys: bson.D{{Key: "timestamp", Value: -1}}},
{Keys: bson.D{{Key: "content", Value: "text"}}},
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

mongoColl, ok := collection.(*MongoDBCollection)
if !ok {
log.Println("Index creation skipped: Collection is not a MongoDBCollection")
} else {
_, err := mongoColl.Indexes().CreateMany(ctx, indexes)
if err != nil {
log.Printf("Failed to create indexes: %v", err)
} else {
log.Println("Indexes created successfully.")
}
}
}

var partitions [][]primitive.ObjectID

var threads = config.Threads
Expand All @@ -56,7 +85,7 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri
partitions[i%threads] = append(partitions[i%threads], id)
}

case "insert", "upsert":
case "insert", "upsert", "insertdoc":
partitions = make([][]primitive.ObjectID, threads)
for i := 0; i < docCount; i++ {
partitions[i%threads] = append(partitions[i%threads], primitive.NewObjectID())
Expand All @@ -73,6 +102,8 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri
docID := docIDs[rand.Intn(len(docIDs))]
partitions[i%threads] = append(partitions[i%threads], docID)
}
default:
log.Fatalf("Unknown or unsupported test type, exiting...")
}

// Start the ticker just before starting the main workload goroutines
Expand All @@ -81,10 +112,11 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri
records = append(records, []string{"t", "count", "mean", "m1_rate", "m5_rate", "m15_rate", "mean_rate"})

var doc interface{}
var data = make([]byte, 1024*2)
generator := NewDocumentGenerator()
/*var data = make([]byte, 1024*2)
for i := 0; i < len(data); i++ {
data[i] = byte(rand.Intn(256))
}
}*/

secondTicker := time.NewTicker(1 * time.Second)
defer secondTicker.Stop()
Expand Down Expand Up @@ -123,16 +155,26 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri
switch testType {
case "insert":
if config.LargeDocs {
doc = bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1, "data": data}
//doc = bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1, "data": data}
doc = generator.GenerateLarge(i)
} else {
doc = bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1}
//doc = bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1}
doc = generator.GenerateSimple(i)
}
_, err := collection.InsertOne(context.Background(), doc)
if err == nil {
insertRate.Mark(1)
} else {
log.Printf("Insert failed: %v", err)
}
case "insertdoc":
doc = generator.GenerateComplex(i)
_, err := collection.InsertOne(context.Background(), doc)
if err == nil {
insertRate.Mark(1)
} else {
log.Printf("Insertdoc failed: %v", err)
}
case "update":
randomDocID := partition[rand.Intn(len(partition))]
filter := bson.M{"_id": randomDocID}
Expand Down Expand Up @@ -192,7 +234,12 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri
}
records = append(records, finalRecord)

filename := fmt.Sprintf("benchmark_results_%s.csv", testType)
filenamePrefix := "benchmark_results"
if config.OutputFilePrefix != "" {
filenamePrefix = config.OutputFilePrefix
}

filename := fmt.Sprintf("%s_%s.csv", filenamePrefix, testType)
file, err := os.Create(filename)
if err != nil {
log.Fatalf("Failed to create CSV file: %v", err)
Expand Down
90 changes: 90 additions & 0 deletions document_generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main

import (
"math/rand"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)

type DocumentGenerator struct {
rnd *rand.Rand
data []byte
tags []string
authors []string
category []string
lorem []string
}

func NewDocumentGenerator() *DocumentGenerator {
// Init once
src := rand.NewSource(time.Now().UnixNano())
return &DocumentGenerator{
rnd: rand.New(src),
data: make([]byte, 1024*2),
tags: []string{"MongoDB", "Benchmark", "CMS", "Database", "Performance", "WebApp", "Scalability", "Indexing", "Query Optimization", "Sharding"},
authors: []string{"Alice Example", "John Doe", "Maria Sample", "Max Mustermann", "Sophie Miller", "Liam Johnson", "Emma Brown", "Noah Davis", "Olivia Wilson", "William Martinez"},
category: []string{"Tech", "Business", "Science", "Health", "Sports", "Education"},
lorem: []string{
"Lorem ipsum dolor sit amet, consectetur adipiscing elit.",
"Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.",
"Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.",
"Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.",
"Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.",
},
}
}

func (g *DocumentGenerator) GenerateSimple(threadRunCount int) bson.M {
return bson.M{"threadRunCount": threadRunCount, "rnd": rand.Int63(), "v": 1}
}

func (g *DocumentGenerator) GenerateLarge(threadRunCount int) bson.M {
for i := range g.data {
g.data[i] = byte(rand.Intn(256))
}
return bson.M{"threadRunCount": threadRunCount, "rnd": rand.Int63(), "v": 1, "data": g.data}
}

func (g *DocumentGenerator) GenerateComplex(threadRunCount int) bson.M {
numTags := rand.Intn(3) + 4 // 4–6 tags
numCoAuthors := rand.Intn(3) + 1 // 1–3 co-authors

tags := g.randomSample(g.tags, numTags)
coAuthors := g.randomSample(g.authors, numCoAuthors)
category := g.category[rand.Intn(len(g.category))]
author := g.authors[rand.Intn(len(g.authors))]

return bson.M{
"_id": primitive.NewObjectID(),
"threadRunCount": threadRunCount,
"rnd": g.rnd.Int63(),
"v": 1,
"title": g.generateLoremIpsum(30),
"author": author,
"co_authors": coAuthors,
"summary": g.generateLoremIpsum(100),
"content": g.generateLoremIpsum(2000 + rand.Intn(3000)),
"tags": tags,
"category": category,
"timestamp": time.Now().Add(-time.Duration(rand.Intn(365*2)) * 24 * time.Hour),
"views": rand.Intn(10000),
"comments": rand.Intn(500),
"likes": rand.Intn(1000),
"shares": rand.Intn(200),
}
}

func (g *DocumentGenerator) generateLoremIpsum(minLen int) string {
text := ""
for len(text) < minLen {
text += g.lorem[g.rnd.Intn(len(g.lorem))] + " "
}
return text[:minLen]
}

func (g *DocumentGenerator) randomSample(list []string, n int) []string {
g.rnd.Shuffle(len(list), func(i, j int) { list[i], list[j] = list[j], list[i] })
return list[:n]
}
72 changes: 61 additions & 11 deletions duration_testing_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,31 @@ import (
"context"
"encoding/csv"
"fmt"
"go.mongodb.org/mongo-driver/bson/primitive"
"log"
"math/rand"
"os"
"sync"
"time"

"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"

"github.com/rcrowley/go-metrics"
"go.mongodb.org/mongo-driver/bson"
)

type DurationTestingStrategy struct{}

func (t DurationTestingStrategy) runTestSequence(collection CollectionAPI, config TestingConfig) {
tests := []string{"insert", "update"}
tests := []string{"insert", "insertdoc", "update"}
for _, test := range tests {
t.runTest(collection, test, config, fetchDocumentIDs)
}
}

func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI, int64, string) ([]primitive.ObjectID, error)) {
var partitions [][]primitive.ObjectID
if testType == "insert" {
if testType == "insert" || testType == "insertdoc" {
if config.DropDb {
if err := collection.Drop(context.Background()); err != nil {
log.Fatalf("Failed to clear collection before test: %v", err)
Expand All @@ -35,6 +37,35 @@ func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType stri
} else {
log.Println("Collection stays. Dropping disabled.")
}

// todo: prevent code duplicates
// Create indexes before insertdoc test begins
if testType == "insertdoc" && config.CreateIndex == true {
log.Println("Creating indexes for insertdoc benchmark...")

indexes := []mongo.IndexModel{
{Keys: bson.D{{Key: "author", Value: 1}}},
{Keys: bson.D{{Key: "tags", Value: 1}}},
{Keys: bson.D{{Key: "timestamp", Value: -1}}},
{Keys: bson.D{{Key: "content", Value: "text"}}},
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

mongoColl, ok := collection.(*MongoDBCollection)
if !ok {
log.Println("Index creation skipped: Collection is not a MongoDBCollection")
} else {
_, err := mongoColl.Indexes().CreateMany(ctx, indexes)
if err != nil {
log.Printf("Failed to create indexes: %v", err)
} else {
log.Println("Indexes created successfully.")
}
}
}

} else if testType == "update" {
docIDs, err := fetchDocIDs(collection, int64(config.DocCount), testType)
if err != nil {
Expand All @@ -53,10 +84,7 @@ func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType stri
}

var doc interface{}
var data = make([]byte, 1024*2)
for i := 0; i < len(data); i++ {
data[i] = byte(rand.Intn(256))
}
generator := NewDocumentGenerator()

endTime := time.Now().Add(time.Duration(config.Duration) * time.Second)
insertRate := metrics.NewMeter()
Expand Down Expand Up @@ -99,10 +127,11 @@ func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType stri

for time.Now().Before(endTime) {
if config.LargeDocs {
doc = bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1, "data": data}

//doc = bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1, "data": data}
doc = generator.GenerateLarge(i)
} else {
doc = bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1}
//doc = bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1}
doc = generator.GenerateSimple(i)
}
_, err := collection.InsertOne(context.Background(), doc)
if err == nil {
Expand All @@ -113,6 +142,22 @@ func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType stri
}
}()
}
} else if testType == "insertdoc" {
for i := 0; i < config.Threads; i++ {
go func() {
defer wg.Done()

for time.Now().Before(endTime) {
doc = generator.GenerateComplex(i)
_, err := collection.InsertOne(context.Background(), doc)
if err == nil {
insertRate.Mark(1)
} else {
log.Printf("Insertdoc failed: %v", err)
}
}
}()
}
} else {
for i := 0; i < config.Threads; i++ {
// Check if the partition is non-empty for this thread
Expand Down Expand Up @@ -167,7 +212,12 @@ func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType stri
records = append(records, finalRecord)

// Write metrics to CSV file
filename := fmt.Sprintf("benchmark_results_%s.csv", testType)
filenamePrefix := "benchmark_results"
if config.OutputFilePrefix != "" {
filenamePrefix = config.OutputFilePrefix
}

filename := fmt.Sprintf("%s_%s.csv", filenamePrefix, testType)
file, err := os.Create(filename)
if err != nil {
log.Fatalf("Failed to create CSV file: %v", err)
Expand Down
Loading